diff --git a/go.mod b/go.mod index 6f8eeac43..049d71852 100644 --- a/go.mod +++ b/go.mod @@ -13,10 +13,10 @@ require ( go.uber.org/atomic v1.4.0 // indirect go.uber.org/multierr v1.1.0 // indirect go.uber.org/zap v1.10.0 - golang.org/x/net v0.0.0-20190311183353-d8887717615a + golang.org/x/net v0.0.0-20190620200207-3b0461eec859 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be - google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 // indirect - google.golang.org/grpc v1.19.0 + golang.org/x/tools v0.0.0-20200131161117-97da75b46c2a // indirect + google.golang.org/grpc v1.27.0 ) go 1.13 diff --git a/go.sum b/go.sum index ff8a579c5..9c7e6e415 100644 --- a/go.sum +++ b/go.sum @@ -1,11 +1,14 @@ cloud.google.com/go v0.26.0 h1:e0WKqKTd5BnrG8aKH3J3h+QvEIQtSUcf2n5UZ5ZgLtQ= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/go-kit/kit v0.9.0 h1:wDJmvq38kDhkVxi50ni9ykkdUr1PKgqKOoi01fa0Mdk= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-logfmt/logfmt v0.4.0 h1:MP4Eh7ZCb31lleYCFuwm0oe4/YGak+5l1vA2NOE80nA= @@ -21,6 +24,7 @@ github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= @@ -33,6 +37,7 @@ github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -47,21 +52,29 @@ go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/ go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be h1:vEDujvNQGv4jgYKudGeI/+DAX4Jffq6hpD55MmoEvKs= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= @@ -69,6 +82,12 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135 h1:5Beo0mZN8dRzgrMMkDp0jc8YXQKx9DiJ2k1dkvGsn5A= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20200131161117-97da75b46c2a h1:CL+F3hTmB3BmfbL1mHgij0CRlggIsLC4cs9jy3h9uh4= +golang.org/x/tools v0.0.0-20200131161117-97da75b46c2a/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.1.0 h1:igQkv0AAhEIvTEpD5LIpAfav2eeVO9HBTjvKHVJPRSs= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0 h1:/wp5JvzpHIxhs/dumFmF7BXTf3Z+dd4uXta4kVyO508= @@ -79,8 +98,14 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2El google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/grpc v1.19.0 h1:cfg4PD8YEdSFnm7qLV4++93WcmhH2nIUhMjhdCvl3j8= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.26.0 h1:2dTRdpdFEEhJYQD8EMLB61nnrzSCTbG38PhqdhvOltg= +google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.27.0 h1:rRYRFMVgRv6E0D70Skyfsr28tDXIuuPZyWGMPdMcnXg= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/retry/examples_test.go b/retry/examples_test.go index cd860bd01..fc75404dd 100644 --- a/retry/examples_test.go +++ b/retry/examples_test.go @@ -90,3 +90,17 @@ func ExampleWithPerRetryTimeout() { fmt.Printf("got pong: %v", pong) } + +// Simple example of using the reconnection configuration. +func Example_initializationWithReconnectionOoption() { + opts := []grpc_retry.CallOption{ + grpc_retry.WithCodes(codes.NotFound, codes.Aborted), + grpc_retry.WithMax(2), + grpc_retry.WithReconnectCodes(codes.Unavailable), + grpc_retry.WithReconnectMax(1), + } + grpc.Dial("myservice.example.com", + grpc.WithStreamInterceptor(grpc_retry.StreamClientInterceptor(opts...)), + grpc.WithUnaryInterceptor(grpc_retry.UnaryClientInterceptor(opts...)), + ) +} diff --git a/retry/options.go b/retry/options.go index 1b3fb1a52..664937110 100644 --- a/retry/options.go +++ b/retry/options.go @@ -17,6 +17,7 @@ var ( // `ResourceExhausted` means that the user quota, e.g. per-RPC limits, have been reached. // `Unavailable` means that system is currently unavailable and the client should retry again. DefaultRetriableCodes = []codes.Code{codes.ResourceExhausted, codes.Unavailable} + DefaultReconnectCodes = []codes.Code{codes.ResourceExhausted, codes.Unavailable, codes.Unknown, codes.DeadlineExceeded} defaultOptions = &options{ max: 0, // disabled @@ -26,6 +27,9 @@ var ( backoffFunc: BackoffFuncContext(func(ctx context.Context, attempt uint) time.Duration { return BackoffLinearWithJitter(50*time.Millisecond /*jitter*/, 0.10)(attempt) }), + + reconnectMax: 0, // ResetConnectBackoff is disabled + reconnectCodes: DefaultReconnectCodes, } ) @@ -49,7 +53,10 @@ type BackoffFuncContext func(ctx context.Context, attempt uint) time.Duration // // Its semantically the same to `WithMax` func Disable() CallOption { - return WithMax(0) + return CallOption{applyFunc: func(o *options) { + o.max = 0 + o.reconnectMax = 0 + }} } // WithMax sets the maximum number of retries on this call, or this interceptor. @@ -59,6 +66,13 @@ func WithMax(maxRetries uint) CallOption { }} } +// WithResetMax sets the maximum number of reset connection on this call, or this interceptor. +func WithReconnectMax(maxRetries uint) CallOption { + return CallOption{applyFunc: func(o *options) { + o.reconnectMax = maxRetries + }} +} + // WithBackoff sets the `BackoffFunc` used to control time between retries. func WithBackoff(bf BackoffFunc) CallOption { return CallOption{applyFunc: func(o *options) { @@ -86,6 +100,19 @@ func WithCodes(retryCodes ...codes.Code) CallOption { }} } +// WithReconnectCodes sets which codes should be use for reset connection. +// +// Please *use with care*, as you may be retrying non-idempotent calls. +// +// Please use it carefully and over net problems only +// +// Default value is *codes.ResourceExhausted, codes.Unavailable, codes.Unknown, codes.DeadlineExceeded* +func WithReconnectCodes(retryCodes ...codes.Code) CallOption { + return CallOption{applyFunc: func(o *options) { + o.reconnectCodes = retryCodes + }} +} + // WithPerRetryTimeout sets the RPC timeout per call (including initial call) on this call, or this interceptor. // // The context.Deadline of the call takes precedence and sets the maximum time the whole invocation @@ -108,6 +135,9 @@ type options struct { includeHeader bool codes []codes.Code backoffFunc BackoffFuncContext + + reconnectMax uint + reconnectCodes []codes.Code } // CallOption is a grpc.CallOption that is local to grpc_retry. diff --git a/retry/retry.go b/retry/retry.go index 9a6b48d8f..484ef6946 100644 --- a/retry/retry.go +++ b/retry/retry.go @@ -24,7 +24,7 @@ const ( // UnaryClientInterceptor returns a new retrying unary client interceptor. // -// The default configuration of the interceptor is to not retry *at all*. This behaviour can be +// The default configuration of the interceptor is to not retry or reset connection *at all*. This behaviour can be // changed through options (e.g. WithMax) on creation of the interceptor or on call (through grpc.CallOptions). func UnaryClientInterceptor(optFuncs ...CallOption) grpc.UnaryClientInterceptor { intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs) @@ -32,35 +32,42 @@ func UnaryClientInterceptor(optFuncs ...CallOption) grpc.UnaryClientInterceptor grpcOpts, retryOpts := filterCallOptions(opts) callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts) // short circuit for simplicity, and avoiding allocations. - if callOpts.max == 0 { + if callOpts.max == 0 && callOpts.reconnectMax == 0 { return invoker(parentCtx, method, req, reply, cc, grpcOpts...) } var lastErr error - for attempt := uint(0); attempt < callOpts.max; attempt++ { - if err := waitRetryBackoff(attempt, parentCtx, callOpts); err != nil { - return err - } - callCtx := perCallContext(parentCtx, callOpts, attempt) - lastErr = invoker(callCtx, method, req, reply, cc, grpcOpts...) - // TODO(mwitkow): Maybe dial and transport errors should be retriable? - if lastErr == nil { - return nil - } - logTrace(parentCtx, "grpc_retry attempt: %d, got err: %v", attempt, lastErr) - if isContextError(lastErr) { - if parentCtx.Err() != nil { - logTrace(parentCtx, "grpc_retry attempt: %d, parent context error: %v", attempt, parentCtx.Err()) - // its the parent context deadline or cancellation. - return lastErr - } else { - logTrace(parentCtx, "grpc_retry attempt: %d, context error from retry call", attempt) - // its the callCtx deadline or cancellation, in which case try again. - continue + for reconnectAttempt := uint(0); reconnectAttempt <= callOpts.reconnectMax; reconnectAttempt++ { + for attempt := uint(0); attempt < callOpts.max || (callOpts.reconnectMax > 0 && attempt < 1); attempt++ { + if err := waitRetryBackoff(attempt, parentCtx, callOpts); err != nil { + return err + } + callCtx := perCallContext(parentCtx, callOpts, attempt) + lastErr = invoker(callCtx, method, req, reply, cc, grpcOpts...) + // TODO(mwitkow): Maybe dial and transport errors should be retriable? + if lastErr == nil { + return nil + } + logTrace(parentCtx, "grpc_retry attempt: %d, got err: %v", attempt, lastErr) + if isContextError(lastErr) { + if parentCtx.Err() != nil { + logTrace(parentCtx, "grpc_retry attempt: %d, parent context error: %v", attempt, parentCtx.Err()) + // its the parent context deadline or cancellation. + break + } else { + logTrace(parentCtx, "grpc_retry attempt: %d, context error from retry call", attempt) + // its the callCtx deadline or cancellation, in which case try again. + continue + } + } + if !isRetriable(lastErr, callOpts.codes) { + break } } - if !isRetriable(lastErr, callOpts) { + if callOpts.reconnectMax == 0 || !isRetriable(lastErr, callOpts.reconnectCodes) { return lastErr } + logTrace(parentCtx, "grpc_retry reset connection: %d, got err: %v", reconnectAttempt, lastErr) + cc.ResetConnectBackoff() } return lastErr } @@ -80,7 +87,7 @@ func StreamClientInterceptor(optFuncs ...CallOption) grpc.StreamClientIntercepto grpcOpts, retryOpts := filterCallOptions(opts) callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts) // short circuit for simplicity, and avoiding allocations. - if callOpts.max == 0 { + if callOpts.max == 0 && callOpts.reconnectMax == 0 { return streamer(parentCtx, desc, cc, method, grpcOpts...) } if desc.ClientStreams { @@ -88,41 +95,47 @@ func StreamClientInterceptor(optFuncs ...CallOption) grpc.StreamClientIntercepto } var lastErr error - for attempt := uint(0); attempt < callOpts.max; attempt++ { - if err := waitRetryBackoff(attempt, parentCtx, callOpts); err != nil { - return nil, err - } - callCtx := perCallContext(parentCtx, callOpts, 0) - - var newStreamer grpc.ClientStream - newStreamer, lastErr = streamer(callCtx, desc, cc, method, grpcOpts...) - if lastErr == nil { - retryingStreamer := &serverStreamingRetryingStream{ - ClientStream: newStreamer, - callOpts: callOpts, - parentCtx: parentCtx, - streamerCall: func(ctx context.Context) (grpc.ClientStream, error) { - return streamer(ctx, desc, cc, method, grpcOpts...) - }, + for reconnectAttempt := uint(0); reconnectAttempt <= callOpts.reconnectMax; reconnectAttempt++ { + for attempt := uint(0); attempt < callOpts.max || (callOpts.reconnectMax > 0 && attempt < 1); attempt++ { + if err := waitRetryBackoff(attempt, parentCtx, callOpts); err != nil { + return nil, err } - return retryingStreamer, nil - } + callCtx := perCallContext(parentCtx, callOpts, 0) - logTrace(parentCtx, "grpc_retry attempt: %d, got err: %v", attempt, lastErr) - if isContextError(lastErr) { - if parentCtx.Err() != nil { - logTrace(parentCtx, "grpc_retry attempt: %d, parent context error: %v", attempt, parentCtx.Err()) - // its the parent context deadline or cancellation. - return nil, lastErr - } else { - logTrace(parentCtx, "grpc_retry attempt: %d, context error from retry call", attempt) - // its the callCtx deadline or cancellation, in which case try again. - continue + var newStreamer grpc.ClientStream + newStreamer, lastErr = streamer(callCtx, desc, cc, method, grpcOpts...) + if lastErr == nil { + retryingStreamer := &serverStreamingRetryingStream{ + ClientStream: newStreamer, + callOpts: callOpts, + parentCtx: parentCtx, + streamerCall: func(ctx context.Context) (grpc.ClientStream, error) { + return streamer(ctx, desc, cc, method, grpcOpts...) + }, + } + return retryingStreamer, nil + } + logTrace(parentCtx, "grpc_retry attempt: %d, got err: %v", attempt, lastErr) + if isContextError(lastErr) { + if parentCtx.Err() != nil { + logTrace(parentCtx, "grpc_retry attempt: %d, parent context error: %v", attempt, parentCtx.Err()) + // its the parent context deadline or cancellation. + return nil, lastErr + } else { + logTrace(parentCtx, "grpc_retry attempt: %d, context error from retry call", attempt) + // its the callCtx deadline or cancellation, in which case try again. + continue + } + } + if !isRetriable(lastErr, callOpts.codes) { + break } } - if !isRetriable(lastErr, callOpts) { + if callOpts.reconnectMax == 0 || !isRetriable(lastErr, callOpts.reconnectCodes) { return nil, lastErr } + logTrace(parentCtx, "grpc_retry reset connection: %d, got err: %v", reconnectAttempt, lastErr) + cc.ResetConnectBackoff() } return nil, lastErr } @@ -226,7 +239,7 @@ func (s *serverStreamingRetryingStream) receiveMsgAndIndicateRetry(m interface{} return true, err } } - return isRetriable(err, s.callOpts), err + return isRetriable(err, s.callOpts.codes), err } @@ -270,13 +283,13 @@ func waitRetryBackoff(attempt uint, parentCtx context.Context, callOpts *options return nil } -func isRetriable(err error, callOpts *options) bool { - errCode := status.Code(err) +func isRetriable(err error, codes []codes.Code) bool { if isContextError(err) { // context errors are not retriable based on user settings. return false } - for _, code := range callOpts.codes { + errCode := status.Code(err) + for _, code := range codes { if code == errCode { return true } diff --git a/retry/retry_test.go b/retry/retry_test.go index b47194c16..c982e2005 100644 --- a/retry/retry_test.go +++ b/retry/retry_test.go @@ -169,6 +169,106 @@ func (s *RetrySuite) TestUnary_OverrideFromDialOpts() { require.EqualValues(s.T(), 5, s.srv.requestCount(), "five requests should have been made") } +func (s *RetrySuite) TestUnary_OverrideFromDialOpts_ResetConnection() { + + // error case 1. This tests 5 attempts connection to server with error without reconnect + s.srv.resetFailingConfiguration(14, codes.Aborted, noSleep) + out, err := s.Client.Ping(s.SimpleCtx(), goodPing, + grpc_retry.WithCodes(codes.Aborted), + grpc_retry.WithMax(5), + ) + require.Error(s.T(), err, "error must occur from the failing service") + require.Nil(s.T(), out, "Pong must be nil") + require.Equal(s.T(), codes.Aborted, status.Code(err), "failure code must come from retrier") + require.EqualValues(s.T(), 5, s.srv.requestCount(), "five requests should have been made") + + // error case 2. This tests 1 re-connection to server + s.srv.resetFailingConfiguration(14, codes.Aborted, noSleep) + out, err = s.Client.Ping(s.SimpleCtx(), goodPing, + grpc_retry.WithCodes(codes.Aborted), + grpc_retry.WithMax(5), + // by default ResourceExhausted, Unavailable, Unknown, DeadlineExceeded + grpc_retry.WithReconnectCodes(codes.Aborted), + grpc_retry.WithReconnectMax(1), + ) + require.Error(s.T(), err, "error must occur from the failing service") + require.Nil(s.T(), out, "Pong must be nil") + require.Equal(s.T(), codes.Aborted, status.Code(err), "failure code must come from retrier") + require.EqualValues(s.T(), 10, s.srv.requestCount(), "ten (5+5) requests should have been made") + + // success case. This tests 2 re-connections to server + s.srv.resetFailingConfiguration(14, codes.Aborted, noSleep) + out, err = s.Client.Ping(s.SimpleCtx(), goodPing, + grpc_retry.WithCodes(codes.Aborted), + grpc_retry.WithMax(5), + // by default ResourceExhausted, Unavailable, Unknown, DeadlineExceeded + grpc_retry.WithReconnectCodes(codes.Aborted), + grpc_retry.WithReconnectMax(2), + ) + require.NoError(s.T(), err, "the 14th invocation should succeed") + require.NotNil(s.T(), out, "Pong must be not nil") + require.EqualValues(s.T(), 14, s.srv.requestCount(), "fourteen (5+5+4) requests should have been made") +} + +func (s *RetrySuite) TestUnary_OverrideFromDialOpts_ResetConnection_Codes() { + // This tests that reconnection codes are used for retries. + + // error case + s.srv.resetFailingConfiguration(7, codes.Unavailable, noSleep) + out, err := s.Client.Ping(s.SimpleCtx(), goodPing, + grpc_retry.WithCodes(codes.DataLoss), + grpc_retry.WithMax(5), + ) + require.Error(s.T(), err, "error must occur from the failing service") + require.Nil(s.T(), out, "Pong must be nil") + require.Equal(s.T(), codes.Unavailable, status.Code(err), "failure code must come from retrier") + require.EqualValues(s.T(), 1, s.srv.requestCount(), "single requests should have been made") + + // success case + s.srv.resetFailingConfiguration(7, codes.Unavailable, noSleep) + out, err = s.Client.Ping(s.SimpleCtx(), goodPing, + grpc_retry.WithMax(5), + grpc_retry.WithCodes(codes.DataLoss, codes.Unavailable), + grpc_retry.WithReconnectMax(2), + // by default ResourceExhausted, Unavailable, Unknown, DeadlineExceeded + grpc_retry.WithReconnectCodes(codes.Unavailable), + ) + require.NoError(s.T(), err, "the 7th invocation should succeed") + require.NotNil(s.T(), out, "Pong must be not nil") + require.EqualValues(s.T(), 7, s.srv.requestCount(), "seven (5+2) requests should have been made") +} + +func (s *RetrySuite) TestUnary_OverrideFromDialOpts_ResetConnection_Codes2() { + // This tests that reconnection codes are used for retries. + // codes.DataLoss doesn't activate reset connection + + s.srv.resetFailingConfiguration(3, codes.DataLoss, noSleep) + out, err := s.Client.Ping(s.SimpleCtx(), goodPing, + grpc_retry.WithMax(5), + grpc_retry.WithCodes(codes.DataLoss), + grpc_retry.WithReconnectMax(2), + // by default ResourceExhausted, Unavailable, Unknown, DeadlineExceeded + grpc_retry.WithReconnectCodes(codes.Unavailable), + ) + require.NoError(s.T(), err, "the 3th invocation should succeed") + require.NotNil(s.T(), out, "Pong must be not nil") + require.EqualValues(s.T(), 3, s.srv.requestCount(), "three requests should have been made") +} + +func (s *RetrySuite) TestUnary_OverrideFromDialOpts_ResetConnection_Attempts() { + // This tests that at least one attempt should be requested for each reset of connection + s.srv.resetFailingConfiguration(2, codes.Unavailable, noSleep) + out, err := s.Client.Ping(s.SimpleCtx(), goodPing, + grpc_retry.WithMax(0), + grpc_retry.WithReconnectMax(1), + // by default ResourceExhausted, Unavailable, Unknown, DeadlineExceeded + grpc_retry.WithReconnectCodes(codes.Unavailable), + ) + require.NoError(s.T(), err, "the second invocation should succeed") + require.NotNil(s.T(), out, "Pong must be not nil") + require.EqualValues(s.T(), 2, s.srv.requestCount(), "two (1+1) requests should have been made") +} + func (s *RetrySuite) TestUnary_PerCallDeadline_Succeeds() { // This tests 5 requests, with first 4 sleeping for 10 millisecond, and the retry logic firing // a retry call with a 5 millisecond deadline. The 5th one doesn't sleep and succeeds.