Skip to content

Commit 8a50583

Browse files
committed
add ring buffer implementation
1 parent b07461a commit 8a50583

File tree

4 files changed

+1175
-0
lines changed

4 files changed

+1175
-0
lines changed

internal/telemetry/buffer.go

Lines changed: 281 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,281 @@
1+
package telemetry
2+
3+
import (
4+
"sync"
5+
"sync/atomic"
6+
"time"
7+
)
8+
9+
const defaultCapacity = 100
10+
11+
// Buffer is a thread-safe ring buffer with overflow policies
12+
type Buffer[T any] struct {
13+
mu sync.RWMutex
14+
items []T
15+
head int
16+
tail int
17+
size int
18+
capacity int
19+
20+
category DataCategory
21+
priority Priority
22+
overflowPolicy OverflowPolicy
23+
24+
offered int64
25+
dropped int64
26+
onDropped func(item T, reason string)
27+
}
28+
29+
func NewBuffer[T any](category DataCategory, capacity int, overflowPolicy OverflowPolicy) *Buffer[T] {
30+
if capacity <= 0 {
31+
capacity = defaultCapacity
32+
}
33+
34+
return &Buffer[T]{
35+
items: make([]T, capacity),
36+
capacity: capacity,
37+
category: category,
38+
priority: category.GetPriority(),
39+
overflowPolicy: overflowPolicy,
40+
}
41+
}
42+
43+
func (b *Buffer[T]) SetDroppedCallback(callback func(item T, reason string)) {
44+
b.mu.Lock()
45+
defer b.mu.Unlock()
46+
b.onDropped = callback
47+
}
48+
49+
// Offer adds an item to the buffer, returns false if dropped due to overflow
50+
func (b *Buffer[T]) Offer(item T) bool {
51+
atomic.AddInt64(&b.offered, 1)
52+
53+
b.mu.Lock()
54+
defer b.mu.Unlock()
55+
56+
if b.size < b.capacity {
57+
b.items[b.tail] = item
58+
b.tail = (b.tail + 1) % b.capacity
59+
b.size++
60+
return true
61+
}
62+
63+
switch b.overflowPolicy {
64+
case OverflowPolicyDropOldest:
65+
oldItem := b.items[b.head]
66+
b.items[b.head] = item
67+
b.head = (b.head + 1) % b.capacity
68+
b.tail = (b.tail + 1) % b.capacity
69+
70+
atomic.AddInt64(&b.dropped, 1)
71+
if b.onDropped != nil {
72+
b.onDropped(oldItem, "buffer_full_drop_oldest")
73+
}
74+
return true
75+
76+
case OverflowPolicyDropNewest:
77+
atomic.AddInt64(&b.dropped, 1)
78+
if b.onDropped != nil {
79+
b.onDropped(item, "buffer_full_drop_newest")
80+
}
81+
return false
82+
83+
default:
84+
atomic.AddInt64(&b.dropped, 1)
85+
if b.onDropped != nil {
86+
b.onDropped(item, "unknown_overflow_policy")
87+
}
88+
return false
89+
}
90+
}
91+
92+
// Poll removes and returns the oldest item, false if empty
93+
func (b *Buffer[T]) Poll() (T, bool) {
94+
b.mu.Lock()
95+
defer b.mu.Unlock()
96+
97+
var zero T
98+
if b.size == 0 {
99+
return zero, false
100+
}
101+
102+
item := b.items[b.head]
103+
b.items[b.head] = zero
104+
b.head = (b.head + 1) % b.capacity
105+
b.size--
106+
107+
return item, true
108+
}
109+
110+
// PollBatch removes and returns up to maxItems
111+
func (b *Buffer[T]) PollBatch(maxItems int) []T {
112+
if maxItems <= 0 {
113+
return nil
114+
}
115+
116+
b.mu.Lock()
117+
defer b.mu.Unlock()
118+
119+
if b.size == 0 {
120+
return nil
121+
}
122+
123+
itemCount := maxItems
124+
if itemCount > b.size {
125+
itemCount = b.size
126+
}
127+
128+
result := make([]T, itemCount)
129+
var zero T
130+
131+
for i := 0; i < itemCount; i++ {
132+
result[i] = b.items[b.head]
133+
b.items[b.head] = zero
134+
b.head = (b.head + 1) % b.capacity
135+
b.size--
136+
}
137+
138+
return result
139+
}
140+
141+
// Drain removes and returns all items
142+
func (b *Buffer[T]) Drain() []T {
143+
b.mu.Lock()
144+
defer b.mu.Unlock()
145+
146+
if b.size == 0 {
147+
return nil
148+
}
149+
150+
result := make([]T, b.size)
151+
index := 0
152+
var zero T
153+
154+
for i := 0; i < b.size; i++ {
155+
pos := (b.head + i) % b.capacity
156+
result[index] = b.items[pos]
157+
b.items[pos] = zero
158+
index++
159+
}
160+
161+
b.head = 0
162+
b.tail = 0
163+
b.size = 0
164+
165+
return result
166+
}
167+
168+
// Peek returns the oldest item without removing it, false if empty
169+
func (b *Buffer[T]) Peek() (T, bool) {
170+
b.mu.RLock()
171+
defer b.mu.RUnlock()
172+
173+
var zero T
174+
if b.size == 0 {
175+
return zero, false
176+
}
177+
178+
return b.items[b.head], true
179+
}
180+
181+
func (b *Buffer[T]) Size() int {
182+
b.mu.RLock()
183+
defer b.mu.RUnlock()
184+
return b.size
185+
}
186+
187+
func (b *Buffer[T]) Capacity() int {
188+
return b.capacity
189+
}
190+
191+
func (b *Buffer[T]) Category() DataCategory {
192+
return b.category
193+
}
194+
195+
func (b *Buffer[T]) Priority() Priority {
196+
return b.priority
197+
}
198+
199+
func (b *Buffer[T]) IsEmpty() bool {
200+
b.mu.RLock()
201+
defer b.mu.RUnlock()
202+
return b.size == 0
203+
}
204+
205+
func (b *Buffer[T]) IsFull() bool {
206+
b.mu.RLock()
207+
defer b.mu.RUnlock()
208+
return b.size == b.capacity
209+
}
210+
211+
func (b *Buffer[T]) Utilization() float64 {
212+
b.mu.RLock()
213+
defer b.mu.RUnlock()
214+
return float64(b.size) / float64(b.capacity)
215+
}
216+
217+
func (b *Buffer[T]) OfferedCount() int64 {
218+
return atomic.LoadInt64(&b.offered)
219+
}
220+
221+
func (b *Buffer[T]) DroppedCount() int64 {
222+
return atomic.LoadInt64(&b.dropped)
223+
}
224+
225+
func (b *Buffer[T]) AcceptedCount() int64 {
226+
return b.OfferedCount() - b.DroppedCount()
227+
}
228+
229+
func (b *Buffer[T]) DropRate() float64 {
230+
offered := b.OfferedCount()
231+
if offered == 0 {
232+
return 0.0
233+
}
234+
return float64(b.DroppedCount()) / float64(offered)
235+
}
236+
237+
func (b *Buffer[T]) Clear() {
238+
b.mu.Lock()
239+
defer b.mu.Unlock()
240+
241+
var zero T
242+
for i := 0; i < b.capacity; i++ {
243+
b.items[i] = zero
244+
}
245+
246+
b.head = 0
247+
b.tail = 0
248+
b.size = 0
249+
}
250+
251+
func (b *Buffer[T]) GetMetrics() BufferMetrics {
252+
b.mu.RLock()
253+
size := b.size
254+
b.mu.RUnlock()
255+
256+
return BufferMetrics{
257+
Category: b.category,
258+
Priority: b.priority,
259+
Capacity: b.capacity,
260+
Size: size,
261+
Utilization: b.Utilization(),
262+
OfferedCount: b.OfferedCount(),
263+
DroppedCount: b.DroppedCount(),
264+
AcceptedCount: b.AcceptedCount(),
265+
DropRate: b.DropRate(),
266+
LastUpdated: time.Now(),
267+
}
268+
}
269+
270+
type BufferMetrics struct {
271+
Category DataCategory `json:"category"`
272+
Priority Priority `json:"priority"`
273+
Capacity int `json:"capacity"`
274+
Size int `json:"size"`
275+
Utilization float64 `json:"utilization"`
276+
OfferedCount int64 `json:"offered_count"`
277+
DroppedCount int64 `json:"dropped_count"`
278+
AcceptedCount int64 `json:"accepted_count"`
279+
DropRate float64 `json:"drop_rate"`
280+
LastUpdated time.Time `json:"last_updated"`
281+
}

0 commit comments

Comments
 (0)