Skip to content

Commit 88d5d16

Browse files
committed
refactor(pool): replace min heap with circular queue / buffer
1 parent 9a9ad4a commit 88d5d16

File tree

6 files changed

+626
-155
lines changed

6 files changed

+626
-155
lines changed

clickhouse.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -234,8 +234,8 @@ func (ch *clickhouse) Stats() driver.Stats {
234234
Open: len(ch.open),
235235
MaxOpenConns: cap(ch.open),
236236

237-
Idle: ch.idle.Length(),
238-
MaxIdleConns: ch.idle.Capacity(),
237+
Idle: ch.idle.Len(),
238+
MaxIdleConns: ch.idle.Cap(),
239239
}
240240
}
241241

internal/circular/queue.go

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
package circular
2+
3+
import "iter"
4+
5+
// Queue is a bounded FIFO queue implemented using a circular array.
6+
// It uses head and tail pointers to avoid slice re-allocations.
7+
// When full, new elements are rejected rather than overwriting old ones.
8+
type Queue[T any] struct {
9+
data []T
10+
head int // index of the first element
11+
tail int // index where the next element will be inserted
12+
len int // number of elements in the queue
13+
}
14+
15+
// New creates a new circular queue with the given capacity.
16+
func New[T any](capacity int) *Queue[T] {
17+
return &Queue[T]{data: make([]T, capacity)}
18+
}
19+
20+
// Len returns the number of elements in the queue.
21+
func (q *Queue[T]) Len() int {
22+
return q.len
23+
}
24+
25+
// Cap returns the capacity of the queue.
26+
func (q *Queue[T]) Cap() int {
27+
return len(q.data)
28+
}
29+
30+
// IsFull returns true if the queue is at capacity.
31+
func (q *Queue[T]) IsFull() bool {
32+
return q.len == len(q.data)
33+
}
34+
35+
// IsEmpty returns true if the queue is empty.
36+
func (q *Queue[T]) IsEmpty() bool {
37+
return q.len == 0
38+
}
39+
40+
// Push adds an element to the tail of the queue.
41+
// Returns false if the queue is full.
42+
func (q *Queue[T]) Push(value T) bool {
43+
if q.IsFull() {
44+
return false
45+
}
46+
47+
q.data[q.tail] = value
48+
q.tail = q.next(q.tail)
49+
q.len++
50+
return true
51+
}
52+
53+
// Pull removes and returns an element from the head of the queue.
54+
// Returns the zero value and false if the queue is empty.
55+
func (q *Queue[T]) Pull() (value T, ok bool) {
56+
if q.IsEmpty() {
57+
return
58+
}
59+
60+
value = q.data[q.head]
61+
var zero T
62+
q.data[q.head] = zero
63+
q.head = q.next(q.head)
64+
q.len--
65+
return value, true
66+
}
67+
68+
// All returns an iterator over all elements in the queue in FIFO order.
69+
// The iterator yields (index, value) pairs where index is 0-based from the head.
70+
func (q *Queue[T]) All() iter.Seq2[int, T] {
71+
return func(yield func(int, T) bool) {
72+
if q.IsEmpty() {
73+
return
74+
}
75+
76+
current := q.head
77+
for idx := 0; idx < q.len; idx++ {
78+
if !yield(idx, q.data[current]) {
79+
return
80+
}
81+
current = q.next(current)
82+
}
83+
}
84+
}
85+
86+
// Compact removes elements from the queue based on a predicate function.
87+
// Returns an iterator over the removed elements.
88+
// Elements for which shouldRemove returns true are removed from the queue.
89+
func (q *Queue[T]) Compact(shouldRemove func(T) bool) iter.Seq[T] {
90+
return func(yield func(T) bool) {
91+
if q.IsEmpty() {
92+
return
93+
}
94+
95+
newTail := q.head
96+
current := q.head
97+
stopYielding := false
98+
newLen := 0
99+
100+
for i := 0; i < q.len; i++ {
101+
value := q.data[current]
102+
var zero T
103+
104+
if !shouldRemove(value) {
105+
// Keep this element - move it to newTail if needed
106+
if current != newTail {
107+
q.data[newTail] = value
108+
q.data[current] = zero
109+
}
110+
newTail = q.next(newTail)
111+
current = q.next(current)
112+
newLen++
113+
continue
114+
}
115+
116+
// Remove this element
117+
q.data[current] = zero
118+
current = q.next(current)
119+
120+
// Try to yield the removed value if we haven't stopped
121+
stopYielding = stopYielding || !yield(value)
122+
}
123+
124+
q.tail = newTail
125+
q.len = newLen
126+
}
127+
}
128+
129+
// Clear removes all elements from the queue.
130+
// Returns an iterator over the removed elements.
131+
func (q *Queue[T]) Clear() iter.Seq[T] {
132+
return func(yield func(T) bool) {
133+
if q.IsEmpty() {
134+
return
135+
}
136+
137+
current := q.head
138+
stopYielding := false
139+
140+
for i := 0; i < q.len; i++ {
141+
value := q.data[current]
142+
var zero T
143+
q.data[current] = zero
144+
current = q.next(current)
145+
146+
stopYielding = stopYielding || !yield(value)
147+
}
148+
149+
q.head = 0
150+
q.tail = 0
151+
q.len = 0
152+
}
153+
}
154+
155+
// next returns the next index in the circular queue.
156+
func (q *Queue[T]) next(index int) int {
157+
index++
158+
if index >= len(q.data) {
159+
return 0
160+
}
161+
return index
162+
}
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package circular
2+
3+
import (
4+
"testing"
5+
)
6+
7+
// Benchmarks for the circular queue implementation.
8+
//
9+
// The circular queue maintains constant memory by reusing the same underlying array.
10+
11+
// BenchmarkQueue_MemoryConstancy demonstrates that the circular queue
12+
// maintains constant memory usage as elements are pushed and pulled,
13+
// without reallocating the underlying array.
14+
func BenchmarkQueue_MemoryConstancy(b *testing.B) {
15+
const capacity = 1000
16+
queue := New[int](capacity)
17+
18+
// Fill the queue initially
19+
for i := 0; i < capacity; i++ {
20+
queue.Push(i)
21+
}
22+
23+
b.ResetTimer()
24+
b.ReportAllocs()
25+
26+
for i := 0; i < b.N; i++ {
27+
queue.Pull()
28+
queue.Push(i)
29+
}
30+
}
31+
32+
// BenchmarkQueue_PushPull measures push/pull performance
33+
func BenchmarkQueue_PushPull(b *testing.B) {
34+
sizes := []struct {
35+
name string
36+
size int
37+
}{
38+
{"Size10", 10},
39+
{"Size100", 100},
40+
{"Size1000", 1000},
41+
{"Size10000", 10000},
42+
}
43+
44+
for _, s := range sizes {
45+
b.Run(s.name, func(b *testing.B) {
46+
queue := New[int](s.size)
47+
48+
// Fill queue to half capacity
49+
for i := 0; i < s.size/2; i++ {
50+
queue.Push(i)
51+
}
52+
53+
b.ResetTimer()
54+
b.ReportAllocs()
55+
56+
for i := 0; i < b.N; i++ {
57+
queue.Push(i)
58+
queue.Pull()
59+
}
60+
})
61+
}
62+
}
63+
64+
// BenchmarkQueue_Compact measures compaction performance
65+
func BenchmarkQueue_Compact(b *testing.B) {
66+
sizes := []struct {
67+
name string
68+
size int
69+
}{
70+
{"Size100", 100},
71+
{"Size1000", 1000},
72+
{"Size10000", 10000},
73+
}
74+
75+
for _, s := range sizes {
76+
b.Run(s.name, func(b *testing.B) {
77+
b.ResetTimer()
78+
b.ReportAllocs()
79+
80+
for i := 0; i < b.N; i++ {
81+
b.StopTimer()
82+
queue := New[int](s.size)
83+
for j := 0; j < s.size; j++ {
84+
queue.Push(j)
85+
}
86+
b.StartTimer()
87+
88+
// Remove even numbers
89+
for range queue.Compact(func(val int) bool { return val%2 == 0 }) {
90+
}
91+
}
92+
})
93+
}
94+
}
95+
96+
// BenchmarkQueue_All measures iteration performance
97+
func BenchmarkQueue_All(b *testing.B) {
98+
sizes := []struct {
99+
name string
100+
size int
101+
}{
102+
{"Size100", 100},
103+
{"Size1000", 1000},
104+
{"Size10000", 10000},
105+
}
106+
107+
for _, s := range sizes {
108+
b.Run(s.name, func(b *testing.B) {
109+
queue := New[int](s.size)
110+
for i := 0; i < s.size; i++ {
111+
queue.Push(i)
112+
}
113+
114+
b.ResetTimer()
115+
b.ReportAllocs()
116+
117+
for i := 0; i < b.N; i++ {
118+
for range queue.All() {
119+
}
120+
}
121+
})
122+
}
123+
}

0 commit comments

Comments
 (0)