Skip to content
15 changes: 15 additions & 0 deletions service/sharddistributor/leader/process/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ func (p *namespaceProcessor) runRebalancingLoop(ctx context.Context) {
err = p.rebalanceShards(ctx)
}
if err != nil {
if store.IsContextCancellation(err) {
return
}
p.logger.Error("rebalance failed", tag.Error(err))
}
}
Expand All @@ -234,6 +237,9 @@ func (p *namespaceProcessor) runShardStatsCleanupLoop(ctx context.Context) {
p.logger.Info("Periodic shard stats cleanup triggered.")
namespaceState, err := p.shardStore.GetState(ctx, p.namespaceCfg.Name)
if err != nil {
if store.IsContextCancellation(err) {
return
}
p.logger.Error("Failed to get state for shard stats cleanup", tag.Error(err))
continue
}
Expand All @@ -243,6 +249,9 @@ func (p *namespaceProcessor) runShardStatsCleanupLoop(ctx context.Context) {
continue
}
if err := p.shardStore.DeleteShardStats(ctx, p.namespaceCfg.Name, staleShardStats, p.election.Guard()); err != nil {
if store.IsContextCancellation(err) {
return
}
p.logger.Error("Failed to delete stale shard stats", tag.Error(err))
}
}
Expand Down Expand Up @@ -340,6 +349,9 @@ func (p *namespaceProcessor) rebalanceShards(ctx context.Context) (err error) {
func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoopScope metrics.Scope) (err error) {
namespaceState, err := p.shardStore.GetState(ctx, p.namespaceCfg.Name)
if err != nil {
if store.IsContextCancellation(err) {
return err
}
return fmt.Errorf("get state: %w", err)
}

Expand Down Expand Up @@ -386,6 +398,9 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo
ExecutorsToDelete: staleExecutors,
}, p.election.Guard())
if err != nil {
if store.IsContextCancellation(err) {
return err
}
return fmt.Errorf("assign shards: %w", err)
}

Expand Down
12 changes: 12 additions & 0 deletions service/sharddistributor/store/etcd/executorstore/etcdstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,14 @@ func (s *executorStoreImpl) GetState(ctx context.Context, namespace string) (*st
executorPrefix := etcdkeys.BuildExecutorsPrefix(s.prefix, namespace)
resp, err := s.client.Get(ctx, executorPrefix, clientv3.WithPrefix())
if err != nil {
if store.IsContextCancellation(err) {
return nil, ctx.Err()
}
return nil, fmt.Errorf("get executor data: %w", err)
}
if ctxErr := ctx.Err(); store.IsContextCancellation(ctxErr) {
return nil, ctxErr
}

for _, kv := range resp.Kvs {
key := string(kv.Key)
Expand Down Expand Up @@ -276,8 +282,14 @@ func (s *executorStoreImpl) GetState(ctx context.Context, namespace string) (*st
shardsPrefix := etcdkeys.BuildShardsPrefix(s.prefix, namespace)
shardResp, err := s.client.Get(ctx, shardsPrefix, clientv3.WithPrefix())
if err != nil {
if store.IsContextCancellation(err) {
return nil, ctx.Err()
}
return nil, fmt.Errorf("get shard data: %w", err)
}
if ctxErr := ctx.Err(); store.IsContextCancellation(ctxErr) {
return nil, ctxErr
}
for _, kv := range shardResp.Kvs {
shardID, shardKeyType, err := etcdkeys.ParseShardKey(s.prefix, namespace, string(kv.Key))
if err != nil {
Expand Down
11 changes: 11 additions & 0 deletions service/sharddistributor/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package store

import (
"context"
"errors"
"fmt"
)

Expand Down Expand Up @@ -48,6 +49,16 @@ func NopGuard() GuardFunc {
}
}

// IsContextCancellation reports whether the provided error indicates the caller's context
// has been cancelled or its deadline has been exceeded.
func IsContextCancellation(err error) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to be some common/utils package
store/ looks like related to persistance, while context cancellation has nothing to do about it in general

if err == nil {
return false
}
return errors.Is(err, context.Canceled) ||
errors.Is(err, context.DeadlineExceeded)
}

// AssignShardsRequest is a request to assign shards to executors, and remove unused shards.
type AssignShardsRequest struct {
// NewState is the new state of the namespace, containing the new assignments of shards to executors.
Expand Down
Loading