Skip to content

Commit 4110738

Browse files
GODRIVER-3659 Filter CMAP/SDAM events for awaitMinPoolSizeMS
1 parent 9bd07db commit 4110738

File tree

5 files changed

+397
-42
lines changed

5 files changed

+397
-42
lines changed

internal/integration/unified/client_entity.go

Lines changed: 147 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,70 @@ var securitySensitiveCommands = []string{
3838
"createUser", "updateUser", "copydbgetnonce", "copydbsaslstart", "copydb",
3939
}
4040

41+
// eventSequencer allows for sequence-based event filtering for
42+
// awaitMinPoolSizeMS support.
43+
//
44+
// Per the unified test format spec, when awaitMinPoolSizeMS is specified, any
45+
// CMAP and SDAM events that occur during connection pool initialization
46+
// (before minPoolSize is reached) must be ignored. We track this by
47+
// assigning a monotonically increasing sequence number to each event as it's
48+
// recorded. After pool initialization completes, we set eventCutoffSeq to the
49+
// current sequence number. Event accessors for CMAP and SDAM types then
50+
// filter out any events with sequence <= eventCutoffSeq.
51+
//
52+
// Sequencing is thread-safe to support concurrent operations that may generate
53+
// events (e.g., connection checkouts generating CMAP events).
54+
type eventSequencer struct {
55+
counter atomic.Int64
56+
cutoff atomic.Int64
57+
58+
mu sync.RWMutex
59+
60+
// pool events are heterogeneous, so we track their sequence separately
61+
poolSeq []int64
62+
seqByEventType map[monitoringEventType][]int64
63+
}
64+
65+
// setCutoff marks the current sequence as the filtering cutoff point.
66+
func (es *eventSequencer) setCutoff() {
67+
es.cutoff.Store(es.counter.Load())
68+
}
69+
70+
// recordEvent stores the sequence number for a given event type.
71+
func (es *eventSequencer) recordEvent(eventType monitoringEventType) {
72+
next := es.counter.Add(1)
73+
74+
es.mu.Lock()
75+
es.seqByEventType[eventType] = append(es.seqByEventType[eventType], next)
76+
es.mu.Unlock()
77+
}
78+
79+
func (es *eventSequencer) recordPooledEvent() {
80+
next := es.counter.Add(1)
81+
82+
es.mu.Lock()
83+
es.poolSeq = append(es.poolSeq, next)
84+
es.mu.Unlock()
85+
}
86+
87+
// shouldFilter returns true if the event at the given index should be filtered.
88+
func (es *eventSequencer) shouldFilter(eventType monitoringEventType, index int) bool {
89+
cutoff := es.cutoff.Load()
90+
if cutoff == 0 {
91+
return false
92+
}
93+
94+
es.mu.RLock()
95+
defer es.mu.RUnlock()
96+
97+
seqs, ok := es.seqByEventType[eventType]
98+
if !ok || index < 0 || index >= len(seqs) {
99+
return false
100+
}
101+
102+
return seqs[index] <= cutoff
103+
}
104+
41105
// clientEntity is a wrapper for a mongo.Client object that also holds additional information required during test
42106
// execution.
43107
type clientEntity struct {
@@ -72,30 +136,8 @@ type clientEntity struct {
72136

73137
entityMap *EntityMap
74138

75-
logQueue chan orderedLogMessage
76-
}
77-
78-
// awaitMinimumPoolSize waits for the client's connection pool to reach the
79-
// specified minimum size. This is a best effort operation that times out after
80-
// some predefined amount of time to avoid blocking tests indefinitely.
81-
func awaitMinimumPoolSize(ctx context.Context, entity *clientEntity, minPoolSize uint64) error {
82-
// Don't spend longer than 500ms awaiting minPoolSize.
83-
awaitCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
84-
defer cancel()
85-
86-
ticker := time.NewTicker(100 * time.Millisecond)
87-
defer ticker.Stop()
88-
89-
for {
90-
select {
91-
case <-awaitCtx.Done():
92-
return fmt.Errorf("timed out waiting for client to reach minPoolSize")
93-
case <-ticker.C:
94-
if uint64(entity.eventsCount[connectionReadyEvent]) >= minPoolSize {
95-
return nil
96-
}
97-
}
98-
}
139+
logQueue chan orderedLogMessage
140+
eventSequencer eventSequencer
99141
}
100142

101143
func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOptions) (*clientEntity, error) {
@@ -118,6 +160,9 @@ func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOp
118160
serverDescriptionChangedEventsCount: make(map[serverDescriptionChangedEventInfo]int32),
119161
entityMap: em,
120162
observeSensitiveCommands: entityOptions.ObserveSensitiveCommands,
163+
eventSequencer: eventSequencer{
164+
seqByEventType: make(map[monitoringEventType][]int64),
165+
},
121166
}
122167
entity.setRecordEvents(true)
123168

