Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 58 additions & 11 deletions cmd/sharddistributor-canary/main.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
package main

import (
"fmt"
"net"
"os"
"time"

"github.com/uber-go/tally"
"github.com/urfave/cli/v2"
"go.uber.org/fx"
"go.uber.org/yarpc"
"go.uber.org/yarpc/api/peer"
"go.uber.org/yarpc/transport/grpc"
"go.uber.org/zap"

sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/service/sharddistributor/canary"
"github.com/uber/cadence/service/sharddistributor/canary/executors"
"github.com/uber/cadence/service/sharddistributor/client/clientcommon"
"github.com/uber/cadence/service/sharddistributor/client/executorclient"
"github.com/uber/cadence/service/sharddistributor/client/spectatorclient"
"github.com/uber/cadence/service/sharddistributor/config"
"github.com/uber/cadence/tools/common/commoncli"
)
Expand All @@ -25,6 +31,7 @@ const (
defaultShardDistributorEndpoint = "127.0.0.1:7943"
defaultFixedNamespace = "shard-distributor-canary"
defaultEphemeralNamespace = "shard-distributor-canary-ephemeral"
defaultCanaryGRPCPort = 7953 // Port for canary to receive ping requests

shardDistributorServiceName = "cadence-shard-distributor"
)
Expand All @@ -33,11 +40,12 @@ func runApp(c *cli.Context) {
endpoint := c.String("endpoint")
fixedNamespace := c.String("fixed-namespace")
ephemeralNamespace := c.String("ephemeral-namespace")
canaryGRPCPort := c.Int("canary-grpc-port")

fx.New(opts(fixedNamespace, ephemeralNamespace, endpoint)).Run()
fx.New(opts(fixedNamespace, ephemeralNamespace, endpoint, canaryGRPCPort)).Run()
}

