Skip to content

Commit f2e5003

Browse files
committed
[resp for #7] NewOverlappedRingBuffer returns RichOverlappedRingBuffer[T] now
which gives some new alternatives apis of `Enqueue` so that you can collect `overwrites` and `size` from the returning data. > The legacy codes keep its stable with the original `Enqueue`. > For a normal ringbuf, non-overlapped, nothing's changed since the current capacity (size) is not a very important value for measuring. > If not, issue me.
1 parent 9c14e68 commit f2e5003

File tree

5 files changed

+325
-100
lines changed

5 files changed

+325
-100
lines changed

mpmc/i.go

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,36 +10,46 @@ type Queue[T any] interface {
1010
CapReal() uint32
1111
// Size returns the quantity of items in the ring buffer queue
1212
Size() uint32
13-
IsEmpty() (b bool)
14-
IsFull() (b bool)
13+
IsEmpty() (empty bool)
14+
IsFull() (full bool)
1515
Reset()
1616
}
1717

18+
// RichOverlappedRingBuffer supplies a rich-measureable [EnqueueM] api.
19+
type RichOverlappedRingBuffer[T any] interface {
20+
RingBuffer[T]
21+
22+
// EnqueueM returns how many elements were overwritten by the
23+
// new incoming data, generally it would be 1 if overwriting
24+
// happened, or 0 for a normal insertion.
25+
// If error occurred, overwites field is undefined.
26+
EnqueueM(item T) (overwrites uint32, err error)
27+
// EnqueueMRich returns overwritten count of elements, and
28+
// current capacity of container.
29+
// If error occurred, both of these two fields are undefined.
30+
EnqueueMRich(item T) (size, overwrites uint32, err error)
31+
}
32+
1833
// RingBuffer interface provides a set of standard ring buffer operations
1934
type RingBuffer[T any] interface {
2035
Close()
2136

2237
// Queue[T]
2338

24-
Enqueue(item T) (err error)
25-
Dequeue() (item T, err error)
26-
// Cap returns the outer capacity of the ring buffer.
27-
Cap() uint32
28-
// CapReal returns the real (inner) capacity of the ring buffer.
29-
CapReal() uint32
30-
// Size returns the quantity of items in the ring buffer queue
31-
Size() uint32
32-
IsEmpty() (b bool)
33-
IsFull() (b bool)
39+
Enqueue(item T) (err error) // or [Put] as alternative
40+
Dequeue() (item T, err error) // or [Get] as alternative
41+
Cap() uint32 // Cap returns the outer capacity of the ring buffer.
42+
CapReal() uint32 // CapReal returns the real (inner) capacity of the ring buffer.
43+
Size() uint32 // Size returns the quantity of items in the ring buffer queue
44+
IsEmpty() (empty bool)
45+
IsFull() (full bool)
3446
Reset()
3547

3648
Put(item T) (err error)
3749
Get() (item T, err error)
3850

39-
// Quantity returns the quantity of items in the ring buffer queue
40-
Quantity() uint32
41-
42-
Debug(enabled bool) (lastState bool)
51+
Quantity() uint32 // Quantity returns the quantity of items in the ring buffer queue
4352

44-
ResetCounters()
53+
Debug(enabled bool) (lastState bool) // for internal debugging, see [Dbg] interface.
54+
ResetCounters() // for internal debugging, see [Dbg] interface.
4555
}

mpmc/new.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package mpmc
2+
3+
// New returns the RingBuffer object.
4+
//
5+
// It returns [ErrQueueFull] when you're trying to put a new
6+
// element into a full ring buffer.
7+
func New[T any](capacity uint32, opts ...Opt[T]) (ringBuffer RingBuffer[T]) {
8+
return newRingBuffer(func(capacity uint32, opts ...Opt[T]) (ringBuffer RingBuffer[T]) {
9+
size := roundUpToPower2(capacity)
10+
rb := &ringBuf[T]{
11+
data: make([]rbItem[T], size),
12+
cap: size,
13+
capModMask: size - 1, // = 2^n - 1
14+
}
15+
for _, opt := range opts {
16+
opt(rb)
17+
}
18+
ringBuffer = rb
19+
return
20+
}, capacity, opts...)
21+
}
22+
23+
// NewOverlappedRingBuffer make a new instance of the overlapped ring
24+
// buffer, which overwrites its head element if it's full.
25+
//
26+
// When an unexpected state occurred, the returned value could be nil.
27+
//
28+
// In this case, checking it for unavailable value is recommended.
29+
// This could happen if a physical core fault detected in high-freq,
30+
// high-pressure, and high-temperature place.
31+
//
32+
// For the normal runtime environment, unexpected state should be
33+
// impossible, so ignore it is safe.
34+
func NewOverlappedRingBuffer[T any](capacity uint32, opts ...Opt[T]) (ringBuffer RichOverlappedRingBuffer[T]) {
35+
return newOverlappedRingBuffer(func(capacity uint32, opts ...Opt[T]) (ringBuffer RichOverlappedRingBuffer[T]) {
36+
size := roundUpToPower2(capacity)
37+
rb := &orbuf[T]{
38+
ringBuf[T]{
39+
data: make([]rbItem[T], size),
40+
cap: size,
41+
capModMask: size - 1, // = 2^n - 1
42+
},
43+
}
44+
for _, opt := range opts {
45+
opt(&rb.ringBuf)
46+
}
47+
ringBuffer = rb
48+
return
49+
}, capacity, opts...)
50+
}
51+
52+
// Creator _
53+
type Creator[T any] func(capacity uint32, opts ...Opt[T]) (ringBuffer RingBuffer[T])
54+
type OverlappedCreator[T any] func(capacity uint32, opts ...Opt[T]) (ringBuffer RichOverlappedRingBuffer[T])
55+
56+
func newOverlappedRingBuffer[T any](creator OverlappedCreator[T], capacity uint32, opts ...Opt[T]) (ringBuffer RichOverlappedRingBuffer[T]) {
57+
if isInitialized() {
58+
ringBuffer = creator(capacity, opts...)
59+
}
60+
return
61+
}
62+
63+
func newRingBuffer[T any](creator Creator[T], capacity uint32, opts ...Opt[T]) (ringBuffer RingBuffer[T]) {
64+
if isInitialized() {
65+
ringBuffer = creator(capacity, opts...)
66+
}
67+
return
68+
}
69+
70+
// Opt interface the functional options
71+
type Opt[T any] func(rb *ringBuf[T])
72+
73+
// WithItemInitializer provides your custom initializer for each data item.
74+
func WithItemInitializer[T any](initializeable Initializeable[T]) Opt[T] {
75+
return func(buf *ringBuf[T]) {
76+
buf.initializer = initializeable
77+
}
78+
}
79+
80+
// WithDebugMode enables the internal debug mode for more logging output, and collect the metrics for debugging
81+
func WithDebugMode[T any](_ bool) Opt[T] {
82+
return func(_ *ringBuf[T]) {
83+
// buf.debugMode = debug
84+
}
85+
}
86+
87+
// // WithLogger setup a logger
88+
// func WithLogger[T any](logger log.Logger) Opt[T] {
89+
// return func(buf *ringBuf[T]) {
90+
// // buf.logger = logger
91+
// }
92+
// }

mpmc/rb.go

Lines changed: 0 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -9,89 +9,6 @@ import (
99
"github.com/hedzr/go-ringbuf/v2/mpmc/state"
1010
)
1111

12-
// New returns the RingBuffer object.
13-
//
14-
// It returns [ErrQueueFull] when you're trying to put a new
15-
// element into a full ring buffer.
16-
func New[T any](capacity uint32, opts ...Opt[T]) (ringBuffer RingBuffer[T]) {
17-
return newRingBuffer(func(capacity uint32, opts ...Opt[T]) (ringBuffer RingBuffer[T]) {
18-
size := roundUpToPower2(capacity)
19-
rb := &ringBuf[T]{
20-
data: make([]rbItem[T], size),
21-
cap: size,
22-
capModMask: size - 1, // = 2^n - 1
23-
}
24-
for _, opt := range opts {
25-
opt(rb)
26-
}
27-
ringBuffer = rb
28-
return
29-
}, capacity, opts...)
30-
}
31-
32-
// NewOverlappedRingBuffer make a new instance of the overlapped ring
33-
// buffer, which overwrites its head element if it's full.
34-
//
35-
// When an unexpected state occurred, the returned value could be nil.
36-
//
37-
// In this case, checking it for unavailable value is recommended.
38-
// This could happen if a physical core fault detected in high-freq,
39-
// high-pressure, and high-temperature place.
40-
//
41-
// For the normal runtime environment, unexpected state should be
42-
// impossible, so ignore it is safe.
43-
func NewOverlappedRingBuffer[T any](capacity uint32, opts ...Opt[T]) (ringBuffer RingBuffer[T]) {
44-
return newRingBuffer(func(capacity uint32, opts ...Opt[T]) (ringBuffer RingBuffer[T]) {
45-
size := roundUpToPower2(capacity)
46-
rb := &orbuf[T]{
47-
ringBuf[T]{
48-
data: make([]rbItem[T], size),
49-
cap: size,
50-
capModMask: size - 1, // = 2^n - 1
51-
},
52-
}
53-
for _, opt := range opts {
54-
opt(&rb.ringBuf)
55-
}
56-
ringBuffer = rb
57-
return
58-
}, capacity, opts...)
59-
}
60-
61-
// Creator _
62-
type Creator[T any] func(capacity uint32, opts ...Opt[T]) (ringBuffer RingBuffer[T])
63-
64-
func newRingBuffer[T any](creator Creator[T], capacity uint32, opts ...Opt[T]) (ringBuffer RingBuffer[T]) {
65-
if isInitialized() {
66-
ringBuffer = creator(capacity, opts...)
67-
}
68-
return
69-
}
70-
71-
// Opt interface the functional options
72-
type Opt[T any] func(rb *ringBuf[T])
73-
74-
// WithItemInitializer provides your custom initializer for each data item.
75-
func WithItemInitializer[T any](initializeable Initializeable[T]) Opt[T] {
76-
return func(buf *ringBuf[T]) {
77-
buf.initializer = initializeable
78-
}
79-
}
80-
81-
// WithDebugMode enables the internal debug mode for more logging output, and collect the metrics for debugging
82-
func WithDebugMode[T any](_ bool) Opt[T] {
83-
return func(_ *ringBuf[T]) {
84-
// buf.debugMode = debug
85-
}
86-
}
87-
88-
// // WithLogger setup a logger
89-
// func WithLogger[T any](logger log.Logger) Opt[T] {
90-
// return func(buf *ringBuf[T]) {
91-
// // buf.logger = logger
92-
// }
93-
// }
94-
9512
// ringBuf implements a circular buffer. It is a fixed size,
9613
// and new writes will be blocked when queue is full.
9714
type ringBuf[T any] struct {

mpmc/rb2.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,115 @@ type orbuf[T any] struct {
1313

1414
func (rb *orbuf[T]) Put(item T) (err error) { return rb.Enqueue(item) } //nolint:revive
1515

16+
func (rb *orbuf[T]) EnqueueM(item T) (overwrites uint32, err error) { //nolint:revive
17+
var tail, head, nt, nh uint32
18+
var holder *rbItem[T]
19+
for {
20+
head = atomic.LoadUint32(&rb.head)
21+
tail = atomic.LoadUint32(&rb.tail)
22+
nt = (tail + 1) & rb.capModMask
23+
24+
isEmpty := head == tail
25+
if isEmpty && head == MaxUint32 {
26+
err = ErrQueueNotReady
27+
return
28+
}
29+
30+
isFull := nt == head
31+
if isFull {
32+
nh = (head + 1) & rb.capModMask
33+
atomic.CompareAndSwapUint32(&rb.head, head, nh)
34+
overwrites++
35+
}
36+
37+
holder = &rb.data[tail]
38+
atomic.CompareAndSwapUint32(&rb.tail, tail, nt)
39+
retry:
40+
if !atomic.CompareAndSwapUint64(&holder.readWrite, 0, 2) { //nolint:gomnd
41+
if !atomic.CompareAndSwapUint64(&holder.readWrite, 1, 2) { //nolint:gomnd
42+
if atomic.LoadUint64(&holder.readWrite) == 0 {
43+
goto retry // sometimes, short circuit
44+
}
45+
runtime.Gosched() // time to time
46+
continue
47+
}
48+
}
49+
50+
if rb.initializer != nil {
51+
rb.initializer.CloneIn(item, &holder.value)
52+
} else {
53+
holder.value = item
54+
}
55+
if !atomic.CompareAndSwapUint64(&holder.readWrite, 2, 1) { //nolint:gomnd
56+
err = ErrRaced // runtime.Gosched() // never happens
57+
}
58+
59+
if state.VerboseEnabled {
60+
state.Verbose("[W] enqueued",
61+
"tail", tail, "new-tail", nt, "head", head, "value", toString(holder.value),
62+
"value(rb.data[0])", toString(rb.data[0].value),
63+
"value(rb.data[1])", toString(rb.data[1].value))
64+
}
65+
return
66+
}
67+
}
68+
69+
func (rb *orbuf[T]) EnqueueMRich(item T) (size, overwrites uint32, err error) { //nolint:revive
70+
var tail, head, nt, nh uint32
71+
var holder *rbItem[T]
72+
for {
73+
head = atomic.LoadUint32(&rb.head)
74+
tail = atomic.LoadUint32(&rb.tail)
75+
nt = (tail + 1) & rb.capModMask
76+
77+
isEmpty := head == tail
78+
if isEmpty && head == MaxUint32 {
79+
err = ErrQueueNotReady
80+
return
81+
}
82+
83+
isFull := nt == head
84+
if isFull {
85+
nh = (head + 1) & rb.capModMask
86+
atomic.CompareAndSwapUint32(&rb.head, head, nh)
87+
overwrites++
88+
size = rb.qty(head, tail) + 1
89+
}
90+
91+
holder = &rb.data[tail]
92+
atomic.CompareAndSwapUint32(&rb.tail, tail, nt)
93+
retry:
94+
if !atomic.CompareAndSwapUint64(&holder.readWrite, 0, 2) { //nolint:gomnd
95+
if !atomic.CompareAndSwapUint64(&holder.readWrite, 1, 2) { //nolint:gomnd
96+
if atomic.LoadUint64(&holder.readWrite) == 0 {
97+
goto retry // sometimes, short circuit
98+
}
99+
runtime.Gosched() // time to time
100+
continue
101+
}
102+
}
103+
104+
if rb.initializer != nil {
105+
rb.initializer.CloneIn(item, &holder.value)
106+
} else {
107+
holder.value = item
108+
}
109+
if !atomic.CompareAndSwapUint64(&holder.readWrite, 2, 1) { //nolint:gomnd
110+
err = ErrRaced // runtime.Gosched() // never happens
111+
}
112+
113+
if state.VerboseEnabled {
114+
state.Verbose("[W] enqueued",
115+
"tail", tail, "new-tail", nt, "head", head, "value", toString(holder.value),
116+
"value(rb.data[0])", toString(rb.data[0].value),
117+
"value(rb.data[1])", toString(rb.data[1].value))
118+
}
119+
120+
size = rb.qty(head, tail) + 1
121+
return
122+
}
123+
}
124+
16125
func (rb *orbuf[T]) Enqueue(item T) (err error) { //nolint:revive
17126
var tail, head, nt, nh uint32
18127
var holder *rbItem[T]

0 commit comments

Comments
 (0)