Skip to content

Commit 956a742

Browse files
authored
fix(shard-distributor): send initial state to new subscribers (#7499)
<!-- Describe what has changed in this PR --> **What changed?** Modified the `Subscribe` method in `namespaceShardToExecutor` to send the initial executor state to new subscribers immediately upon subscription, and refactored `getExecutorState` into a separate method to safely retrieve a copy of the current state. Removed logic for sending initial state from the handler. The logic didn't send the metadata for the executors. Improved test infrastructure by updating `setupExecutorWithShards` to use atomic etcd transactions and to properly delete old testdata. <!-- Tell your future self why have you made these changes --> **Why?** New subscribers were not receiving the initial state when they subscribed to the executor state pub/sub, causing them to wait for the next update before getting data. This lead to delays in subscribers getting the current shard distribution. The test improvements ensure executor setup happens atomically and makes tests more maintainable by ensuring deletion of stale state. <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** Unit tests <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes** --------- Signed-off-by: Jakob Haahr Taankvist <jht@uber.com>
1 parent a88d516 commit 956a742

File tree

7 files changed

+179
-140
lines changed

7 files changed

+179
-140
lines changed

service/sharddistributor/handler/handler.go

Lines changed: 1 addition & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -157,16 +157,6 @@ func (h *handlerImpl) WatchNamespaceState(request *types.WatchNamespaceStateRequ
157157
return &types.InternalServiceError{Message: fmt.Sprintf("failed to subscribe to namespace state: %v", err)}
158158
}
159159

160-
// Send initial state immediately so client doesn't have to wait for first update
161-
state, err := h.storage.GetState(server.Context(), request.Namespace)
162-
if err != nil {
163-
return &types.InternalServiceError{Message: fmt.Sprintf("failed to get namespace state: %v", err)}
164-
}
165-
response := toWatchNamespaceStateResponse(state)
166-
if err := server.Send(response); err != nil {
167-
return fmt.Errorf("send initial state: %w", err)
168-
}
169-
170160
// Stream subsequent updates
171161
for {
172162
select {
@@ -177,7 +167,7 @@ func (h *handlerImpl) WatchNamespaceState(request *types.WatchNamespaceStateRequ
177167
return fmt.Errorf("unexpected close of updates channel")
178168
}
179169
response := &types.WatchNamespaceStateResponse{
180-
Executors: make([]*types.ExecutorShardAssignment, 0, len(state.ShardAssignments)),
170+
Executors: make([]*types.ExecutorShardAssignment, 0, len(assignmentChanges)),
181171
}
182172
for executor, shardIDs := range assignmentChanges {
183173
response.Executors = append(response.Executors, &types.ExecutorShardAssignment{
@@ -195,27 +185,6 @@ func (h *handlerImpl) WatchNamespaceState(request *types.WatchNamespaceStateRequ
195185
}
196186
}
197187

198-
func toWatchNamespaceStateResponse(state *store.NamespaceState) *types.WatchNamespaceStateResponse {
199-
response := &types.WatchNamespaceStateResponse{
200-
Executors: make([]*types.ExecutorShardAssignment, 0, len(state.ShardAssignments)),
201-
}
202-
203-
for executorID, assignment := range state.ShardAssignments {
204-
// Extract shard IDs from the assigned shards map
205-
shardIDs := make([]string, 0, len(assignment.AssignedShards))
206-
for shardID := range assignment.AssignedShards {
207-
shardIDs = append(shardIDs, shardID)
208-
}
209-
210-
response.Executors = append(response.Executors, &types.ExecutorShardAssignment{
211-
ExecutorID: executorID,
212-
AssignedShards: WrapShards(shardIDs),
213-
Metadata: state.Executors[executorID].Metadata,
214-
})
215-
}
216-
return response
217-
}
218-
219188
func WrapShards(shardIDs []string) []*types.Shard {
220189
shards := make([]*types.Shard, 0, len(shardIDs))
221190
for _, shardID := range shardIDs {

service/sharddistributor/handler/handler_test.go

Lines changed: 21 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -243,62 +243,31 @@ func TestWatchNamespaceState(t *testing.T) {
243243
startWG: sync.WaitGroup{},
244244
}
245245

246-
t.Run("successful streaming", func(t *testing.T) {
247-
ctx, cancel := context.WithCancel(context.Background())
246+
ctx, cancel := context.WithCancel(context.Background())
248247

249-
initialState := &store.NamespaceState{
250-
ShardAssignments: map[string]store.AssignedState{
251-
"executor-1": {
252-
AssignedShards: map[string]*types.ShardAssignment{
253-
"shard-1": {},
254-
},
255-
},
256-
},
257-
}
258-
259-
updatesChan := make(chan map[*store.ShardOwner][]string, 1)
260-
unsubscribe := func() { close(updatesChan) }
261-
262-
mockServer.EXPECT().Context().Return(ctx).AnyTimes()
263-
mockStorage.EXPECT().GetState(gomock.Any(), "test-ns").Return(initialState, nil)
264-
mockStorage.EXPECT().SubscribeToAssignmentChanges(gomock.Any(), "test-ns").Return(updatesChan, unsubscribe, nil)
265-
266-
// Expect initial state send
267-
mockServer.EXPECT().Send(gomock.Any()).DoAndReturn(func(resp *types.WatchNamespaceStateResponse) error {
268-
require.Len(t, resp.Executors, 1)
269-
require.Equal(t, "executor-1", resp.Executors[0].ExecutorID)
270-
return nil
271-
})
272-
273-
// Expect update send
274-
mockServer.EXPECT().Send(gomock.Any()).DoAndReturn(func(resp *types.WatchNamespaceStateResponse) error {
275-
require.Len(t, resp.Executors, 1)
276-
require.Equal(t, "executor-2", resp.Executors[0].ExecutorID)
277-
return nil
278-
})
248+
updatesChan := make(chan map[*store.ShardOwner][]string, 1)
249+
unsubscribe := func() { close(updatesChan) }
279250

280-
// Send update, then cancel
281-
go func() {
282-
time.Sleep(10 * time.Millisecond)
283-
updatesChan <- map[*store.ShardOwner][]string{
284-
{ExecutorID: "executor-2", Metadata: map[string]string{}}: {"shard-2"},
285-
}
286-
cancel()
287-
}()
251+
mockServer.EXPECT().Context().Return(ctx).AnyTimes()
252+
mockStorage.EXPECT().SubscribeToAssignmentChanges(gomock.Any(), "test-ns").Return(updatesChan, unsubscribe, nil)
288253

289-
err := handler.WatchNamespaceState(&types.WatchNamespaceStateRequest{Namespace: "test-ns"}, mockServer)
290-
require.Error(t, err)
291-
require.ErrorIs(t, err, context.Canceled)
254+
// Expect update send
255+
mockServer.EXPECT().Send(gomock.Any()).DoAndReturn(func(resp *types.WatchNamespaceStateResponse) error {
256+
require.Len(t, resp.Executors, 1)
257+
require.Equal(t, "executor-1", resp.Executors[0].ExecutorID)
258+
return nil
292259
})
293260

294-
t.Run("storage error on initial state", func(t *testing.T) {
295-
ctx := context.Background()
296-
mockServer.EXPECT().Context().Return(ctx).AnyTimes()
297-
mockStorage.EXPECT().GetState(gomock.Any(), "test-ns").Return(nil, errors.New("storage error"))
298-
mockStorage.EXPECT().SubscribeToAssignmentChanges(gomock.Any(), "test-ns").Return(make(chan map[*store.ShardOwner][]string), func() {}, nil)
261+
// Send update, then cancel
262+
go func() {
263+
time.Sleep(10 * time.Millisecond)
264+
updatesChan <- map[*store.ShardOwner][]string{
265+
{ExecutorID: "executor-1", Metadata: map[string]string{}}: {"shard-1"},
266+
}
267+
cancel()
268+
}()
299269

300-
err := handler.WatchNamespaceState(&types.WatchNamespaceStateRequest{Namespace: "test-ns"}, mockServer)
301-
require.Error(t, err)
302-
require.Contains(t, err.Error(), "failed to get namespace state: storage error")
303-
})
270+
err := handler.WatchNamespaceState(&types.WatchNamespaceStateRequest{Namespace: "test-ns"}, mockServer)
271+
require.Error(t, err)
272+
require.ErrorIs(t, err, context.Canceled)
304273
}

service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache.go

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,22 @@ func (n *namespaceShardToExecutor) GetExecutorModRevisionCmp() ([]clientv3.Cmp,
9797
}
9898

9999
func (n *namespaceShardToExecutor) Subscribe(ctx context.Context) (<-chan map[*store.ShardOwner][]string, func()) {
100-
return n.pubSub.subscribe(ctx)
100+
subCh, unSub := n.pubSub.subscribe(ctx)
101+
102+
// The go routine sends the initial state to the subscriber.
103+
go func() {
104+
initialState := n.getExecutorState()
105+
106+
select {
107+
case <-ctx.Done():
108+
n.logger.Warn("context finnished before initial state was sent", tag.ShardNamespace(n.namespace))
109+
case subCh <- initialState:
110+
n.logger.Info("initial state sent to subscriber", tag.ShardNamespace(n.namespace), tag.Value(initialState))
111+
}
112+
113+
}()
114+
115+
return subCh, unSub
101116
}
102117

103118
func (n *namespaceShardToExecutor) nameSpaceRefreashLoop() {
@@ -135,16 +150,20 @@ func (n *namespaceShardToExecutor) refresh(ctx context.Context) error {
135150
return fmt.Errorf("refresh executor state: %w", err)
136151
}
137152

153+
n.pubSub.publish(n.getExecutorState())
154+
return nil
155+
}
156+
157+
func (n *namespaceShardToExecutor) getExecutorState() map[*store.ShardOwner][]string {
138158
n.RLock()
159+
defer n.RUnlock()
139160
executorState := make(map[*store.ShardOwner][]string)
140161
for executor, shardIDs := range n.executorState {
141162
executorState[executor] = make([]string, len(shardIDs))
142163
copy(executorState[executor], shardIDs)
143164
}
144-
n.RUnlock()
145165

146-
n.pubSub.publish(executorState)
147-
return nil
166+
return executorState
148167
}
149168

150169
func (n *namespaceShardToExecutor) refreshExecutorState(ctx context.Context) error {

service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache_test.go

Lines changed: 127 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -9,69 +9,30 @@ import (
99

1010
"github.com/stretchr/testify/assert"
1111
"github.com/stretchr/testify/require"
12+
clientv3 "go.etcd.io/etcd/client/v3"
1213

1314
"github.com/uber/cadence/common/log/testlogger"
1415
"github.com/uber/cadence/common/types"
16+
"github.com/uber/cadence/service/sharddistributor/store"
1517
"github.com/uber/cadence/service/sharddistributor/store/etcd/etcdkeys"
1618
"github.com/uber/cadence/service/sharddistributor/store/etcd/etcdtypes"
1719
"github.com/uber/cadence/service/sharddistributor/store/etcd/testhelper"
1820
)
1921

20-
// setupExecutorWithShards creates an executor in etcd with assigned shards and metadata
21-
func setupExecutorWithShards(t *testing.T, testCluster *testhelper.StoreTestCluster, namespace, executorID string, shards []string, metadata map[string]string) {
22-
// Create assigned state
23-
assignedState := &etcdtypes.AssignedState{
24-
AssignedShards: make(map[string]*types.ShardAssignment),
25-
}
26-
for _, shardID := range shards {
27-
assignedState.AssignedShards[shardID] = &types.ShardAssignment{Status: types.AssignmentStatusREADY}
28-
}
29-
assignedStateJSON, err := json.Marshal(assignedState)
30-
require.NoError(t, err)
31-
32-
executorAssignedStateKey := etcdkeys.BuildExecutorKey(testCluster.EtcdPrefix, namespace, executorID, etcdkeys.ExecutorAssignedStateKey)
33-
testCluster.Client.Put(context.Background(), executorAssignedStateKey, string(assignedStateJSON))
34-
35-
// Add metadata
36-
for key, value := range metadata {
37-
metadataKey := etcdkeys.BuildMetadataKey(testCluster.EtcdPrefix, namespace, executorID, key)
38-
testCluster.Client.Put(context.Background(), metadataKey, value)
39-
}
40-
}
41-
42-
// verifyShardOwner checks that a shard has the expected owner and metadata
43-
func verifyShardOwner(t *testing.T, cache *namespaceShardToExecutor, shardID, expectedExecutorID string, expectedMetadata map[string]string) {
44-
owner, err := cache.GetShardOwner(context.Background(), shardID)
45-
require.NoError(t, err)
46-
require.NotNil(t, owner)
47-
assert.Equal(t, expectedExecutorID, owner.ExecutorID)
48-
for key, expectedValue := range expectedMetadata {
49-
assert.Equal(t, expectedValue, owner.Metadata[key])
50-
}
51-
52-
executor, err := cache.GetExecutor(context.Background(), expectedExecutorID)
53-
require.NoError(t, err)
54-
require.NotNil(t, executor)
55-
assert.Equal(t, expectedExecutorID, executor.ExecutorID)
56-
for key, expectedValue := range expectedMetadata {
57-
assert.Equal(t, expectedValue, executor.Metadata[key])
58-
}
59-
}
60-
6122
func TestNamespaceShardToExecutor_Lifecycle(t *testing.T) {
6223
testCluster := testhelper.SetupStoreTestCluster(t)
6324
logger := testlogger.New(t)
6425
stopCh := make(chan struct{})
6526
defer close(stopCh)
6627

6728
// Setup: Create executor-1 with shard-1
68-
setupExecutorWithShards(t, testCluster, "test-ns", "executor-1", []string{"shard-1"}, map[string]string{
29+
setupExecutorWithShards(t, testCluster, "executor-1", []string{"shard-1"}, map[string]string{
6930
"hostname": "executor-1-host",
7031
"version": "v1.0.0",
7132
})
7233

7334
// Start the cache
74-
namespaceShardToExecutor, err := newNamespaceShardToExecutor(testCluster.EtcdPrefix, "test-ns", testCluster.Client, stopCh, logger)
35+
namespaceShardToExecutor, err := newNamespaceShardToExecutor(testCluster.EtcdPrefix, testCluster.Namespace, testCluster.Client, stopCh, logger)
7536
assert.NoError(t, err)
7637
namespaceShardToExecutor.Start(&sync.WaitGroup{})
7738
time.Sleep(50 * time.Millisecond)
@@ -90,7 +51,7 @@ func TestNamespaceShardToExecutor_Lifecycle(t *testing.T) {
9051
namespaceShardToExecutor.RUnlock()
9152

9253
// Add executor-2 with shard-2 to trigger watch update
93-
setupExecutorWithShards(t, testCluster, "test-ns", "executor-2", []string{"shard-2"}, map[string]string{
54+
setupExecutorWithShards(t, testCluster, "executor-2", []string{"shard-2"}, map[string]string{
9455
"hostname": "executor-2-host",
9556
"region": "us-west",
9657
})
@@ -109,3 +70,125 @@ func TestNamespaceShardToExecutor_Lifecycle(t *testing.T) {
10970
"region": "us-west",
11071
})
11172
}
73+
74+
func TestNamespaceShardToExecutor_Subscribe(t *testing.T) {
75+
testCluster := testhelper.SetupStoreTestCluster(t)
76+
logger := testlogger.New(t)
77+
stopCh := make(chan struct{})
78+
defer close(stopCh)
79+
80+
// Setup: Create executor-1 with shard-1
81+
setupExecutorWithShards(t, testCluster, "executor-1", []string{"shard-1"}, map[string]string{
82+
"hostname": "executor-1-host",
83+
"version": "v1.0.0",
84+
})
85+
86+
// Start the cache
87+
namespaceShardToExecutor, err := newNamespaceShardToExecutor(testCluster.EtcdPrefix, testCluster.Namespace, testCluster.Client, stopCh, logger)
88+
assert.NoError(t, err)
89+
namespaceShardToExecutor.Start(&sync.WaitGroup{})
90+
91+
// Refresh the cache to get the initial state
92+
namespaceShardToExecutor.refresh(context.Background())
93+
94+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
95+
defer cancel()
96+
subCh, unSub := namespaceShardToExecutor.Subscribe(ctx)
97+
defer unSub()
98+
99+
var wg sync.WaitGroup
100+
wg.Add(1)
101+
102+
// start listener
103+
go func() {
104+
defer wg.Done()
105+
// Check that we get the initial state
106+
state := <-subCh
107+
assert.Len(t, state, 1)
108+
verifyExecutorInState(t, state, "executor-1", []string{"shard-1"}, map[string]string{
109+
"hostname": "executor-1-host",
110+
"version": "v1.0.0",
111+
})
112+
113+
// Check that we get the updated state
114+
state = <-subCh
115+
assert.Len(t, state, 2)
116+
verifyExecutorInState(t, state, "executor-1", []string{"shard-1"}, map[string]string{
117+
"hostname": "executor-1-host",
118+
"version": "v1.0.0",
119+
})
120+
verifyExecutorInState(t, state, "executor-2", []string{"shard-2"}, map[string]string{
121+
"hostname": "executor-2-host",
122+
"region": "us-west",
123+
})
124+
}()
125+
time.Sleep(10 * time.Millisecond)
126+
127+
// Add executor-2 with shard-2 to trigger new subscription update
128+
setupExecutorWithShards(t, testCluster, "executor-2", []string{"shard-2"}, map[string]string{
129+
"hostname": "executor-2-host",
130+
"region": "us-west",
131+
})
132+
133+
wg.Wait()
134+
}
135+
136+
// setupExecutorWithShards creates an executor in etcd with assigned shards and metadata
137+
func setupExecutorWithShards(t *testing.T, testCluster *testhelper.StoreTestCluster, executorID string, shards []string, metadata map[string]string) {
138+
// Create assigned state
139+
assignedState := &etcdtypes.AssignedState{
140+
AssignedShards: make(map[string]*types.ShardAssignment),
141+
}
142+
for _, shardID := range shards {
143+
assignedState.AssignedShards[shardID] = &types.ShardAssignment{Status: types.AssignmentStatusREADY}
144+
}
145+
assignedStateJSON, err := json.Marshal(assignedState)
146+
require.NoError(t, err)
147+
148+
var operations []clientv3.Op
149+
150+
executorAssignedStateKey := etcdkeys.BuildExecutorKey(testCluster.EtcdPrefix, testCluster.Namespace, executorID, etcdkeys.ExecutorAssignedStateKey)
151+
operations = append(operations, clientv3.OpPut(executorAssignedStateKey, string(assignedStateJSON)))
152+
153+
// Add metadata
154+
for key, value := range metadata {
155+
metadataKey := etcdkeys.BuildMetadataKey(testCluster.EtcdPrefix, testCluster.Namespace, executorID, key)
156+
operations = append(operations, clientv3.OpPut(metadataKey, value))
157+
}
158+
159+
txnResp, err := testCluster.Client.Txn(context.Background()).Then(operations...).Commit()
160+
require.NoError(t, err)
161+
require.True(t, txnResp.Succeeded)
162+
}
163+
164+
func verifyExecutorInState(t *testing.T, state map[*store.ShardOwner][]string, executorID string, shards []string, metadata map[string]string) {
165+
executorInState := false
166+
for executor, executorShards := range state {
167+
if executor.ExecutorID == executorID {
168+
assert.Equal(t, shards, executorShards)
169+
assert.Equal(t, metadata, executor.Metadata)
170+
executorInState = true
171+
break
172+
}
173+
}
174+
assert.True(t, executorInState)
175+
}
176+
177+
// verifyShardOwner checks that a shard has the expected owner and metadata
178+
func verifyShardOwner(t *testing.T, cache *namespaceShardToExecutor, shardID, expectedExecutorID string, expectedMetadata map[string]string) {
179+
owner, err := cache.GetShardOwner(context.Background(), shardID)
180+
require.NoError(t, err)
181+
require.NotNil(t, owner)
182+
assert.Equal(t, expectedExecutorID, owner.ExecutorID)
183+
for key, expectedValue := range expectedMetadata {
184+
assert.Equal(t, expectedValue, owner.Metadata[key])
185+
}
186+
187+
executor, err := cache.GetExecutor(context.Background(), expectedExecutorID)
188+
require.NoError(t, err)
189+
require.NotNil(t, executor)
190+
assert.Equal(t, expectedExecutorID, executor.ExecutorID)
191+
for key, expectedValue := range expectedMetadata {
192+
assert.Equal(t, expectedValue, executor.Metadata[key])
193+
}
194+
}

0 commit comments

Comments
 (0)