diff --git a/balancer/rls/balancer_test.go b/balancer/rls/balancer_test.go index fd53b731defd..fa4373753b5d 100644 --- a/balancer/rls/balancer_test.go +++ b/balancer/rls/balancer_test.go @@ -912,7 +912,9 @@ func (s) TestDataCachePurging(t *testing.T) { // TestControlChannelConnectivityStateMonitoring tests the scenario where the // control channel goes down and comes back up again and verifies that backoff -// state is reset for cache entries in this scenario. +// state is reset for cache entries in this scenario. It also verifies that +// backoff is NOT reset when the control channel first becomes READY (i.e., the +// initial CONNECTING → READY transition should not trigger a backoff reset). func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) { // Create a restartable listener which can close existing connections. l, err := testutils.LocalTCPListener() @@ -973,6 +975,16 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) { // Make sure an RLS request is sent out. verifyRLSRequest(t, rlsReqCh, true) + // Verify that the initial READY state of the control channel did NOT trigger + // a backoff reset. The resetBackoffHook should only be called when + // transitioning from TRANSIENT_FAILURE to READY, not for the initial + // CONNECTING → READY transition. + select { + case <-resetBackoffDone: + t.Fatal("Backoff reset was triggered for initial READY state, want no reset") + default: + } + // Stop the RLS server. lis.Stop() diff --git a/balancer/rls/control_channel.go b/balancer/rls/control_channel.go index 60e6a021d133..205534fab0e1 100644 --- a/balancer/rls/control_channel.go +++ b/balancer/rls/control_channel.go @@ -21,6 +21,7 @@ package rls import ( "context" "fmt" + "sync" "time" "google.golang.org/grpc" @@ -29,7 +30,6 @@ import ( "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal" - "google.golang.org/grpc/internal/buffer" internalgrpclog "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/pretty" @@ -57,12 +57,14 @@ type controlChannel struct { // hammering the RLS service while it is overloaded or down. throttler adaptiveThrottler - cc *grpc.ClientConn - client rlsgrpc.RouteLookupServiceClient - logger *internalgrpclog.PrefixLogger - connectivityStateCh *buffer.Unbounded - unsubscribe func() - monitorDoneCh chan struct{} + cc *grpc.ClientConn + client rlsgrpc.RouteLookupServiceClient + logger *internalgrpclog.PrefixLogger + unsubscribe func() + + // All fields below are guarded by mu. + mu sync.Mutex + seenTransientFailure bool } // newControlChannel creates a controlChannel to rlsServerName and uses @@ -70,11 +72,9 @@ type controlChannel struct { // gRPC channel. func newControlChannel(rlsServerName, serviceConfig string, rpcTimeout time.Duration, bOpts balancer.BuildOptions, backToReadyFunc func()) (*controlChannel, error) { ctrlCh := &controlChannel{ - rpcTimeout: rpcTimeout, - backToReadyFunc: backToReadyFunc, - throttler: newAdaptiveThrottler(), - connectivityStateCh: buffer.NewUnbounded(), - monitorDoneCh: make(chan struct{}), + rpcTimeout: rpcTimeout, + backToReadyFunc: backToReadyFunc, + throttler: newAdaptiveThrottler(), } ctrlCh.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-control-channel %p] ", ctrlCh)) @@ -92,7 +92,6 @@ func newControlChannel(rlsServerName, serviceConfig string, rpcTimeout time.Dura ctrlCh.cc.Connect() ctrlCh.client = rlsgrpc.NewRouteLookupServiceClient(ctrlCh.cc) ctrlCh.logger.Infof("Control channel created to RLS server at: %v", rlsServerName) - go ctrlCh.monitorConnectivityState() return ctrlCh, nil } @@ -101,7 +100,40 @@ func (cc *controlChannel) OnMessage(msg any) { if !ok { panic(fmt.Sprintf("Unexpected message type %T , wanted connectectivity.State type", msg)) } - cc.connectivityStateCh.Put(st) + + cc.mu.Lock() + defer cc.mu.Unlock() + + switch st { + case connectivity.Ready: + // Only reset backoff when transitioning from TRANSIENT_FAILURE to READY. + // This indicates the RLS server has recovered from being unreachable, so + // we reset backoff state in all cache entries to allow pending RPCs to + // proceed immediately. We skip benign transitions like READY → IDLE → READY + // since those don't represent actual failures. + if cc.seenTransientFailure { + if cc.logger.V(2) { + cc.logger.Infof("Control channel back to READY after TRANSIENT_FAILURE") + } + cc.seenTransientFailure = false + if cc.backToReadyFunc != nil { + cc.backToReadyFunc() + } + } else { + if cc.logger.V(2) { + cc.logger.Infof("Control channel is READY") + } + } + case connectivity.TransientFailure: + // Track that we've entered TRANSIENT_FAILURE state so we know to reset + // backoffs when we recover to READY. + cc.logger.Warningf("Control channel is TRANSIENT_FAILURE") + cc.seenTransientFailure = true + default: + if cc.logger.V(2) { + cc.logger.Infof("Control channel connectivity state is %s", st) + } + } } // dialOpts constructs the dial options for the control plane channel. @@ -148,68 +180,8 @@ func (cc *controlChannel) dialOpts(bOpts balancer.BuildOptions, serviceConfig st return dopts, nil } -func (cc *controlChannel) monitorConnectivityState() { - cc.logger.Infof("Starting connectivity state monitoring goroutine") - defer close(cc.monitorDoneCh) - - // Since we use two mechanisms to deal with RLS server being down: - // - adaptive throttling for the channel as a whole - // - exponential backoff on a per-request basis - // we need a way to avoid double-penalizing requests by counting failures - // toward both mechanisms when the RLS server is unreachable. - // - // To accomplish this, we monitor the state of the control plane channel. If - // the state has been TRANSIENT_FAILURE since the last time it was in state - // READY, and it then transitions into state READY, we push on a channel - // which is being read by the LB policy. - // - // The LB the policy will iterate through the cache to reset the backoff - // timeouts in all cache entries. Specifically, this means that it will - // reset the backoff state and cancel the pending backoff timer. Note that - // when cancelling the backoff timer, just like when the backoff timer fires - // normally, a new picker is returned to the channel, to force it to - // re-process any wait-for-ready RPCs that may still be queued if we failed - // them while we were in backoff. However, we should optimize this case by - // returning only one new picker, regardless of how many backoff timers are - // cancelled. - - // Wait for the control channel to become READY for the first time. - for s, ok := <-cc.connectivityStateCh.Get(); s != connectivity.Ready; s, ok = <-cc.connectivityStateCh.Get() { - if !ok { - return - } - - cc.connectivityStateCh.Load() - if s == connectivity.Shutdown { - return - } - } - cc.connectivityStateCh.Load() - cc.logger.Infof("Connectivity state is READY") - - for { - s, ok := <-cc.connectivityStateCh.Get() - if !ok { - return - } - cc.connectivityStateCh.Load() - - if s == connectivity.Shutdown { - return - } - if s == connectivity.Ready { - cc.logger.Infof("Control channel back to READY") - cc.backToReadyFunc() - } - - cc.logger.Infof("Connectivity state is %s", s) - } -} - func (cc *controlChannel) close() { cc.unsubscribe() - cc.connectivityStateCh.Close() - <-cc.monitorDoneCh cc.cc.Close() cc.logger.Infof("Shutdown") } diff --git a/balancer/rls/control_channel_test.go b/balancer/rls/control_channel_test.go index 5a30820c3b47..28558951c420 100644 --- a/balancer/rls/control_channel_test.go +++ b/balancer/rls/control_channel_test.go @@ -463,3 +463,4 @@ func (s) TestNewControlChannelUnsupportedCredsBundle(t *testing.T) { t.Fatal("newControlChannel succeeded when expected to fail") } } +