func opts(fixedNamespace, ephemeralNamespace, endpoint string) fx.Option {
func opts(fixedNamespace, ephemeralNamespace, endpoint string, canaryGRPCPort int) fx.Option {
configuration := clientcommon.Config{
Namespaces: []clientcommon.NamespaceConfig{
{Namespace: fixedNamespace, HeartBeatInterval: 1 * time.Second, MigrationMode: config.MigrationModeONBOARDED},
Expand All @@ -49,22 +57,51 @@ func opts(fixedNamespace, ephemeralNamespace, endpoint string) fx.Option {
},
}

canaryGRPCAddress := fmt.Sprintf("127.0.0.1:%d", canaryGRPCPort)

// Create listener for GRPC inbound
listener, err := net.Listen("tcp", canaryGRPCAddress)
if err != nil {
panic(err)
}

transport := grpc.NewTransport()
yarpcConfig := yarpc.Config{
Name: "shard-distributor-canary",
Outbounds: yarpc.Outbounds{
shardDistributorServiceName: {
Unary: transport.NewSingleOutbound(endpoint),
},
},

executorMetadata := executorclient.ExecutorMetadata{
clientcommon.GrpcAddressMetadataKey: canaryGRPCAddress,
}

return fx.Options(
fx.Supply(
fx.Annotate(tally.NoopScope, fx.As(new(tally.Scope))),
fx.Annotate(clock.NewRealTimeSource(), fx.As(new(clock.TimeSource))),
yarpcConfig,
configuration,
transport,
executorMetadata,
),

fx.Provide(func(peerChooser spectatorclient.SpectatorPeerChooserInterface) yarpc.Config {
return yarpc.Config{
Name: "shard-distributor-canary",
Inbounds: yarpc.Inbounds{
transport.NewInbound(listener), // Listen for incoming ping requests
},
Outbounds: yarpc.Outbounds{
shardDistributorServiceName: {
Unary: transport.NewSingleOutbound(endpoint),
Stream: transport.NewSingleOutbound(endpoint),
},
// canary-to-canary outbound uses peer chooser to route to other canary instances
"shard-distributor-canary": {
Unary: transport.NewOutbound(peerChooser),
Stream: transport.NewOutbound(peerChooser),
},
},
}
}),

fx.Provide(
func(t *grpc.Transport) peer.Transport { return t },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have transport in fx.supply, do we need this? it seems redundant

),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have transport in fx.supply, do we need this? it seems redundant

fx.Provide(
yarpc.NewDispatcher,
Expand All @@ -73,12 +110,17 @@ func opts(fixedNamespace, ephemeralNamespace, endpoint string) fx.Option {
fx.Provide(zap.NewDevelopment),
fx.Provide(log.NewLogger),

// Register canary procedures with dispatcher
fx.Invoke(func(dispatcher *yarpc.Dispatcher, server sharddistributorv1.ShardDistributorExecutorCanaryAPIYARPCServer) {
dispatcher.Register(sharddistributorv1.BuildShardDistributorExecutorCanaryAPIYARPCProcedures(server))
}),

// Start the YARPC dispatcher
fx.Invoke(func(lc fx.Lifecycle, dispatcher *yarpc.Dispatcher) {
lc.Append(fx.StartStopHook(dispatcher.Start, dispatcher.Stop))
}),

// Include the canary module
// Include the canary module - it will set up spectator peer choosers and canary client
canary.Module(canary.NamespacesNames{FixedNamespace: fixedNamespace, EphemeralNamespace: ephemeralNamespace, ExternalAssignmentNamespace: executors.ExternalAssignmentNamespace, SharddistributorServiceName: shardDistributorServiceName}),
)
}
Expand Down Expand Up @@ -110,6 +152,11 @@ func buildCLI() *cli.App {
Value: defaultEphemeralNamespace,
Usage: "namespace for ephemeral shard creation testing",
},
&cli.IntFlag{
Name: "canary-grpc-port",
Value: defaultCanaryGRPCPort,
Usage: "port for canary to receive ping requests",
},
},
Action: func(c *cli.Context) error {
runApp(c)
Expand Down
2 changes: 1 addition & 1 deletion cmd/sharddistributor-canary/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ import (
)

func TestDependenciesAreSatisfied(t *testing.T) {
assert.NoError(t, fx.ValidateApp(opts(defaultFixedNamespace, defaultEphemeralNamespace, defaultShardDistributorEndpoint)))
assert.NoError(t, fx.ValidateApp(opts(defaultFixedNamespace, defaultEphemeralNamespace, defaultShardDistributorEndpoint, defaultCanaryGRPCPort)))
}
39 changes: 39 additions & 0 deletions service/sharddistributor/canary/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@ package canary

import (
"go.uber.org/fx"
"go.uber.org/yarpc"

sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1"
"github.com/uber/cadence/service/sharddistributor/canary/executors"
"github.com/uber/cadence/service/sharddistributor/canary/factory"
"github.com/uber/cadence/service/sharddistributor/canary/handler"
"github.com/uber/cadence/service/sharddistributor/canary/pinger"
"github.com/uber/cadence/service/sharddistributor/canary/processor"
"github.com/uber/cadence/service/sharddistributor/canary/processorephemeral"
"github.com/uber/cadence/service/sharddistributor/canary/sharddistributorclient"
"github.com/uber/cadence/service/sharddistributor/client/executorclient"
"github.com/uber/cadence/service/sharddistributor/client/spectatorclient"
)

type NamespacesNames struct {
Expand Down Expand Up @@ -49,5 +53,40 @@ func opts(names NamespacesNames) fx.Option {
executors.Module(names.FixedNamespace, names.EphemeralNamespace, names.ExternalAssignmentNamespace),

processorephemeral.ShardCreatorModule([]string{names.EphemeralNamespace}),

spectatorclient.Module(),
fx.Provide(spectatorclient.NewSpectatorPeerChooser),
fx.Invoke(func(chooser spectatorclient.SpectatorPeerChooserInterface, lc fx.Lifecycle) {
lc.Append(fx.StartStopHook(chooser.Start, chooser.Stop))
}),

// Create canary client using the dispatcher's client config
fx.Provide(func(dispatcher *yarpc.Dispatcher) sharddistributorv1.ShardDistributorExecutorCanaryAPIYARPCClient {
config := dispatcher.ClientConfig("shard-distributor-canary")
return sharddistributorv1.NewShardDistributorExecutorCanaryAPIYARPCClient(config)
}),

fx.Provide(func(params pinger.Params) *pinger.Pinger {
return pinger.NewPinger(params, names.FixedNamespace, 32)
}),
fx.Invoke(func(p *pinger.Pinger, lc fx.Lifecycle) {
lc.Append(fx.StartStopHook(p.Start, p.Stop))
}),

// Register canary ping handler to receive ping requests from other executors
fx.Provide(handler.NewPingHandler),
fx.Provide(fx.Annotate(
func(h *handler.PingHandler) sharddistributorv1.ShardDistributorExecutorCanaryAPIYARPCServer {
return h
},
)),
fx.Provide(sharddistributorv1.NewFxShardDistributorExecutorCanaryAPIYARPCProcedures()),

// There is a circular dependency between the spectator client and the peer chooser, since
// the yarpc dispatcher needs the peer chooser and the peer chooser needs the spectators, which needs the yarpc dispatcher.
// To break the circular dependency, we set the spectators on the peer chooser here.
fx.Invoke(func(chooser spectatorclient.SpectatorPeerChooserInterface, spectators *spectatorclient.Spectators) {
chooser.SetSpectators(spectators)
}),
)
}
16 changes: 15 additions & 1 deletion service/sharddistributor/canary/module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"go.uber.org/fx"
"go.uber.org/fx/fxtest"
"go.uber.org/yarpc"
"go.uber.org/yarpc/api/peer"
"go.uber.org/yarpc/api/transport/transporttest"
"go.uber.org/yarpc/transport/grpc"
"go.uber.org/yarpc/yarpctest"
Expand All @@ -23,7 +24,8 @@ func TestModule(t *testing.T) {
// Create mocks
ctrl := gomock.NewController(t)
mockClientConfig := transporttest.NewMockClientConfig(ctrl)
outbound := grpc.NewTransport().NewOutbound(yarpctest.NewFakePeerList())
transport := grpc.NewTransport()
outbound := transport.NewOutbound(yarpctest.NewFakePeerList())

mockClientConfig.EXPECT().Caller().Return("test-executor").Times(2)
mockClientConfig.EXPECT().Service().Return("shard-distributor").Times(2)
Expand All @@ -43,15 +45,27 @@ func TestModule(t *testing.T) {
},
}

// Create a mock dispatcher with the required outbound
dispatcher := yarpc.NewDispatcher(yarpc.Config{
Name: "test-canary",
Outbounds: yarpc.Outbounds{
"shard-distributor-canary": {
Unary: outbound,
},
},
})

// Create a test app with the library, check that it starts and stops
fxtest.New(t,
fx.Supply(
fx.Annotate(tally.NoopScope, fx.As(new(tally.Scope))),
fx.Annotate(clock.NewMockedTimeSource(), fx.As(new(clock.TimeSource))),
fx.Annotate(log.NewNoop(), fx.As(new(log.Logger))),
fx.Annotate(mockClientConfigProvider, fx.As(new(yarpc.ClientConfig))),
fx.Annotate(transport, fx.As(new(peer.Transport))),
zaptest.NewLogger(t),
config,
dispatcher,
),
Module(NamespacesNames{FixedNamespace: "shard-distributor-canary", EphemeralNamespace: "shard-distributor-canary-ephemeral", ExternalAssignmentNamespace: "test-external-assignment", SharddistributorServiceName: "cadence-shard-distributor"}),
).RequireStart().RequireStop()
Expand Down
40 changes: 40 additions & 0 deletions service/sharddistributor/canary/pinger/pingAndLog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package pinger

import (
"context"
"time"

"go.uber.org/yarpc"
"go.uber.org/zap"

sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1"
"github.com/uber/cadence/service/sharddistributor/client/spectatorclient"
)

const (
pingTimeout = 5 * time.Second
)

func PingShard(ctx context.Context, canaryClient sharddistributorv1.ShardDistributorExecutorCanaryAPIYARPCClient, logger *zap.Logger, namespace, shardKey string) {
request := &sharddistributorv1.PingRequest{
ShardKey: shardKey,
Namespace: namespace,
}

ctx, cancel := context.WithTimeout(ctx, pingTimeout)
defer cancel()

response, err := canaryClient.Ping(ctx, request, yarpc.WithShardKey(shardKey), yarpc.WithHeader(spectatorclient.NamespaceHeader, namespace))
if err != nil {
logger.Error("Failed to ping shard", zap.String("namespace", namespace), zap.String("shard_key", shardKey), zap.Error(err))
return
}

// Verify response
if !response.GetOwnsShard() {
logger.Warn("Executor does not own shard", zap.String("namespace", namespace), zap.String("shard_key", shardKey), zap.String("executor_id", response.GetExecutorId()))
return
}

logger.Info("Successfully pinged shard owner", zap.String("namespace", namespace), zap.String("shard_key", shardKey), zap.String("executor_id", response.GetExecutorId()))
}
23 changes: 1 addition & 22 deletions service/sharddistributor/canary/pinger/pinger.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,18 @@ import (
"time"

"go.uber.org/fx"
"go.uber.org/yarpc"
"go.uber.org/zap"

sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/service/sharddistributor/client/spectatorclient"
)

//go:generate mockgen -package $GOPACKAGE -destination canary_client_mock.go github.com/uber/cadence/.gen/proto/sharddistributor/v1 ShardDistributorExecutorCanaryAPIYARPCClient

const (
pingInterval = 1 * time.Second
pingJitterCoeff = 0.1 // 10% jitter
pingTimeout = 5 * time.Second
)

// Pinger periodically pings shard owners in the fixed namespace
Expand Down Expand Up @@ -96,23 +93,5 @@ func (p *Pinger) pingRandomShard() {
shardNum := rand.Intn(p.numShards)
shardKey := fmt.Sprintf("%d", shardNum)

request := &sharddistributorv1.PingRequest{
ShardKey: shardKey,
Namespace: p.namespace,
}

ctx, cancel := context.WithTimeout(p.ctx, pingTimeout)
defer cancel()

response, err := p.canaryClient.Ping(ctx, request, yarpc.WithShardKey(shardKey), yarpc.WithHeader(spectatorclient.NamespaceHeader, p.namespace))
if err != nil {
p.logger.Error("Failed to ping shard", zap.String("namespace", p.namespace), zap.String("shard_key", shardKey), zap.Error(err))
}

// Verify response
if !response.GetOwnsShard() {
p.logger.Warn("Executor does not own shard", zap.String("namespace", p.namespace), zap.String("shard_key", shardKey), zap.String("executor_id", response.GetExecutorId()))
}

p.logger.Info("Successfully pinged shard owner", zap.String("namespace", p.namespace), zap.String("shard_key", shardKey), zap.String("executor_id", response.GetExecutorId()))
PingShard(p.ctx, p.canaryClient, p.logger, p.namespace, shardKey)
}
Loading