From 577f3425a38f54e46c9532d85c47954b7c410a48 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Mon, 21 Jul 2025 17:05:59 +0800 Subject: [PATCH 01/12] add some log --- logservice/eventstore/event_store.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 1629d40ee..80a1bb163 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -124,6 +124,10 @@ type subscriptionStat struct { eventCh *chann.UnlimitedChannel[eventWithCallback, uint64] // data <= checkpointTs can be deleted checkpointTs atomic.Uint64 + // whether the subStat has received any resolved ts + initialized atomic.Bool + // the time when the subscription receives latest resolved ts + lastAdvanceTime atomic.Int64 // the resolveTs persisted in the store resolvedTs atomic.Uint64 // the max commit ts of dml event in the store @@ -494,6 +498,9 @@ func (e *eventStore) RegisterDispatcher( if ts <= currentResolvedTs { return } + now := time.Now().UnixMilli() + subStat.lastAdvanceTime.Store(now) + subStat.initialized.Store(true) // just do CompareAndSwap once, if failed, it means another goroutine has updated resolvedTs if subStat.resolvedTs.CompareAndSwap(currentResolvedTs, ts) { subStat.dispatchers.Lock() @@ -805,6 +812,17 @@ func (e *eventStore) updateMetricsOnce() { resolvedTs := subStat.resolvedTs.Load() resolvedPhyTs := oracle.ExtractPhysical(resolvedTs) resolvedLag := float64(pdPhyTs-resolvedPhyTs) / 1e3 + if subStat.initialized.Load() && resolvedLag >= 10 { + log.Warn("resolved ts lag is too large for initialized subscription", + zap.Uint64("subID", uint64(subStat.subID)), + zap.Int64("tableID", subStat.tableSpan.TableID), + zap.Uint64("resolvedTs", resolvedTs), + zap.Float64("resolvedLag", resolvedLag), + zap.Stringer("lastAdvanceTime", time.UnixMilli(subStat.lastAdvanceTime.Load())), + zap.String("tableSpan", common.FormatTableSpan(subStat.tableSpan)), + zap.Uint64("checkpointTs", subStat.checkpointTs.Load()), + zap.Uint64("maxEventCommitTs", subStat.maxEventCommitTs.Load())) + } metrics.EventStoreDispatcherResolvedTsLagHist.Observe(float64(resolvedLag)) if minResolvedTs == 0 || resolvedTs < minResolvedTs { minResolvedTs = resolvedTs From 4420252ed86f0324716700702dded353eead2eae Mon Sep 17 00:00:00 2001 From: lidezhu Date: Mon, 21 Jul 2025 20:51:47 +0800 Subject: [PATCH 02/12] add more log --- logservice/eventstore/event_store.go | 9 ++++++++- pkg/metrics/event_store.go | 13 ++++++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 80a1bb163..795640970 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -128,6 +128,10 @@ type subscriptionStat struct { initialized atomic.Bool // the time when the subscription receives latest resolved ts lastAdvanceTime atomic.Int64 + // the time when the subscription receives latest dml event + // it is used to calculate the idle time of the subscription + // 0 means the subscription has not received any dml event + lastDMLTime atomic.Int64 // the resolveTs persisted in the store resolvedTs atomic.Uint64 // the max commit ts of dml event in the store @@ -482,6 +486,7 @@ func (e *eventStore) RegisterDispatcher( maxCommitTs = kv.CRTs } } + subStat.lastDMLTime.Store(time.Now().UnixMilli()) util.CompareAndMonotonicIncrease(&subStat.maxEventCommitTs, maxCommitTs) subStat.eventCh.Push(eventWithCallback{ subID: subStat.subID, @@ -509,6 +514,7 @@ func (e *eventStore) RegisterDispatcher( notifier(ts, subStat.maxEventCommitTs.Load()) } CounterResolved.Inc() + metrics.EventStoreNotifyDispatcherDurationHist.Observe(float64(time.Since(start).Milliseconds()) / 1000) } } @@ -817,8 +823,9 @@ func (e *eventStore) updateMetricsOnce() { zap.Uint64("subID", uint64(subStat.subID)), zap.Int64("tableID", subStat.tableSpan.TableID), zap.Uint64("resolvedTs", resolvedTs), - zap.Float64("resolvedLag", resolvedLag), + zap.Float64("resolvedLag(s)", resolvedLag), zap.Stringer("lastAdvanceTime", time.UnixMilli(subStat.lastAdvanceTime.Load())), + zap.Stringer("lastDMLTime", time.UnixMilli(subStat.lastDMLTime.Load())), zap.String("tableSpan", common.FormatTableSpan(subStat.tableSpan)), zap.Uint64("checkpointTs", subStat.checkpointTs.Load()), zap.Uint64("maxEventCommitTs", subStat.maxEventCommitTs.Load())) diff --git a/pkg/metrics/event_store.go b/pkg/metrics/event_store.go index 795983604..9707b33e2 100644 --- a/pkg/metrics/event_store.go +++ b/pkg/metrics/event_store.go @@ -41,12 +41,13 @@ var ( Name: "output_event_count", Help: "The number of events output by the sorter", }, []string{"type"}) // types : kv, resolved. + EventStoreWriteDurationHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{ Namespace: "ticdc", Subsystem: "event_store", Name: "write_duration", Help: "Bucketed histogram of event store write duration", - Buckets: prometheus.ExponentialBuckets(0.00004, 2.0, 10), + Buckets: prometheus.ExponentialBuckets(0.00004, 2.0, 10), // 0.1ms ~ 52s, }) EventStoreScanRequestsCount = prometheus.NewCounter( @@ -162,6 +163,15 @@ var ( Help: "Bucketed histogram of event store sorter iterator read duration", Buckets: prometheus.ExponentialBuckets(0.00004, 2.0, 28), // 40us to 1.5h }, []string{"type"}) + + EventStoreNotifyDispatcherDurationHist = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "ticdc", + Subsystem: "event_store", + Name: "notify_dispatcher_duration", + Help: "The duration of notifying dispatchers with resolved ts.", + Buckets: prometheus.ExponentialBuckets(0.0001, 2, 20), // 0.1ms ~ 52s, + }) ) func InitEventStoreMetrics(registry *prometheus.Registry) { @@ -183,4 +193,5 @@ func InitEventStoreMetrics(registry *prometheus.Registry) { registry.MustRegister(EventStoreWriteBatchSizeHist) registry.MustRegister(EventStoreWriteRequestsCount) registry.MustRegister(EventStoreReadDurationHistogram) + registry.MustRegister(EventStoreNotifyDispatcherDurationHist) } From 2e654b6f4f013364c52c8afffd21b4c4cb8ec2d7 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Mon, 21 Jul 2025 22:52:56 +0800 Subject: [PATCH 03/12] more log --- logservice/eventstore/event_store.go | 10 +++++++--- logservice/logpuller/region_event_handler.go | 5 +++++ 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 795640970..0a693556f 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -131,7 +131,7 @@ type subscriptionStat struct { // the time when the subscription receives latest dml event // it is used to calculate the idle time of the subscription // 0 means the subscription has not received any dml event - lastDMLTime atomic.Int64 + lastReceiveDMLTime atomic.Int64 // the resolveTs persisted in the store resolvedTs atomic.Uint64 // the max commit ts of dml event in the store @@ -486,7 +486,7 @@ func (e *eventStore) RegisterDispatcher( maxCommitTs = kv.CRTs } } - subStat.lastDMLTime.Store(time.Now().UnixMilli()) + subStat.lastReceiveDMLTime.Store(time.Now().UnixMilli()) util.CompareAndMonotonicIncrease(&subStat.maxEventCommitTs, maxCommitTs) subStat.eventCh.Push(eventWithCallback{ subID: subStat.subID, @@ -819,13 +819,17 @@ func (e *eventStore) updateMetricsOnce() { resolvedPhyTs := oracle.ExtractPhysical(resolvedTs) resolvedLag := float64(pdPhyTs-resolvedPhyTs) / 1e3 if subStat.initialized.Load() && resolvedLag >= 10 { + lastReceiveDMLTimeRepr := "never" + if lastReceiveDMLTime := subStat.lastReceiveDMLTime.Load(); lastReceiveDMLTime > 0 { + lastReceiveDMLTimeRepr = time.UnixMilli(lastReceiveDMLTime).String() + } log.Warn("resolved ts lag is too large for initialized subscription", zap.Uint64("subID", uint64(subStat.subID)), zap.Int64("tableID", subStat.tableSpan.TableID), zap.Uint64("resolvedTs", resolvedTs), zap.Float64("resolvedLag(s)", resolvedLag), zap.Stringer("lastAdvanceTime", time.UnixMilli(subStat.lastAdvanceTime.Load())), - zap.Stringer("lastDMLTime", time.UnixMilli(subStat.lastDMLTime.Load())), + zap.String("lastReceiveDMLTime", lastReceiveDMLTimeRepr), zap.String("tableSpan", common.FormatTableSpan(subStat.tableSpan)), zap.Uint64("checkpointTs", subStat.checkpointTs.Load()), zap.Uint64("maxEventCommitTs", subStat.maxEventCommitTs.Load())) diff --git a/logservice/logpuller/region_event_handler.go b/logservice/logpuller/region_event_handler.go index 2aa4bf4f0..fe6ac37a7 100644 --- a/logservice/logpuller/region_event_handler.go +++ b/logservice/logpuller/region_event_handler.go @@ -312,6 +312,11 @@ func handleResolvedTs(span *subscribedSpan, state *regionFeedState, resolvedTs u zap.Uint64("regionID", regionID), zap.Uint64("resolvedTs", ts)) } + log.Info("advance resolved ts", + zap.Uint64("subscriptionID", uint64(span.subID)), + zap.Uint64("regionID", regionID), + zap.Uint64("resolvedTs", ts), + zap.Uint64("lastResolvedTs", lastResolvedTs)) lastResolvedTs := span.resolvedTs.Load() // Generally, we don't want to send duplicate resolved ts, // so we check whether `ts` is larger than `lastResolvedTs` before send it. From b45be21b3436aede0697ede14e928610635e54b4 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 22 Jul 2025 12:36:15 +0800 Subject: [PATCH 04/12] remove log --- logservice/logpuller/region_event_handler.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/logservice/logpuller/region_event_handler.go b/logservice/logpuller/region_event_handler.go index fe6ac37a7..2aa4bf4f0 100644 --- a/logservice/logpuller/region_event_handler.go +++ b/logservice/logpuller/region_event_handler.go @@ -312,11 +312,6 @@ func handleResolvedTs(span *subscribedSpan, state *regionFeedState, resolvedTs u zap.Uint64("regionID", regionID), zap.Uint64("resolvedTs", ts)) } - log.Info("advance resolved ts", - zap.Uint64("subscriptionID", uint64(span.subID)), - zap.Uint64("regionID", regionID), - zap.Uint64("resolvedTs", ts), - zap.Uint64("lastResolvedTs", lastResolvedTs)) lastResolvedTs := span.resolvedTs.Load() // Generally, we don't want to send duplicate resolved ts, // so we check whether `ts` is larger than `lastResolvedTs` before send it. From 6e8e8846dd13025708e64ec089c6d453e420892c Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 22 Jul 2025 18:06:48 +0800 Subject: [PATCH 05/12] add more log --- logservice/logpuller/subscription_client.go | 31 +++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index 3e8848d35..5ae5fa249 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -428,6 +428,7 @@ func (s *subscriptionClient) Run(ctx context.Context) error { g.Go(func() error { return s.runResolveLockChecker(ctx) }) g.Go(func() error { return s.handleResolveLockTasks(ctx) }) g.Go(func() error { return s.logSlowRegions(ctx) }) + g.Go(func() error { return s.logSlowSpan(ctx) }) g.Go(func() error { return s.errCache.dispatch(ctx) }) log.Info("subscription client starts") @@ -954,6 +955,36 @@ func (s *subscriptionClient) logSlowRegions(ctx context.Context) error { } } +func (s *subscriptionClient) logSlowSpan(ctx context.Context) error { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + + pdTime := s.pdClock.CurrentTime() + pdPhyTs := oracle.GetPhysical(pdTime) + s.totalSpans.RLock() + for subID, rt := range s.totalSpans.spanMap { + if !rt.initialized.Load() { + continue + } + resolvedTs := rt.rangeLock.GetHeapMinTs() + resolvedPhyTs := oracle.ExtractPhysical(resolvedTs) + resolvedLag := float64(pdPhyTs-resolvedPhyTs) / 1e3 + log.Warn("resolved ts lag is too large for initialized subscription", + zap.Uint64("subID", uint64(subID)), + zap.Int64("tableID", rt.span.TableID), + zap.Uint64("resolvedTs", resolvedTs), + zap.Float64("resolvedLag(s)", resolvedLag)) + } + s.totalSpans.RUnlock() + } +} + func (s *subscriptionClient) newSubscribedSpan( subID SubscriptionID, span heartbeatpb.TableSpan, From ba338805d6ef4479bf528677b0f22d8df8c58547 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 22 Jul 2025 19:26:36 +0800 Subject: [PATCH 06/12] add more log --- logservice/logpuller/region_event_handler.go | 30 ++++++++++++++++++++ logservice/logpuller/subscription_client.go | 12 ++++---- 2 files changed, 37 insertions(+), 5 deletions(-) diff --git a/logservice/logpuller/region_event_handler.go b/logservice/logpuller/region_event_handler.go index 2aa4bf4f0..edec6bcd8 100644 --- a/logservice/logpuller/region_event_handler.go +++ b/logservice/logpuller/region_event_handler.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/metrics" "github.com/pingcap/ticdc/utils/dynstream" + "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" ) @@ -313,12 +314,41 @@ func handleResolvedTs(span *subscribedSpan, state *regionFeedState, resolvedTs u zap.Uint64("resolvedTs", ts)) } lastResolvedTs := span.resolvedTs.Load() + curTime := time.Now() + curPhyTs := oracle.GetPhysical(curTime) + resolvedPhyTs := oracle.ExtractPhysical(lastResolvedTs) + resolvedLag := float64(curPhyTs-resolvedPhyTs) / 1e3 + if resolvedLag > 10 { + log.Warn("last resolved ts lag is too large", + zap.Uint64("subID", uint64(span.subID)), + zap.Int64("tableID", span.span.TableID), + zap.Uint64("resolvedTs", lastResolvedTs), + zap.Float64("resolvedLag(s)", resolvedLag)) + } + nextResolvedPhyTs := oracle.ExtractPhysical(ts) + nextResolvedLag := float64(curPhyTs-nextResolvedPhyTs) / 1e3 + if nextResolvedLag > 10 { + log.Warn("next resolved ts lag is too large", + zap.Uint64("subID", uint64(span.subID)), + zap.Int64("tableID", span.span.TableID), + zap.Uint64("resolvedTs", ts), + zap.Float64("resolvedLag(s)", nextResolvedLag)) + } // Generally, we don't want to send duplicate resolved ts, // so we check whether `ts` is larger than `lastResolvedTs` before send it. // but when `ts` == `lastResolvedTs` == `span.startTs`, // the span may just be initialized and have not receive any resolved ts before, // so we also send ts in this case for quick notification to downstream. if ts > lastResolvedTs || (ts == lastResolvedTs && lastResolvedTs == span.startTs) { + decreaseLag := float64(nextResolvedPhyTs-resolvedPhyTs) / 1e3 + if decreaseLag > 10 { + log.Warn("resolved ts advance step is too large", + zap.Uint64("subID", uint64(span.subID)), + zap.Int64("tableID", span.span.TableID), + zap.Uint64("resolvedTs", ts), + zap.Uint64("lastResolvedTs", lastResolvedTs), + zap.Float64("decreaseLag(s)", decreaseLag)) + } span.resolvedTs.Store(ts) span.resolvedTsUpdated.Store(time.Now().Unix()) return ts diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index 5ae5fa249..b7881ac2a 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -975,11 +975,13 @@ func (s *subscriptionClient) logSlowSpan(ctx context.Context) error { resolvedTs := rt.rangeLock.GetHeapMinTs() resolvedPhyTs := oracle.ExtractPhysical(resolvedTs) resolvedLag := float64(pdPhyTs-resolvedPhyTs) / 1e3 - log.Warn("resolved ts lag is too large for initialized subscription", - zap.Uint64("subID", uint64(subID)), - zap.Int64("tableID", rt.span.TableID), - zap.Uint64("resolvedTs", resolvedTs), - zap.Float64("resolvedLag(s)", resolvedLag)) + if resolvedLag > 10 { + log.Warn("resolved ts lag is too large for initialized span", + zap.Uint64("subID", uint64(subID)), + zap.Int64("tableID", rt.span.TableID), + zap.Uint64("resolvedTs", resolvedTs), + zap.Float64("resolvedLag(s)", resolvedLag)) + } } s.totalSpans.RUnlock() } From 4723eb2ca01289623c89e2624d8cffe51019449c Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 22 Jul 2025 19:35:50 +0800 Subject: [PATCH 07/12] add more log --- logservice/logpuller/region_event_handler.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/logservice/logpuller/region_event_handler.go b/logservice/logpuller/region_event_handler.go index edec6bcd8..d1d1240f3 100644 --- a/logservice/logpuller/region_event_handler.go +++ b/logservice/logpuller/region_event_handler.go @@ -322,6 +322,7 @@ func handleResolvedTs(span *subscribedSpan, state *regionFeedState, resolvedTs u log.Warn("last resolved ts lag is too large", zap.Uint64("subID", uint64(span.subID)), zap.Int64("tableID", span.span.TableID), + zap.Uint64("regionID", regionID), zap.Uint64("resolvedTs", lastResolvedTs), zap.Float64("resolvedLag(s)", resolvedLag)) } @@ -331,6 +332,7 @@ func handleResolvedTs(span *subscribedSpan, state *regionFeedState, resolvedTs u log.Warn("next resolved ts lag is too large", zap.Uint64("subID", uint64(span.subID)), zap.Int64("tableID", span.span.TableID), + zap.Uint64("regionID", regionID), zap.Uint64("resolvedTs", ts), zap.Float64("resolvedLag(s)", nextResolvedLag)) } @@ -345,6 +347,7 @@ func handleResolvedTs(span *subscribedSpan, state *regionFeedState, resolvedTs u log.Warn("resolved ts advance step is too large", zap.Uint64("subID", uint64(span.subID)), zap.Int64("tableID", span.span.TableID), + zap.Uint64("regionID", regionID), zap.Uint64("resolvedTs", ts), zap.Uint64("lastResolvedTs", lastResolvedTs), zap.Float64("decreaseLag(s)", decreaseLag)) From 5edb527f0ac2f993b410201ce897ffa6b9552876 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 22 Jul 2025 20:57:29 +0800 Subject: [PATCH 08/12] check --- logservice/logpuller/region_event_handler.go | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/logservice/logpuller/region_event_handler.go b/logservice/logpuller/region_event_handler.go index d1d1240f3..109ba1099 100644 --- a/logservice/logpuller/region_event_handler.go +++ b/logservice/logpuller/region_event_handler.go @@ -306,7 +306,8 @@ func handleResolvedTs(span *subscribedSpan, state *regionFeedState, resolvedTs u now := time.Now().UnixMilli() lastAdvance := span.lastAdvanceTime.Load() if now-lastAdvance >= span.advanceInterval && span.lastAdvanceTime.CompareAndSwap(lastAdvance, now) { - ts := span.rangeLock.GetHeapMinTs() + // ts := span.rangeLock.GetHeapMinTs() + ts := span.rangeLock.ResolvedTs() if ts > 0 && span.initialized.CompareAndSwap(false, true) { log.Info("subscription client is initialized", zap.Uint64("subscriptionID", uint64(span.subID)), @@ -316,16 +317,6 @@ func handleResolvedTs(span *subscribedSpan, state *regionFeedState, resolvedTs u lastResolvedTs := span.resolvedTs.Load() curTime := time.Now() curPhyTs := oracle.GetPhysical(curTime) - resolvedPhyTs := oracle.ExtractPhysical(lastResolvedTs) - resolvedLag := float64(curPhyTs-resolvedPhyTs) / 1e3 - if resolvedLag > 10 { - log.Warn("last resolved ts lag is too large", - zap.Uint64("subID", uint64(span.subID)), - zap.Int64("tableID", span.span.TableID), - zap.Uint64("regionID", regionID), - zap.Uint64("resolvedTs", lastResolvedTs), - zap.Float64("resolvedLag(s)", resolvedLag)) - } nextResolvedPhyTs := oracle.ExtractPhysical(ts) nextResolvedLag := float64(curPhyTs-nextResolvedPhyTs) / 1e3 if nextResolvedLag > 10 { From ff850a65cf4d43a117f5e7db945dd7b8b600ad3e Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 22 Jul 2025 20:59:52 +0800 Subject: [PATCH 09/12] fix build --- logservice/logpuller/region_event_handler.go | 1 + 1 file changed, 1 insertion(+) diff --git a/logservice/logpuller/region_event_handler.go b/logservice/logpuller/region_event_handler.go index 109ba1099..ed54e78df 100644 --- a/logservice/logpuller/region_event_handler.go +++ b/logservice/logpuller/region_event_handler.go @@ -333,6 +333,7 @@ func handleResolvedTs(span *subscribedSpan, state *regionFeedState, resolvedTs u // the span may just be initialized and have not receive any resolved ts before, // so we also send ts in this case for quick notification to downstream. if ts > lastResolvedTs || (ts == lastResolvedTs && lastResolvedTs == span.startTs) { + resolvedPhyTs := oracle.ExtractPhysical(lastResolvedTs) decreaseLag := float64(nextResolvedPhyTs-resolvedPhyTs) / 1e3 if decreaseLag > 10 { log.Warn("resolved ts advance step is too large", From 2985c44da60a28df4cde9c11ce668847c33965f5 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 22 Jul 2025 21:08:41 +0800 Subject: [PATCH 10/12] add more log --- logservice/logpuller/region_event_handler.go | 3 ++- logservice/logpuller/regionlock/region_range_lock.go | 7 +++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/logservice/logpuller/region_event_handler.go b/logservice/logpuller/region_event_handler.go index ed54e78df..f1abd4e14 100644 --- a/logservice/logpuller/region_event_handler.go +++ b/logservice/logpuller/region_event_handler.go @@ -307,7 +307,7 @@ func handleResolvedTs(span *subscribedSpan, state *regionFeedState, resolvedTs u lastAdvance := span.lastAdvanceTime.Load() if now-lastAdvance >= span.advanceInterval && span.lastAdvanceTime.CompareAndSwap(lastAdvance, now) { // ts := span.rangeLock.GetHeapMinTs() - ts := span.rangeLock.ResolvedTs() + ts, tsRegionID := span.rangeLock.ResolvedTs() if ts > 0 && span.initialized.CompareAndSwap(false, true) { log.Info("subscription client is initialized", zap.Uint64("subscriptionID", uint64(span.subID)), @@ -324,6 +324,7 @@ func handleResolvedTs(span *subscribedSpan, state *regionFeedState, resolvedTs u zap.Uint64("subID", uint64(span.subID)), zap.Int64("tableID", span.span.TableID), zap.Uint64("regionID", regionID), + zap.Uint64("tsRegionID", tsRegionID), zap.Uint64("resolvedTs", ts), zap.Float64("resolvedLag(s)", nextResolvedLag)) } diff --git a/logservice/logpuller/regionlock/region_range_lock.go b/logservice/logpuller/regionlock/region_range_lock.go index 3bb27b3ef..cc6dde710 100644 --- a/logservice/logpuller/regionlock/region_range_lock.go +++ b/logservice/logpuller/regionlock/region_range_lock.go @@ -261,15 +261,17 @@ func (l *RangeLock) Len() int { // ResolvedTs calculates and returns the minimum resolvedTs // of all ranges in the RangeLock. -func (l *RangeLock) ResolvedTs() uint64 { +func (l *RangeLock) ResolvedTs() (uint64, uint64) { l.mu.RLock() defer l.mu.RUnlock() var minTs uint64 = math.MaxUint64 + var regionID uint64 l.lockedRanges.Ascend(func(item *rangeLockEntry) bool { ts := item.lockedRangeState.ResolvedTs.Load() if ts < minTs { minTs = ts + regionID = item.regionID } return true }) @@ -277,9 +279,10 @@ func (l *RangeLock) ResolvedTs() uint64 { unlockedMinTs := l.unlockedRanges.getMinTs() if unlockedMinTs < minTs { minTs = unlockedMinTs + regionID = 0 } - return minTs + return minTs, regionID } // RangeLockStatistics represents some statistics of a RangeLock. From e78d73184adbc2eb09c3c0c1a88d395bf547835a Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 23 Jul 2025 10:22:12 +0800 Subject: [PATCH 11/12] add more log --- logservice/logpuller/region_event_handler.go | 4 ++-- logservice/logpuller/region_request_worker.go | 6 +++--- logservice/logpuller/regionlock/region_range_lock.go | 4 ++-- logservice/logpuller/subscription_client.go | 12 ++++++------ 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/logservice/logpuller/region_event_handler.go b/logservice/logpuller/region_event_handler.go index f1abd4e14..d39a010e7 100644 --- a/logservice/logpuller/region_event_handler.go +++ b/logservice/logpuller/region_event_handler.go @@ -179,7 +179,7 @@ func (h *regionEventHandler) handleRegionError(state *regionFeedState, worker *r stepsToRemoved := state.markRemoved() err := state.takeError() if err != nil { - log.Debug("region event handler get a region error", + log.Info("region event handler get a region error", zap.Uint64("workerID", worker.workerID), zap.Uint64("subscriptionID", uint64(state.region.subscribedSpan.subID)), zap.Uint64("regionID", state.region.verID.GetID()), @@ -219,7 +219,7 @@ func handleEventEntries(span *subscribedSpan, state *regionFeedState, entries *c switch entry.Type { case cdcpb.Event_INITIALIZED: state.setInitialized() - log.Debug("region is initialized", + log.Info("region is initialized", zap.Int64("tableID", span.span.TableID), zap.Uint64("regionID", regionID), zap.Uint64("requestID", state.requestID), diff --git a/logservice/logpuller/region_request_worker.go b/logservice/logpuller/region_request_worker.go index 72e68cbf4..dd0089665 100644 --- a/logservice/logpuller/region_request_worker.go +++ b/logservice/logpuller/region_request_worker.go @@ -249,7 +249,7 @@ func (s *regionRequestWorker) dispatchRegionChangeEvents(events []*cdcpb.Event) // ignore continue case *cdcpb.Event_Error: - log.Debug("region request worker receives a region error", + log.Info("region request worker receives a region error", zap.Uint64("workerID", s.workerID), zap.Uint64("subscriptionID", uint64(subscriptionID)), zap.Uint64("regionID", event.RegionId), @@ -269,7 +269,7 @@ func (s *regionRequestWorker) dispatchRegionChangeEvents(events []*cdcpb.Event) switch event.Event.(type) { case *cdcpb.Event_Error: // it is normal to receive region error after deregister a subscription - log.Debug("region request worker receives an error for a stale region, ignore it", + log.Info("region request worker receives an error for a stale region, ignore it", zap.Uint64("workerID", s.workerID), zap.Uint64("subscriptionID", uint64(subscriptionID)), zap.Uint64("regionID", event.RegionId)) @@ -348,7 +348,7 @@ func (s *regionRequestWorker) processRegionSendTask( for { // TODO: can region be nil? subID := region.subscribedSpan.subID - log.Debug("region request worker gets a singleRegionInfo", + log.Info("region request worker gets a singleRegionInfo", zap.Uint64("workerID", s.workerID), zap.Uint64("subscriptionID", uint64(subID)), zap.Uint64("regionID", region.verID.GetID()), diff --git a/logservice/logpuller/regionlock/region_range_lock.go b/logservice/logpuller/regionlock/region_range_lock.go index cc6dde710..50446d687 100644 --- a/logservice/logpuller/regionlock/region_range_lock.go +++ b/logservice/logpuller/regionlock/region_range_lock.go @@ -244,7 +244,7 @@ func (l *RangeLock) UnlockRange( } l.unlockedRanges.set(startKey, endKey, newResolvedTs) - log.Debug("unlocked range", + log.Info("unlocked range", zap.Uint64("lockID", l.id), zap.Uint64("regionID", entry.regionID), zap.Uint64("resolvedTs", newResolvedTs), zap.String("startKey", hex.EncodeToString(startKey)), @@ -472,7 +472,7 @@ func (l *RangeLock) tryLockRange(startKey, endKey []byte, regionID, regionVersio l.lockedRangeStateHeap.AddOrUpdate(&newEntry.lockedRangeState) l.unlockedRanges.unset(startKey, endKey) - log.Debug("range locked", + log.Info("range locked", zap.Uint64("lockID", l.id), zap.Uint64("regionID", regionID), zap.Uint64("version", regionVersion), diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index b7881ac2a..9bed95380 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -552,7 +552,7 @@ func (s *subscriptionClient) handleRegions(ctx context.Context, eg *errgroup.Gro worker := store.getRequestWorker() worker.requestsCh <- region - log.Debug("subscription client will request a region", + log.Info("subscription client will request a region", zap.Uint64("workID", worker.workerID), zap.Uint64("subscriptionID", uint64(region.subscribedSpan.subID)), zap.Uint64("regionID", region.verID.GetID()), @@ -569,7 +569,7 @@ func (s *subscriptionClient) attachRPCContextForRegion(ctx context.Context, regi return region, true } if err != nil { - log.Debug("subscription client get rpc context fail", + log.Info("subscription client get rpc context fail", zap.Uint64("subscriptionID", uint64(region.subscribedSpan.subID)), zap.Uint64("regionID", region.verID.GetID()), zap.Error(err)) @@ -616,7 +616,7 @@ func (s *subscriptionClient) divideSpanAndScheduleRegionRequests( } backoffBeforeLoad = false } - log.Debug("subscription client is going to load regions", + log.Info("subscription client is going to load regions", zap.Uint64("subscriptionID", uint64(subscribedSpan.subID)), zap.Any("span", nextSpan)) @@ -734,7 +734,7 @@ func (s *subscriptionClient) doHandleError(ctx context.Context, errInfo regionEr switch eerr := err.(type) { case *eventError: innerErr := eerr.err - log.Debug("cdc region error", + log.Info("cdc region error", zap.Uint64("subscriptionID", uint64(errInfo.subscribedSpan.subID)), zap.Stringer("error", innerErr)) @@ -931,7 +931,7 @@ func (s *subscriptionClient) logSlowRegions(ctx context.Context) error { ckptTime := oracle.GetTimeFromTS(attr.SlowestRegion.ResolvedTs) if attr.SlowestRegion.Initialized { if currTime.Sub(ckptTime) > 2*resolveLockMinInterval { - log.Debug("subscription client finds a initialized slow region", + log.Info("subscription client finds a initialized slow region", zap.Uint64("subscriptionID", uint64(subscriptionID)), zap.Any("slowRegion", attr.SlowestRegion)) } @@ -1047,7 +1047,7 @@ func (s *subscriptionClient) GetResolvedTsLag() float64 { func (r *subscribedSpan) resolveStaleLocks(targetTs uint64) { util.MustCompareAndMonotonicIncrease(&r.staleLocksTargetTs, targetTs) res := r.rangeLock.IterAll(r.tryResolveLock) - log.Debug("subscription client finds slow locked ranges", + log.Info("subscription client finds slow locked ranges", zap.Uint64("subscriptionID", uint64(r.subID)), zap.Any("ranges", res)) } From f70ac1a50da13a6979900d49bd54598a683bec98 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 23 Jul 2025 15:11:47 +0800 Subject: [PATCH 12/12] add more log --- logservice/logpuller/region_event_handler.go | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/logservice/logpuller/region_event_handler.go b/logservice/logpuller/region_event_handler.go index d39a010e7..f27bcc8a0 100644 --- a/logservice/logpuller/region_event_handler.go +++ b/logservice/logpuller/region_event_handler.go @@ -306,8 +306,8 @@ func handleResolvedTs(span *subscribedSpan, state *regionFeedState, resolvedTs u now := time.Now().UnixMilli() lastAdvance := span.lastAdvanceTime.Load() if now-lastAdvance >= span.advanceInterval && span.lastAdvanceTime.CompareAndSwap(lastAdvance, now) { - // ts := span.rangeLock.GetHeapMinTs() - ts, tsRegionID := span.rangeLock.ResolvedTs() + ts := span.rangeLock.GetHeapMinTs() + // ts, _ := span.rangeLock.ResolvedTs() if ts > 0 && span.initialized.CompareAndSwap(false, true) { log.Info("subscription client is initialized", zap.Uint64("subscriptionID", uint64(span.subID)), @@ -315,19 +315,7 @@ func handleResolvedTs(span *subscribedSpan, state *regionFeedState, resolvedTs u zap.Uint64("resolvedTs", ts)) } lastResolvedTs := span.resolvedTs.Load() - curTime := time.Now() - curPhyTs := oracle.GetPhysical(curTime) nextResolvedPhyTs := oracle.ExtractPhysical(ts) - nextResolvedLag := float64(curPhyTs-nextResolvedPhyTs) / 1e3 - if nextResolvedLag > 10 { - log.Warn("next resolved ts lag is too large", - zap.Uint64("subID", uint64(span.subID)), - zap.Int64("tableID", span.span.TableID), - zap.Uint64("regionID", regionID), - zap.Uint64("tsRegionID", tsRegionID), - zap.Uint64("resolvedTs", ts), - zap.Float64("resolvedLag(s)", nextResolvedLag)) - } // Generally, we don't want to send duplicate resolved ts, // so we check whether `ts` is larger than `lastResolvedTs` before send it. // but when `ts` == `lastResolvedTs` == `span.startTs`,