Skip to content

Commit 7f0fc41

Browse files
authored
benchmark/client: add context for cancellation (#8614)
Fixes: #8596 RELEASE NOTES: None
1 parent 0b57abb commit 7f0fc41

File tree

2 files changed

+14
-19
lines changed

2 files changed

+14
-19
lines changed

benchmark/worker/benchmark_client.go

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ func (h *lockingHistogram) mergeInto(merged *stats.Histogram) {
7373

7474
type benchmarkClient struct {
7575
closeConns func()
76-
stop chan bool
7776
lastResetTime time.Time
7877
histogramOptions stats.HistogramOptions
7978
lockingHistograms []lockingHistogram
@@ -168,7 +167,7 @@ func createConns(config *testpb.ClientConfig) ([]*grpc.ClientConn, func(), error
168167
}, nil
169168
}
170169

171-
func performRPCs(config *testpb.ClientConfig, conns []*grpc.ClientConn, bc *benchmarkClient) error {
170+
func performRPCs(ctx context.Context, config *testpb.ClientConfig, conns []*grpc.ClientConn, bc *benchmarkClient) error {
172171
// Read payload size and type from config.
173172
var (
174173
payloadReqSize, payloadRespSize int
@@ -212,17 +211,17 @@ func performRPCs(config *testpb.ClientConfig, conns []*grpc.ClientConn, bc *benc
212211

213212
switch config.RpcType {
214213
case testpb.RpcType_UNARY:
215-
bc.unaryLoop(conns, rpcCountPerConn, payloadReqSize, payloadRespSize, poissonLambda)
214+
bc.unaryLoop(ctx, conns, rpcCountPerConn, payloadReqSize, payloadRespSize, poissonLambda)
216215
case testpb.RpcType_STREAMING:
217-
bc.streamingLoop(conns, rpcCountPerConn, payloadReqSize, payloadRespSize, payloadType, poissonLambda)
216+
bc.streamingLoop(ctx, conns, rpcCountPerConn, payloadReqSize, payloadRespSize, payloadType, poissonLambda)
218217
default:
219218
return status.Errorf(codes.InvalidArgument, "unknown rpc type: %v", config.RpcType)
220219
}
221220

222221
return nil
223222
}
224223

225-
func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error) {
224+
func startBenchmarkClient(ctx context.Context, config *testpb.ClientConfig) (*benchmarkClient, error) {
226225
printClientConfig(config)
227226

228227
// Set running environment like how many cores to use.
@@ -243,13 +242,12 @@ func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error)
243242
},
244243
lockingHistograms: make([]lockingHistogram, rpcCountPerConn*len(conns)),
245244

246-
stop: make(chan bool),
247245
lastResetTime: time.Now(),
248246
closeConns: closeConns,
249247
rusageLastReset: syscall.GetRusage(),
250248
}
251249

252-
if err = performRPCs(config, conns, bc); err != nil {
250+
if err = performRPCs(ctx, config, conns, bc); err != nil {
253251
// Close all connections if performRPCs failed.
254252
closeConns()
255253
return nil, err
@@ -258,7 +256,7 @@ func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error)
258256
return bc, nil
259257
}
260258

261-
func (bc *benchmarkClient) unaryLoop(conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, poissonLambda *float64) {
259+
func (bc *benchmarkClient) unaryLoop(ctx context.Context, conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, poissonLambda *float64) {
262260
for ic, conn := range conns {
263261
client := testgrpc.NewBenchmarkServiceClient(conn)
264262
// For each connection, create rpcCountPerConn goroutines to do rpc.
@@ -274,10 +272,8 @@ func (bc *benchmarkClient) unaryLoop(conns []*grpc.ClientConn, rpcCountPerConn i
274272
// before starting benchmark.
275273
if poissonLambda == nil { // Closed loop.
276274
for {
277-
select {
278-
case <-bc.stop:
279-
return
280-
default:
275+
if ctx.Err() != nil {
276+
break
281277
}
282278
start := time.Now()
283279
if err := benchmark.DoUnaryCall(client, reqSize, respSize); err != nil {
@@ -292,13 +288,12 @@ func (bc *benchmarkClient) unaryLoop(conns []*grpc.ClientConn, rpcCountPerConn i
292288
bc.poissonUnary(client, idx, reqSize, respSize, *poissonLambda)
293289
})
294290
}
295-
296291
}(idx)
297292
}
298293
}
299294
}
300295

301-
func (bc *benchmarkClient) streamingLoop(conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, payloadType string, poissonLambda *float64) {
296+
func (bc *benchmarkClient) streamingLoop(ctx context.Context, conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, payloadType string, poissonLambda *float64) {
302297
var doRPC func(testgrpc.BenchmarkService_StreamingCallClient, int, int) error
303298
if payloadType == "bytebuf" {
304299
doRPC = benchmark.DoByteBufStreamingRoundTrip
@@ -329,10 +324,8 @@ func (bc *benchmarkClient) streamingLoop(conns []*grpc.ClientConn, rpcCountPerCo
329324
}
330325
elapse := time.Since(start)
331326
bc.lockingHistograms[idx].add(int64(elapse))
332-
select {
333-
case <-bc.stop:
327+
if ctx.Err() != nil {
334328
return
335-
default:
336329
}
337330
}
338331
}(idx)
@@ -364,6 +357,7 @@ func (bc *benchmarkClient) poissonUnary(client testgrpc.BenchmarkServiceClient,
364357
func (bc *benchmarkClient) poissonStreaming(stream testgrpc.BenchmarkService_StreamingCallClient, idx int, reqSize int, respSize int, lambda float64, doRPC func(testgrpc.BenchmarkService_StreamingCallClient, int, int) error) {
365358
go func() {
366359
start := time.Now()
360+
367361
if err := doRPC(stream, reqSize, respSize); err != nil {
368362
return
369363
}
@@ -430,6 +424,5 @@ func (bc *benchmarkClient) getStats(reset bool) *testpb.ClientStats {
430424
}
431425

432426
func (bc *benchmarkClient) shutdown() {
433-
close(bc.stop)
434427
bc.closeConns()
435428
}

benchmark/worker/main.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,9 @@ func (s *workerServer) RunServer(stream testgrpc.WorkerService_RunServerServer)
139139

140140
func (s *workerServer) RunClient(stream testgrpc.WorkerService_RunClientServer) error {
141141
var bc *benchmarkClient
142+
ctx, cancel := context.WithCancel(stream.Context())
142143
defer func() {
144+
cancel()
143145
// Shut down benchmark client when stream ends.
144146
logger.Infof("shutting down benchmark client")
145147
if bc != nil {
@@ -163,7 +165,7 @@ func (s *workerServer) RunClient(stream testgrpc.WorkerService_RunClientServer)
163165
logger.Infof("client setup received when client already exists, shutting down the existing client")
164166
bc.shutdown()
165167
}
166-
bc, err = startBenchmarkClient(t.Setup)
168+
bc, err = startBenchmarkClient(ctx, t.Setup)
167169
if err != nil {
168170
return err
169171
}

0 commit comments

Comments
 (0)