Skip to content

Commit c268569

Browse files
authored
feat(shard-distributor): Add ping verification to ephemeral shard creator (#7496)
**What changed?** The ephemeral shard creator now pings shard owners immediately after creation to verify end-to-end functionality. This also wires up the spectator, and dispatcher and fixed namespace pinger. **Why?** This adds canary verification that validates: - Shards are created successfully - Owners can be reached via gRPC - Owners actually own the assigned shards Previously, the shard creator only verified that GetShardOwner returned successfully, but didn't verify that the executor was actually reachable or owned the shard. **How did you test it?** Unit tests **Potential risks** Low risk - this only affects the canary test environment and adds verification without changing core shard creation logic. **Documentation** --------- Signed-off-by: Jakob Haahr Taankvist <jht@uber.com>
1 parent 46d0f64 commit c268569

File tree

13 files changed

+267
-89
lines changed

13 files changed

+267
-89
lines changed

cmd/sharddistributor-canary/main.go

Lines changed: 58 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,27 @@
11
package main
22

33
import (
4+
"fmt"
5+
"net"
46
"os"
57
"time"
68

79
"github.com/uber-go/tally"
810
"github.com/urfave/cli/v2"
911
"go.uber.org/fx"
1012
"go.uber.org/yarpc"
13+
"go.uber.org/yarpc/api/peer"
1114
"go.uber.org/yarpc/transport/grpc"
1215
"go.uber.org/zap"
1316

17+
sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1"
1418
"github.com/uber/cadence/common/clock"
1519
"github.com/uber/cadence/common/log"
1620
"github.com/uber/cadence/service/sharddistributor/canary"
1721
"github.com/uber/cadence/service/sharddistributor/canary/executors"
1822
"github.com/uber/cadence/service/sharddistributor/client/clientcommon"
23+
"github.com/uber/cadence/service/sharddistributor/client/executorclient"
24+
"github.com/uber/cadence/service/sharddistributor/client/spectatorclient"
1925
"github.com/uber/cadence/service/sharddistributor/config"
2026
"github.com/uber/cadence/tools/common/commoncli"
2127
)
@@ -25,6 +31,7 @@ const (
2531
defaultShardDistributorEndpoint = "127.0.0.1:7943"
2632
defaultFixedNamespace = "shard-distributor-canary"
2733
defaultEphemeralNamespace = "shard-distributor-canary-ephemeral"
34+
defaultCanaryGRPCPort = 7953 // Port for canary to receive ping requests
2835

2936
shardDistributorServiceName = "cadence-shard-distributor"
3037
)
@@ -33,11 +40,12 @@ func runApp(c *cli.Context) {
3340
endpoint := c.String("endpoint")
3441
fixedNamespace := c.String("fixed-namespace")
3542
ephemeralNamespace := c.String("ephemeral-namespace")
43+
canaryGRPCPort := c.Int("canary-grpc-port")
3644

37-
fx.New(opts(fixedNamespace, ephemeralNamespace, endpoint)).Run()
45+
fx.New(opts(fixedNamespace, ephemeralNamespace, endpoint, canaryGRPCPort)).Run()
3846
}
3947

40-
func opts(fixedNamespace, ephemeralNamespace, endpoint string) fx.Option {
48+
func opts(fixedNamespace, ephemeralNamespace, endpoint string, canaryGRPCPort int) fx.Option {
4149
configuration := clientcommon.Config{
4250
Namespaces: []clientcommon.NamespaceConfig{
4351
{Namespace: fixedNamespace, HeartBeatInterval: 1 * time.Second, MigrationMode: config.MigrationModeONBOARDED},
@@ -49,22 +57,51 @@ func opts(fixedNamespace, ephemeralNamespace, endpoint string) fx.Option {
4957
},
5058
}
5159

60+
canaryGRPCAddress := fmt.Sprintf("127.0.0.1:%d", canaryGRPCPort)
61+
62+
// Create listener for GRPC inbound
63+
listener, err := net.Listen("tcp", canaryGRPCAddress)
64+
if err != nil {
65+
panic(err)
66+
}
67+
5268
transport := grpc.NewTransport()
53-
yarpcConfig := yarpc.Config{
54-
Name: "shard-distributor-canary",
55-
Outbounds: yarpc.Outbounds{
56-
shardDistributorServiceName: {
57-
Unary: transport.NewSingleOutbound(endpoint),
58-
},
59-
},
69+
70+
executorMetadata := executorclient.ExecutorMetadata{
71+
clientcommon.GrpcAddressMetadataKey: canaryGRPCAddress,
6072
}
6173

6274
return fx.Options(
6375
fx.Supply(
6476
fx.Annotate(tally.NoopScope, fx.As(new(tally.Scope))),
6577
fx.Annotate(clock.NewRealTimeSource(), fx.As(new(clock.TimeSource))),
66-
yarpcConfig,
6778
configuration,
79+
transport,
80+
executorMetadata,
81+
),
82+
83+
fx.Provide(func(peerChooser spectatorclient.SpectatorPeerChooserInterface) yarpc.Config {
84+
return yarpc.Config{
85+
Name: "shard-distributor-canary",
86+
Inbounds: yarpc.Inbounds{
87+
transport.NewInbound(listener), // Listen for incoming ping requests
88+
},
89+
Outbounds: yarpc.Outbounds{
90+
shardDistributorServiceName: {
91+
Unary: transport.NewSingleOutbound(endpoint),
92+
Stream: transport.NewSingleOutbound(endpoint),
93+
},
94+
// canary-to-canary outbound uses peer chooser to route to other canary instances
95+
"shard-distributor-canary": {
96+
Unary: transport.NewOutbound(peerChooser),
97+
Stream: transport.NewOutbound(peerChooser),
98+
},
99+
},
100+
}
101+
}),
102+
103+
fx.Provide(
104+
func(t *grpc.Transport) peer.Transport { return t },
68105
),
69106
fx.Provide(
70107
yarpc.NewDispatcher,
@@ -73,12 +110,17 @@ func opts(fixedNamespace, ephemeralNamespace, endpoint string) fx.Option {
73110
fx.Provide(zap.NewDevelopment),
74111
fx.Provide(log.NewLogger),
75112

113+
// Register canary procedures with dispatcher
114+
fx.Invoke(func(dispatcher *yarpc.Dispatcher, server sharddistributorv1.ShardDistributorExecutorCanaryAPIYARPCServer) {
115+
dispatcher.Register(sharddistributorv1.BuildShardDistributorExecutorCanaryAPIYARPCProcedures(server))
116+
}),
117+
76118
// Start the YARPC dispatcher
77119
fx.Invoke(func(lc fx.Lifecycle, dispatcher *yarpc.Dispatcher) {
78120
lc.Append(fx.StartStopHook(dispatcher.Start, dispatcher.Stop))
79121
}),
80122

81-
// Include the canary module
123+
// Include the canary module - it will set up spectator peer choosers and canary client
82124
canary.Module(canary.NamespacesNames{FixedNamespace: fixedNamespace, EphemeralNamespace: ephemeralNamespace, ExternalAssignmentNamespace: executors.ExternalAssignmentNamespace, SharddistributorServiceName: shardDistributorServiceName}),
83125
)
84126
}
@@ -110,6 +152,11 @@ func buildCLI() *cli.App {
110152
Value: defaultEphemeralNamespace,
111153
Usage: "namespace for ephemeral shard creation testing",
112154
},
155+
&cli.IntFlag{
156+
Name: "canary-grpc-port",
157+
Value: defaultCanaryGRPCPort,
158+
Usage: "port for canary to receive ping requests",
159+
},
113160
},
114161
Action: func(c *cli.Context) error {
115162
runApp(c)

cmd/sharddistributor-canary/main_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ import (
88
)
99

1010
func TestDependenciesAreSatisfied(t *testing.T) {
11-
assert.NoError(t, fx.ValidateApp(opts(defaultFixedNamespace, defaultEphemeralNamespace, defaultShardDistributorEndpoint)))
11+
assert.NoError(t, fx.ValidateApp(opts(defaultFixedNamespace, defaultEphemeralNamespace, defaultShardDistributorEndpoint, defaultCanaryGRPCPort)))
1212
}

service/sharddistributor/canary/module.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,18 @@ package canary
22

33
import (
44
"go.uber.org/fx"
5+
"go.uber.org/yarpc"
56

67
sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1"
78
"github.com/uber/cadence/service/sharddistributor/canary/executors"
89
"github.com/uber/cadence/service/sharddistributor/canary/factory"
10+
"github.com/uber/cadence/service/sharddistributor/canary/handler"
11+
"github.com/uber/cadence/service/sharddistributor/canary/pinger"
912
"github.com/uber/cadence/service/sharddistributor/canary/processor"
1013
"github.com/uber/cadence/service/sharddistributor/canary/processorephemeral"
1114
"github.com/uber/cadence/service/sharddistributor/canary/sharddistributorclient"
1215
"github.com/uber/cadence/service/sharddistributor/client/executorclient"
16+
"github.com/uber/cadence/service/sharddistributor/client/spectatorclient"
1317
)
1418

1519
type NamespacesNames struct {
@@ -49,5 +53,40 @@ func opts(names NamespacesNames) fx.Option {
4953
executors.Module(names.FixedNamespace, names.EphemeralNamespace, names.ExternalAssignmentNamespace),
5054

5155
processorephemeral.ShardCreatorModule([]string{names.EphemeralNamespace}),
56+
57+
spectatorclient.Module(),
58+
fx.Provide(spectatorclient.NewSpectatorPeerChooser),
59+
fx.Invoke(func(chooser spectatorclient.SpectatorPeerChooserInterface, lc fx.Lifecycle) {
60+
lc.Append(fx.StartStopHook(chooser.Start, chooser.Stop))
61+
}),
62+
63+
// Create canary client using the dispatcher's client config
64+
fx.Provide(func(dispatcher *yarpc.Dispatcher) sharddistributorv1.ShardDistributorExecutorCanaryAPIYARPCClient {
65+
config := dispatcher.ClientConfig("shard-distributor-canary")
66+
return sharddistributorv1.NewShardDistributorExecutorCanaryAPIYARPCClient(config)
67+
}),
68+
69+
fx.Provide(func(params pinger.Params) *pinger.Pinger {
70+
return pinger.NewPinger(params, names.FixedNamespace, 32)
71+
}),
72+
fx.Invoke(func(p *pinger.Pinger, lc fx.Lifecycle) {
73+
lc.Append(fx.StartStopHook(p.Start, p.Stop))
74+
}),
75+
76+
// Register canary ping handler to receive ping requests from other executors
77+
fx.Provide(handler.NewPingHandler),
78+
fx.Provide(fx.Annotate(
79+
func(h *handler.PingHandler) sharddistributorv1.ShardDistributorExecutorCanaryAPIYARPCServer {
80+
return h
81+
},
82+
)),
83+
fx.Provide(sharddistributorv1.NewFxShardDistributorExecutorCanaryAPIYARPCProcedures()),
84+
85+
// There is a circular dependency between the spectator client and the peer chooser, since
86+
// the yarpc dispatcher needs the peer chooser and the peer chooser needs the spectators, which needs the yarpc dispatcher.
87+
// To break the circular dependency, we set the spectators on the peer chooser here.
88+
fx.Invoke(func(chooser spectatorclient.SpectatorPeerChooserInterface, spectators *spectatorclient.Spectators) {
89+
chooser.SetSpectators(spectators)
90+
}),
5291
)
5392
}

service/sharddistributor/canary/module_test.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"go.uber.org/fx"
1010
"go.uber.org/fx/fxtest"
1111
"go.uber.org/yarpc"
12+
"go.uber.org/yarpc/api/peer"
1213
"go.uber.org/yarpc/api/transport/transporttest"
1314
"go.uber.org/yarpc/transport/grpc"
1415
"go.uber.org/yarpc/yarpctest"
@@ -23,7 +24,8 @@ func TestModule(t *testing.T) {
2324
// Create mocks
2425
ctrl := gomock.NewController(t)
2526
mockClientConfig := transporttest.NewMockClientConfig(ctrl)
26-
outbound := grpc.NewTransport().NewOutbound(yarpctest.NewFakePeerList())
27+
transport := grpc.NewTransport()
28+
outbound := transport.NewOutbound(yarpctest.NewFakePeerList())
2729

2830
mockClientConfig.EXPECT().Caller().Return("test-executor").Times(2)
2931
mockClientConfig.EXPECT().Service().Return("shard-distributor").Times(2)
@@ -43,15 +45,27 @@ func TestModule(t *testing.T) {
4345
},
4446
}
4547

48+
// Create a mock dispatcher with the required outbound
49+
dispatcher := yarpc.NewDispatcher(yarpc.Config{
50+
Name: "test-canary",
51+
Outbounds: yarpc.Outbounds{
52+
"shard-distributor-canary": {
53+
Unary: outbound,
54+
},
55+
},
56+
})
57+
4658
// Create a test app with the library, check that it starts and stops
4759
fxtest.New(t,
4860
fx.Supply(
4961
fx.Annotate(tally.NoopScope, fx.As(new(tally.Scope))),
5062
fx.Annotate(clock.NewMockedTimeSource(), fx.As(new(clock.TimeSource))),
5163
fx.Annotate(log.NewNoop(), fx.As(new(log.Logger))),
5264
fx.Annotate(mockClientConfigProvider, fx.As(new(yarpc.ClientConfig))),
65+
fx.Annotate(transport, fx.As(new(peer.Transport))),
5366
zaptest.NewLogger(t),
5467
config,
68+
dispatcher,
5569
),
5670
Module(NamespacesNames{FixedNamespace: "shard-distributor-canary", EphemeralNamespace: "shard-distributor-canary-ephemeral", ExternalAssignmentNamespace: "test-external-assignment", SharddistributorServiceName: "cadence-shard-distributor"}),
5771
).RequireStart().RequireStop()
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package pinger
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"go.uber.org/yarpc"
8+
"go.uber.org/zap"
9+
10+
sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1"
11+
"github.com/uber/cadence/service/sharddistributor/client/spectatorclient"
12+
)
13+
14+
const (
15+
pingTimeout = 5 * time.Second
16+
)
17+
18+
func PingShard(ctx context.Context, canaryClient sharddistributorv1.ShardDistributorExecutorCanaryAPIYARPCClient, logger *zap.Logger, namespace, shardKey string) {
19+
request := &sharddistributorv1.PingRequest{
20+
ShardKey: shardKey,
21+
Namespace: namespace,
22+
}
23+
24+
ctx, cancel := context.WithTimeout(ctx, pingTimeout)
25+
defer cancel()
26+
27+
response, err := canaryClient.Ping(ctx, request, yarpc.WithShardKey(shardKey), yarpc.WithHeader(spectatorclient.NamespaceHeader, namespace))
28+
if err != nil {
29+
logger.Error("Failed to ping shard", zap.String("namespace", namespace), zap.String("shard_key", shardKey), zap.Error(err))
30+
return
31+
}
32+
33+
// Verify response
34+
if !response.GetOwnsShard() {
35+
logger.Warn("Executor does not own shard", zap.String("namespace", namespace), zap.String("shard_key", shardKey), zap.String("executor_id", response.GetExecutorId()))
36+
return
37+
}
38+
39+
logger.Info("Successfully pinged shard owner", zap.String("namespace", namespace), zap.String("shard_key", shardKey), zap.String("executor_id", response.GetExecutorId()))
40+
}

service/sharddistributor/canary/pinger/pinger.go

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,18 @@ import (
88
"time"
99

1010
"go.uber.org/fx"
11-
"go.uber.org/yarpc"
1211
"go.uber.org/zap"
1312

1413
sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1"
1514
"github.com/uber/cadence/common/backoff"
1615
"github.com/uber/cadence/common/clock"
17-
"github.com/uber/cadence/service/sharddistributor/client/spectatorclient"
1816
)
1917

2018
//go:generate mockgen -package $GOPACKAGE -destination canary_client_mock.go github.com/uber/cadence/.gen/proto/sharddistributor/v1 ShardDistributorExecutorCanaryAPIYARPCClient
2119

2220
const (
2321
pingInterval = 1 * time.Second
2422
pingJitterCoeff = 0.1 // 10% jitter
25-
pingTimeout = 5 * time.Second
2623
)
2724

2825
// Pinger periodically pings shard owners in the fixed namespace
@@ -96,23 +93,5 @@ func (p *Pinger) pingRandomShard() {
9693
shardNum := rand.Intn(p.numShards)
9794
shardKey := fmt.Sprintf("%d", shardNum)
9895

99-
request := &sharddistributorv1.PingRequest{
100-
ShardKey: shardKey,
101-
Namespace: p.namespace,
102-
}
103-
104-
ctx, cancel := context.WithTimeout(p.ctx, pingTimeout)
105-
defer cancel()
106-
107-
response, err := p.canaryClient.Ping(ctx, request, yarpc.WithShardKey(shardKey), yarpc.WithHeader(spectatorclient.NamespaceHeader, p.namespace))
108-
if err != nil {
109-
p.logger.Error("Failed to ping shard", zap.String("namespace", p.namespace), zap.String("shard_key", shardKey), zap.Error(err))
110-
}
111-
112-
// Verify response
113-
if !response.GetOwnsShard() {
114-
p.logger.Warn("Executor does not own shard", zap.String("namespace", p.namespace), zap.String("shard_key", shardKey), zap.String("executor_id", response.GetExecutorId()))
115-
}
116-
117-
p.logger.Info("Successfully pinged shard owner", zap.String("namespace", p.namespace), zap.String("shard_key", shardKey), zap.String("executor_id", response.GetExecutorId()))
96+
PingShard(p.ctx, p.canaryClient, p.logger, p.namespace, shardKey)
11897
}

0 commit comments

Comments
 (0)