Skip to content

Commit 2ccb148

Browse files
committed
fix race
1 parent 8a50583 commit 2ccb148

File tree

2 files changed

+11
-9
lines changed

2 files changed

+11
-9
lines changed

internal/telemetry/buffer.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88

99
const defaultCapacity = 100
1010

11-
// Buffer is a thread-safe ring buffer with overflow policies
11+
// Buffer is a thread-safe ring buffer with overflow policies.
1212
type Buffer[T any] struct {
1313
mu sync.RWMutex
1414
items []T
@@ -46,7 +46,7 @@ func (b *Buffer[T]) SetDroppedCallback(callback func(item T, reason string)) {
4646
b.onDropped = callback
4747
}
4848

49-
// Offer adds an item to the buffer, returns false if dropped due to overflow
49+
// Offer adds an item to the buffer, returns false if dropped due to overflow.
5050
func (b *Buffer[T]) Offer(item T) bool {
5151
atomic.AddInt64(&b.offered, 1)
5252

@@ -89,7 +89,7 @@ func (b *Buffer[T]) Offer(item T) bool {
8989
}
9090
}
9191

92-
// Poll removes and returns the oldest item, false if empty
92+
// Poll removes and returns the oldest item, false if empty.
9393
func (b *Buffer[T]) Poll() (T, bool) {
9494
b.mu.Lock()
9595
defer b.mu.Unlock()
@@ -251,14 +251,15 @@ func (b *Buffer[T]) Clear() {
251251
func (b *Buffer[T]) GetMetrics() BufferMetrics {
252252
b.mu.RLock()
253253
size := b.size
254+
util := float64(b.size) / float64(b.capacity)
254255
b.mu.RUnlock()
255256

256257
return BufferMetrics{
257258
Category: b.category,
258259
Priority: b.priority,
259260
Capacity: b.capacity,
260261
Size: size,
261-
Utilization: b.Utilization(),
262+
Utilization: util,
262263
OfferedCount: b.OfferedCount(),
263264
DroppedCount: b.DroppedCount(),
264265
AcceptedCount: b.AcceptedCount(),

internal/telemetry/buffer_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package telemetry
22

33
import (
44
"sync"
5+
"sync/atomic"
56
"testing"
67
"time"
78
)
@@ -348,7 +349,7 @@ func TestBufferStressTest(t *testing.T) {
348349

349350
// Start consumers
350351
wg.Add(numConsumers)
351-
consumedCount := int64(0)
352+
var consumedCount int64
352353
for i := 0; i < numConsumers; i++ {
353354
go func() {
354355
defer wg.Done()
@@ -361,13 +362,13 @@ func TestBufferStressTest(t *testing.T) {
361362
if !ok {
362363
break
363364
}
364-
consumedCount++
365+
atomic.AddInt64(&consumedCount, 1)
365366
}
366367
return
367368
default:
368369
_, ok := buffer.Poll()
369370
if ok {
370-
consumedCount++
371+
atomic.AddInt64(&consumedCount, 1)
371372
}
372373
}
373374
}
@@ -380,13 +381,13 @@ func TestBufferStressTest(t *testing.T) {
380381
wg.Wait()
381382

382383
t.Logf("Stress test results: offered=%d, dropped=%d, consumed=%d",
383-
buffer.OfferedCount(), buffer.DroppedCount(), consumedCount)
384+
buffer.OfferedCount(), buffer.DroppedCount(), atomic.LoadInt64(&consumedCount))
384385

385386
// Basic sanity checks
386387
if buffer.OfferedCount() <= 0 {
387388
t.Error("Expected some items to be offered")
388389
}
389-
if consumedCount <= 0 {
390+
if atomic.LoadInt64(&consumedCount) <= 0 {
390391
t.Error("Expected some items to be consumed")
391392
}
392393
}

0 commit comments

Comments
 (0)