Skip to content

Commit 3a11e3e

Browse files
committed
use a list.List for the queue of requests
1 parent 6ff8f6d commit 3a11e3e

File tree

3 files changed

+97
-137
lines changed

3 files changed

+97
-137
lines changed

internal/xds/clients/internal/buffer/unbounded.go

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -115,23 +115,3 @@ func (b *Unbounded) Close() {
115115
close(b.c)
116116
}
117117
}
118-
119-
// Reset clears all buffered data in the unbounded buffer. This does not close
120-
// the buffer, and new data may be Put() into it after a call to this method.
121-
//
122-
// It's expected to be used in scenarios where the buffered data is no longer
123-
// relevant, and needs to be cleared.
124-
func (b *Unbounded) Reset() {
125-
b.mu.Lock()
126-
defer b.mu.Unlock()
127-
128-
if b.closing {
129-
return
130-
}
131-
132-
b.backlog = b.backlog[:0]
133-
select {
134-
case <-b.c:
135-
default:
136-
}
137-
}

internal/xds/clients/internal/buffer/unbounded_test.go

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"sort"
2222
"sync"
2323
"testing"
24-
"time"
2524

2625
"github.com/google/go-cmp/cmp"
2726
"google.golang.org/grpc/internal/grpctest"
@@ -147,37 +146,3 @@ func (s) TestClose(t *testing.T) {
147146
}
148147
ub.Close() // ignored
149148
}
150-
151-
// TestReset resets the buffer and makes sure that the buffer can be used after
152-
// the reset.
153-
func (s) TestReset(t *testing.T) {
154-
ub := NewUnbounded()
155-
if err := ub.Put(1); err != nil {
156-
t.Fatalf("Unbounded.Put() = %v; want nil", err)
157-
}
158-
if err := ub.Put(2); err != nil {
159-
t.Fatalf("Unbounded.Put() = %v; want nil", err)
160-
}
161-
if v, ok := <-ub.Get(); !ok {
162-
t.Errorf("Unbounded.Get() = %v, %v, want %v, %v", v, ok, 1, true)
163-
}
164-
ub.Load()
165-
ub.Reset()
166-
167-
// Make sure the buffer is empty after the reset. Wait for a short duration
168-
// to make sure that no value is received on the read channel.
169-
select {
170-
case v, ok := <-ub.Get():
171-
t.Errorf("Unbounded.Get() = %v, %v; want no value", v, ok)
172-
case <-time.After(10 * time.Millisecond):
173-
}
174-
175-
// Make sure the buffer can be used after the reset.
176-
if err := ub.Put(1); err != nil {
177-
t.Fatalf("Unbounded.Put() = %v; want nil", err)
178-
}
179-
if v, ok := <-ub.Get(); !ok {
180-
t.Errorf("Unbounded.Get() = %v, %v, want %v, %v", v, ok, 1, true)
181-
}
182-
ub.Load()
183-
}

internal/xds/clients/xdsclient/ads_stream.go

Lines changed: 97 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package xdsclient
2020

