Skip to content

Commit 3304cc8

Browse files
johanbrandhorstdomgreen
authored andcommitted
Add context functions to retry and recover (#172)
* retry: add new BackoffFuncContext type BackoffFuncContext adds the context parameter, allowing the retry function access to the contents of the parent context. Fixes #171 * recovery: add new RecoveryHandlerFuncContext type Allows the user to configure a custom recovery handler that has access to the request scoped values in the context. Fixes #168
1 parent 498ae20 commit 3304cc8

File tree

4 files changed

+44
-9
lines changed

4 files changed

+44
-9
lines changed

recovery/interceptors.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,17 @@ import (
1212
// RecoveryHandlerFunc is a function that recovers from the panic `p` by returning an `error`.
1313
type RecoveryHandlerFunc func(p interface{}) (err error)
1414

15+
// RecoveryHandlerFuncContext is a function that recovers from the panic `p` by returning an `error`.
16+
// The context can be used to extract request scoped metadata and context values.
17+
type RecoveryHandlerFuncContext func(ctx context.Context, p interface{}) (err error)
18+
1519
// UnaryServerInterceptor returns a new unary server interceptor for panic recovery.
1620
func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
1721
o := evaluateOptions(opts)
1822
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (_ interface{}, err error) {
1923
defer func() {
2024
if r := recover(); r != nil {
21-
err = recoverFrom(r, o.recoveryHandlerFunc)
25+
err = recoverFrom(ctx, r, o.recoveryHandlerFunc)
2226
}
2327
}()
2428

@@ -32,17 +36,17 @@ func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
3236
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {
3337
defer func() {
3438
if r := recover(); r != nil {
35-
err = recoverFrom(r, o.recoveryHandlerFunc)
39+
err = recoverFrom(stream.Context(), r, o.recoveryHandlerFunc)
3640
}
3741
}()
3842

3943
return handler(srv, stream)
4044
}
4145
}
4246

43-
func recoverFrom(p interface{}, r RecoveryHandlerFunc) error {
47+
func recoverFrom(ctx context.Context, p interface{}, r RecoveryHandlerFuncContext) error {
4448
if r == nil {
4549
return grpc.Errorf(codes.Internal, "%s", p)
4650
}
47-
return r(p)
51+
return r(ctx, p)
4852
}

recovery/options.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,16 @@
33

44
package grpc_recovery
55

6+
import "golang.org/x/net/context"
7+
68
var (
79
defaultOptions = &options{
810
recoveryHandlerFunc: nil,
911
}
1012
)
1113

1214
type options struct {
13-
recoveryHandlerFunc RecoveryHandlerFunc
15+
recoveryHandlerFunc RecoveryHandlerFuncContext
1416
}
1517

1618
func evaluateOptions(opts []Option) *options {
@@ -26,6 +28,15 @@ type Option func(*options)
2628

2729
// WithRecoveryHandler customizes the function for recovering from a panic.
2830
func WithRecoveryHandler(f RecoveryHandlerFunc) Option {
31+
return func(o *options) {
32+
o.recoveryHandlerFunc = RecoveryHandlerFuncContext(func(ctx context.Context, p interface{}) error {
33+
return f(p)
34+
})
35+
}
36+
}
37+
38+
// WithRecoveryHandlerContext customizes the function for recovering from a panic.
39+
func WithRecoveryHandlerContext(f RecoveryHandlerFuncContext) Option {
2940
return func(o *options) {
3041
o.recoveryHandlerFunc = f
3142
}

retry/options.go

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package grpc_retry
66
import (
77
"time"
88

9+
"golang.org/x/net/context"
910
"google.golang.org/grpc"
1011
"google.golang.org/grpc/codes"
1112
)
@@ -22,7 +23,9 @@ var (
2223
perCallTimeout: 0, // disabled
2324
includeHeader: true,
2425
codes: DefaultRetriableCodes,
25-
backoffFunc: BackoffLinearWithJitter(50*time.Millisecond /*jitter*/, 0.10),
26+
backoffFunc: BackoffFuncContext(func(ctx context.Context, attempt uint) time.Duration {
27+
return BackoffLinearWithJitter(50*time.Millisecond /*jitter*/, 0.10)(attempt)
28+
}),
2629
}
2730
)
2831

@@ -34,6 +37,14 @@ var (
3437
// with the next iteration.
3538
type BackoffFunc func(attempt uint) time.Duration
3639

40+
// BackoffFuncContext denotes a family of functions that control the backoff duration between call retries.
41+
//
42+
// They are called with an identifier of the attempt, and should return a time the system client should
43+
// hold off for. If the time returned is longer than the `context.Context.Deadline` of the request
44+
// the deadline of the request takes precedence and the wait will be interrupted before proceeding
45+
// with the next iteration. The context can be used to extract request scoped metadata and context values.
46+
type BackoffFuncContext func(ctx context.Context, attempt uint) time.Duration
47+
3748
// Disable disables the retry behaviour on this call, or this interceptor.
3849
//
3950
// Its semantically the same to `WithMax`
@@ -48,8 +59,17 @@ func WithMax(maxRetries uint) CallOption {
4859
}}
4960
}
5061

51-
// WithBackoff sets the `BackoffFunc `used to control time between retries.
62+
// WithBackoff sets the `BackoffFunc` used to control time between retries.
5263
func WithBackoff(bf BackoffFunc) CallOption {
64+
return CallOption{applyFunc: func(o *options) {
65+
o.backoffFunc = BackoffFuncContext(func(ctx context.Context, attempt uint) time.Duration {
66+
return bf(attempt)
67+
})
68+
}}
69+
}
70+
71+
// WithBackoffContext sets the `BackoffFuncContext` used to control time between retries.
72+
func WithBackoffContext(bf BackoffFuncContext) CallOption {
5373
return CallOption{applyFunc: func(o *options) {
5474
o.backoffFunc = bf
5575
}}
@@ -87,7 +107,7 @@ type options struct {
87107
perCallTimeout time.Duration
88108
includeHeader bool
89109
codes []codes.Code
90-
backoffFunc BackoffFunc
110+
backoffFunc BackoffFuncContext
91111
}
92112

93113
// CallOption is a grpc.CallOption that is local to grpc_retry.

retry/retry.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ func (s *serverStreamingRetryingStream) reestablishStreamAndResendBuffer(callCtx
254254
func waitRetryBackoff(attempt uint, parentCtx context.Context, callOpts *options) error {
255255
var waitTime time.Duration = 0
256256
if attempt > 0 {
257-
waitTime = callOpts.backoffFunc(attempt)
257+
waitTime = callOpts.backoffFunc(parentCtx, attempt)
258258
}
259259
if waitTime > 0 {
260260
logTrace(parentCtx, "grpc_retry attempt: %d, backoff for %v", attempt, waitTime)

0 commit comments

Comments
 (0)