Skip to content

Ldz/add log0721 #1570

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions logservice/eventstore/event_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,14 @@ type subscriptionStat struct {
eventCh *chann.UnlimitedChannel[eventWithCallback, uint64]
// data <= checkpointTs can be deleted
checkpointTs atomic.Uint64
// whether the subStat has received any resolved ts
initialized atomic.Bool
// the time when the subscription receives latest resolved ts
lastAdvanceTime atomic.Int64
// the time when the subscription receives latest dml event
// it is used to calculate the idle time of the subscription
// 0 means the subscription has not received any dml event
lastReceiveDMLTime atomic.Int64
// the resolveTs persisted in the store
resolvedTs atomic.Uint64
// the max commit ts of dml event in the store
Expand Down Expand Up @@ -478,6 +486,7 @@ func (e *eventStore) RegisterDispatcher(
maxCommitTs = kv.CRTs
}
}
subStat.lastReceiveDMLTime.Store(time.Now().UnixMilli())
util.CompareAndMonotonicIncrease(&subStat.maxEventCommitTs, maxCommitTs)
subStat.eventCh.Push(eventWithCallback{
subID: subStat.subID,
Expand All @@ -494,6 +503,9 @@ func (e *eventStore) RegisterDispatcher(
if ts <= currentResolvedTs {
return
}
now := time.Now().UnixMilli()
subStat.lastAdvanceTime.Store(now)
subStat.initialized.Store(true)
// just do CompareAndSwap once, if failed, it means another goroutine has updated resolvedTs
if subStat.resolvedTs.CompareAndSwap(currentResolvedTs, ts) {
subStat.dispatchers.Lock()
Expand All @@ -502,6 +514,7 @@ func (e *eventStore) RegisterDispatcher(
notifier(ts, subStat.maxEventCommitTs.Load())
}
CounterResolved.Inc()
metrics.EventStoreNotifyDispatcherDurationHist.Observe(float64(time.Since(start).Milliseconds()) / 1000)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The metric EventStoreNotifyDispatcherDurationHist is being updated with time.Since(start). The start variable is captured from the RegisterDispatcher function's scope, which means this will measure the time since the dispatcher was registered, not the duration of this specific notification. This will result in misleadingly large values for the metric.

The duration should be measured within this if block to accurately reflect the notification time. For example, by recording time.Now() at the beginning of the if block and using it to calculate the duration.

}
}

Expand Down Expand Up @@ -805,6 +818,22 @@ func (e *eventStore) updateMetricsOnce() {
resolvedTs := subStat.resolvedTs.Load()
resolvedPhyTs := oracle.ExtractPhysical(resolvedTs)
resolvedLag := float64(pdPhyTs-resolvedPhyTs) / 1e3
if subStat.initialized.Load() && resolvedLag >= 10 {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The magic number 10 is used as a threshold for the resolved timestamp lag in seconds. To improve readability and maintainability, it's better to define this as a named constant at the package level. This value is used in multiple places in this pull request.

const largeResolvedTsLagInSecs = 10

            if subStat.initialized.Load() && resolvedLag >= largeResolvedTsLagInSecs {

lastReceiveDMLTimeRepr := "never"
if lastReceiveDMLTime := subStat.lastReceiveDMLTime.Load(); lastReceiveDMLTime > 0 {
lastReceiveDMLTimeRepr = time.UnixMilli(lastReceiveDMLTime).String()
}
log.Warn("resolved ts lag is too large for initialized subscription",
zap.Uint64("subID", uint64(subStat.subID)),
zap.Int64("tableID", subStat.tableSpan.TableID),
zap.Uint64("resolvedTs", resolvedTs),
zap.Float64("resolvedLag(s)", resolvedLag),
zap.Stringer("lastAdvanceTime", time.UnixMilli(subStat.lastAdvanceTime.Load())),
zap.String("lastReceiveDMLTime", lastReceiveDMLTimeRepr),
zap.String("tableSpan", common.FormatTableSpan(subStat.tableSpan)),
zap.Uint64("checkpointTs", subStat.checkpointTs.Load()),
zap.Uint64("maxEventCommitTs", subStat.maxEventCommitTs.Load()))
}
metrics.EventStoreDispatcherResolvedTsLagHist.Observe(float64(resolvedLag))
if minResolvedTs == 0 || resolvedTs < minResolvedTs {
minResolvedTs = resolvedTs
Expand Down
18 changes: 16 additions & 2 deletions logservice/logpuller/region_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/metrics"
"github.com/pingcap/ticdc/utils/dynstream"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -178,7 +179,7 @@ func (h *regionEventHandler) handleRegionError(state *regionFeedState, worker *r
stepsToRemoved := state.markRemoved()
err := state.takeError()
if err != nil {
log.Debug("region event handler get a region error",
log.Info("region event handler get a region error",
zap.Uint64("workerID", worker.workerID),
zap.Uint64("subscriptionID", uint64(state.region.subscribedSpan.subID)),
zap.Uint64("regionID", state.region.verID.GetID()),
Expand Down Expand Up @@ -218,7 +219,7 @@ func handleEventEntries(span *subscribedSpan, state *regionFeedState, entries *c
switch entry.Type {
case cdcpb.Event_INITIALIZED:
state.setInitialized()
log.Debug("region is initialized",
log.Info("region is initialized",
zap.Int64("tableID", span.span.TableID),
zap.Uint64("regionID", regionID),
zap.Uint64("requestID", state.requestID),
Expand Down Expand Up @@ -306,19 +307,32 @@ func handleResolvedTs(span *subscribedSpan, state *regionFeedState, resolvedTs u
lastAdvance := span.lastAdvanceTime.Load()
if now-lastAdvance >= span.advanceInterval && span.lastAdvanceTime.CompareAndSwap(lastAdvance, now) {
ts := span.rangeLock.GetHeapMinTs()
// ts, _ := span.rangeLock.ResolvedTs()
if ts > 0 && span.initialized.CompareAndSwap(false, true) {
log.Info("subscription client is initialized",
zap.Uint64("subscriptionID", uint64(span.subID)),
zap.Uint64("regionID", regionID),
zap.Uint64("resolvedTs", ts))
}
lastResolvedTs := span.resolvedTs.Load()
nextResolvedPhyTs := oracle.ExtractPhysical(ts)
// Generally, we don't want to send duplicate resolved ts,
// so we check whether `ts` is larger than `lastResolvedTs` before send it.
// but when `ts` == `lastResolvedTs` == `span.startTs`,
// the span may just be initialized and have not receive any resolved ts before,
// so we also send ts in this case for quick notification to downstream.
if ts > lastResolvedTs || (ts == lastResolvedTs && lastResolvedTs == span.startTs) {
resolvedPhyTs := oracle.ExtractPhysical(lastResolvedTs)
decreaseLag := float64(nextResolvedPhyTs-resolvedPhyTs) / 1e3
if decreaseLag > 10 {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The magic number 10 is used as a threshold for a large resolved timestamp advance step. This should be defined as a named constant to improve code clarity and maintainability, especially since this value is used elsewhere.

const largeResolvedTsAdvanceStepInSecs = 10

            if decreaseLag > largeResolvedTsAdvanceStepInSecs {

log.Warn("resolved ts advance step is too large",
zap.Uint64("subID", uint64(span.subID)),
zap.Int64("tableID", span.span.TableID),
zap.Uint64("regionID", regionID),
zap.Uint64("resolvedTs", ts),
zap.Uint64("lastResolvedTs", lastResolvedTs),
zap.Float64("decreaseLag(s)", decreaseLag))
}
span.resolvedTs.Store(ts)
span.resolvedTsUpdated.Store(time.Now().Unix())
return ts
Expand Down
6 changes: 3 additions & 3 deletions logservice/logpuller/region_request_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func (s *regionRequestWorker) dispatchRegionChangeEvents(events []*cdcpb.Event)
// ignore
continue
case *cdcpb.Event_Error:
log.Debug("region request worker receives a region error",
log.Info("region request worker receives a region error",
zap.Uint64("workerID", s.workerID),
zap.Uint64("subscriptionID", uint64(subscriptionID)),
zap.Uint64("regionID", event.RegionId),
Expand All @@ -269,7 +269,7 @@ func (s *regionRequestWorker) dispatchRegionChangeEvents(events []*cdcpb.Event)
switch event.Event.(type) {
case *cdcpb.Event_Error:
// it is normal to receive region error after deregister a subscription
log.Debug("region request worker receives an error for a stale region, ignore it",
log.Info("region request worker receives an error for a stale region, ignore it",
zap.Uint64("workerID", s.workerID),
zap.Uint64("subscriptionID", uint64(subscriptionID)),
zap.Uint64("regionID", event.RegionId))
Expand Down Expand Up @@ -348,7 +348,7 @@ func (s *regionRequestWorker) processRegionSendTask(
for {
// TODO: can region be nil?
subID := region.subscribedSpan.subID
log.Debug("region request worker gets a singleRegionInfo",
log.Info("region request worker gets a singleRegionInfo",
zap.Uint64("workerID", s.workerID),
zap.Uint64("subscriptionID", uint64(subID)),
zap.Uint64("regionID", region.verID.GetID()),
Expand Down
11 changes: 7 additions & 4 deletions logservice/logpuller/regionlock/region_range_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (l *RangeLock) UnlockRange(
}

l.unlockedRanges.set(startKey, endKey, newResolvedTs)
log.Debug("unlocked range",
log.Info("unlocked range",
zap.Uint64("lockID", l.id), zap.Uint64("regionID", entry.regionID),
zap.Uint64("resolvedTs", newResolvedTs),
zap.String("startKey", hex.EncodeToString(startKey)),
Expand All @@ -261,25 +261,28 @@ func (l *RangeLock) Len() int {

// ResolvedTs calculates and returns the minimum resolvedTs
// of all ranges in the RangeLock.
func (l *RangeLock) ResolvedTs() uint64 {
func (l *RangeLock) ResolvedTs() (uint64, uint64) {
l.mu.RLock()
defer l.mu.RUnlock()

var minTs uint64 = math.MaxUint64
var regionID uint64
l.lockedRanges.Ascend(func(item *rangeLockEntry) bool {
ts := item.lockedRangeState.ResolvedTs.Load()
if ts < minTs {
minTs = ts
regionID = item.regionID
}
return true
})

unlockedMinTs := l.unlockedRanges.getMinTs()
if unlockedMinTs < minTs {
minTs = unlockedMinTs
regionID = 0
}

return minTs
return minTs, regionID
}

// RangeLockStatistics represents some statistics of a RangeLock.
Expand Down Expand Up @@ -469,7 +472,7 @@ func (l *RangeLock) tryLockRange(startKey, endKey []byte, regionID, regionVersio
l.lockedRangeStateHeap.AddOrUpdate(&newEntry.lockedRangeState)

l.unlockedRanges.unset(startKey, endKey)
log.Debug("range locked",
log.Info("range locked",
zap.Uint64("lockID", l.id),
zap.Uint64("regionID", regionID),
zap.Uint64("version", regionVersion),
Expand Down
45 changes: 39 additions & 6 deletions logservice/logpuller/subscription_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ func (s *subscriptionClient) Run(ctx context.Context) error {
g.Go(func() error { return s.runResolveLockChecker(ctx) })
g.Go(func() error { return s.handleResolveLockTasks(ctx) })
g.Go(func() error { return s.logSlowRegions(ctx) })
g.Go(func() error { return s.logSlowSpan(ctx) })
g.Go(func() error { return s.errCache.dispatch(ctx) })

log.Info("subscription client starts")
Expand Down Expand Up @@ -551,7 +552,7 @@ func (s *subscriptionClient) handleRegions(ctx context.Context, eg *errgroup.Gro
worker := store.getRequestWorker()
worker.requestsCh <- region

log.Debug("subscription client will request a region",
log.Info("subscription client will request a region",
zap.Uint64("workID", worker.workerID),
zap.Uint64("subscriptionID", uint64(region.subscribedSpan.subID)),
zap.Uint64("regionID", region.verID.GetID()),
Expand All @@ -568,7 +569,7 @@ func (s *subscriptionClient) attachRPCContextForRegion(ctx context.Context, regi
return region, true
}
if err != nil {
log.Debug("subscription client get rpc context fail",
log.Info("subscription client get rpc context fail",
zap.Uint64("subscriptionID", uint64(region.subscribedSpan.subID)),
zap.Uint64("regionID", region.verID.GetID()),
zap.Error(err))
Expand Down Expand Up @@ -615,7 +616,7 @@ func (s *subscriptionClient) divideSpanAndScheduleRegionRequests(
}
backoffBeforeLoad = false
}
log.Debug("subscription client is going to load regions",
log.Info("subscription client is going to load regions",
zap.Uint64("subscriptionID", uint64(subscribedSpan.subID)),
zap.Any("span", nextSpan))
Comment on lines +619 to 621

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This log message "subscription client is going to load regions" is inside a loop. While it's before a network call, logging it at Info level might still be too verbose if many small ranges are processed. Debug level would be more suitable for this kind of "about to do X" message.

Suggested change
log.Info("subscription client is going to load regions",
zap.Uint64("subscriptionID", uint64(subscribedSpan.subID)),
zap.Any("span", nextSpan))
log.Debug("subscription client is going to load regions",
zap.Uint64("subscriptionID", uint64(subscribedSpan.subID)),
zap.Any("span", nextSpan))


Expand Down Expand Up @@ -733,7 +734,7 @@ func (s *subscriptionClient) doHandleError(ctx context.Context, errInfo regionEr
switch eerr := err.(type) {
case *eventError:
innerErr := eerr.err
log.Debug("cdc region error",
log.Info("cdc region error",
zap.Uint64("subscriptionID", uint64(errInfo.subscribedSpan.subID)),
zap.Stringer("error", innerErr))

Expand Down Expand Up @@ -930,7 +931,7 @@ func (s *subscriptionClient) logSlowRegions(ctx context.Context) error {
ckptTime := oracle.GetTimeFromTS(attr.SlowestRegion.ResolvedTs)
if attr.SlowestRegion.Initialized {
if currTime.Sub(ckptTime) > 2*resolveLockMinInterval {
log.Debug("subscription client finds a initialized slow region",
log.Info("subscription client finds a initialized slow region",
zap.Uint64("subscriptionID", uint64(subscriptionID)),
zap.Any("slowRegion", attr.SlowestRegion))
}
Expand All @@ -954,6 +955,38 @@ func (s *subscriptionClient) logSlowRegions(ctx context.Context) error {
}
}

func (s *subscriptionClient) logSlowSpan(ctx context.Context) error {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}

pdTime := s.pdClock.CurrentTime()
pdPhyTs := oracle.GetPhysical(pdTime)
s.totalSpans.RLock()
for subID, rt := range s.totalSpans.spanMap {
if !rt.initialized.Load() {
continue
}
resolvedTs := rt.rangeLock.GetHeapMinTs()
resolvedPhyTs := oracle.ExtractPhysical(resolvedTs)
resolvedLag := float64(pdPhyTs-resolvedPhyTs) / 1e3
if resolvedLag > 10 {
Comment on lines +959 to +978

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This new function logSlowSpan uses a couple of magic numbers:

  • 10 * time.Second for the ticker interval (line 959)
  • 10 for the lag threshold in seconds (line 978)

These should be defined as named constants at the package level to improve readability and maintainability.

Suggested change
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}
pdTime := s.pdClock.CurrentTime()
pdPhyTs := oracle.GetPhysical(pdTime)
s.totalSpans.RLock()
for subID, rt := range s.totalSpans.spanMap {
if !rt.initialized.Load() {
continue
}
resolvedTs := rt.rangeLock.GetHeapMinTs()
resolvedPhyTs := oracle.ExtractPhysical(resolvedTs)
resolvedLag := float64(pdPhyTs-resolvedPhyTs) / 1e3
if resolvedLag > 10 {
const logSlowSpanTickerInterval = 10 * time.Second
const resolvedLagThresholdInSecs = 10
ticker := time.NewTicker(logSlowSpanTickerInterval)
if resolvedLag > resolvedLagThresholdInSecs {

log.Warn("resolved ts lag is too large for initialized span",
zap.Uint64("subID", uint64(subID)),
zap.Int64("tableID", rt.span.TableID),
zap.Uint64("resolvedTs", resolvedTs),
zap.Float64("resolvedLag(s)", resolvedLag))
}
}
s.totalSpans.RUnlock()
}
}

func (s *subscriptionClient) newSubscribedSpan(
subID SubscriptionID,
span heartbeatpb.TableSpan,
Expand Down Expand Up @@ -1014,7 +1047,7 @@ func (s *subscriptionClient) GetResolvedTsLag() float64 {
func (r *subscribedSpan) resolveStaleLocks(targetTs uint64) {
util.MustCompareAndMonotonicIncrease(&r.staleLocksTargetTs, targetTs)
res := r.rangeLock.IterAll(r.tryResolveLock)
log.Debug("subscription client finds slow locked ranges",
log.Info("subscription client finds slow locked ranges",
zap.Uint64("subscriptionID", uint64(r.subID)),
zap.Any("ranges", res))
}
Expand Down
13 changes: 12 additions & 1 deletion pkg/metrics/event_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,13 @@ var (
Name: "output_event_count",
Help: "The number of events output by the sorter",
}, []string{"type"}) // types : kv, resolved.

EventStoreWriteDurationHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "event_store",
Name: "write_duration",
Help: "Bucketed histogram of event store write duration",
Buckets: prometheus.ExponentialBuckets(0.00004, 2.0, 10),
Buckets: prometheus.ExponentialBuckets(0.00004, 2.0, 10), // 0.1ms ~ 52s,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The comment // 0.1ms ~ 52s, seems incorrect for the buckets defined by prometheus.ExponentialBuckets(0.00004, 2.0, 10). The buckets for this metric range from 40µs to approximately 20.48ms. The comment was likely misplaced from another metric definition and should be corrected or removed to avoid confusion.

Suggested change
Buckets: prometheus.ExponentialBuckets(0.00004, 2.0, 10), // 0.1ms ~ 52s,
Buckets: prometheus.ExponentialBuckets(0.00004, 2.0, 10),

})

EventStoreScanRequestsCount = prometheus.NewCounter(
Expand Down Expand Up @@ -162,6 +163,15 @@ var (
Help: "Bucketed histogram of event store sorter iterator read duration",
Buckets: prometheus.ExponentialBuckets(0.00004, 2.0, 28), // 40us to 1.5h
}, []string{"type"})

EventStoreNotifyDispatcherDurationHist = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "event_store",
Name: "notify_dispatcher_duration",
Help: "The duration of notifying dispatchers with resolved ts.",
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 20), // 0.1ms ~ 52s,
})
)

func InitEventStoreMetrics(registry *prometheus.Registry) {
Expand All @@ -183,4 +193,5 @@ func InitEventStoreMetrics(registry *prometheus.Registry) {
registry.MustRegister(EventStoreWriteBatchSizeHist)
registry.MustRegister(EventStoreWriteRequestsCount)
registry.MustRegister(EventStoreReadDurationHistogram)
registry.MustRegister(EventStoreNotifyDispatcherDurationHist)
}
Loading