Skip to content
21 changes: 21 additions & 0 deletions service/sharddistributor/leader/process/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package process

import (
"context"
"errors"
"fmt"
"maps"
"math/rand"
Expand Down Expand Up @@ -214,6 +215,9 @@ func (p *namespaceProcessor) runRebalancingLoop(ctx context.Context) {
err = p.rebalanceShards(ctx)
}
if err != nil {
if isCancelledOrDeadlineExceeded(err) {
return
}
p.logger.Error("rebalance failed", tag.Error(err))
}
}
Expand All @@ -233,6 +237,9 @@ func (p *namespaceProcessor) runShardStatsCleanupLoop(ctx context.Context) {
p.logger.Info("Periodic shard stats cleanup triggered.")
namespaceState, err := p.shardStore.GetState(ctx, p.namespaceCfg.Name)
if err != nil {
if isCancelledOrDeadlineExceeded(err) {
return
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is expected to be caught in case <-ctx.Done(): instead.
Which means, if ctx will be Done during the GetState we won't see Shard stats cleanup loop cancelled., which will is very unexpected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree it’s a bit counterintuitive at first glance.
In practice the case <-ctx.Done() only fires if the cancellation happens before we enter the branch. In the noisy logs we’re seeing the ticker fire, we drop into the cleanup work, and then the context gets cancelled while GetState is still in flight.
At that point we’re already executing the branch, so the select won’t re-evaluate
the only place we can notice the cancellation is via the error returned from the store.

I added the inline guard so that path exits quietly instead of logging "get executor data: context canceled" since the message is expected any time leadership or shutdown interrupts the iteration.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand how it works technically. My point is - this can lead to absense of (which looks like an important) log message of shutting down corresponding to "starting".
We better to preserve that, if it's important. Maybe by logging this outside of select in any way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, we can either log it also in this case, just to make sure we are not missing the implementation, or go to the loop again, in this case there is a risk that we never hit the first case , it should be minimal but exists.

p.logger.Error("Failed to get state for shard stats cleanup", tag.Error(err))
continue
}
Expand All @@ -242,6 +249,9 @@ func (p *namespaceProcessor) runShardStatsCleanupLoop(ctx context.Context) {
continue
}
if err := p.shardStore.DeleteShardStats(ctx, p.namespaceCfg.Name, staleShardStats, p.election.Guard()); err != nil {
if isCancelledOrDeadlineExceeded(err) {
return
}
p.logger.Error("Failed to delete stale shard stats", tag.Error(err))
}
}
Expand Down Expand Up @@ -340,6 +350,9 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo

namespaceState, err := p.shardStore.GetState(ctx, p.namespaceCfg.Name)
if err != nil {
if isCancelledOrDeadlineExceeded(err) {
return err
}
return fmt.Errorf("get state: %w", err)
}

Expand Down Expand Up @@ -386,6 +399,9 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo
ExecutorsToDelete: staleExecutors,
}, p.election.Guard())
if err != nil {
if isCancelledOrDeadlineExceeded(err) {
return err
}
return fmt.Errorf("assign shards: %w", err)
}

Expand Down Expand Up @@ -586,3 +602,8 @@ func makeShards(num int64) []string {
}
return shards
}

func isCancelledOrDeadlineExceeded(err error) bool {
return errors.Is(err, context.Canceled) ||
errors.Is(err, context.DeadlineExceeded)
}
12 changes: 12 additions & 0 deletions service/sharddistributor/store/etcd/executorstore/etcdstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,14 @@ func (s *executorStoreImpl) GetState(ctx context.Context, namespace string) (*st
executorPrefix := etcdkeys.BuildExecutorsPrefix(s.prefix, namespace)
resp, err := s.client.Get(ctx, executorPrefix, clientv3.WithPrefix())
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return nil, ctx.Err()
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose moving isCancelledOrDeadlineExceeded as it is a very pattern and re-use it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved IsContextCancellation to store.go

return nil, fmt.Errorf("get executor data: %w", err)
}
if ctxErr := ctx.Err(); errors.Is(ctxErr, context.Canceled) || errors.Is(ctxErr, context.DeadlineExceeded) {
return nil, ctxErr
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this is required? I think it's already handles in the above if err != nil

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though the call succeeds, the context might be cancelled immediately after Get returns, before we iterate the KVs.
By checking ctx.Err() again after the call, we avoid logging or processing stale data when the caller already abandoned the request.

Though I don't have a strong opinion here, I was just adding extra guards to prevent noisy logging.
If you think this is unnecessary I can remove that part.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be cancelled in a middle of the KVs iteration as well.


for _, kv := range resp.Kvs {
key := string(kv.Key)
Expand Down Expand Up @@ -276,8 +282,14 @@ func (s *executorStoreImpl) GetState(ctx context.Context, namespace string) (*st
shardsPrefix := etcdkeys.BuildShardsPrefix(s.prefix, namespace)
shardResp, err := s.client.Get(ctx, shardsPrefix, clientv3.WithPrefix())
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return nil, ctx.Err()
}
return nil, fmt.Errorf("get shard data: %w", err)
}
if ctxErr := ctx.Err(); errors.Is(ctxErr, context.Canceled) || errors.Is(ctxErr, context.DeadlineExceeded) {
return nil, ctxErr
}
for _, kv := range shardResp.Kvs {
shardID, shardKeyType, err := etcdkeys.ParseShardKey(s.prefix, namespace, string(kv.Key))
if err != nil {
Expand Down
Loading