diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 1629d40ee..0a693556f 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -124,6 +124,14 @@ type subscriptionStat struct { eventCh *chann.UnlimitedChannel[eventWithCallback, uint64] // data <= checkpointTs can be deleted checkpointTs atomic.Uint64 + // whether the subStat has received any resolved ts + initialized atomic.Bool + // the time when the subscription receives latest resolved ts + lastAdvanceTime atomic.Int64 + // the time when the subscription receives latest dml event + // it is used to calculate the idle time of the subscription + // 0 means the subscription has not received any dml event + lastReceiveDMLTime atomic.Int64 // the resolveTs persisted in the store resolvedTs atomic.Uint64 // the max commit ts of dml event in the store @@ -478,6 +486,7 @@ func (e *eventStore) RegisterDispatcher( maxCommitTs = kv.CRTs } } + subStat.lastReceiveDMLTime.Store(time.Now().UnixMilli()) util.CompareAndMonotonicIncrease(&subStat.maxEventCommitTs, maxCommitTs) subStat.eventCh.Push(eventWithCallback{ subID: subStat.subID, @@ -494,6 +503,9 @@ func (e *eventStore) RegisterDispatcher( if ts <= currentResolvedTs { return } + now := time.Now().UnixMilli() + subStat.lastAdvanceTime.Store(now) + subStat.initialized.Store(true) // just do CompareAndSwap once, if failed, it means another goroutine has updated resolvedTs if subStat.resolvedTs.CompareAndSwap(currentResolvedTs, ts) { subStat.dispatchers.Lock() @@ -502,6 +514,7 @@ func (e *eventStore) RegisterDispatcher( notifier(ts, subStat.maxEventCommitTs.Load()) } CounterResolved.Inc() + metrics.EventStoreNotifyDispatcherDurationHist.Observe(float64(time.Since(start).Milliseconds()) / 1000) } } @@ -805,6 +818,22 @@ func (e *eventStore) updateMetricsOnce() { resolvedTs := subStat.resolvedTs.Load() resolvedPhyTs := oracle.ExtractPhysical(resolvedTs) resolvedLag := float64(pdPhyTs-resolvedPhyTs) / 1e3 + if subStat.initialized.Load() && resolvedLag >= 10 { + lastReceiveDMLTimeRepr := "never" + if lastReceiveDMLTime := subStat.lastReceiveDMLTime.Load(); lastReceiveDMLTime > 0 { + lastReceiveDMLTimeRepr = time.UnixMilli(lastReceiveDMLTime).String() + } + log.Warn("resolved ts lag is too large for initialized subscription", + zap.Uint64("subID", uint64(subStat.subID)), + zap.Int64("tableID", subStat.tableSpan.TableID), + zap.Uint64("resolvedTs", resolvedTs), + zap.Float64("resolvedLag(s)", resolvedLag), + zap.Stringer("lastAdvanceTime", time.UnixMilli(subStat.lastAdvanceTime.Load())), + zap.String("lastReceiveDMLTime", lastReceiveDMLTimeRepr), + zap.String("tableSpan", common.FormatTableSpan(subStat.tableSpan)), + zap.Uint64("checkpointTs", subStat.checkpointTs.Load()), + zap.Uint64("maxEventCommitTs", subStat.maxEventCommitTs.Load())) + } metrics.EventStoreDispatcherResolvedTsLagHist.Observe(float64(resolvedLag)) if minResolvedTs == 0 || resolvedTs < minResolvedTs { minResolvedTs = resolvedTs diff --git a/logservice/logpuller/region_event_handler.go b/logservice/logpuller/region_event_handler.go index 2aa4bf4f0..f27bcc8a0 100644 --- a/logservice/logpuller/region_event_handler.go +++ b/logservice/logpuller/region_event_handler.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/metrics" "github.com/pingcap/ticdc/utils/dynstream" + "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" ) @@ -178,7 +179,7 @@ func (h *regionEventHandler) handleRegionError(state *regionFeedState, worker *r stepsToRemoved := state.markRemoved() err := state.takeError() if err != nil { - log.Debug("region event handler get a region error", + log.Info("region event handler get a region error", zap.Uint64("workerID", worker.workerID), zap.Uint64("subscriptionID", uint64(state.region.subscribedSpan.subID)), zap.Uint64("regionID", state.region.verID.GetID()), @@ -218,7 +219,7 @@ func handleEventEntries(span *subscribedSpan, state *regionFeedState, entries *c switch entry.Type { case cdcpb.Event_INITIALIZED: state.setInitialized() - log.Debug("region is initialized", + log.Info("region is initialized", zap.Int64("tableID", span.span.TableID), zap.Uint64("regionID", regionID), zap.Uint64("requestID", state.requestID), @@ -306,6 +307,7 @@ func handleResolvedTs(span *subscribedSpan, state *regionFeedState, resolvedTs u lastAdvance := span.lastAdvanceTime.Load() if now-lastAdvance >= span.advanceInterval && span.lastAdvanceTime.CompareAndSwap(lastAdvance, now) { ts := span.rangeLock.GetHeapMinTs() + // ts, _ := span.rangeLock.ResolvedTs() if ts > 0 && span.initialized.CompareAndSwap(false, true) { log.Info("subscription client is initialized", zap.Uint64("subscriptionID", uint64(span.subID)), @@ -313,12 +315,24 @@ func handleResolvedTs(span *subscribedSpan, state *regionFeedState, resolvedTs u zap.Uint64("resolvedTs", ts)) } lastResolvedTs := span.resolvedTs.Load() + nextResolvedPhyTs := oracle.ExtractPhysical(ts) // Generally, we don't want to send duplicate resolved ts, // so we check whether `ts` is larger than `lastResolvedTs` before send it. // but when `ts` == `lastResolvedTs` == `span.startTs`, // the span may just be initialized and have not receive any resolved ts before, // so we also send ts in this case for quick notification to downstream. if ts > lastResolvedTs || (ts == lastResolvedTs && lastResolvedTs == span.startTs) { + resolvedPhyTs := oracle.ExtractPhysical(lastResolvedTs) + decreaseLag := float64(nextResolvedPhyTs-resolvedPhyTs) / 1e3 + if decreaseLag > 10 { + log.Warn("resolved ts advance step is too large", + zap.Uint64("subID", uint64(span.subID)), + zap.Int64("tableID", span.span.TableID), + zap.Uint64("regionID", regionID), + zap.Uint64("resolvedTs", ts), + zap.Uint64("lastResolvedTs", lastResolvedTs), + zap.Float64("decreaseLag(s)", decreaseLag)) + } span.resolvedTs.Store(ts) span.resolvedTsUpdated.Store(time.Now().Unix()) return ts diff --git a/logservice/logpuller/region_request_worker.go b/logservice/logpuller/region_request_worker.go index 72e68cbf4..dd0089665 100644 --- a/logservice/logpuller/region_request_worker.go +++ b/logservice/logpuller/region_request_worker.go @@ -249,7 +249,7 @@ func (s *regionRequestWorker) dispatchRegionChangeEvents(events []*cdcpb.Event) // ignore continue case *cdcpb.Event_Error: - log.Debug("region request worker receives a region error", + log.Info("region request worker receives a region error", zap.Uint64("workerID", s.workerID), zap.Uint64("subscriptionID", uint64(subscriptionID)), zap.Uint64("regionID", event.RegionId), @@ -269,7 +269,7 @@ func (s *regionRequestWorker) dispatchRegionChangeEvents(events []*cdcpb.Event) switch event.Event.(type) { case *cdcpb.Event_Error: // it is normal to receive region error after deregister a subscription - log.Debug("region request worker receives an error for a stale region, ignore it", + log.Info("region request worker receives an error for a stale region, ignore it", zap.Uint64("workerID", s.workerID), zap.Uint64("subscriptionID", uint64(subscriptionID)), zap.Uint64("regionID", event.RegionId)) @@ -348,7 +348,7 @@ func (s *regionRequestWorker) processRegionSendTask( for { // TODO: can region be nil? subID := region.subscribedSpan.subID - log.Debug("region request worker gets a singleRegionInfo", + log.Info("region request worker gets a singleRegionInfo", zap.Uint64("workerID", s.workerID), zap.Uint64("subscriptionID", uint64(subID)), zap.Uint64("regionID", region.verID.GetID()), diff --git a/logservice/logpuller/regionlock/region_range_lock.go b/logservice/logpuller/regionlock/region_range_lock.go index 3bb27b3ef..50446d687 100644 --- a/logservice/logpuller/regionlock/region_range_lock.go +++ b/logservice/logpuller/regionlock/region_range_lock.go @@ -244,7 +244,7 @@ func (l *RangeLock) UnlockRange( } l.unlockedRanges.set(startKey, endKey, newResolvedTs) - log.Debug("unlocked range", + log.Info("unlocked range", zap.Uint64("lockID", l.id), zap.Uint64("regionID", entry.regionID), zap.Uint64("resolvedTs", newResolvedTs), zap.String("startKey", hex.EncodeToString(startKey)), @@ -261,15 +261,17 @@ func (l *RangeLock) Len() int { // ResolvedTs calculates and returns the minimum resolvedTs // of all ranges in the RangeLock. -func (l *RangeLock) ResolvedTs() uint64 { +func (l *RangeLock) ResolvedTs() (uint64, uint64) { l.mu.RLock() defer l.mu.RUnlock() var minTs uint64 = math.MaxUint64 + var regionID uint64 l.lockedRanges.Ascend(func(item *rangeLockEntry) bool { ts := item.lockedRangeState.ResolvedTs.Load() if ts < minTs { minTs = ts + regionID = item.regionID } return true }) @@ -277,9 +279,10 @@ func (l *RangeLock) ResolvedTs() uint64 { unlockedMinTs := l.unlockedRanges.getMinTs() if unlockedMinTs < minTs { minTs = unlockedMinTs + regionID = 0 } - return minTs + return minTs, regionID } // RangeLockStatistics represents some statistics of a RangeLock. @@ -469,7 +472,7 @@ func (l *RangeLock) tryLockRange(startKey, endKey []byte, regionID, regionVersio l.lockedRangeStateHeap.AddOrUpdate(&newEntry.lockedRangeState) l.unlockedRanges.unset(startKey, endKey) - log.Debug("range locked", + log.Info("range locked", zap.Uint64("lockID", l.id), zap.Uint64("regionID", regionID), zap.Uint64("version", regionVersion), diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index 3e8848d35..9bed95380 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -428,6 +428,7 @@ func (s *subscriptionClient) Run(ctx context.Context) error { g.Go(func() error { return s.runResolveLockChecker(ctx) }) g.Go(func() error { return s.handleResolveLockTasks(ctx) }) g.Go(func() error { return s.logSlowRegions(ctx) }) + g.Go(func() error { return s.logSlowSpan(ctx) }) g.Go(func() error { return s.errCache.dispatch(ctx) }) log.Info("subscription client starts") @@ -551,7 +552,7 @@ func (s *subscriptionClient) handleRegions(ctx context.Context, eg *errgroup.Gro worker := store.getRequestWorker() worker.requestsCh <- region - log.Debug("subscription client will request a region", + log.Info("subscription client will request a region", zap.Uint64("workID", worker.workerID), zap.Uint64("subscriptionID", uint64(region.subscribedSpan.subID)), zap.Uint64("regionID", region.verID.GetID()), @@ -568,7 +569,7 @@ func (s *subscriptionClient) attachRPCContextForRegion(ctx context.Context, regi return region, true } if err != nil { - log.Debug("subscription client get rpc context fail", + log.Info("subscription client get rpc context fail", zap.Uint64("subscriptionID", uint64(region.subscribedSpan.subID)), zap.Uint64("regionID", region.verID.GetID()), zap.Error(err)) @@ -615,7 +616,7 @@ func (s *subscriptionClient) divideSpanAndScheduleRegionRequests( } backoffBeforeLoad = false } - log.Debug("subscription client is going to load regions", + log.Info("subscription client is going to load regions", zap.Uint64("subscriptionID", uint64(subscribedSpan.subID)), zap.Any("span", nextSpan)) @@ -733,7 +734,7 @@ func (s *subscriptionClient) doHandleError(ctx context.Context, errInfo regionEr switch eerr := err.(type) { case *eventError: innerErr := eerr.err - log.Debug("cdc region error", + log.Info("cdc region error", zap.Uint64("subscriptionID", uint64(errInfo.subscribedSpan.subID)), zap.Stringer("error", innerErr)) @@ -930,7 +931,7 @@ func (s *subscriptionClient) logSlowRegions(ctx context.Context) error { ckptTime := oracle.GetTimeFromTS(attr.SlowestRegion.ResolvedTs) if attr.SlowestRegion.Initialized { if currTime.Sub(ckptTime) > 2*resolveLockMinInterval { - log.Debug("subscription client finds a initialized slow region", + log.Info("subscription client finds a initialized slow region", zap.Uint64("subscriptionID", uint64(subscriptionID)), zap.Any("slowRegion", attr.SlowestRegion)) } @@ -954,6 +955,38 @@ func (s *subscriptionClient) logSlowRegions(ctx context.Context) error { } } +func (s *subscriptionClient) logSlowSpan(ctx context.Context) error { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + + pdTime := s.pdClock.CurrentTime() + pdPhyTs := oracle.GetPhysical(pdTime) + s.totalSpans.RLock() + for subID, rt := range s.totalSpans.spanMap { + if !rt.initialized.Load() { + continue + } + resolvedTs := rt.rangeLock.GetHeapMinTs() + resolvedPhyTs := oracle.ExtractPhysical(resolvedTs) + resolvedLag := float64(pdPhyTs-resolvedPhyTs) / 1e3 + if resolvedLag > 10 { + log.Warn("resolved ts lag is too large for initialized span", + zap.Uint64("subID", uint64(subID)), + zap.Int64("tableID", rt.span.TableID), + zap.Uint64("resolvedTs", resolvedTs), + zap.Float64("resolvedLag(s)", resolvedLag)) + } + } + s.totalSpans.RUnlock() + } +} + func (s *subscriptionClient) newSubscribedSpan( subID SubscriptionID, span heartbeatpb.TableSpan, @@ -1014,7 +1047,7 @@ func (s *subscriptionClient) GetResolvedTsLag() float64 { func (r *subscribedSpan) resolveStaleLocks(targetTs uint64) { util.MustCompareAndMonotonicIncrease(&r.staleLocksTargetTs, targetTs) res := r.rangeLock.IterAll(r.tryResolveLock) - log.Debug("subscription client finds slow locked ranges", + log.Info("subscription client finds slow locked ranges", zap.Uint64("subscriptionID", uint64(r.subID)), zap.Any("ranges", res)) } diff --git a/pkg/metrics/event_store.go b/pkg/metrics/event_store.go index 795983604..9707b33e2 100644 --- a/pkg/metrics/event_store.go +++ b/pkg/metrics/event_store.go @@ -41,12 +41,13 @@ var ( Name: "output_event_count", Help: "The number of events output by the sorter", }, []string{"type"}) // types : kv, resolved. + EventStoreWriteDurationHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{ Namespace: "ticdc", Subsystem: "event_store", Name: "write_duration", Help: "Bucketed histogram of event store write duration", - Buckets: prometheus.ExponentialBuckets(0.00004, 2.0, 10), + Buckets: prometheus.ExponentialBuckets(0.00004, 2.0, 10), // 0.1ms ~ 52s, }) EventStoreScanRequestsCount = prometheus.NewCounter( @@ -162,6 +163,15 @@ var ( Help: "Bucketed histogram of event store sorter iterator read duration", Buckets: prometheus.ExponentialBuckets(0.00004, 2.0, 28), // 40us to 1.5h }, []string{"type"}) + + EventStoreNotifyDispatcherDurationHist = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "ticdc", + Subsystem: "event_store", + Name: "notify_dispatcher_duration", + Help: "The duration of notifying dispatchers with resolved ts.", + Buckets: prometheus.ExponentialBuckets(0.0001, 2, 20), // 0.1ms ~ 52s, + }) ) func InitEventStoreMetrics(registry *prometheus.Registry) { @@ -183,4 +193,5 @@ func InitEventStoreMetrics(registry *prometheus.Registry) { registry.MustRegister(EventStoreWriteBatchSizeHist) registry.MustRegister(EventStoreWriteRequestsCount) registry.MustRegister(EventStoreReadDurationHistogram) + registry.MustRegister(EventStoreNotifyDispatcherDurationHist) }