From 07c98cec8ce1971ea48b4bc22ef44c19085fa11c Mon Sep 17 00:00:00 2001 From: Gaziza Yestemirova Date: Wed, 26 Nov 2025 14:35:37 +0100 Subject: [PATCH 1/8] [shard-distributor]Remove error logs from store level Signed-off-by: Gaziza Yestemirova --- .../store/wrappers/metered/base.go | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/service/sharddistributor/store/wrappers/metered/base.go b/service/sharddistributor/store/wrappers/metered/base.go index 85ec58d1fe9..ffe499c7f19 100644 --- a/service/sharddistributor/store/wrappers/metered/base.go +++ b/service/sharddistributor/store/wrappers/metered/base.go @@ -27,7 +27,6 @@ import ( "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/log" - "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/service/sharddistributor/store" ) @@ -38,18 +37,11 @@ type base struct { timeSource clock.TimeSource } -func (p *base) updateErrorMetricPerNamespace(scope metrics.ScopeIdx, err error, scopeWithNamespaceTags metrics.Scope, logger log.Logger) { - logger = logger.Helper() - - switch { - case errors.Is(err, store.ErrExecutorNotFound): +func (p *base) updateErrorMetricPerNamespace(err error, scopeWithNamespaceTags metrics.Scope) { + if errors.Is(err, store.ErrExecutorNotFound) { scopeWithNamespaceTags.IncCounter(metrics.ShardDistributorStoreExecutorNotFound) - logger.Error("Executor not found.", tag.Error(err), tag.MetricScope(int(scope))) - case errors.Is(err, store.ErrShardNotFound): - // this is expected, so we don't want to log it - default: - logger.Error("Store failed with internal error.", tag.Error(err), tag.MetricScope(int(scope))) // int??? } + scopeWithNamespaceTags.IncCounter(metrics.ShardDistributorStoreFailuresPerNamespace) } @@ -62,9 +54,8 @@ func (p *base) call(scope metrics.ScopeIdx, op func() error, tags ...metrics.Tag duration := p.timeSource.Since(before) metricsScope.RecordHistogramDuration(metrics.ShardDistributorStoreLatencyHistogramPerNamespace, duration) - logger := p.logger.Helper() if err != nil { - p.updateErrorMetricPerNamespace(scope, err, metricsScope, logger) + p.updateErrorMetricPerNamespace(err, metricsScope) } return err } From 6b28caab5de702a5df9d745dec375b15a6567b6e Mon Sep 17 00:00:00 2001 From: Gaziza Yestemirova Date: Wed, 26 Nov 2025 15:02:41 +0100 Subject: [PATCH 2/8] update tests Signed-off-by: Gaziza Yestemirova --- .../store/wrappers/metered/metered_test.go | 28 ++++--------------- 1 file changed, 6 insertions(+), 22 deletions(-) diff --git a/service/sharddistributor/store/wrappers/metered/metered_test.go b/service/sharddistributor/store/wrappers/metered/metered_test.go index 5874cc736f2..a7e1564becf 100644 --- a/service/sharddistributor/store/wrappers/metered/metered_test.go +++ b/service/sharddistributor/store/wrappers/metered/metered_test.go @@ -11,7 +11,6 @@ import ( "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/log" - "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/sharddistributor/store" @@ -31,33 +30,19 @@ func TestMeteredStore_GetHeartbeat(t *testing.T) { } tests := []struct { - name string - setupMocks func(logger *log.MockLogger) - error error + name string + error error }{ { - name: "Success", - setupMocks: func(logger *log.MockLogger) {}, - error: nil, + name: "Success", + error: nil, }, { - name: "NotFound", - setupMocks: func(logger *log.MockLogger) { - logger.EXPECT().Error( - "Executor not found.", - []tag.Tag{tag.Error(store.ErrExecutorNotFound), tag.MetricScope(int(metrics.ShardDistributorStoreGetHeartbeatScope))}, - ).Times(1) - }, + name: "NotFound", error: store.ErrExecutorNotFound, }, { - name: "Failure", - setupMocks: func(logger *log.MockLogger) { - logger.EXPECT().Error( - "Store failed with internal error.", - []tag.Tag{tag.Error(&types.InternalServiceError{}), tag.MetricScope(int(metrics.ShardDistributorStoreGetHeartbeatScope))}, - ).Times(1) - }, + name: "Failure", error: &types.InternalServiceError{}, }, } @@ -79,7 +64,6 @@ func TestMeteredStore_GetHeartbeat(t *testing.T) { mockLogger.EXPECT().Helper().Return(mockLogger).AnyTimes() wrapped := NewStore(mockHandler, metricsClient, mockLogger, timeSource).(*meteredStore) - tt.setupMocks(mockLogger) gotHeartbeat, gotAssignedState, err := wrapped.GetHeartbeat(context.Background(), _testNamespace, _testExecutorID) From a68b74f367cd7e7723fa3d59ed27ce05bb859c1c Mon Sep 17 00:00:00 2001 From: eleonoradgr <32766443+eleonoradgr@users.noreply.github.com> Date: Wed, 26 Nov 2025 15:05:44 +0100 Subject: [PATCH 3/8] fix(shard-distributor): remove trimming of prefixes (#7490) **What changed?** Reverting the trimprefix since we are using constants to compare the values that include that **Why?** Constants that include the prefix are used to **How did you test it?** Deployed in staging **Potential risks** Corruption of db, which is already the case. **Release notes** **Documentation Changes** --------- Signed-off-by: edigregorio Signed-off-by: Gaziza Yestemirova --- common/types/sharddistributor.go | 2 +- ...rddistributor_statuses_enumer_generated.go | 110 +++++++++--------- .../store/etcd/etcdtypes/state_test.go | 2 +- 3 files changed, 57 insertions(+), 57 deletions(-) diff --git a/common/types/sharddistributor.go b/common/types/sharddistributor.go index 3e4b960f69d..5bbddf478f0 100644 --- a/common/types/sharddistributor.go +++ b/common/types/sharddistributor.go @@ -24,7 +24,7 @@ package types import "fmt" -//go:generate enumer -type=ExecutorStatus,ShardStatus,AssignmentStatus,MigrationMode -trimprefix=ExecutorStatus,ShardStatus,AssignmentStatus,MigrationMode -json -output sharddistributor_statuses_enumer_generated.go +//go:generate enumer -type=ExecutorStatus,ShardStatus,AssignmentStatus,MigrationMode -json -output sharddistributor_statuses_enumer_generated.go type GetShardOwnerRequest struct { ShardKey string diff --git a/common/types/sharddistributor_statuses_enumer_generated.go b/common/types/sharddistributor_statuses_enumer_generated.go index b120dfe7b66..9e1f0f873e0 100644 --- a/common/types/sharddistributor_statuses_enumer_generated.go +++ b/common/types/sharddistributor_statuses_enumer_generated.go @@ -1,4 +1,4 @@ -// Code generated by "enumer -type=ExecutorStatus,ShardStatus,AssignmentStatus,MigrationMode -trimprefix=ExecutorStatus,ShardStatus,AssignmentStatus,MigrationMode -json -output sharddistributor_statuses_enumer_generated.go"; DO NOT EDIT. +// Code generated by "enumer -type=ExecutorStatus,ShardStatus,AssignmentStatus,MigrationMode -json -output sharddistributor_statuses_enumer_generated.go"; DO NOT EDIT. package types @@ -8,11 +8,11 @@ import ( "strings" ) -const _ExecutorStatusName = "INVALIDACTIVEDRAININGDRAINED" +const _ExecutorStatusName = "ExecutorStatusINVALIDExecutorStatusACTIVEExecutorStatusDRAININGExecutorStatusDRAINED" -var _ExecutorStatusIndex = [...]uint8{0, 7, 13, 21, 28} +var _ExecutorStatusIndex = [...]uint8{0, 21, 41, 63, 84} -const _ExecutorStatusLowerName = "invalidactivedrainingdrained" +const _ExecutorStatusLowerName = "executorstatusinvalidexecutorstatusactiveexecutorstatusdrainingexecutorstatusdrained" func (i ExecutorStatus) String() string { if i < 0 || i >= ExecutorStatus(len(_ExecutorStatusIndex)-1) { @@ -34,21 +34,21 @@ func _ExecutorStatusNoOp() { var _ExecutorStatusValues = []ExecutorStatus{ExecutorStatusINVALID, ExecutorStatusACTIVE, ExecutorStatusDRAINING, ExecutorStatusDRAINED} var _ExecutorStatusNameToValueMap = map[string]ExecutorStatus{ - _ExecutorStatusName[0:7]: ExecutorStatusINVALID, - _ExecutorStatusLowerName[0:7]: ExecutorStatusINVALID, - _ExecutorStatusName[7:13]: ExecutorStatusACTIVE, - _ExecutorStatusLowerName[7:13]: ExecutorStatusACTIVE, - _ExecutorStatusName[13:21]: ExecutorStatusDRAINING, - _ExecutorStatusLowerName[13:21]: ExecutorStatusDRAINING, - _ExecutorStatusName[21:28]: ExecutorStatusDRAINED, - _ExecutorStatusLowerName[21:28]: ExecutorStatusDRAINED, + _ExecutorStatusName[0:21]: ExecutorStatusINVALID, + _ExecutorStatusLowerName[0:21]: ExecutorStatusINVALID, + _ExecutorStatusName[21:41]: ExecutorStatusACTIVE, + _ExecutorStatusLowerName[21:41]: ExecutorStatusACTIVE, + _ExecutorStatusName[41:63]: ExecutorStatusDRAINING, + _ExecutorStatusLowerName[41:63]: ExecutorStatusDRAINING, + _ExecutorStatusName[63:84]: ExecutorStatusDRAINED, + _ExecutorStatusLowerName[63:84]: ExecutorStatusDRAINED, } var _ExecutorStatusNames = []string{ - _ExecutorStatusName[0:7], - _ExecutorStatusName[7:13], - _ExecutorStatusName[13:21], - _ExecutorStatusName[21:28], + _ExecutorStatusName[0:21], + _ExecutorStatusName[21:41], + _ExecutorStatusName[41:63], + _ExecutorStatusName[63:84], } // ExecutorStatusString retrieves an enum value from the enum constants string name. @@ -103,11 +103,11 @@ func (i *ExecutorStatus) UnmarshalJSON(data []byte) error { return err } -const _ShardStatusName = "INVALIDREADYDONE" +const _ShardStatusName = "ShardStatusINVALIDShardStatusREADYShardStatusDONE" -var _ShardStatusIndex = [...]uint8{0, 7, 12, 16} +var _ShardStatusIndex = [...]uint8{0, 18, 34, 49} -const _ShardStatusLowerName = "invalidreadydone" +const _ShardStatusLowerName = "shardstatusinvalidshardstatusreadyshardstatusdone" func (i ShardStatus) String() string { if i < 0 || i >= ShardStatus(len(_ShardStatusIndex)-1) { @@ -128,18 +128,18 @@ func _ShardStatusNoOp() { var _ShardStatusValues = []ShardStatus{ShardStatusINVALID, ShardStatusREADY, ShardStatusDONE} var _ShardStatusNameToValueMap = map[string]ShardStatus{ - _ShardStatusName[0:7]: ShardStatusINVALID, - _ShardStatusLowerName[0:7]: ShardStatusINVALID, - _ShardStatusName[7:12]: ShardStatusREADY, - _ShardStatusLowerName[7:12]: ShardStatusREADY, - _ShardStatusName[12:16]: ShardStatusDONE, - _ShardStatusLowerName[12:16]: ShardStatusDONE, + _ShardStatusName[0:18]: ShardStatusINVALID, + _ShardStatusLowerName[0:18]: ShardStatusINVALID, + _ShardStatusName[18:34]: ShardStatusREADY, + _ShardStatusLowerName[18:34]: ShardStatusREADY, + _ShardStatusName[34:49]: ShardStatusDONE, + _ShardStatusLowerName[34:49]: ShardStatusDONE, } var _ShardStatusNames = []string{ - _ShardStatusName[0:7], - _ShardStatusName[7:12], - _ShardStatusName[12:16], + _ShardStatusName[0:18], + _ShardStatusName[18:34], + _ShardStatusName[34:49], } // ShardStatusString retrieves an enum value from the enum constants string name. @@ -194,11 +194,11 @@ func (i *ShardStatus) UnmarshalJSON(data []byte) error { return err } -const _AssignmentStatusName = "INVALIDREADY" +const _AssignmentStatusName = "AssignmentStatusINVALIDAssignmentStatusREADY" -var _AssignmentStatusIndex = [...]uint8{0, 7, 12} +var _AssignmentStatusIndex = [...]uint8{0, 23, 44} -const _AssignmentStatusLowerName = "invalidready" +const _AssignmentStatusLowerName = "assignmentstatusinvalidassignmentstatusready" func (i AssignmentStatus) String() string { if i < 0 || i >= AssignmentStatus(len(_AssignmentStatusIndex)-1) { @@ -218,15 +218,15 @@ func _AssignmentStatusNoOp() { var _AssignmentStatusValues = []AssignmentStatus{AssignmentStatusINVALID, AssignmentStatusREADY} var _AssignmentStatusNameToValueMap = map[string]AssignmentStatus{ - _AssignmentStatusName[0:7]: AssignmentStatusINVALID, - _AssignmentStatusLowerName[0:7]: AssignmentStatusINVALID, - _AssignmentStatusName[7:12]: AssignmentStatusREADY, - _AssignmentStatusLowerName[7:12]: AssignmentStatusREADY, + _AssignmentStatusName[0:23]: AssignmentStatusINVALID, + _AssignmentStatusLowerName[0:23]: AssignmentStatusINVALID, + _AssignmentStatusName[23:44]: AssignmentStatusREADY, + _AssignmentStatusLowerName[23:44]: AssignmentStatusREADY, } var _AssignmentStatusNames = []string{ - _AssignmentStatusName[0:7], - _AssignmentStatusName[7:12], + _AssignmentStatusName[0:23], + _AssignmentStatusName[23:44], } // AssignmentStatusString retrieves an enum value from the enum constants string name. @@ -281,11 +281,11 @@ func (i *AssignmentStatus) UnmarshalJSON(data []byte) error { return err } -const _MigrationModeName = "INVALIDLOCALPASSTHROUGHLOCALPASSTHROUGHSHADOWDISTRIBUTEDPASSTHROUGHONBOARDED" +const _MigrationModeName = "MigrationModeINVALIDMigrationModeLOCALPASSTHROUGHMigrationModeLOCALPASSTHROUGHSHADOWMigrationModeDISTRIBUTEDPASSTHROUGHMigrationModeONBOARDED" -var _MigrationModeIndex = [...]uint8{0, 7, 23, 45, 67, 76} +var _MigrationModeIndex = [...]uint8{0, 20, 49, 84, 119, 141} -const _MigrationModeLowerName = "invalidlocalpassthroughlocalpassthroughshadowdistributedpassthroughonboarded" +const _MigrationModeLowerName = "migrationmodeinvalidmigrationmodelocalpassthroughmigrationmodelocalpassthroughshadowmigrationmodedistributedpassthroughmigrationmodeonboarded" func (i MigrationMode) String() string { if i < 0 || i >= MigrationMode(len(_MigrationModeIndex)-1) { @@ -308,24 +308,24 @@ func _MigrationModeNoOp() { var _MigrationModeValues = []MigrationMode{MigrationModeINVALID, MigrationModeLOCALPASSTHROUGH, MigrationModeLOCALPASSTHROUGHSHADOW, MigrationModeDISTRIBUTEDPASSTHROUGH, MigrationModeONBOARDED} var _MigrationModeNameToValueMap = map[string]MigrationMode{ - _MigrationModeName[0:7]: MigrationModeINVALID, - _MigrationModeLowerName[0:7]: MigrationModeINVALID, - _MigrationModeName[7:23]: MigrationModeLOCALPASSTHROUGH, - _MigrationModeLowerName[7:23]: MigrationModeLOCALPASSTHROUGH, - _MigrationModeName[23:45]: MigrationModeLOCALPASSTHROUGHSHADOW, - _MigrationModeLowerName[23:45]: MigrationModeLOCALPASSTHROUGHSHADOW, - _MigrationModeName[45:67]: MigrationModeDISTRIBUTEDPASSTHROUGH, - _MigrationModeLowerName[45:67]: MigrationModeDISTRIBUTEDPASSTHROUGH, - _MigrationModeName[67:76]: MigrationModeONBOARDED, - _MigrationModeLowerName[67:76]: MigrationModeONBOARDED, + _MigrationModeName[0:20]: MigrationModeINVALID, + _MigrationModeLowerName[0:20]: MigrationModeINVALID, + _MigrationModeName[20:49]: MigrationModeLOCALPASSTHROUGH, + _MigrationModeLowerName[20:49]: MigrationModeLOCALPASSTHROUGH, + _MigrationModeName[49:84]: MigrationModeLOCALPASSTHROUGHSHADOW, + _MigrationModeLowerName[49:84]: MigrationModeLOCALPASSTHROUGHSHADOW, + _MigrationModeName[84:119]: MigrationModeDISTRIBUTEDPASSTHROUGH, + _MigrationModeLowerName[84:119]: MigrationModeDISTRIBUTEDPASSTHROUGH, + _MigrationModeName[119:141]: MigrationModeONBOARDED, + _MigrationModeLowerName[119:141]: MigrationModeONBOARDED, } var _MigrationModeNames = []string{ - _MigrationModeName[0:7], - _MigrationModeName[7:23], - _MigrationModeName[23:45], - _MigrationModeName[45:67], - _MigrationModeName[67:76], + _MigrationModeName[0:20], + _MigrationModeName[20:49], + _MigrationModeName[49:84], + _MigrationModeName[84:119], + _MigrationModeName[119:141], } // MigrationModeString retrieves an enum value from the enum constants string name. diff --git a/service/sharddistributor/store/etcd/etcdtypes/state_test.go b/service/sharddistributor/store/etcd/etcdtypes/state_test.go index c2f104235f2..14f12e1dce1 100644 --- a/service/sharddistributor/store/etcd/etcdtypes/state_test.go +++ b/service/sharddistributor/store/etcd/etcdtypes/state_test.go @@ -112,7 +112,7 @@ func TestAssignedState_FromAssignedState(t *testing.T) { } func TestAssignedState_JSONMarshalling(t *testing.T) { - const jsonStr = `{"assigned_shards":{"1":{"status":"READY"}},"last_updated":"2025-11-18T12:00:00.123456789Z","mod_revision":42}` + const jsonStr = `{"assigned_shards":{"1":{"status":"AssignmentStatusREADY"}},"last_updated":"2025-11-18T12:00:00.123456789Z","mod_revision":42}` state := &AssignedState{ AssignedShards: map[string]*types.ShardAssignment{ From b94764d828df2b7a8870e9d607bc2068132df979 Mon Sep 17 00:00:00 2001 From: Jakob Haahr Taankvist Date: Fri, 28 Nov 2025 09:38:07 +0100 Subject: [PATCH 4/8] feat(shard-distributor): return executor metadata from spectator GetShardOwner (#7476) **What changed?** Changed `GetShardOwner` to return an `ExecutorOwnership` struct containing both executor ID and metadata map, instead of just the executor ID string. Also adds a Spectators group so we can easily pass around all spectators. **Why?** Enables callers to access additional executor information like gRPC address for peer routing, without requiring separate lookups. This is needed for implementing canary peer chooser that routes requests to executors based on their addresses. **How did you test it?** Updated all tests to verify metadata is included in responses. Verified locally that ownership information includes metadata. **Potential risks** Low - this is an API enhancement that maintains backward compatibility by returning the same executor ID, just with additional metadata. **Release notes** **Documentation Changes** None --------- Signed-off-by: Jakob Haahr Taankvist Signed-off-by: Gaziza Yestemirova --- .../client/spectatorclient/client.go | 63 +++++++++++++------ .../client/spectatorclient/clientimpl.go | 14 +++-- .../client/spectatorclient/clientimpl_test.go | 32 +++++++--- .../client/spectatorclient/interface_mock.go | 4 +- 4 files changed, 81 insertions(+), 32 deletions(-) diff --git a/service/sharddistributor/client/spectatorclient/client.go b/service/sharddistributor/client/spectatorclient/client.go index c4b19eb094a..8f6e2f1e8b3 100644 --- a/service/sharddistributor/client/spectatorclient/client.go +++ b/service/sharddistributor/client/spectatorclient/client.go @@ -20,13 +20,52 @@ import ( //go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination interface_mock.go . Spectator +type Spectators struct { + spectators map[string]Spectator +} + +func (s *Spectators) ForNamespace(namespace string) (Spectator, error) { + spectator, ok := s.spectators[namespace] + if !ok { + return nil, fmt.Errorf("spectator not found for namespace %s", namespace) + } + return spectator, nil +} + +func (s *Spectators) Start(ctx context.Context) error { + for namespace, spectator := range s.spectators { + if err := spectator.Start(ctx); err != nil { + return fmt.Errorf("start spectator for namespace %s: %w", namespace, err) + } + } + return nil +} + +func (s *Spectators) Stop() { + for _, spectator := range s.spectators { + spectator.Stop() + } +} + +func NewSpectators(params Params) (*Spectators, error) { + spectators := make(map[string]Spectator) + for _, namespace := range params.Config.Namespaces { + spectator, err := NewSpectatorWithNamespace(params, namespace.Namespace) + if err != nil { + return nil, fmt.Errorf("create spectator for namespace %s: %w", namespace.Namespace, err) + } + + spectators[namespace.Namespace] = spectator + } + return &Spectators{spectators: spectators}, nil +} + type Spectator interface { Start(ctx context.Context) error Stop() - // GetShardOwner returns the owner of a shard. It first checks the local cache, - // and if not found, falls back to querying the shard distributor directly. - GetShardOwner(ctx context.Context, shardKey string) (string, error) + // GetShardOwner returns the owner of a shard + GetShardOwner(ctx context.Context, shardKey string) (*ShardOwner, error) } type Params struct { @@ -109,21 +148,9 @@ func createShardDistributorClient(yarpcClient sharddistributorv1.ShardDistributo // Module creates a spectator module using auto-selection (single namespace only) func Module() fx.Option { return fx.Module("shard-distributor-spectator-client", - fx.Provide(NewSpectator), - fx.Invoke(func(spectator Spectator, lc fx.Lifecycle) { - lc.Append(fx.StartStopHook(spectator.Start, spectator.Stop)) - }), - ) -} - -// ModuleWithNamespace creates a spectator module for a specific namespace -func ModuleWithNamespace(namespace string) fx.Option { - return fx.Module(fmt.Sprintf("shard-distributor-spectator-client-%s", namespace), - fx.Provide(func(params Params) (Spectator, error) { - return NewSpectatorWithNamespace(params, namespace) - }), - fx.Invoke(func(spectator Spectator, lc fx.Lifecycle) { - lc.Append(fx.StartStopHook(spectator.Start, spectator.Stop)) + fx.Provide(NewSpectators), + fx.Invoke(func(spectators *Spectators, lc fx.Lifecycle) { + lc.Append(fx.StartStopHook(spectators.Start, spectators.Stop)) }), ) } diff --git a/service/sharddistributor/client/spectatorclient/clientimpl.go b/service/sharddistributor/client/spectatorclient/clientimpl.go index 88934cdadf6..15c6d32d8b5 100644 --- a/service/sharddistributor/client/spectatorclient/clientimpl.go +++ b/service/sharddistributor/client/spectatorclient/clientimpl.go @@ -103,6 +103,7 @@ func (s *spectatorImpl) watchLoop() { // Server shutdown or network issue - recreate stream (load balancer will route to new server) s.logger.Info("Stream ended, reconnecting", tag.ShardNamespace(s.namespace)) + s.timeSource.Sleep(backoff.JitDuration(streamRetryInterval, streamRetryJitterCoeff)) } } @@ -163,10 +164,10 @@ func (s *spectatorImpl) handleResponse(response *types.WatchNamespaceStateRespon tag.Counter(len(response.Executors))) } -// GetShardOwner returns the executor ID for a given shard. +// GetShardOwner returns the full owner information including metadata for a given shard. // It first waits for the initial state to be received, then checks the cache. // If not found in cache, it falls back to querying the shard distributor directly. -func (s *spectatorImpl) GetShardOwner(ctx context.Context, shardKey string) (string, error) { +func (s *spectatorImpl) GetShardOwner(ctx context.Context, shardKey string) (*ShardOwner, error) { // Wait for first state to be received to avoid flooding shard distributor on startup s.firstStateWG.Wait() @@ -176,7 +177,7 @@ func (s *spectatorImpl) GetShardOwner(ctx context.Context, shardKey string) (str s.stateMu.RUnlock() if owner != nil { - return owner.ExecutorID, nil + return owner, nil } // Cache miss - fall back to RPC call @@ -189,8 +190,11 @@ func (s *spectatorImpl) GetShardOwner(ctx context.Context, shardKey string) (str ShardKey: shardKey, }) if err != nil { - return "", fmt.Errorf("get shard owner from shard distributor: %w", err) + return nil, fmt.Errorf("get shard owner from shard distributor: %w", err) } - return response.Owner, nil + return &ShardOwner{ + ExecutorID: response.Owner, + Metadata: response.Metadata, + }, nil } diff --git a/service/sharddistributor/client/spectatorclient/clientimpl_test.go b/service/sharddistributor/client/spectatorclient/clientimpl_test.go index a0c60aa2706..637883b1647 100644 --- a/service/sharddistributor/client/spectatorclient/clientimpl_test.go +++ b/service/sharddistributor/client/spectatorclient/clientimpl_test.go @@ -44,6 +44,9 @@ func TestWatchLoopBasicFlow(t *testing.T) { Executors: []*types.ExecutorShardAssignment{ { ExecutorID: "executor-1", + Metadata: map[string]string{ + "grpc_address": "127.0.0.1:7953", + }, AssignedShards: []*types.Shard{ {ShardKey: "shard-1"}, {ShardKey: "shard-2"}, @@ -72,11 +75,12 @@ func TestWatchLoopBasicFlow(t *testing.T) { // Query shard owner owner, err := spectator.GetShardOwner(context.Background(), "shard-1") assert.NoError(t, err) - assert.Equal(t, "executor-1", owner) + assert.Equal(t, "executor-1", owner.ExecutorID) + assert.Equal(t, "127.0.0.1:7953", owner.Metadata["grpc_address"]) owner, err = spectator.GetShardOwner(context.Background(), "shard-2") assert.NoError(t, err) - assert.Equal(t, "executor-1", owner) + assert.Equal(t, "executor-1", owner.ExecutorID) } func TestGetShardOwner_CacheMiss_FallbackToRPC(t *testing.T) { @@ -103,7 +107,13 @@ func TestGetShardOwner_CacheMiss_FallbackToRPC(t *testing.T) { // First Recv returns state mockStream.EXPECT().Recv().Return(&types.WatchNamespaceStateResponse{ Executors: []*types.ExecutorShardAssignment{ - {ExecutorID: "executor-1", AssignedShards: []*types.Shard{{ShardKey: "shard-1"}}}, + { + ExecutorID: "executor-1", + Metadata: map[string]string{ + "grpc_address": "127.0.0.1:7953", + }, + AssignedShards: []*types.Shard{{ShardKey: "shard-1"}}, + }, }, }, nil) @@ -122,7 +132,12 @@ func TestGetShardOwner_CacheMiss_FallbackToRPC(t *testing.T) { Namespace: "test-ns", ShardKey: "unknown-shard", }). - Return(&types.GetShardOwnerResponse{Owner: "executor-2"}, nil) + Return(&types.GetShardOwnerResponse{ + Owner: "executor-2", + Metadata: map[string]string{ + "grpc_address": "127.0.0.1:7954", + }, + }, nil) spectator.Start(context.Background()) defer spectator.Stop() @@ -132,12 +147,13 @@ func TestGetShardOwner_CacheMiss_FallbackToRPC(t *testing.T) { // Cache hit owner, err := spectator.GetShardOwner(context.Background(), "shard-1") assert.NoError(t, err) - assert.Equal(t, "executor-1", owner) + assert.Equal(t, "executor-1", owner.ExecutorID) // Cache miss - should trigger RPC owner, err = spectator.GetShardOwner(context.Background(), "unknown-shard") assert.NoError(t, err) - assert.Equal(t, "executor-2", owner) + assert.Equal(t, "executor-2", owner.ExecutorID) + assert.Equal(t, "127.0.0.1:7954", owner.Metadata["grpc_address"]) } func TestStreamReconnection(t *testing.T) { @@ -188,7 +204,9 @@ func TestStreamReconnection(t *testing.T) { spectator.Start(context.Background()) defer spectator.Stop() - // Advance time for retry + // Wait for the goroutine to be blocked in Sleep, then advance time + mockTimeSource.BlockUntil(1) // Wait for 1 goroutine to be blocked in Sleep mockTimeSource.Advance(2 * time.Second) + spectator.firstStateWG.Wait() } diff --git a/service/sharddistributor/client/spectatorclient/interface_mock.go b/service/sharddistributor/client/spectatorclient/interface_mock.go index 5b1eaaa5500..0e68d476608 100644 --- a/service/sharddistributor/client/spectatorclient/interface_mock.go +++ b/service/sharddistributor/client/spectatorclient/interface_mock.go @@ -41,10 +41,10 @@ func (m *MockSpectator) EXPECT() *MockSpectatorMockRecorder { } // GetShardOwner mocks base method. -func (m *MockSpectator) GetShardOwner(ctx context.Context, shardKey string) (string, error) { +func (m *MockSpectator) GetShardOwner(ctx context.Context, shardKey string) (*ShardOwner, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetShardOwner", ctx, shardKey) - ret0, _ := ret[0].(string) + ret0, _ := ret[0].(*ShardOwner) ret1, _ := ret[1].(error) return ret0, ret1 } From af3b6fa85f933241359cc20091f17294c0cc872a Mon Sep 17 00:00:00 2001 From: Gaziza Yestemirova Date: Fri, 28 Nov 2025 10:42:06 +0100 Subject: [PATCH 5/8] handle context cancelled Signed-off-by: Gaziza Yestemirova --- .../leader/process/processor.go | 21 +++++++++++++++++++ .../store/etcd/executorstore/etcdstore.go | 12 +++++++++++ 2 files changed, 33 insertions(+) diff --git a/service/sharddistributor/leader/process/processor.go b/service/sharddistributor/leader/process/processor.go index 76abb573900..a6572cb5a01 100644 --- a/service/sharddistributor/leader/process/processor.go +++ b/service/sharddistributor/leader/process/processor.go @@ -2,6 +2,7 @@ package process import ( "context" + "errors" "fmt" "maps" "math/rand" @@ -214,6 +215,9 @@ func (p *namespaceProcessor) runRebalancingLoop(ctx context.Context) { err = p.rebalanceShards(ctx) } if err != nil { + if isCancelledOrDeadlineExceeded(err) { + return + } p.logger.Error("rebalance failed", tag.Error(err)) } } @@ -233,6 +237,9 @@ func (p *namespaceProcessor) runShardStatsCleanupLoop(ctx context.Context) { p.logger.Info("Periodic shard stats cleanup triggered.") namespaceState, err := p.shardStore.GetState(ctx, p.namespaceCfg.Name) if err != nil { + if isCancelledOrDeadlineExceeded(err) { + return + } p.logger.Error("Failed to get state for shard stats cleanup", tag.Error(err)) continue } @@ -242,6 +249,9 @@ func (p *namespaceProcessor) runShardStatsCleanupLoop(ctx context.Context) { continue } if err := p.shardStore.DeleteShardStats(ctx, p.namespaceCfg.Name, staleShardStats, p.election.Guard()); err != nil { + if isCancelledOrDeadlineExceeded(err) { + return + } p.logger.Error("Failed to delete stale shard stats", tag.Error(err)) } } @@ -340,6 +350,9 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo namespaceState, err := p.shardStore.GetState(ctx, p.namespaceCfg.Name) if err != nil { + if isCancelledOrDeadlineExceeded(err) { + return err + } return fmt.Errorf("get state: %w", err) } @@ -386,6 +399,9 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo ExecutorsToDelete: staleExecutors, }, p.election.Guard()) if err != nil { + if isCancelledOrDeadlineExceeded(err) { + return err + } return fmt.Errorf("assign shards: %w", err) } @@ -586,3 +602,8 @@ func makeShards(num int64) []string { } return shards } + +func isCancelledOrDeadlineExceeded(err error) bool { + return errors.Is(err, context.Canceled) || + errors.Is(err, context.DeadlineExceeded) +} diff --git a/service/sharddistributor/store/etcd/executorstore/etcdstore.go b/service/sharddistributor/store/etcd/executorstore/etcdstore.go index 4c4a2f7462f..5b5add49415 100644 --- a/service/sharddistributor/store/etcd/executorstore/etcdstore.go +++ b/service/sharddistributor/store/etcd/executorstore/etcdstore.go @@ -233,8 +233,14 @@ func (s *executorStoreImpl) GetState(ctx context.Context, namespace string) (*st executorPrefix := etcdkeys.BuildExecutorsPrefix(s.prefix, namespace) resp, err := s.client.Get(ctx, executorPrefix, clientv3.WithPrefix()) if err != nil { + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return nil, ctx.Err() + } return nil, fmt.Errorf("get executor data: %w", err) } + if ctxErr := ctx.Err(); errors.Is(ctxErr, context.Canceled) || errors.Is(ctxErr, context.DeadlineExceeded) { + return nil, ctxErr + } for _, kv := range resp.Kvs { key := string(kv.Key) @@ -276,8 +282,14 @@ func (s *executorStoreImpl) GetState(ctx context.Context, namespace string) (*st shardsPrefix := etcdkeys.BuildShardsPrefix(s.prefix, namespace) shardResp, err := s.client.Get(ctx, shardsPrefix, clientv3.WithPrefix()) if err != nil { + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return nil, ctx.Err() + } return nil, fmt.Errorf("get shard data: %w", err) } + if ctxErr := ctx.Err(); errors.Is(ctxErr, context.Canceled) || errors.Is(ctxErr, context.DeadlineExceeded) { + return nil, ctxErr + } for _, kv := range shardResp.Kvs { shardID, shardKeyType, err := etcdkeys.ParseShardKey(s.prefix, namespace, string(kv.Key)) if err != nil { From ffa7b1daadf8216d5613c95173b3112d1d75b6c0 Mon Sep 17 00:00:00 2001 From: Gaziza Yestemirova Date: Mon, 1 Dec 2025 12:44:42 +0100 Subject: [PATCH 6/8] move IsContextCancellation Signed-off-by: Gaziza Yestemirova --- .../sharddistributor/leader/process/processor.go | 15 +++++---------- .../store/etcd/executorstore/etcdstore.go | 8 ++++---- service/sharddistributor/store/store.go | 11 +++++++++++ 3 files changed, 20 insertions(+), 14 deletions(-) diff --git a/service/sharddistributor/leader/process/processor.go b/service/sharddistributor/leader/process/processor.go index 630a7a44c86..fca6fc97e32 100644 --- a/service/sharddistributor/leader/process/processor.go +++ b/service/sharddistributor/leader/process/processor.go @@ -215,7 +215,7 @@ func (p *namespaceProcessor) runRebalancingLoop(ctx context.Context) { err = p.rebalanceShards(ctx) } if err != nil { - if isCancelledOrDeadlineExceeded(err) { + if store.IsContextCancellation(err) { return } p.logger.Error("rebalance failed", tag.Error(err)) @@ -237,7 +237,7 @@ func (p *namespaceProcessor) runShardStatsCleanupLoop(ctx context.Context) { p.logger.Info("Periodic shard stats cleanup triggered.") namespaceState, err := p.shardStore.GetState(ctx, p.namespaceCfg.Name) if err != nil { - if isCancelledOrDeadlineExceeded(err) { + if store.IsContextCancellation(err) { return } p.logger.Error("Failed to get state for shard stats cleanup", tag.Error(err)) @@ -249,7 +249,7 @@ func (p *namespaceProcessor) runShardStatsCleanupLoop(ctx context.Context) { continue } if err := p.shardStore.DeleteShardStats(ctx, p.namespaceCfg.Name, staleShardStats, p.election.Guard()); err != nil { - if isCancelledOrDeadlineExceeded(err) { + if store.IsContextCancellation(err) { return } p.logger.Error("Failed to delete stale shard stats", tag.Error(err)) @@ -349,7 +349,7 @@ func (p *namespaceProcessor) rebalanceShards(ctx context.Context) (err error) { func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoopScope metrics.Scope) (err error) { namespaceState, err := p.shardStore.GetState(ctx, p.namespaceCfg.Name) if err != nil { - if isCancelledOrDeadlineExceeded(err) { + if store.IsContextCancellation(err) { return err } return fmt.Errorf("get state: %w", err) @@ -398,7 +398,7 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo ExecutorsToDelete: staleExecutors, }, p.election.Guard()) if err != nil { - if isCancelledOrDeadlineExceeded(err) { + if store.IsContextCancellation(err) { return err } return fmt.Errorf("assign shards: %w", err) @@ -679,8 +679,3 @@ func makeShards(num int64) []string { } return shards } - -func isCancelledOrDeadlineExceeded(err error) bool { - return errors.Is(err, context.Canceled) || - errors.Is(err, context.DeadlineExceeded) -} diff --git a/service/sharddistributor/store/etcd/executorstore/etcdstore.go b/service/sharddistributor/store/etcd/executorstore/etcdstore.go index 5b5add49415..c0a09b8a642 100644 --- a/service/sharddistributor/store/etcd/executorstore/etcdstore.go +++ b/service/sharddistributor/store/etcd/executorstore/etcdstore.go @@ -233,12 +233,12 @@ func (s *executorStoreImpl) GetState(ctx context.Context, namespace string) (*st executorPrefix := etcdkeys.BuildExecutorsPrefix(s.prefix, namespace) resp, err := s.client.Get(ctx, executorPrefix, clientv3.WithPrefix()) if err != nil { - if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + if store.IsContextCancellation(err) { return nil, ctx.Err() } return nil, fmt.Errorf("get executor data: %w", err) } - if ctxErr := ctx.Err(); errors.Is(ctxErr, context.Canceled) || errors.Is(ctxErr, context.DeadlineExceeded) { + if ctxErr := ctx.Err(); store.IsContextCancellation(ctxErr) { return nil, ctxErr } @@ -282,12 +282,12 @@ func (s *executorStoreImpl) GetState(ctx context.Context, namespace string) (*st shardsPrefix := etcdkeys.BuildShardsPrefix(s.prefix, namespace) shardResp, err := s.client.Get(ctx, shardsPrefix, clientv3.WithPrefix()) if err != nil { - if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + if store.IsContextCancellation(err) { return nil, ctx.Err() } return nil, fmt.Errorf("get shard data: %w", err) } - if ctxErr := ctx.Err(); errors.Is(ctxErr, context.Canceled) || errors.Is(ctxErr, context.DeadlineExceeded) { + if ctxErr := ctx.Err(); store.IsContextCancellation(ctxErr) { return nil, ctxErr } for _, kv := range shardResp.Kvs { diff --git a/service/sharddistributor/store/store.go b/service/sharddistributor/store/store.go index f2b2d3d0774..378f0e18a1a 100644 --- a/service/sharddistributor/store/store.go +++ b/service/sharddistributor/store/store.go @@ -2,6 +2,7 @@ package store import ( "context" + "errors" "fmt" ) @@ -48,6 +49,16 @@ func NopGuard() GuardFunc { } } +// IsContextCancellation reports whether the provided error indicates the caller's context +// has been cancelled or its deadline has been exceeded. +func IsContextCancellation(err error) bool { + if err == nil { + return false + } + return errors.Is(err, context.Canceled) || + errors.Is(err, context.DeadlineExceeded) +} + // AssignShardsRequest is a request to assign shards to executors, and remove unused shards. type AssignShardsRequest struct { // NewState is the new state of the namespace, containing the new assignments of shards to executors. From fcf94ce79a36f28a73768be451813f450ca2f2a5 Mon Sep 17 00:00:00 2001 From: Gaziza Yestemirova Date: Mon, 1 Dec 2025 15:18:51 +0100 Subject: [PATCH 7/8] remove etcdstore changes Signed-off-by: Gaziza Yestemirova --- .../store/etcd/executorstore/etcdstore.go | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/service/sharddistributor/store/etcd/executorstore/etcdstore.go b/service/sharddistributor/store/etcd/executorstore/etcdstore.go index c0a09b8a642..14b87c84e0a 100644 --- a/service/sharddistributor/store/etcd/executorstore/etcdstore.go +++ b/service/sharddistributor/store/etcd/executorstore/etcdstore.go @@ -233,14 +233,8 @@ func (s *executorStoreImpl) GetState(ctx context.Context, namespace string) (*st executorPrefix := etcdkeys.BuildExecutorsPrefix(s.prefix, namespace) resp, err := s.client.Get(ctx, executorPrefix, clientv3.WithPrefix()) if err != nil { - if store.IsContextCancellation(err) { - return nil, ctx.Err() - } return nil, fmt.Errorf("get executor data: %w", err) } - if ctxErr := ctx.Err(); store.IsContextCancellation(ctxErr) { - return nil, ctxErr - } for _, kv := range resp.Kvs { key := string(kv.Key) @@ -282,14 +276,9 @@ func (s *executorStoreImpl) GetState(ctx context.Context, namespace string) (*st shardsPrefix := etcdkeys.BuildShardsPrefix(s.prefix, namespace) shardResp, err := s.client.Get(ctx, shardsPrefix, clientv3.WithPrefix()) if err != nil { - if store.IsContextCancellation(err) { - return nil, ctx.Err() - } return nil, fmt.Errorf("get shard data: %w", err) } - if ctxErr := ctx.Err(); store.IsContextCancellation(ctxErr) { - return nil, ctxErr - } + for _, kv := range shardResp.Kvs { shardID, shardKeyType, err := etcdkeys.ParseShardKey(s.prefix, namespace, string(kv.Key)) if err != nil { From cf23b9f8a72b821ea995734ca1c33ba1d9c359a1 Mon Sep 17 00:00:00 2001 From: Gaziza Yestemirova Date: Mon, 1 Dec 2025 15:22:51 +0100 Subject: [PATCH 8/8] rm space Signed-off-by: Gaziza Yestemirova --- service/sharddistributor/store/etcd/executorstore/etcdstore.go | 1 - 1 file changed, 1 deletion(-) diff --git a/service/sharddistributor/store/etcd/executorstore/etcdstore.go b/service/sharddistributor/store/etcd/executorstore/etcdstore.go index 14b87c84e0a..4c4a2f7462f 100644 --- a/service/sharddistributor/store/etcd/executorstore/etcdstore.go +++ b/service/sharddistributor/store/etcd/executorstore/etcdstore.go @@ -278,7 +278,6 @@ func (s *executorStoreImpl) GetState(ctx context.Context, namespace string) (*st if err != nil { return nil, fmt.Errorf("get shard data: %w", err) } - for _, kv := range shardResp.Kvs { shardID, shardKeyType, err := etcdkeys.ParseShardKey(s.prefix, namespace, string(kv.Key)) if err != nil {