From 39c40e5a3d55728a5f0029b55bd70771319cfcd7 Mon Sep 17 00:00:00 2001 From: Ilia Demianenko Date: Wed, 15 Oct 2025 09:06:49 -0700 Subject: [PATCH 1/4] Classify mongo conn pool error, spew full structure for all errors --- flow/alerting/alerting.go | 12 +++++++++++- flow/alerting/classifier.go | 13 +++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/flow/alerting/alerting.go b/flow/alerting/alerting.go index 22bfd4757f..3b1cd3fab9 100644 --- a/flow/alerting/alerting.go +++ b/flow/alerting/alerting.go @@ -8,11 +8,13 @@ import ( "io" "log/slog" "net" + "reflect" "slices" "strings" "time" "github.com/ClickHouse/clickhouse-go/v2" + "github.com/davecgh/go-spew/spew" "github.com/go-mysql-org/go-mysql/mysql" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" @@ -445,6 +447,11 @@ const ( flowErrorTypeError flowErrorType = "error" ) +var errSpew = spew.ConfigState{ + Indent: " ", + DisableMethods: true, // Don't call Error() method, show the actual fields +} + // logFlowErrorInternal pushes the error to the errors table and emits a metric as well as a telemetry message func (a *Alerter) logFlowErrorInternal( ctx context.Context, @@ -456,7 +463,10 @@ func (a *Alerter) logFlowErrorInternal( logger := internal.LoggerFromCtx(ctx) inErrWithStack := fmt.Sprintf("%+v", inErr) errError := inErr.Error() - loggerFunc(errError, slog.String("stack", inErrWithStack)) + loggerFunc(errError, + slog.String("stack", inErrWithStack), + slog.String("type", reflect.TypeOf(inErr).String()), + slog.String("spew", errSpew.Sdump(inErr))) if _, err := a.CatalogPool.Exec( ctx, "INSERT INTO peerdb_stats.flow_errors(flow_name,error_message,error_type) VALUES($1,$2,$3)", flowName, inErrWithStack, errorType.String(), diff --git a/flow/alerting/classifier.go b/flow/alerting/classifier.go index 918663ed2e..d8dfc678a7 100644 --- a/flow/alerting/classifier.go +++ b/flow/alerting/classifier.go @@ -650,6 +650,19 @@ func GetErrorClass(ctx context.Context, err error) (ErrorClass, ErrorInfo) { } } + var mongoDriverError driver.Error + if errors.As(err, &mongoDriverError) { + mongoErrorInfo := ErrorInfo{ + Source: ErrorSourceMongoDB, + Code: strconv.Itoa(int(mongoDriverError.Code)), + } + + // This should recover, but we notify if exceed default threshold + if mongoDriverError.HasErrorLabel(driver.TransientTransactionError) { + return ErrorNotifyConnectivity, mongoErrorInfo + } + } + var mongoMarshalErr mongo.MarshalError if errors.As(err, &mongoMarshalErr) { return ErrorOther, ErrorInfo{ From a13c5f6202aafc2d46e05e8328b94ca3798b75d2 Mon Sep 17 00:00:00 2001 From: Ilia Demianenko Date: Wed, 15 Oct 2025 13:42:14 -0700 Subject: [PATCH 2/4] no mongo, just spew --- flow/alerting/classifier.go | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/flow/alerting/classifier.go b/flow/alerting/classifier.go index d8dfc678a7..918663ed2e 100644 --- a/flow/alerting/classifier.go +++ b/flow/alerting/classifier.go @@ -650,19 +650,6 @@ func GetErrorClass(ctx context.Context, err error) (ErrorClass, ErrorInfo) { } } - var mongoDriverError driver.Error - if errors.As(err, &mongoDriverError) { - mongoErrorInfo := ErrorInfo{ - Source: ErrorSourceMongoDB, - Code: strconv.Itoa(int(mongoDriverError.Code)), - } - - // This should recover, but we notify if exceed default threshold - if mongoDriverError.HasErrorLabel(driver.TransientTransactionError) { - return ErrorNotifyConnectivity, mongoErrorInfo - } - } - var mongoMarshalErr mongo.MarshalError if errors.As(err, &mongoMarshalErr) { return ErrorOther, ErrorInfo{ From 68dca58770e4481a59e2846d54de2973ecbcab4d Mon Sep 17 00:00:00 2001 From: Ilia Demianenko Date: Wed, 15 Oct 2025 19:40:48 -0700 Subject: [PATCH 3/4] mod tidy --- flow/go.mod | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flow/go.mod b/flow/go.mod index b0934ac64e..fddcc60c12 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -90,7 +90,7 @@ require ( require ( cel.dev/expr v0.24.0 // indirect - cloud.google.com/go/auth v0.17.0 // indirect + cloud.google.com/go/auth v0.17.0 cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect cloud.google.com/go/compute/metadata v0.9.0 // indirect cloud.google.com/go/iam v1.5.3 // indirect @@ -143,7 +143,7 @@ require ( github.com/coreos/go-systemd/v22 v22.6.0 // indirect github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect github.com/danieljoos/wincred v1.2.3 // indirect - github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da // indirect github.com/docker/go-units v0.5.0 // indirect From e2daa5e0266dea29ee263a4b34d510431f3d5019 Mon Sep 17 00:00:00 2001 From: Ilia Demianenko Date: Wed, 15 Oct 2025 23:07:35 -0700 Subject: [PATCH 4/4] remove type, it's useless --- flow/alerting/alerting.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/flow/alerting/alerting.go b/flow/alerting/alerting.go index 3b1cd3fab9..851ca07c99 100644 --- a/flow/alerting/alerting.go +++ b/flow/alerting/alerting.go @@ -8,7 +8,6 @@ import ( "io" "log/slog" "net" - "reflect" "slices" "strings" "time" @@ -465,7 +464,6 @@ func (a *Alerter) logFlowErrorInternal( errError := inErr.Error() loggerFunc(errError, slog.String("stack", inErrWithStack), - slog.String("type", reflect.TypeOf(inErr).String()), slog.String("spew", errSpew.Sdump(inErr))) if _, err := a.CatalogPool.Exec( ctx, "INSERT INTO peerdb_stats.flow_errors(flow_name,error_message,error_type) VALUES($1,$2,$3)",