diff --git a/cmd/sharddistributor-canary/main.go b/cmd/sharddistributor-canary/main.go index 350cb3e4dbc..109768515d9 100644 --- a/cmd/sharddistributor-canary/main.go +++ b/cmd/sharddistributor-canary/main.go @@ -1,6 +1,8 @@ package main import ( + "fmt" + "net" "os" "time" @@ -8,14 +10,18 @@ import ( "github.com/urfave/cli/v2" "go.uber.org/fx" "go.uber.org/yarpc" + "go.uber.org/yarpc/api/peer" "go.uber.org/yarpc/transport/grpc" "go.uber.org/zap" + sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1" "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/log" "github.com/uber/cadence/service/sharddistributor/canary" "github.com/uber/cadence/service/sharddistributor/canary/executors" "github.com/uber/cadence/service/sharddistributor/client/clientcommon" + "github.com/uber/cadence/service/sharddistributor/client/executorclient" + "github.com/uber/cadence/service/sharddistributor/client/spectatorclient" "github.com/uber/cadence/service/sharddistributor/config" "github.com/uber/cadence/tools/common/commoncli" ) @@ -25,6 +31,7 @@ const ( defaultShardDistributorEndpoint = "127.0.0.1:7943" defaultFixedNamespace = "shard-distributor-canary" defaultEphemeralNamespace = "shard-distributor-canary-ephemeral" + defaultCanaryGRPCPort = 7953 // Port for canary to receive ping requests shardDistributorServiceName = "cadence-shard-distributor" ) @@ -33,11 +40,12 @@ func runApp(c *cli.Context) { endpoint := c.String("endpoint") fixedNamespace := c.String("fixed-namespace") ephemeralNamespace := c.String("ephemeral-namespace") + canaryGRPCPort := c.Int("canary-grpc-port") - fx.New(opts(fixedNamespace, ephemeralNamespace, endpoint)).Run() + fx.New(opts(fixedNamespace, ephemeralNamespace, endpoint, canaryGRPCPort)).Run() } -func opts(fixedNamespace, ephemeralNamespace, endpoint string) fx.Option { +func opts(fixedNamespace, ephemeralNamespace, endpoint string, canaryGRPCPort int) fx.Option { configuration := clientcommon.Config{ Namespaces: []clientcommon.NamespaceConfig{ {Namespace: fixedNamespace, HeartBeatInterval: 1 * time.Second, MigrationMode: config.MigrationModeONBOARDED}, @@ -49,22 +57,51 @@ func opts(fixedNamespace, ephemeralNamespace, endpoint string) fx.Option { }, } + canaryGRPCAddress := fmt.Sprintf("127.0.0.1:%d", canaryGRPCPort) + + // Create listener for GRPC inbound + listener, err := net.Listen("tcp", canaryGRPCAddress) + if err != nil { + panic(err) + } + transport := grpc.NewTransport() - yarpcConfig := yarpc.Config{ - Name: "shard-distributor-canary", - Outbounds: yarpc.Outbounds{ - shardDistributorServiceName: { - Unary: transport.NewSingleOutbound(endpoint), - }, - }, + + executorMetadata := executorclient.ExecutorMetadata{ + clientcommon.GrpcAddressMetadataKey: canaryGRPCAddress, } return fx.Options( fx.Supply( fx.Annotate(tally.NoopScope, fx.As(new(tally.Scope))), fx.Annotate(clock.NewRealTimeSource(), fx.As(new(clock.TimeSource))), - yarpcConfig, configuration, + transport, + executorMetadata, + ), + + fx.Provide(func(peerChooser spectatorclient.SpectatorPeerChooserInterface) yarpc.Config { + return yarpc.Config{ + Name: "shard-distributor-canary", + Inbounds: yarpc.Inbounds{ + transport.NewInbound(listener), // Listen for incoming ping requests + }, + Outbounds: yarpc.Outbounds{ + shardDistributorServiceName: { + Unary: transport.NewSingleOutbound(endpoint), + Stream: transport.NewSingleOutbound(endpoint), + }, + // canary-to-canary outbound uses peer chooser to route to other canary instances + "shard-distributor-canary": { + Unary: transport.NewOutbound(peerChooser), + Stream: transport.NewOutbound(peerChooser), + }, + }, + } + }), + + fx.Provide( + func(t *grpc.Transport) peer.Transport { return t }, ), fx.Provide( yarpc.NewDispatcher, @@ -73,12 +110,17 @@ func opts(fixedNamespace, ephemeralNamespace, endpoint string) fx.Option { fx.Provide(zap.NewDevelopment), fx.Provide(log.NewLogger), + // Register canary procedures with dispatcher + fx.Invoke(func(dispatcher *yarpc.Dispatcher, server sharddistributorv1.ShardDistributorExecutorCanaryAPIYARPCServer) { + dispatcher.Register(sharddistributorv1.BuildShardDistributorExecutorCanaryAPIYARPCProcedures(server)) + }), + // Start the YARPC dispatcher fx.Invoke(func(lc fx.Lifecycle, dispatcher *yarpc.Dispatcher) { lc.Append(fx.StartStopHook(dispatcher.Start, dispatcher.Stop)) }), - // Include the canary module + // Include the canary module - it will set up spectator peer choosers and canary client canary.Module(canary.NamespacesNames{FixedNamespace: fixedNamespace, EphemeralNamespace: ephemeralNamespace, ExternalAssignmentNamespace: executors.ExternalAssignmentNamespace, SharddistributorServiceName: shardDistributorServiceName}), ) } @@ -110,6 +152,11 @@ func buildCLI() *cli.App { Value: defaultEphemeralNamespace, Usage: "namespace for ephemeral shard creation testing", }, + &cli.IntFlag{ + Name: "canary-grpc-port", + Value: defaultCanaryGRPCPort, + Usage: "port for canary to receive ping requests", + }, }, Action: func(c *cli.Context) error { runApp(c) diff --git a/cmd/sharddistributor-canary/main_test.go b/cmd/sharddistributor-canary/main_test.go index 3784469e312..8ead4019e05 100644 --- a/cmd/sharddistributor-canary/main_test.go +++ b/cmd/sharddistributor-canary/main_test.go @@ -8,5 +8,5 @@ import ( ) func TestDependenciesAreSatisfied(t *testing.T) { - assert.NoError(t, fx.ValidateApp(opts(defaultFixedNamespace, defaultEphemeralNamespace, defaultShardDistributorEndpoint))) + assert.NoError(t, fx.ValidateApp(opts(defaultFixedNamespace, defaultEphemeralNamespace, defaultShardDistributorEndpoint, defaultCanaryGRPCPort))) } diff --git a/service/sharddistributor/canary/module.go b/service/sharddistributor/canary/module.go index fc82740110f..bd50324a083 100644 --- a/service/sharddistributor/canary/module.go +++ b/service/sharddistributor/canary/module.go @@ -2,14 +2,18 @@ package canary import ( "go.uber.org/fx" + "go.uber.org/yarpc" sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1" "github.com/uber/cadence/service/sharddistributor/canary/executors" "github.com/uber/cadence/service/sharddistributor/canary/factory" + "github.com/uber/cadence/service/sharddistributor/canary/handler" + "github.com/uber/cadence/service/sharddistributor/canary/pinger" "github.com/uber/cadence/service/sharddistributor/canary/processor" "github.com/uber/cadence/service/sharddistributor/canary/processorephemeral" "github.com/uber/cadence/service/sharddistributor/canary/sharddistributorclient" "github.com/uber/cadence/service/sharddistributor/client/executorclient" + "github.com/uber/cadence/service/sharddistributor/client/spectatorclient" ) type NamespacesNames struct { @@ -49,5 +53,40 @@ func opts(names NamespacesNames) fx.Option { executors.Module(names.FixedNamespace, names.EphemeralNamespace, names.ExternalAssignmentNamespace), processorephemeral.ShardCreatorModule([]string{names.EphemeralNamespace}), + + spectatorclient.Module(), + fx.Provide(spectatorclient.NewSpectatorPeerChooser), + fx.Invoke(func(chooser spectatorclient.SpectatorPeerChooserInterface, lc fx.Lifecycle) { + lc.Append(fx.StartStopHook(chooser.Start, chooser.Stop)) + }), + + // Create canary client using the dispatcher's client config + fx.Provide(func(dispatcher *yarpc.Dispatcher) sharddistributorv1.ShardDistributorExecutorCanaryAPIYARPCClient { + config := dispatcher.ClientConfig("shard-distributor-canary") + return sharddistributorv1.NewShardDistributorExecutorCanaryAPIYARPCClient(config) + }), + + fx.Provide(func(params pinger.Params) *pinger.Pinger { + return pinger.NewPinger(params, names.FixedNamespace, 32) + }), + fx.Invoke(func(p *pinger.Pinger, lc fx.Lifecycle) { + lc.Append(fx.StartStopHook(p.Start, p.Stop)) + }), + + // Register canary ping handler to receive ping requests from other executors + fx.Provide(handler.NewPingHandler), + fx.Provide(fx.Annotate( + func(h *handler.PingHandler) sharddistributorv1.ShardDistributorExecutorCanaryAPIYARPCServer { + return h + }, + )), + fx.Provide(sharddistributorv1.NewFxShardDistributorExecutorCanaryAPIYARPCProcedures()), + + // There is a circular dependency between the spectator client and the peer chooser, since + // the yarpc dispatcher needs the peer chooser and the peer chooser needs the spectators, which needs the yarpc dispatcher. + // To break the circular dependency, we set the spectators on the peer chooser here. + fx.Invoke(func(chooser spectatorclient.SpectatorPeerChooserInterface, spectators *spectatorclient.Spectators) { + chooser.SetSpectators(spectators) + }), ) } diff --git a/service/sharddistributor/canary/module_test.go b/service/sharddistributor/canary/module_test.go index f723c07a85d..f3b3c91bdc2 100644 --- a/service/sharddistributor/canary/module_test.go +++ b/service/sharddistributor/canary/module_test.go @@ -9,6 +9,7 @@ import ( "go.uber.org/fx" "go.uber.org/fx/fxtest" "go.uber.org/yarpc" + "go.uber.org/yarpc/api/peer" "go.uber.org/yarpc/api/transport/transporttest" "go.uber.org/yarpc/transport/grpc" "go.uber.org/yarpc/yarpctest" @@ -23,7 +24,8 @@ func TestModule(t *testing.T) { // Create mocks ctrl := gomock.NewController(t) mockClientConfig := transporttest.NewMockClientConfig(ctrl) - outbound := grpc.NewTransport().NewOutbound(yarpctest.NewFakePeerList()) + transport := grpc.NewTransport() + outbound := transport.NewOutbound(yarpctest.NewFakePeerList()) mockClientConfig.EXPECT().Caller().Return("test-executor").Times(2) mockClientConfig.EXPECT().Service().Return("shard-distributor").Times(2) @@ -43,6 +45,16 @@ func TestModule(t *testing.T) { }, } + // Create a mock dispatcher with the required outbound + dispatcher := yarpc.NewDispatcher(yarpc.Config{ + Name: "test-canary", + Outbounds: yarpc.Outbounds{ + "shard-distributor-canary": { + Unary: outbound, + }, + }, + }) + // Create a test app with the library, check that it starts and stops fxtest.New(t, fx.Supply( @@ -50,8 +62,10 @@ func TestModule(t *testing.T) { fx.Annotate(clock.NewMockedTimeSource(), fx.As(new(clock.TimeSource))), fx.Annotate(log.NewNoop(), fx.As(new(log.Logger))), fx.Annotate(mockClientConfigProvider, fx.As(new(yarpc.ClientConfig))), + fx.Annotate(transport, fx.As(new(peer.Transport))), zaptest.NewLogger(t), config, + dispatcher, ), Module(NamespacesNames{FixedNamespace: "shard-distributor-canary", EphemeralNamespace: "shard-distributor-canary-ephemeral", ExternalAssignmentNamespace: "test-external-assignment", SharddistributorServiceName: "cadence-shard-distributor"}), ).RequireStart().RequireStop() diff --git a/service/sharddistributor/canary/pinger/pingAndLog.go b/service/sharddistributor/canary/pinger/pingAndLog.go new file mode 100644 index 00000000000..5b2033810e0 --- /dev/null +++ b/service/sharddistributor/canary/pinger/pingAndLog.go @@ -0,0 +1,40 @@ +package pinger + +import ( + "context" + "time" + + "go.uber.org/yarpc" + "go.uber.org/zap" + + sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1" + "github.com/uber/cadence/service/sharddistributor/client/spectatorclient" +) + +const ( + pingTimeout = 5 * time.Second +) + +func PingShard(ctx context.Context, canaryClient sharddistributorv1.ShardDistributorExecutorCanaryAPIYARPCClient, logger *zap.Logger, namespace, shardKey string) { + request := &sharddistributorv1.PingRequest{ + ShardKey: shardKey, + Namespace: namespace, + } + + ctx, cancel := context.WithTimeout(ctx, pingTimeout) + defer cancel() + + response, err := canaryClient.Ping(ctx, request, yarpc.WithShardKey(shardKey), yarpc.WithHeader(spectatorclient.NamespaceHeader, namespace)) + if err != nil { + logger.Error("Failed to ping shard", zap.String("namespace", namespace), zap.String("shard_key", shardKey), zap.Error(err)) + return + } + + // Verify response + if !response.GetOwnsShard() { + logger.Warn("Executor does not own shard", zap.String("namespace", namespace), zap.String("shard_key", shardKey), zap.String("executor_id", response.GetExecutorId())) + return + } + + logger.Info("Successfully pinged shard owner", zap.String("namespace", namespace), zap.String("shard_key", shardKey), zap.String("executor_id", response.GetExecutorId())) +} diff --git a/service/sharddistributor/canary/pinger/pinger.go b/service/sharddistributor/canary/pinger/pinger.go index 641fb8434ac..59efd45ac7c 100644 --- a/service/sharddistributor/canary/pinger/pinger.go +++ b/service/sharddistributor/canary/pinger/pinger.go @@ -8,13 +8,11 @@ import ( "time" "go.uber.org/fx" - "go.uber.org/yarpc" "go.uber.org/zap" sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1" "github.com/uber/cadence/common/backoff" "github.com/uber/cadence/common/clock" - "github.com/uber/cadence/service/sharddistributor/client/spectatorclient" ) //go:generate mockgen -package $GOPACKAGE -destination canary_client_mock.go github.com/uber/cadence/.gen/proto/sharddistributor/v1 ShardDistributorExecutorCanaryAPIYARPCClient @@ -22,7 +20,6 @@ import ( const ( pingInterval = 1 * time.Second pingJitterCoeff = 0.1 // 10% jitter - pingTimeout = 5 * time.Second ) // Pinger periodically pings shard owners in the fixed namespace @@ -96,23 +93,5 @@ func (p *Pinger) pingRandomShard() { shardNum := rand.Intn(p.numShards) shardKey := fmt.Sprintf("%d", shardNum) - request := &sharddistributorv1.PingRequest{ - ShardKey: shardKey, - Namespace: p.namespace, - } - - ctx, cancel := context.WithTimeout(p.ctx, pingTimeout) - defer cancel() - - response, err := p.canaryClient.Ping(ctx, request, yarpc.WithShardKey(shardKey), yarpc.WithHeader(spectatorclient.NamespaceHeader, p.namespace)) - if err != nil { - p.logger.Error("Failed to ping shard", zap.String("namespace", p.namespace), zap.String("shard_key", shardKey), zap.Error(err)) - } - - // Verify response - if !response.GetOwnsShard() { - p.logger.Warn("Executor does not own shard", zap.String("namespace", p.namespace), zap.String("shard_key", shardKey), zap.String("executor_id", response.GetExecutorId())) - } - - p.logger.Info("Successfully pinged shard owner", zap.String("namespace", p.namespace), zap.String("shard_key", shardKey), zap.String("executor_id", response.GetExecutorId())) + PingShard(p.ctx, p.canaryClient, p.logger, p.namespace, shardKey) } diff --git a/service/sharddistributor/canary/processorephemeral/canary_client_mock_test.go b/service/sharddistributor/canary/processorephemeral/canary_client_mock_test.go new file mode 100644 index 00000000000..7b17600a6ec --- /dev/null +++ b/service/sharddistributor/canary/processorephemeral/canary_client_mock_test.go @@ -0,0 +1,64 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/uber/cadence/.gen/proto/sharddistributor/v1 (interfaces: ShardDistributorExecutorCanaryAPIYARPCClient) +// +// Generated by this command: +// +// mockgen -package processorephemeral -destination canary_client_mock_test.go github.com/uber/cadence/.gen/proto/sharddistributor/v1 ShardDistributorExecutorCanaryAPIYARPCClient +// + +// Package processorephemeral is a generated GoMock package. +package processorephemeral + +import ( + context "context" + reflect "reflect" + + gomock "go.uber.org/mock/gomock" + yarpc "go.uber.org/yarpc" + + sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1" +) + +// MockShardDistributorExecutorCanaryAPIYARPCClient is a mock of ShardDistributorExecutorCanaryAPIYARPCClient interface. +type MockShardDistributorExecutorCanaryAPIYARPCClient struct { + ctrl *gomock.Controller + recorder *MockShardDistributorExecutorCanaryAPIYARPCClientMockRecorder + isgomock struct{} +} + +// MockShardDistributorExecutorCanaryAPIYARPCClientMockRecorder is the mock recorder for MockShardDistributorExecutorCanaryAPIYARPCClient. +type MockShardDistributorExecutorCanaryAPIYARPCClientMockRecorder struct { + mock *MockShardDistributorExecutorCanaryAPIYARPCClient +} + +// NewMockShardDistributorExecutorCanaryAPIYARPCClient creates a new mock instance. +func NewMockShardDistributorExecutorCanaryAPIYARPCClient(ctrl *gomock.Controller) *MockShardDistributorExecutorCanaryAPIYARPCClient { + mock := &MockShardDistributorExecutorCanaryAPIYARPCClient{ctrl: ctrl} + mock.recorder = &MockShardDistributorExecutorCanaryAPIYARPCClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockShardDistributorExecutorCanaryAPIYARPCClient) EXPECT() *MockShardDistributorExecutorCanaryAPIYARPCClientMockRecorder { + return m.recorder +} + +// Ping mocks base method. +func (m *MockShardDistributorExecutorCanaryAPIYARPCClient) Ping(arg0 context.Context, arg1 *sharddistributorv1.PingRequest, arg2 ...yarpc.CallOption) (*sharddistributorv1.PingResponse, error) { + m.ctrl.T.Helper() + varargs := []any{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Ping", varargs...) + ret0, _ := ret[0].(*sharddistributorv1.PingResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Ping indicates an expected call of Ping. +func (mr *MockShardDistributorExecutorCanaryAPIYARPCClientMockRecorder) Ping(arg0, arg1 any, arg2 ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ping", reflect.TypeOf((*MockShardDistributorExecutorCanaryAPIYARPCClient)(nil).Ping), varargs...) +} diff --git a/service/sharddistributor/canary/processorephemeral/shardcreator.go b/service/sharddistributor/canary/processorephemeral/shardcreator.go index 3a8e63fb859..fbe24d61fdf 100644 --- a/service/sharddistributor/canary/processorephemeral/shardcreator.go +++ b/service/sharddistributor/canary/processorephemeral/shardcreator.go @@ -9,44 +9,46 @@ import ( "go.uber.org/fx" "go.uber.org/zap" - "github.com/uber/cadence/client/sharddistributor" + sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1" "github.com/uber/cadence/common/clock" - "github.com/uber/cadence/common/types" + "github.com/uber/cadence/service/sharddistributor/canary/pinger" ) +//go:generate mockgen -package $GOPACKAGE -destination canary_client_mock_test.go github.com/uber/cadence/.gen/proto/sharddistributor/v1 ShardDistributorExecutorCanaryAPIYARPCClient + const ( shardCreationInterval = 1 * time.Second ) // ShardCreator creates shards at regular intervals for ephemeral canary testing type ShardCreator struct { - logger *zap.Logger - timeSource clock.TimeSource - shardDistributor sharddistributor.Client + logger *zap.Logger + timeSource clock.TimeSource + canaryClient sharddistributorv1.ShardDistributorExecutorCanaryAPIYARPCClient + namespaces []string stopChan chan struct{} goRoutineWg sync.WaitGroup - namespaces []string } // ShardCreatorParams contains the dependencies needed to create a ShardCreator type ShardCreatorParams struct { fx.In - Logger *zap.Logger - TimeSource clock.TimeSource - ShardDistributor sharddistributor.Client + Logger *zap.Logger + TimeSource clock.TimeSource + CanaryClient sharddistributorv1.ShardDistributorExecutorCanaryAPIYARPCClient } // NewShardCreator creates a new ShardCreator instance with the given parameters and namespace func NewShardCreator(params ShardCreatorParams, namespaces []string) *ShardCreator { return &ShardCreator{ - logger: params.Logger, - timeSource: params.TimeSource, - shardDistributor: params.ShardDistributor, - stopChan: make(chan struct{}), - goRoutineWg: sync.WaitGroup{}, - namespaces: namespaces, + logger: params.Logger, + timeSource: params.TimeSource, + canaryClient: params.CanaryClient, + stopChan: make(chan struct{}), + goRoutineWg: sync.WaitGroup{}, + namespaces: namespaces, } } @@ -92,15 +94,8 @@ func (s *ShardCreator) process(ctx context.Context) { for _, namespace := range s.namespaces { shardKey := uuid.New().String() s.logger.Info("Creating shard", zap.String("shardKey", shardKey), zap.String("namespace", namespace)) - response, err := s.shardDistributor.GetShardOwner(ctx, &types.GetShardOwnerRequest{ - ShardKey: shardKey, - Namespace: namespace, - }) - if err != nil { - s.logger.Error("create shard failed", zap.Error(err)) - continue - } - s.logger.Info("shard created", zap.String("shardKey", shardKey), zap.String("shardOwner", response.Owner), zap.String("namespace", response.Namespace)) + + pinger.PingShard(ctx, s.canaryClient, s.logger, namespace, shardKey) } } } diff --git a/service/sharddistributor/canary/processorephemeral/shardcreator_test.go b/service/sharddistributor/canary/processorephemeral/shardcreator_test.go index c4d851c260e..45d03ffad8e 100644 --- a/service/sharddistributor/canary/processorephemeral/shardcreator_test.go +++ b/service/sharddistributor/canary/processorephemeral/shardcreator_test.go @@ -9,52 +9,45 @@ import ( "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" - "github.com/uber/cadence/client/sharddistributor" + sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1" "github.com/uber/cadence/common/clock" - "github.com/uber/cadence/common/types" ) -func TestShardCreator_Lifecycle(t *testing.T) { +func TestShardCreator_PingsShards(t *testing.T) { goleak.VerifyNone(t) logger := zaptest.NewLogger(t) timeSource := clock.NewMockedTimeSource() ctrl := gomock.NewController(t) - mockShardDistributor := sharddistributor.NewMockClient(ctrl) namespace := "test-namespace" + mockCanaryClient := NewMockShardDistributorExecutorCanaryAPIYARPCClient(ctrl) - // Set up expectation for GetShardOwner calls that return errors - mockShardDistributor.EXPECT(). - GetShardOwner(gomock.Any(), gomock.Any()). - DoAndReturn(func(ctx interface{}, req *types.GetShardOwnerRequest, opts ...interface{}) (*types.GetShardOwnerResponse, error) { - // Verify the request contains the correct namespace even on error - assert.Equal(t, namespace, req.Namespace) + // Ping happens after successful GetShardOwner + mockCanaryClient.EXPECT(). + Ping(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(ctx interface{}, req *sharddistributorv1.PingRequest, opts ...interface{}) (*sharddistributorv1.PingResponse, error) { assert.NotEmpty(t, req.ShardKey) - - return nil, assert.AnError // Using testify's AnError for consistency - }). - Times(2) + assert.Equal(t, namespace, req.Namespace) + return &sharddistributorv1.PingResponse{ + OwnsShard: true, + ExecutorId: "executor-1", + }, nil + }) params := ShardCreatorParams{ - Logger: logger, - TimeSource: timeSource, - ShardDistributor: mockShardDistributor, + Logger: logger, + TimeSource: timeSource, + CanaryClient: mockCanaryClient, } creator := NewShardCreator(params, []string{namespace}) creator.Start() - // Wait for the goroutine to start + // Wait for the goroutine to start and do it's ping timeSource.BlockUntil(1) - - // Trigger shard creation that will fail - timeSource.Advance(shardCreationInterval + 100*time.Millisecond) - time.Sleep(10 * time.Millisecond) // Allow processing - - // Trigger another shard creation to ensure processing continues after error timeSource.Advance(shardCreationInterval + 100*time.Millisecond) - time.Sleep(10 * time.Millisecond) // Allow processing + time.Sleep(10 * time.Millisecond) creator.Stop() } diff --git a/service/sharddistributor/client/clientcommon/constants.go b/service/sharddistributor/client/clientcommon/constants.go new file mode 100644 index 00000000000..b1880d037d9 --- /dev/null +++ b/service/sharddistributor/client/clientcommon/constants.go @@ -0,0 +1,5 @@ +package clientcommon + +const ( + GrpcAddressMetadataKey = "grpc_address" +) diff --git a/service/sharddistributor/client/spectatorclient/peer_chooser.go b/service/sharddistributor/client/spectatorclient/peer_chooser.go index a46a60ac8cf..c3d854ac5b7 100644 --- a/service/sharddistributor/client/spectatorclient/peer_chooser.go +++ b/service/sharddistributor/client/spectatorclient/peer_chooser.go @@ -13,11 +13,11 @@ import ( "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" + "github.com/uber/cadence/service/sharddistributor/client/clientcommon" ) const ( - NamespaceHeader = "x-shard-distributor-namespace" - grpcAddressMetadataKey = "grpc_address" + NamespaceHeader = "x-shard-distributor-namespace" ) // SpectatorPeerChooserInterface extends peer.Chooser with SetSpectators method @@ -127,7 +127,7 @@ func (c *SpectatorPeerChooser) Choose(ctx context.Context, req *transport.Reques } // Extract GRPC address from owner metadata - grpcAddress, ok := owner.Metadata[grpcAddressMetadataKey] + grpcAddress, ok := owner.Metadata[clientcommon.GrpcAddressMetadataKey] if !ok || grpcAddress == "" { return nil, nil, yarpcerrors.InternalErrorf("no grpc_address in metadata for executor %s owning shard %s", owner.ExecutorID, req.ShardKey) } diff --git a/service/sharddistributor/client/spectatorclient/peer_chooser_test.go b/service/sharddistributor/client/spectatorclient/peer_chooser_test.go index 569c354d01f..91a6896ed0c 100644 --- a/service/sharddistributor/client/spectatorclient/peer_chooser_test.go +++ b/service/sharddistributor/client/spectatorclient/peer_chooser_test.go @@ -12,6 +12,7 @@ import ( "go.uber.org/yarpc/transport/grpc" "github.com/uber/cadence/common/log/testlogger" + "github.com/uber/cadence/service/sharddistributor/client/clientcommon" ) func TestSpectatorPeerChooser_Choose_MissingShardKey(t *testing.T) { @@ -128,7 +129,7 @@ func TestSpectatorPeerChooser_Choose_Success(t *testing.T) { Return(&ShardOwner{ ExecutorID: "executor-1", Metadata: map[string]string{ - grpcAddressMetadataKey: "127.0.0.1:7953", + clientcommon.GrpcAddressMetadataKey: "127.0.0.1:7953", }, }, nil) @@ -172,7 +173,7 @@ func TestSpectatorPeerChooser_Choose_ReusesPeer(t *testing.T) { Return(&ShardOwner{ ExecutorID: "executor-1", Metadata: map[string]string{ - grpcAddressMetadataKey: "127.0.0.1:7953", + clientcommon.GrpcAddressMetadataKey: "127.0.0.1:7953", }, }, nil).Times(2) diff --git a/service/sharddistributor/leader/process/processor.go b/service/sharddistributor/leader/process/processor.go index 64451a40631..7febba65b99 100644 --- a/service/sharddistributor/leader/process/processor.go +++ b/service/sharddistributor/leader/process/processor.go @@ -256,6 +256,7 @@ func (p *namespaceProcessor) identifyStaleExecutors(namespaceState *store.Namesp for executorID, state := range namespaceState.Executors { if now.Sub(state.LastHeartbeat) > p.cfg.HeartbeatTTL { + p.logger.Info("Executor has not reported a heartbeat recently", tag.ShardExecutor(executorID), tag.ShardNamespace(p.namespaceCfg.Name), tag.Value(state.LastHeartbeat)) expiredExecutors[executorID] = namespaceState.ShardAssignments[executorID].ModRevision } }