-
Notifications
You must be signed in to change notification settings - Fork 26
Ldz/add log0721 #1570
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Ldz/add log0721 #1570
Changes from all commits
577f342
4420252
2e654b6
b45be21
6e8e884
ba33880
4723eb2
5edb527
ff850a6
2985c44
e78d731
f70ac1a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -124,6 +124,14 @@ type subscriptionStat struct { | |
eventCh *chann.UnlimitedChannel[eventWithCallback, uint64] | ||
// data <= checkpointTs can be deleted | ||
checkpointTs atomic.Uint64 | ||
// whether the subStat has received any resolved ts | ||
initialized atomic.Bool | ||
// the time when the subscription receives latest resolved ts | ||
lastAdvanceTime atomic.Int64 | ||
// the time when the subscription receives latest dml event | ||
// it is used to calculate the idle time of the subscription | ||
// 0 means the subscription has not received any dml event | ||
lastReceiveDMLTime atomic.Int64 | ||
// the resolveTs persisted in the store | ||
resolvedTs atomic.Uint64 | ||
// the max commit ts of dml event in the store | ||
|
@@ -478,6 +486,7 @@ func (e *eventStore) RegisterDispatcher( | |
maxCommitTs = kv.CRTs | ||
} | ||
} | ||
subStat.lastReceiveDMLTime.Store(time.Now().UnixMilli()) | ||
util.CompareAndMonotonicIncrease(&subStat.maxEventCommitTs, maxCommitTs) | ||
subStat.eventCh.Push(eventWithCallback{ | ||
subID: subStat.subID, | ||
|
@@ -494,6 +503,9 @@ func (e *eventStore) RegisterDispatcher( | |
if ts <= currentResolvedTs { | ||
return | ||
} | ||
now := time.Now().UnixMilli() | ||
subStat.lastAdvanceTime.Store(now) | ||
subStat.initialized.Store(true) | ||
// just do CompareAndSwap once, if failed, it means another goroutine has updated resolvedTs | ||
if subStat.resolvedTs.CompareAndSwap(currentResolvedTs, ts) { | ||
subStat.dispatchers.Lock() | ||
|
@@ -502,6 +514,7 @@ func (e *eventStore) RegisterDispatcher( | |
notifier(ts, subStat.maxEventCommitTs.Load()) | ||
} | ||
CounterResolved.Inc() | ||
metrics.EventStoreNotifyDispatcherDurationHist.Observe(float64(time.Since(start).Milliseconds()) / 1000) | ||
} | ||
} | ||
|
||
|
@@ -805,6 +818,22 @@ func (e *eventStore) updateMetricsOnce() { | |
resolvedTs := subStat.resolvedTs.Load() | ||
resolvedPhyTs := oracle.ExtractPhysical(resolvedTs) | ||
resolvedLag := float64(pdPhyTs-resolvedPhyTs) / 1e3 | ||
if subStat.initialized.Load() && resolvedLag >= 10 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The magic number const largeResolvedTsLagInSecs = 10
if subStat.initialized.Load() && resolvedLag >= largeResolvedTsLagInSecs { |
||
lastReceiveDMLTimeRepr := "never" | ||
if lastReceiveDMLTime := subStat.lastReceiveDMLTime.Load(); lastReceiveDMLTime > 0 { | ||
lastReceiveDMLTimeRepr = time.UnixMilli(lastReceiveDMLTime).String() | ||
} | ||
log.Warn("resolved ts lag is too large for initialized subscription", | ||
zap.Uint64("subID", uint64(subStat.subID)), | ||
zap.Int64("tableID", subStat.tableSpan.TableID), | ||
zap.Uint64("resolvedTs", resolvedTs), | ||
zap.Float64("resolvedLag(s)", resolvedLag), | ||
zap.Stringer("lastAdvanceTime", time.UnixMilli(subStat.lastAdvanceTime.Load())), | ||
zap.String("lastReceiveDMLTime", lastReceiveDMLTimeRepr), | ||
zap.String("tableSpan", common.FormatTableSpan(subStat.tableSpan)), | ||
zap.Uint64("checkpointTs", subStat.checkpointTs.Load()), | ||
zap.Uint64("maxEventCommitTs", subStat.maxEventCommitTs.Load())) | ||
} | ||
metrics.EventStoreDispatcherResolvedTsLagHist.Observe(float64(resolvedLag)) | ||
if minResolvedTs == 0 || resolvedTs < minResolvedTs { | ||
minResolvedTs = resolvedTs | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ import ( | |
"github.com/pingcap/ticdc/pkg/common" | ||
"github.com/pingcap/ticdc/pkg/metrics" | ||
"github.com/pingcap/ticdc/utils/dynstream" | ||
"github.com/tikv/client-go/v2/oracle" | ||
"go.uber.org/zap" | ||
) | ||
|
||
|
@@ -178,7 +179,7 @@ func (h *regionEventHandler) handleRegionError(state *regionFeedState, worker *r | |
stepsToRemoved := state.markRemoved() | ||
err := state.takeError() | ||
if err != nil { | ||
log.Debug("region event handler get a region error", | ||
log.Info("region event handler get a region error", | ||
zap.Uint64("workerID", worker.workerID), | ||
zap.Uint64("subscriptionID", uint64(state.region.subscribedSpan.subID)), | ||
zap.Uint64("regionID", state.region.verID.GetID()), | ||
|
@@ -218,7 +219,7 @@ func handleEventEntries(span *subscribedSpan, state *regionFeedState, entries *c | |
switch entry.Type { | ||
case cdcpb.Event_INITIALIZED: | ||
state.setInitialized() | ||
log.Debug("region is initialized", | ||
log.Info("region is initialized", | ||
zap.Int64("tableID", span.span.TableID), | ||
zap.Uint64("regionID", regionID), | ||
zap.Uint64("requestID", state.requestID), | ||
|
@@ -306,19 +307,32 @@ func handleResolvedTs(span *subscribedSpan, state *regionFeedState, resolvedTs u | |
lastAdvance := span.lastAdvanceTime.Load() | ||
if now-lastAdvance >= span.advanceInterval && span.lastAdvanceTime.CompareAndSwap(lastAdvance, now) { | ||
ts := span.rangeLock.GetHeapMinTs() | ||
// ts, _ := span.rangeLock.ResolvedTs() | ||
if ts > 0 && span.initialized.CompareAndSwap(false, true) { | ||
log.Info("subscription client is initialized", | ||
zap.Uint64("subscriptionID", uint64(span.subID)), | ||
zap.Uint64("regionID", regionID), | ||
zap.Uint64("resolvedTs", ts)) | ||
} | ||
lastResolvedTs := span.resolvedTs.Load() | ||
nextResolvedPhyTs := oracle.ExtractPhysical(ts) | ||
// Generally, we don't want to send duplicate resolved ts, | ||
// so we check whether `ts` is larger than `lastResolvedTs` before send it. | ||
// but when `ts` == `lastResolvedTs` == `span.startTs`, | ||
// the span may just be initialized and have not receive any resolved ts before, | ||
// so we also send ts in this case for quick notification to downstream. | ||
if ts > lastResolvedTs || (ts == lastResolvedTs && lastResolvedTs == span.startTs) { | ||
resolvedPhyTs := oracle.ExtractPhysical(lastResolvedTs) | ||
decreaseLag := float64(nextResolvedPhyTs-resolvedPhyTs) / 1e3 | ||
if decreaseLag > 10 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The magic number const largeResolvedTsAdvanceStepInSecs = 10
if decreaseLag > largeResolvedTsAdvanceStepInSecs { |
||
log.Warn("resolved ts advance step is too large", | ||
zap.Uint64("subID", uint64(span.subID)), | ||
zap.Int64("tableID", span.span.TableID), | ||
zap.Uint64("regionID", regionID), | ||
zap.Uint64("resolvedTs", ts), | ||
zap.Uint64("lastResolvedTs", lastResolvedTs), | ||
zap.Float64("decreaseLag(s)", decreaseLag)) | ||
} | ||
span.resolvedTs.Store(ts) | ||
span.resolvedTsUpdated.Store(time.Now().Unix()) | ||
return ts | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -428,6 +428,7 @@ func (s *subscriptionClient) Run(ctx context.Context) error { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
g.Go(func() error { return s.runResolveLockChecker(ctx) }) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
g.Go(func() error { return s.handleResolveLockTasks(ctx) }) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
g.Go(func() error { return s.logSlowRegions(ctx) }) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
g.Go(func() error { return s.logSlowSpan(ctx) }) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
g.Go(func() error { return s.errCache.dispatch(ctx) }) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
log.Info("subscription client starts") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -551,7 +552,7 @@ func (s *subscriptionClient) handleRegions(ctx context.Context, eg *errgroup.Gro | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
worker := store.getRequestWorker() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
worker.requestsCh <- region | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
log.Debug("subscription client will request a region", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
log.Info("subscription client will request a region", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
zap.Uint64("workID", worker.workerID), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
zap.Uint64("subscriptionID", uint64(region.subscribedSpan.subID)), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
zap.Uint64("regionID", region.verID.GetID()), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -568,7 +569,7 @@ func (s *subscriptionClient) attachRPCContextForRegion(ctx context.Context, regi | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return region, true | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
log.Debug("subscription client get rpc context fail", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
log.Info("subscription client get rpc context fail", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
zap.Uint64("subscriptionID", uint64(region.subscribedSpan.subID)), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
zap.Uint64("regionID", region.verID.GetID()), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
zap.Error(err)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -615,7 +616,7 @@ func (s *subscriptionClient) divideSpanAndScheduleRegionRequests( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
backoffBeforeLoad = false | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
log.Debug("subscription client is going to load regions", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
log.Info("subscription client is going to load regions", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
zap.Uint64("subscriptionID", uint64(subscribedSpan.subID)), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
zap.Any("span", nextSpan)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+619
to
621
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This log message "subscription client is going to load regions" is inside a loop. While it's before a network call, logging it at
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -733,7 +734,7 @@ func (s *subscriptionClient) doHandleError(ctx context.Context, errInfo regionEr | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
switch eerr := err.(type) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
case *eventError: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
innerErr := eerr.err | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
log.Debug("cdc region error", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
log.Info("cdc region error", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
zap.Uint64("subscriptionID", uint64(errInfo.subscribedSpan.subID)), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
zap.Stringer("error", innerErr)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -930,7 +931,7 @@ func (s *subscriptionClient) logSlowRegions(ctx context.Context) error { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ckptTime := oracle.GetTimeFromTS(attr.SlowestRegion.ResolvedTs) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if attr.SlowestRegion.Initialized { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if currTime.Sub(ckptTime) > 2*resolveLockMinInterval { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
log.Debug("subscription client finds a initialized slow region", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
log.Info("subscription client finds a initialized slow region", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
zap.Uint64("subscriptionID", uint64(subscriptionID)), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
zap.Any("slowRegion", attr.SlowestRegion)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -954,6 +955,38 @@ func (s *subscriptionClient) logSlowRegions(ctx context.Context) error { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
func (s *subscriptionClient) logSlowSpan(ctx context.Context) error { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ticker := time.NewTicker(10 * time.Second) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
defer ticker.Stop() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
select { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
case <-ctx.Done(): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return ctx.Err() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
case <-ticker.C: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pdTime := s.pdClock.CurrentTime() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pdPhyTs := oracle.GetPhysical(pdTime) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
s.totalSpans.RLock() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for subID, rt := range s.totalSpans.spanMap { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if !rt.initialized.Load() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
continue | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
resolvedTs := rt.rangeLock.GetHeapMinTs() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
resolvedPhyTs := oracle.ExtractPhysical(resolvedTs) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
resolvedLag := float64(pdPhyTs-resolvedPhyTs) / 1e3 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if resolvedLag > 10 { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+959
to
+978
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This new function
These should be defined as named constants at the package level to improve readability and maintainability.
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
log.Warn("resolved ts lag is too large for initialized span", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
zap.Uint64("subID", uint64(subID)), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
zap.Int64("tableID", rt.span.TableID), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
zap.Uint64("resolvedTs", resolvedTs), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
zap.Float64("resolvedLag(s)", resolvedLag)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
s.totalSpans.RUnlock() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
func (s *subscriptionClient) newSubscribedSpan( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
subID SubscriptionID, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
span heartbeatpb.TableSpan, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -1014,7 +1047,7 @@ func (s *subscriptionClient) GetResolvedTsLag() float64 { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
func (r *subscribedSpan) resolveStaleLocks(targetTs uint64) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
util.MustCompareAndMonotonicIncrease(&r.staleLocksTargetTs, targetTs) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
res := r.rangeLock.IterAll(r.tryResolveLock) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
log.Debug("subscription client finds slow locked ranges", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
log.Info("subscription client finds slow locked ranges", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
zap.Uint64("subscriptionID", uint64(r.subID)), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
zap.Any("ranges", res)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -41,12 +41,13 @@ var ( | |||||
Name: "output_event_count", | ||||||
Help: "The number of events output by the sorter", | ||||||
}, []string{"type"}) // types : kv, resolved. | ||||||
|
||||||
EventStoreWriteDurationHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{ | ||||||
Namespace: "ticdc", | ||||||
Subsystem: "event_store", | ||||||
Name: "write_duration", | ||||||
Help: "Bucketed histogram of event store write duration", | ||||||
Buckets: prometheus.ExponentialBuckets(0.00004, 2.0, 10), | ||||||
Buckets: prometheus.ExponentialBuckets(0.00004, 2.0, 10), // 0.1ms ~ 52s, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The comment
Suggested change
|
||||||
}) | ||||||
|
||||||
EventStoreScanRequestsCount = prometheus.NewCounter( | ||||||
|
@@ -162,6 +163,15 @@ var ( | |||||
Help: "Bucketed histogram of event store sorter iterator read duration", | ||||||
Buckets: prometheus.ExponentialBuckets(0.00004, 2.0, 28), // 40us to 1.5h | ||||||
}, []string{"type"}) | ||||||
|
||||||
EventStoreNotifyDispatcherDurationHist = prometheus.NewHistogram( | ||||||
prometheus.HistogramOpts{ | ||||||
Namespace: "ticdc", | ||||||
Subsystem: "event_store", | ||||||
Name: "notify_dispatcher_duration", | ||||||
Help: "The duration of notifying dispatchers with resolved ts.", | ||||||
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 20), // 0.1ms ~ 52s, | ||||||
}) | ||||||
) | ||||||
|
||||||
func InitEventStoreMetrics(registry *prometheus.Registry) { | ||||||
|
@@ -183,4 +193,5 @@ func InitEventStoreMetrics(registry *prometheus.Registry) { | |||||
registry.MustRegister(EventStoreWriteBatchSizeHist) | ||||||
registry.MustRegister(EventStoreWriteRequestsCount) | ||||||
registry.MustRegister(EventStoreReadDurationHistogram) | ||||||
registry.MustRegister(EventStoreNotifyDispatcherDurationHist) | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The metric
EventStoreNotifyDispatcherDurationHist
is being updated withtime.Since(start)
. Thestart
variable is captured from theRegisterDispatcher
function's scope, which means this will measure the time since the dispatcher was registered, not the duration of this specific notification. This will result in misleadingly large values for the metric.The duration should be measured within this
if
block to accurately reflect the notification time. For example, by recordingtime.Now()
at the beginning of theif
block and using it to calculate the duration.