2121
import (
22+
"container/list"
2223
"context"
2324
"fmt"
2425
"sync"
@@ -28,7 +29,6 @@ import (
2829
igrpclog "google.golang.org/grpc/internal/grpclog"
2930
"google.golang.org/grpc/internal/xds/clients"
3031
"google.golang.org/grpc/internal/xds/clients/internal/backoff"
31-
"google.golang.org/grpc/internal/xds/clients/internal/buffer"
3232
"google.golang.org/grpc/internal/xds/clients/internal/pretty"
3333
"google.golang.org/grpc/internal/xds/clients/xdsclient/internal/xdsresource"
3434

@@ -104,7 +104,6 @@ type adsStreamImpl struct {
104104
// The following fields are initialized in the constructor and are not
105105
// written to afterwards, and hence can be accessed without a mutex.
106106
streamCh chan clients.Stream // New ADS streams are pushed here.
107-
requestCh *buffer.Unbounded // Subscriptions and unsubscriptions are pushed here.
108107
runnerDoneCh chan struct{} // Notify completion of runner goroutine.
109108
cancel context.CancelFunc // To cancel the context passed to the runner goroutine.
110109
fc *adsFlowControl // Flow control for ADS stream.
@@ -113,6 +112,8 @@ type adsStreamImpl struct {
113112
mu sync.Mutex
114113
resourceTypeState map[ResourceType]*resourceTypeState // Map of resource types to their state.
115114
firstRequest bool // False after the first request is sent out.
115+
queuedReqs *list.List // Queued requests waiting to be sent.
116+
queuedReqsExist *sync.Cond // Condition variable for waiting on queued requests.
116117
}
117118

118119
// adsStreamOpts contains the options for creating a new ADS Stream.
@@ -137,11 +138,12 @@ func newADSStreamImpl(opts adsStreamOpts) *adsStreamImpl {
137138
watchExpiryTimeout: opts.watchExpiryTimeout,
138139

139140
streamCh: make(chan clients.Stream, 1),
140-
requestCh: buffer.NewUnbounded(),
141141
runnerDoneCh: make(chan struct{}),
142142
fc: newADSFlowControl(),
143143
resourceTypeState: make(map[ResourceType]*resourceTypeState),
144+
queuedReqs: list.New(),
144145
}
146+
s.queuedReqsExist = sync.NewCond(&s.mu)
145147

146148
l := grpclog.Component("xds")
147149
s.logger = igrpclog.NewPrefixLogger(l, opts.logPrefix+fmt.Sprintf("[ads-stream %p] ", s))
@@ -156,7 +158,10 @@ func newADSStreamImpl(opts adsStreamOpts) *adsStreamImpl {
156158
func (s *adsStreamImpl) Stop() {
157159
s.cancel()
158160
s.fc.stop()
159-
s.requestCh.Close()
161+
// Unblock the sender goroutine which might be blocked waiting for queued
162+
// requests to be sent out. It is allowed but not required to hold the lock
163+
// when signalling.
164+
s.queuedReqsExist.Signal()
160165
<-s.runnerDoneCh
161166
s.logger.Infof("Shutdown ADS stream")
162167
}
@@ -185,8 +190,13 @@ func (s *adsStreamImpl) subscribe(typ ResourceType, name string) {
185190
// be started when a request for this resource is actually sent out.
186191
state.subscribedResources[name] = &xdsresource.ResourceWatchState{State: xdsresource.ResourceWatchStateStarted}
187192

188-
// Send a request for the resource type with updated subscriptions.
189-
s.requestCh.Put(request{typ: typ, resourceNames: resourceNames(state.subscribedResources)})
193+
// Queue a request for the resource type with updated subscriptions.
194+
resourceNames := resourceNames(state.subscribedResources)
195+
if s.logger.V(2) {
196+
s.logger.Infof("Queueing a request for resources %q of type %q", resourceNames, typ.TypeName)
197+
}
198+
s.queuedReqs.PushBack(request{typ: typ, resourceNames: resourceNames})
199+
s.queuedReqsExist.Signal()
190200
}
191201

192202
// unsubscribe cancels the subscription to the given resource. It is a no-op if
@@ -215,8 +225,13 @@ func (s *adsStreamImpl) unsubscribe(typ ResourceType, name string) {
215225
}
216226
delete(state.subscribedResources, name)
217227

218-
// Send a request for the resource type with updated subscriptions.
219-
s.requestCh.Put(request{typ: typ, resourceNames: resourceNames(state.subscribedResources)})
228+
// Queue a request for the resource type with updated subscriptions.
229+
resourceNames := resourceNames(state.subscribedResources)
230+
if s.logger.V(2) {
231+
s.logger.Infof("Queueing a request for resources %q of type %q", resourceNames, typ.TypeName)
232+
}
233+
s.queuedReqs.PushBack(request{typ: typ, resourceNames: resourceNames})
234+
s.queuedReqsExist.Signal()
220235
}
221236

222237
// runner is a long-running goroutine that handles the lifecycle of the ADS
@@ -227,8 +242,6 @@ func (s *adsStreamImpl) unsubscribe(typ ResourceType, name string) {
227242
func (s *adsStreamImpl) runner(ctx context.Context) {
228243
defer close(s.runnerDoneCh)
229244

230-
go s.send(ctx)
231-
232245
runStreamWithBackoff := func() error {
233246
stream, err := s.transport.NewStream(ctx, "/envoy.service.discovery.v3.AggregatedDiscoveryService/StreamAggregatedResources")
234247
if err != nil {
@@ -242,93 +255,95 @@ func (s *adsStreamImpl) runner(ctx context.Context) {
242255

243256
s.mu.Lock()
244257
s.firstRequest = true
258+
if err := s.sendExistingLocked(stream); err != nil {
259+
s.logger.Warningf("Failed to send existing resources on newly created stream: %v", err)
260+
s.mu.Unlock()
261+
return nil
262+
}
245263
s.mu.Unlock()
246264

247-
// Ensure that the most recently created stream is pushed on the
248-
// channel for the `send` goroutine to consume.
249-
select {
250-
case <-s.streamCh:
251-
default:
252-
}
253-
s.streamCh <- stream
265+
// Spawn the sending goroutine that runs until the context is done, or
266+
// writing to the stream fails. When the latter happens, the next
267+
// iteration of the loop in the runner goroutine will spawn another
268+
// sending goroutine.
269+
sendDoneCh := make(chan struct{})
270+
recvDoneCh := make(chan struct{})
271+
go func() {
272+
defer close(sendDoneCh)
273+
274+
for ctx.Err() == nil {
275+
// Spawn a goroutine to wait for queued requests to be available
276+
// for sending. This is required to exit the sending goroutine
277+
// blocked on the condition variable when the receiving is done.
278+
waitCh := make(chan struct{})
279+
go func() {
280+
s.mu.Lock()
281+
defer s.mu.Unlock()
282+
283+
// Wait on the condition variable only if there are no
284+
// queued requests. A call to `Signal` will be a no-op if
285+
// there are no blocked goroutines at that point in time.
286+
// There could be newly queued requests that come in after
287+
// we release the lock at the end of the loop, and signal
288+
// before we get here.
289+
if s.queuedReqs.Len() == 0 {
290+
s.queuedReqsExist.Wait()
291+
}
292+
close(waitCh)
293+
}()
294+
295+
select {
296+
case <-waitCh:
297+
// Queued requests are now available, continue with sending.
298+
case <-recvDoneCh:
299+
// Receiving is done. Ensure that the goroutine waiting on
300+
// the condition variable exits.
301+
s.queuedReqsExist.Signal()
302+
<-waitCh
303+
return
304+
}
305+
306+
// Iterate and consume the list of queued requests.
307+
s.mu.Lock()
308+
for s.queuedReqs.Len() > 0 {
309+
elem := s.queuedReqs.Front()
310+
req := elem.Value.(request)
311+
state := s.resourceTypeState[req.typ]
312+
if err := s.sendMessageLocked(stream, req.resourceNames, req.typ.TypeURL, state.version, state.nonce, nil); err != nil {
313+
s.logger.Warningf("Failed to send queued request for resources %q of type %q: %v", req.resourceNames, req.typ.TypeName, err)
314+
s.mu.Unlock()
315+
return
316+
}
317+
s.queuedReqs.Remove(elem)
318+
s.startWatchTimersLocked(req.typ, req.resourceNames)
319+
}
320+
s.mu.Unlock()
321+
}
322+
}()
254323

255324
// Backoff state is reset upon successful receipt of at least one
256325
// message from the server.
326+
err = nil
257327
if s.recv(stream) {
258-
return backoff.ErrResetBackoff
328+
err = backoff.ErrResetBackoff
259329
}
260-
return nil
261-
}
262-
backoff.RunF(ctx, runStreamWithBackoff, s.backoff)
263-
}
264-
265-
// send is a long running goroutine that handles sending discovery requests for
266-
// two scenarios:
267-
// - a new subscription or unsubscription request is received
268-
// - a new stream is created after the previous one failed
269-
func (s *adsStreamImpl) send(ctx context.Context) {
270-
// Stores the most recent stream instance received on streamCh.
271-
var stream clients.Stream
272-
for {
273-
select {
274-
case <-ctx.Done():
275-
return
276-
case stream = <-s.streamCh:
277-
if err := s.sendExisting(stream); err != nil {
278-
// Send failed, clear the current stream. Attempt to resend will
279-
// only be made after a new stream is created.
280-
stream = nil
281-
continue
282-
}
283-
case r, ok := <-s.requestCh.Get():
284-
if !ok {
285-
return
286-
}
287-
s.requestCh.Load()
330+
close(recvDoneCh)
288331

289-
req := r.(request)
290-
if err := s.sendNew(stream, req.typ, req.resourceNames); err != nil {
291-
stream = nil
292-
continue
293-
}
294-
}
295-
}
296-
}
297-
298-
// sendNew attempts to send a discovery request based on a new subscription or
299-
// unsubscription. This method also starts the watch expiry timer for resources
300-
// that were sent in the request for the first time, i.e. their watch state is
301-
// `watchStateStarted`.
302-
func (s *adsStreamImpl) sendNew(stream clients.Stream, typ ResourceType, names []string) error {
303-
s.mu.Lock()
304-
defer s.mu.Unlock()
305-
306-
// If there's no stream yet, skip the request. This request will be resent
307-
// when a new stream is created. If no stream is created, the watcher will
308-
// timeout (same as server not sending response back).
309-
if stream == nil {
310-
return nil
311-
}
312-
313-
state := s.resourceTypeState[typ]
314-
if err := s.sendMessageLocked(stream, names, typ.TypeURL, state.version, state.nonce, nil); err != nil {
332+
<-sendDoneCh
315333
return err
316334
}
317-
s.startWatchTimersLocked(typ, names)
318-
return nil
335+
backoff.RunF(ctx, runStreamWithBackoff, s.backoff)
319336
}
320337

321-
// sendExisting sends out discovery requests for existing resources when
322-
// recovering from a broken stream.
338+
// sendExistingLocked sends out discovery requests for existing resources when
339+
// recovering from a broken stream. The stream argument is guaranteed to be
340+
// non-nil.
323341
//
324-
// The stream argument is guaranteed to be non-nil.
325-
func (s *adsStreamImpl) sendExisting(stream clients.Stream) error {
326-
s.mu.Lock()
327-
defer s.mu.Unlock()
328-
342+
// Caller needs to hold c.mu.
343+
func (s *adsStreamImpl) sendExistingLocked(stream clients.Stream) error {
329344
// Clear any queued requests. Previously subscribed to resources will be
330345
// resent below.
331-
s.requestCh.Reset()
346+
s.queuedReqs.Init()
332347

333348
for typ, state := range s.resourceTypeState {
334349
// Reset only the nonces map when the stream restarts.

0 commit comments

Comments
 (0)