Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions pkg/worker/analyzed_audit_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ func (w *Worker) AnalyzedAuditJob(ctx context.Context, args ...interface{}) erro
}
// Note, this will panic if the context doesn't have a faktory job context it will panic.
helper := faktory_worker.HelperFor(ctx)
logger := loggerWithFaktoryFields(w.Logger, helper, logfields.ModelPlanID(modelPlanID), logfields.Date(dayToAnalyze))
base := loggerWithFaktoryFields(w.Logger, helper, logfields.ModelPlanID(modelPlanID), logfields.Date(dayToAnalyze))
logger := RetryAwareLogger(ctx, base) // demotes Error->Warn unless final attempt
_, err = resolvers.AnalyzeModelPlanForAnalyzedAudit(ctx, w.Store, logger, dayToAnalyze, modelPlanID)

if err != nil {
Expand All @@ -52,7 +53,8 @@ func (w *Worker) AnalyzedAuditJob(ctx context.Context, args ...interface{}) erro
// args[0] date
func (w *Worker) AnalyzedAuditBatchJob(ctx context.Context, args ...interface{}) error {
helper := faktory_worker.HelperFor(ctx)
logger := loggerWithFaktoryFieldsWithoutBatchID(w.Logger, helper)
base := loggerWithFaktoryFieldsWithoutBatchID(w.Logger, helper)
logger := RetryAwareLogger(ctx, base) // demotes Error->Warn unless final attempt
logger.Info("starting analyzed audit batch job")

dayToAnalyze := args[0]
Expand Down Expand Up @@ -94,7 +96,8 @@ func (w *Worker) AnalyzedAuditBatchJob(ctx context.Context, args ...interface{})
func (w *Worker) AnalyzedAuditBatchJobSuccess(ctx context.Context, args ...interface{}) error {
dateAnalyzed := args[0]
help := faktory_worker.HelperFor(ctx)
logger := loggerWithFaktoryFields(w.Logger, help)
base := loggerWithFaktoryFields(w.Logger, help)
logger := RetryAwareLogger(ctx, base) // demotes Error->Warn unless final attempt

// Kick off DigestEmailBatchJob
return help.With(func(cl *faktory.Client) error {
Expand Down
11 changes: 6 additions & 5 deletions pkg/worker/digest_cron_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

faktory "github.com/contribsys/faktory/client"
faktory_worker "github.com/contribsys/faktory_worker_go"
"go.uber.org/zap"
)

const (
Expand All @@ -14,12 +15,12 @@ const (

// DigestCronJob is the job the cron schedule calls
func (w *Worker) DigestCronJob(ctx context.Context, args ...interface{}) error {
dayToAnalyze := time.Now().AddDate(0, 0, -1).UTC().Format("2006-01-02")

// Call AnalyzedAuditBatchJob
helper := faktory_worker.HelperFor(ctx)
logger := loggerWithFaktoryFields(w.Logger, helper)
logger.Info("creating Daily Analyzed Audit Cron Job")
base := loggerWithFaktoryFields(w.Logger, helper)
logger := RetryAwareLogger(ctx, base) // demotes Error->Warn unless final attempt

dayToAnalyze := time.Now().AddDate(0, 0, -1).UTC().Format("2006-01-02")
logger.Info("creating Daily Analyzed Audit Cron Job", zap.String("day", dayToAnalyze))

return helper.With(func(cl *faktory.Client) error {
job := faktory.NewJob(analyzedAuditBatchJobName, dayToAnalyze)
Expand Down
13 changes: 9 additions & 4 deletions pkg/worker/digest_email_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ func (w *Worker) DigestEmailBatchJob(ctx context.Context, args ...interface{}) e
dateAnalyzed := args[0].(string)

helper := faktory_worker.HelperFor(ctx)
logger := loggerWithFaktoryFieldsWithoutBatchID(w.Logger, helper)
base := loggerWithFaktoryFieldsWithoutBatchID(w.Logger, helper)
logger := RetryAwareLogger(ctx, base) // demotes Error->Warn unless final attempt

logger.Info("getting collection of unique userIds that have favorited a model")

Expand Down Expand Up @@ -74,7 +75,8 @@ func (w *Worker) DigestEmailBatchJob(ctx context.Context, args ...interface{}) e
// args[0] date
func (w *Worker) DigestEmailBatchJobSuccess(ctx context.Context, args ...interface{}) error {
helper := faktory_worker.HelperFor(ctx)
logger := loggerWithFaktoryFields(w.Logger, helper)
base := loggerWithFaktoryFields(w.Logger, helper)
logger := RetryAwareLogger(ctx, base) // demotes Error->Warn unless final attempt
logger.Info("Digest Email Batch Job Succeeded")
// TODO: Add notification here if wanted in the future
return nil
Expand All @@ -95,7 +97,9 @@ func (w *Worker) DigestEmailJob(ctx context.Context, args ...interface{}) error
return err
}
helper := faktory_worker.HelperFor(ctx)
logger := loggerWithFaktoryFields(w.Logger, helper, logfields.Date(dateAnalyzed), logfields.UserID(userID))
base := loggerWithFaktoryFields(w.Logger, helper, logfields.Date(dateAnalyzed), logfields.UserID(userID))
logger := RetryAwareLogger(ctx, base) // demotes Error->Warn unless final attempt

logger.Info("preparing to send daily digest email")

// Note, if desired we can wrap this in a transaction so if there is a failure sending an email, the notification in the database also gets rolled back.
Expand All @@ -115,7 +119,8 @@ func (w *Worker) AggregatedDigestEmailJob(ctx context.Context, args ...interface
return err
}
helper := faktory_worker.HelperFor(ctx)
logger := loggerWithFaktoryFields(w.Logger, helper, logfields.Date(dateAnalyzed))
base := loggerWithFaktoryFields(w.Logger, helper, logfields.Date(dateAnalyzed))
logger := RetryAwareLogger(ctx, base) // demotes Error->Warn unless final attempt
logger.Info("preparing to send aggregated digest email")
err = AggregatedDigestEmailJob(
dateAnalyzed,
Expand Down
104 changes: 104 additions & 0 deletions pkg/worker/job_utilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (

"github.com/cms-enterprise/mint-app/pkg/apperrors"
"github.com/cms-enterprise/mint-app/pkg/logfields"

faktory "github.com/contribsys/faktory/client"
)

// JobWithPanicProtection wraps a faktory Job in a wrapper function that will return an error instead of stopping the application.
Expand Down Expand Up @@ -68,3 +70,105 @@ func loggerWithFaktoryStandardFields(logger *zap.Logger, jid string, jobType str
}, extraFields...)
return logger.With(fields...)
}

// RetryAwareLogging returns middleware that logs WARN if a failure will be retried,
// and ERROR only on the final failing attempt (i.e., no retries remain).
func RetryAwareLogging(logger *zap.Logger) faktory_worker.MiddlewareFunc {
return func(ctx context.Context, job *faktory.Job, next func(ctx context.Context) error) error {
logger = CapAtWarn(logger)

help := faktory_worker.HelperFor(ctx)

maxRetries := defaultMaxRetries
if job.Retry != nil {
maxRetries = *job.Retry
}
failCount := 0
if job.Failure != nil {
failCount = job.Failure.RetryCount
}
isFinal := failCount >= maxRetries

// Put the flag in context so jobs/downstream can decide how to log
ctx = withIsFinalAttempt(ctx, isFinal)

// Run the job
err := next(ctx)
if err == nil {
return nil
}

// Also emit your structured summary line
log := loggerWithFaktoryFieldsWithoutBatchID(
logger, help,
zap.Int("fail_count_so_far", failCount),
zap.Int("max_retries", maxRetries),
)
if isFinal {
log.Error("job failed on final attempt; no retries remain", zap.Error(err))
} else {
log.Warn("job failed; will retry", zap.Error(err))
}
return err
}
}

// context key so jobs can know if this run is final
type retryCtxKey struct{}

func withIsFinalAttempt(ctx context.Context, isFinal bool) context.Context {
return context.WithValue(ctx, retryCtxKey{}, isFinal)
}

func isFinalAttempt(ctx context.Context) bool {
v := ctx.Value(retryCtxKey{})
if b, ok := v.(bool); ok {
return b
}
return false
}

// downgradeErrorsCore wraps a Core and, when enabled, converts ERROR logs to WARN.
type downgradeErrorsCore struct {
zapcore.Core
downgrade bool
}

func (d *downgradeErrorsCore) With(fields []zapcore.Field) zapcore.Core {
return &downgradeErrorsCore{Core: d.Core.With(fields), downgrade: d.downgrade}
}

func (d *downgradeErrorsCore) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry {
if d.downgrade && ent.Level == zapcore.ErrorLevel {
ent.Level = zapcore.WarnLevel
}
return d.Core.Check(ent, ce)
}

// RetryAwareLogger returns a logger that demotes Error->Warn if !final attempt.
func RetryAwareLogger(ctx context.Context, base *zap.Logger) *zap.Logger {
if isFinalAttempt(ctx) {
return base // final run: keep ERRORs as ERROR
}
return base.WithOptions(zap.WrapCore(func(c zapcore.Core) zapcore.Core {
return &downgradeErrorsCore{Core: c, downgrade: true}
}))
}

// Core wrapper that forces any level > Warn down to Warn.
type warnCapCore struct{ zapcore.Core }

func (c warnCapCore) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry {
if ent.Level > zapcore.WarnLevel {
ent.Level = zapcore.WarnLevel
}
return c.Core.Check(ent, ce)
}

// CapAtWarn returns a copy of base that never logs above Warn.
// (logger.Error(...) will be emitted as a warn entry.)
func CapAtWarn(base *zap.Logger) *zap.Logger {
return base.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
return warnCapCore{Core: core}
}))
}
128 changes: 128 additions & 0 deletions pkg/worker/job_utilities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@ package worker
import (
"context"
"encoding/json"
"errors"
"fmt"
"regexp"
"strings"
"testing"

faktory "github.com/contribsys/faktory/client"
worker "github.com/contribsys/faktory_worker_go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/cms-enterprise/mint-app/pkg/logfields"
)
Expand Down Expand Up @@ -114,3 +119,126 @@ func TestDecorateFaktoryLoggerStandardFields(t *testing.T) {

})
}

// A tiny helper to decode the *last* JSON log line in the buffer.
func lastLogLine(buf string, t *testing.T) map[string]interface{} {
t.Helper()
lines := strings.Split(strings.TrimSpace(buf), "\n")
require.NotEmpty(t, lines, "expected at least one log line")
var m map[string]interface{}
require.NoError(t, json.Unmarshal([]byte(lines[len(lines)-1]), &m))
return m
}

func TestRetryAwareLogging_WarnsWhenRetriesRemain(t *testing.T) {
ws, zl := createTestLogger()

// Wire middleware into a real Manager so HelperFor(ctx) is valid.
mgr := worker.NewManager()
mgr.Use(RetryAwareLogging(zl))

// Register a job that always fails.
mgr.Register("FailingJob", func(ctx context.Context, args ...interface{}) error {
return errors.New("boom")
})

// First failure: no Failure set yet -> fail_count_so_far = 0
job := faktory.NewJob("FailingJob")

// Execute inline (runs middleware + job)
err := mgr.InlineDispatch(job)
require.Error(t, err)

out := ws.GetBufferString()
ll := lastLogLine(out, t)

require.Equal(t, "warn", ll["level"])
require.Equal(t, "job failed; will retry", ll["msg"])
require.EqualValues(t, 0, ll["fail_count_so_far"])
require.EqualValues(t, defaultMaxRetries, ll["max_retries"])
require.NotEmpty(t, ll["jid"])
require.NotEmpty(t, ll["job_type"])
}

func TestRetryAwareLogging_ErrorsOnFinalAttempt_DefaultRetries(t *testing.T) {
ws, zl := createTestLogger()

mgr := worker.NewManager()
mgr.Use(RetryAwareLogging(zl))
mgr.Register("FailingJob", func(ctx context.Context, args ...interface{}) error {
return errors.New("boom")
})

// Simulate we already failed `defaultMaxRetries` times BEFORE this run.
job := faktory.NewJob("FailingJob")
job.Failure = &faktory.Failure{RetryCount: defaultMaxRetries}

err := mgr.InlineDispatch(job)
require.Error(t, err)

ll := lastLogLine(ws.GetBufferString(), t)
require.Equal(t, "error", ll["level"])
require.Equal(t, "job failed on final attempt; no retries remain", ll["msg"])
require.EqualValues(t, defaultMaxRetries, ll["fail_count_so_far"])
require.EqualValues(t, defaultMaxRetries, ll["max_retries"])
}

func TestRetryAwareLogging_RespectsPerJobRetryOverride(t *testing.T) {
ws, zl := createTestLogger()

mgr := worker.NewManager()
mgr.Use(RetryAwareLogging(zl))
mgr.Register("FailingJob", func(ctx context.Context, args ...interface{}) error {
return errors.New("boom")
})

// Per-job retry override: allow only 1 retry total.
one := 1
job := faktory.NewJob("FailingJob")
job.Retry = &one

// If we've already failed once BEFORE this run, this run is final.
job.Failure = &faktory.Failure{RetryCount: 1}

err := mgr.InlineDispatch(job)
require.Error(t, err)

ll := lastLogLine(ws.GetBufferString(), t)
require.Equal(t, "error", ll["level"])
require.Equal(t, "job failed on final attempt; no retries remain", ll["msg"])
require.EqualValues(t, 1, ll["fail_count_so_far"])
require.EqualValues(t, 1, ll["max_retries"])
}

func TestRetryAwareLogging_NoLogOnSuccess(t *testing.T) {
ws, zl := createTestLogger()
mgr := worker.NewManager()
mgr.Use(RetryAwareLogging(zl))
mgr.Register("Succeeds", func(ctx context.Context, args ...interface{}) error {
return nil
})

job := faktory.NewJob("Succeeds")
err := mgr.InlineDispatch(job)
require.NoError(t, err)

// Middleware should not emit warn/error on success.
require.Equal(t, "", strings.TrimSpace(ws.GetBufferString()))
}

// Optional: sanity check that helper fields are present in logs
func TestRetryAwareLogging_EmitsHelperFields(t *testing.T) {
ws, zl := createTestLogger()
mgr := worker.NewManager()
mgr.Use(RetryAwareLogging(zl))
mgr.Register("FailingJob", func(ctx context.Context, args ...interface{}) error {
return errors.New("boom")
})

job := faktory.NewJob("FailingJob")
_ = mgr.InlineDispatch(job)

ll := lastLogLine(ws.GetBufferString(), t)
require.NotEmpty(t, ll["jid"])
require.NotEmpty(t, ll["job_type"])
}
6 changes: 4 additions & 2 deletions pkg/worker/model_status_update_batch_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ const (
func (w *Worker) ModelStatusUpdateBatchJob(ctx context.Context, args ...interface{}) error {
helper := faktory_worker.HelperFor(ctx)
// decorate the logger, but exclude the bid, the bid will be decorated when we create the batch
logger := loggerWithFaktoryFieldsWithoutBatchID(w.Logger, helper)
base := loggerWithFaktoryFieldsWithoutBatchID(w.Logger, helper)
logger := RetryAwareLogger(ctx, base) // demotes Error->Warn unless final attempt
logger.Info("Getting collection of model plans that require status checking")

// TODO: Implement the logic to return the models to check? Or do we check every model plan?
Expand Down Expand Up @@ -85,7 +86,8 @@ func CreateModelStatusJobInBatch(logger *zap.Logger, w *Worker, batch *faktory.B
// ModelStatusUpdateBatchJobSuccess is called when the model status update job has completed.
func (w *Worker) ModelStatusUpdateBatchJobSuccess(ctx context.Context, args ...interface{}) error {
helper := faktory_worker.HelperFor(ctx)
logger := loggerWithFaktoryFields(w.Logger, helper)
base := loggerWithFaktoryFields(w.Logger, helper)
logger := RetryAwareLogger(ctx, base) // demotes Error->Warn unless final attempt
logger.Info("Model Status update job completed successfully")
return nil
}
3 changes: 2 additions & 1 deletion pkg/worker/model_status_update_cron_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ func (w *Worker) ModelStatusUpdateCronJob(ctx context.Context, args ...interface

now := time.Now()

logger := loggerWithFaktoryFieldsWithoutBatchID(w.Logger, helper)
base := loggerWithFaktoryFieldsWithoutBatchID(w.Logger, helper)
logger := RetryAwareLogger(ctx, base) // demotes Error->Warn unless final attempt
logger.Info("creating Model Status Update Cron Job")

// Note, this function doesn't need a param, adding so it can be distinguished from another batch job call
Expand Down
Loading
Loading