@@ -226,8 +271,9 @@ func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOp
226271
return nil, fmt.Errorf("error creating mongo.Client: %w", err)
227272
}
228273

229-
if entityOptions.AwaitMinPoolSize && clientOpts.MinPoolSize != nil && *clientOpts.MinPoolSize > 0 {
230-
if err := awaitMinimumPoolSize(ctx, entity, *clientOpts.MinPoolSize); err != nil {
274+
if entityOptions.AwaitMinPoolSizeMS != nil && *entityOptions.AwaitMinPoolSizeMS > 0 &&
275+
clientOpts.MinPoolSize != nil && *clientOpts.MinPoolSize > 0 {
276+
if err := awaitMinimumPoolSize(ctx, entity, *clientOpts.MinPoolSize, *entityOptions.AwaitMinPoolSizeMS); err != nil {
231277
return nil, err
232278
}
233279
}
@@ -326,8 +372,47 @@ func (c *clientEntity) failedEvents() []*event.CommandFailedEvent {
326372
return events
327373
}
328374

329-
func (c *clientEntity) poolEvents() []*event.PoolEvent {
330-
return c.pooled
375+
// filterEventsBySeq filters events by sequence number for the given eventType.
376+
// See comments on eventSequencer for more details.
377+
func filterEventsBySeq[T any](c *clientEntity, events []T, eventType monitoringEventType) []T {
378+
cutoff := c.eventSequencer.cutoff.Load()
379+
if cutoff == 0 {
380+
return events
381+
}
382+
383+
// Lock order: eventProcessMu -> eventSequencer.mu (matches writers)
384+
c.eventProcessMu.RLock()
385+
c.eventSequencer.mu.RLock()
386+
387+
// Snapshot to minimize time under locks and avoid races
388+
localEvents := append([]T(nil), events...)
389+
390+
var seqSlice []int64
391+
if eventType == poolAnyEvent {
392+
seqSlice = c.eventSequencer.poolSeq
393+
} else {
394+
seqSlice = c.eventSequencer.seqByEventType[eventType]
395+
}
396+
397+
localSeqs := append([]int64(nil), seqSlice...)
398+
399+
c.eventSequencer.mu.RUnlock()
400+
c.eventProcessMu.RUnlock()
401+
402+
// guard against index out of range.
403+
n := len(localEvents)
404+
if len(localSeqs) < n {
405+
n = len(localSeqs)
406+
}
407+
408+
filtered := make([]T, 0, n)
409+
for i := 0; i < n; i++ {
410+
if localSeqs[i] > cutoff {
411+
filtered = append(filtered, localEvents[i])
412+
}
413+
}
414+
415+
return filtered
331416
}
332417

333418
func (c *clientEntity) numberConnectionsCheckedOut() int32 {
@@ -516,7 +601,10 @@ func (c *clientEntity) processPoolEvent(evt *event.PoolEvent) {
516601

517602
eventType := monitoringEventTypeFromPoolEvent(evt)
518603
if _, ok := c.observedEvents[eventType]; ok {
604+
c.eventProcessMu.Lock()
519605
c.pooled = append(c.pooled, evt)
606+
c.eventSequencer.recordPooledEvent()
607+
c.eventProcessMu.Unlock()
520608
}
521609

522610
c.addEventsCount(eventType)
@@ -539,6 +627,7 @@ func (c *clientEntity) processServerDescriptionChangedEvent(evt *event.ServerDes
539627

540628
if _, ok := c.observedEvents[serverDescriptionChangedEvent]; ok {
541629
c.serverDescriptionChanged = append(c.serverDescriptionChanged, evt)
630+
c.eventSequencer.recordEvent(serverDescriptionChangedEvent)
542631
}
543632

544633
// Record object-specific unified spec test data on an event.
@@ -558,6 +647,7 @@ func (c *clientEntity) processServerHeartbeatFailedEvent(evt *event.ServerHeartb
558647

559648
if _, ok := c.observedEvents[serverHeartbeatFailedEvent]; ok {
560649
c.serverHeartbeatFailedEvent = append(c.serverHeartbeatFailedEvent, evt)
650+
c.eventSequencer.recordEvent(serverHeartbeatFailedEvent)
561651
}
562652

563653
c.addEventsCount(serverHeartbeatFailedEvent)
@@ -573,6 +663,7 @@ func (c *clientEntity) processServerHeartbeatStartedEvent(evt *event.ServerHeart
573663

574664
if _, ok := c.observedEvents[serverHeartbeatStartedEvent]; ok {
575665
c.serverHeartbeatStartedEvent = append(c.serverHeartbeatStartedEvent, evt)
666+
c.eventSequencer.recordEvent(serverHeartbeatStartedEvent)
576667
}
577668

578669
c.addEventsCount(serverHeartbeatStartedEvent)
@@ -588,6 +679,7 @@ func (c *clientEntity) processServerHeartbeatSucceededEvent(evt *event.ServerHea
588679

589680
if _, ok := c.observedEvents[serverHeartbeatSucceededEvent]; ok {
590681
c.serverHeartbeatSucceeded = append(c.serverHeartbeatSucceeded, evt)
682+
c.eventSequencer.recordEvent(serverHeartbeatSucceededEvent)
591683
}
592684

593685
c.addEventsCount(serverHeartbeatSucceededEvent)
@@ -603,6 +695,7 @@ func (c *clientEntity) processTopologyDescriptionChangedEvent(evt *event.Topolog
603695

604696
if _, ok := c.observedEvents[topologyDescriptionChangedEvent]; ok {
605697
c.topologyDescriptionChanged = append(c.topologyDescriptionChanged, evt)
698+
c.eventSequencer.recordEvent(topologyDescriptionChangedEvent)
606699
}
607700

608701
c.addEventsCount(topologyDescriptionChangedEvent)
@@ -618,6 +711,7 @@ func (c *clientEntity) processTopologyOpeningEvent(evt *event.TopologyOpeningEve
618711

619712
if _, ok := c.observedEvents[topologyOpeningEvent]; ok {
620713
c.topologyOpening = append(c.topologyOpening, evt)
714+
c.eventSequencer.recordEvent(topologyOpeningEvent)
621715
}
622716

623717
c.addEventsCount(topologyOpeningEvent)
@@ -633,6 +727,7 @@ func (c *clientEntity) processTopologyClosedEvent(evt *event.TopologyClosedEvent
633727

634728
if _, ok := c.observedEvents[topologyClosedEvent]; ok {
635729
c.topologyClosed = append(c.topologyClosed, evt)
730+
c.eventSequencer.recordEvent(topologyClosedEvent)
636731
}
637732

638733
c.addEventsCount(topologyClosedEvent)
@@ -724,3 +819,27 @@ func evaluateUseMultipleMongoses(clientOpts *options.ClientOptions, useMultipleM
724819
}
725820
return nil
726821
}
822+
823+
// awaitMinimumPoolSize waits for the client's connection pool to reach the
824+
// specified minimum size, then clears all CMAP and SDAM events that occurred
825+
// during pool initialization.
826+
func awaitMinimumPoolSize(ctx context.Context, entity *clientEntity, minPoolSize uint64, timeoutMS int) error {
827+
awaitCtx, cancel := context.WithTimeout(ctx, time.Duration(timeoutMS)*time.Millisecond)
828+
defer cancel()
829+
830+
ticker := time.NewTicker(100 * time.Millisecond)
831+
defer ticker.Stop()
832+
833+
for {
834+
select {
835+
case <-awaitCtx.Done():
836+
return fmt.Errorf("timed out waiting for client to reach minPoolSize")
837+
case <-ticker.C:
838+
if uint64(entity.getEventCount(connectionReadyEvent)) >= minPoolSize {
839+
entity.eventSequencer.setCutoff()
840+
841+
return nil
842+
}
843+
}
844+
}
845+
}

0 commit comments

Comments
 (0)