diff --git a/service/sharddistributor/leader/process/processor.go b/service/sharddistributor/leader/process/processor.go index 64451a40631..fca6fc97e32 100644 --- a/service/sharddistributor/leader/process/processor.go +++ b/service/sharddistributor/leader/process/processor.go @@ -215,6 +215,9 @@ func (p *namespaceProcessor) runRebalancingLoop(ctx context.Context) { err = p.rebalanceShards(ctx) } if err != nil { + if store.IsContextCancellation(err) { + return + } p.logger.Error("rebalance failed", tag.Error(err)) } } @@ -234,6 +237,9 @@ func (p *namespaceProcessor) runShardStatsCleanupLoop(ctx context.Context) { p.logger.Info("Periodic shard stats cleanup triggered.") namespaceState, err := p.shardStore.GetState(ctx, p.namespaceCfg.Name) if err != nil { + if store.IsContextCancellation(err) { + return + } p.logger.Error("Failed to get state for shard stats cleanup", tag.Error(err)) continue } @@ -243,6 +249,9 @@ func (p *namespaceProcessor) runShardStatsCleanupLoop(ctx context.Context) { continue } if err := p.shardStore.DeleteShardStats(ctx, p.namespaceCfg.Name, staleShardStats, p.election.Guard()); err != nil { + if store.IsContextCancellation(err) { + return + } p.logger.Error("Failed to delete stale shard stats", tag.Error(err)) } } @@ -340,6 +349,9 @@ func (p *namespaceProcessor) rebalanceShards(ctx context.Context) (err error) { func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoopScope metrics.Scope) (err error) { namespaceState, err := p.shardStore.GetState(ctx, p.namespaceCfg.Name) if err != nil { + if store.IsContextCancellation(err) { + return err + } return fmt.Errorf("get state: %w", err) } @@ -386,6 +398,9 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo ExecutorsToDelete: staleExecutors, }, p.election.Guard()) if err != nil { + if store.IsContextCancellation(err) { + return err + } return fmt.Errorf("assign shards: %w", err) } diff --git a/service/sharddistributor/store/store.go b/service/sharddistributor/store/store.go index f2b2d3d0774..378f0e18a1a 100644 --- a/service/sharddistributor/store/store.go +++ b/service/sharddistributor/store/store.go @@ -2,6 +2,7 @@ package store import ( "context" + "errors" "fmt" ) @@ -48,6 +49,16 @@ func NopGuard() GuardFunc { } } +// IsContextCancellation reports whether the provided error indicates the caller's context +// has been cancelled or its deadline has been exceeded. +func IsContextCancellation(err error) bool { + if err == nil { + return false + } + return errors.Is(err, context.Canceled) || + errors.Is(err, context.DeadlineExceeded) +} + // AssignShardsRequest is a request to assign shards to executors, and remove unused shards. type AssignShardsRequest struct { // NewState is the new state of the namespace, containing the new assignments of shards to executors.