Skip to content

Commit 83bc4d6

Browse files
committed
mpmc: add NBLFQ implementation
Adds the NBLFQ lock-free queue implementation.
1 parent d97296d commit 83bc4d6

File tree

7 files changed

+738
-7
lines changed

7 files changed

+738
-7
lines changed

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ mpmc_large = []
5959
# Enable crossbeam ArrayQueue MPMC implementation.
6060
mpmc_crossbeam = ["dep:crossbeam-utils"]
6161

62+
# Enable NBLFQ MPMC implementation.
63+
mpmc_nblfq = []
64+
6265
# Implement some alloc Vec interoperability
6366
alloc = []
6467

@@ -84,6 +87,7 @@ stable_deref_trait = { version = "1", default-features = false }
8487
[dev-dependencies]
8588
critical-section = { version = "1.1", features = ["std"] }
8689
static_assertions = "1.1.0"
90+
thread-priority = "3.0"
8791

8892
[target.'cfg(loom)'.dependencies]
8993
loom = "0.7.2"

src/c_string.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ mod tests {
382382
let empty = CString::<1>::new();
383383

384384
assert_eq!(empty.as_c_str(), c"");
385-
assert_eq!(empty.as_bytes(), &[]);
385+
assert!(empty.as_bytes().is_empty());
386386
assert_eq!(empty.to_str(), Ok(""));
387387
}
388388

src/history_buf.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -670,7 +670,7 @@ mod tests {
670670
assert!(x.is_full());
671671

672672
let x: HistoryBuf<u8, 4> = HistoryBuf::new();
673-
assert_eq!(x.as_slice(), []);
673+
assert!(x.as_slice().is_empty());
674674
assert!(!x.is_full());
675675
}
676676

@@ -694,7 +694,7 @@ mod tests {
694694
fn clear() {
695695
let mut x: HistoryBuf<u8, 4> = HistoryBuf::new_with(1);
696696
x.clear();
697-
assert_eq!(x.as_slice(), []);
697+
assert!(x.as_slice().is_empty());
698698

699699
let mut x: HistoryBuf<u8, 4> = HistoryBuf::new();
700700
x.clear_with(1);
@@ -782,7 +782,7 @@ mod tests {
782782
fn as_slice() {
783783
let mut x: HistoryBuf<u8, 4> = HistoryBuf::new();
784784

785-
assert_eq!(x.as_slice(), []);
785+
assert!(x.as_slice().is_empty());
786786

787787
x.extend([1, 2, 3, 4, 5].iter());
788788

src/mpmc.rs

Lines changed: 157 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,20 +99,27 @@ type UintSize = usize;
9999
type UintSize = u8;
100100

101101
#[cfg(feature = "mpmc_large")]
102+
#[allow(unused)]
102103
type IntSize = isize;
103104
#[cfg(not(feature = "mpmc_large"))]
105+
#[allow(unused)]
104106
type IntSize = i8;
105107

106-
#[cfg(not(feature = "mpmc_crossbeam"))]
108+
#[cfg(all(not(feature = "mpmc_nblfq"), not(feature = "mpmc_crossbeam")))]
107109
mod original;
108-
#[cfg(not(feature = "mpmc_crossbeam"))]
110+
#[cfg(all(not(feature = "mpmc_nblfq"), not(feature = "mpmc_crossbeam")))]
109111
pub use original::*;
110112

111113
#[cfg(feature = "mpmc_crossbeam")]
112114
mod crossbeam_array_queue;
113115
#[cfg(feature = "mpmc_crossbeam")]
114116
pub use crossbeam_array_queue::*;
115117

118+
#[cfg(all(feature = "mpmc_nblfq", not(feature = "mpmc_crossbeam")))]
119+
mod nblfq;
120+
#[cfg(all(feature = "mpmc_nblfq", not(feature = "mpmc_crossbeam")))]
121+
pub use nblfq::*;
122+
116123
/// A [`Queue`] with dynamic capacity.
117124
///
118125
/// [`Queue`] coerces to `QueueView`. `QueueView` is `!Sized`, meaning it can only ever be used by reference.
@@ -125,6 +132,8 @@ mod tests {
125132

126133
use super::Queue;
127134

135+
const N: usize = 4;
136+
128137
// Ensure a `Queue` containing `!Send` values stays `!Send` itself.
129138
assert_not_impl_any!(Queue<*const (), 4>: Send);
130139

@@ -196,7 +205,153 @@ mod tests {
196205
// Queue is full, this should not block forever.
197206
q.enqueue(0x55).unwrap_err();
198207
}
208+
209+
#[test]
210+
fn test_enqueue_contention_rt() {
211+
use thread_priority::*;
212+
213+
let q0 = std::sync::Arc::new(Queue::<u8, N>::new());
214+
215+
for i in 0..N {
216+
q0.enqueue(i as u8).expect("new enqueue");
217+
}
218+
219+
let model_thread = |q0: std::sync::Arc<Queue<u8, N>>| {
220+
for k in 0..N {
221+
match q0.dequeue() {
222+
Some(_i) => (),
223+
None if q0.is_empty() => (),
224+
None => panic!(
225+
"enqueue: Dequeue failed on iteration: {k}, empty queue?: {}, queue len: {}",
226+
q0.is_empty(),
227+
q0.len()
228+
),
229+
};
230+
231+
q0.enqueue(k as u8).unwrap();
232+
}
233+
};
234+
235+
let q1 = q0.clone();
236+
let h1 = ThreadBuilder::default()
237+
.name("h1")
238+
.priority(ThreadPriority::Max)
239+
.policy(ThreadSchedulePolicy::Realtime(
240+
RealtimeThreadSchedulePolicy::Fifo,
241+
))
242+
.spawn(move |_| model_thread(q1))
243+
.unwrap();
244+
245+
let q2 = q0.clone();
246+
let h2 = ThreadBuilder::default()
247+
.name("h2")
248+
.priority(ThreadPriority::Max)
249+
.policy(ThreadSchedulePolicy::Realtime(
250+
RealtimeThreadSchedulePolicy::Fifo,
251+
))
252+
.spawn(move |_| model_thread(q2))
253+
.unwrap();
254+
255+
h1.join().unwrap();
256+
h2.join().unwrap();
257+
}
258+
259+
#[test]
260+
fn test_dequeue_contention_rt() {
261+
use thread_priority::*;
262+
263+
let q0 = std::sync::Arc::new(Queue::<u8, N>::new());
264+
265+
let model_thread = |q0: std::sync::Arc<Queue<u8, N>>| {
266+
for k in 0..N {
267+
q0.enqueue(k as u8).unwrap();
268+
match q0.dequeue() {
269+
Some(_i) => (),
270+
None if q0.is_empty() => (),
271+
None => {
272+
panic!(
273+
"dequeue: Dequeue failed on iteration: {k}, queue is empty?: {}, queue len: {}",
274+
q0.is_empty(),
275+
q0.len()
276+
);
277+
}
278+
}
279+
}
280+
};
281+
282+
let q1 = q0.clone();
283+
let h1 = ThreadBuilder::default()
284+
.name("h1")
285+
.priority(ThreadPriority::Max)
286+
.policy(ThreadSchedulePolicy::Realtime(
287+
RealtimeThreadSchedulePolicy::Fifo,
288+
))
289+
.spawn(move |_| model_thread(q1))
290+
.unwrap();
291+
292+
let q2 = q0.clone();
293+
let h2 = ThreadBuilder::default()
294+
.name("h2")
295+
.priority(ThreadPriority::Max)
296+
.policy(ThreadSchedulePolicy::Realtime(
297+
RealtimeThreadSchedulePolicy::Fifo,
298+
))
299+
.spawn(move |_| model_thread(q2))
300+
.unwrap();
301+
302+
h1.join().unwrap();
303+
h2.join().unwrap();
304+
}
305+
306+
#[test]
307+
fn test_issue_583_enqueue_rt() {
308+
use thread_priority::*;
309+
310+
fn to_vec<T>(q: &Queue<T, N>) -> Vec<T> {
311+
// inaccurate
312+
let mut ret = vec![];
313+
while let Some(v) = q.dequeue() {
314+
ret.push(v);
315+
}
316+
ret
317+
}
318+
319+
let q0 = std::sync::Arc::new(Queue::<u8, N>::new());
320+
321+
let model_thread = move |q0: std::sync::Arc<Queue<u8, N>>| {
322+
for k in 0..1_000_000 {
323+
if let Some(v) = q0.dequeue() {
324+
q0.enqueue(v)
325+
.unwrap_or_else(|v| panic!("{}: q0 -> q0: {}, {:?}", k, v, to_vec(&q0)));
326+
}
327+
}
328+
};
329+
330+
let q1 = q0.clone();
331+
let h1 = ThreadBuilder::default()
332+
.name("h1")
333+
.priority(ThreadPriority::Max)
334+
.policy(ThreadSchedulePolicy::Realtime(
335+
RealtimeThreadSchedulePolicy::Fifo,
336+
))
337+
.spawn(move |_| model_thread(q1))
338+
.unwrap();
339+
340+
let q2 = q0.clone();
341+
let h2 = ThreadBuilder::default()
342+
.name("h2")
343+
.priority(ThreadPriority::Max)
344+
.policy(ThreadSchedulePolicy::Realtime(
345+
RealtimeThreadSchedulePolicy::Fifo,
346+
))
347+
.spawn(move |_| model_thread(q2))
348+
.unwrap();
349+
350+
h1.join().unwrap();
351+
h2.join().unwrap();
352+
}
199353
}
354+
200355
#[cfg(all(loom, test))]
201356
mod tests_loom {
202357
use super::*;

src/mpmc/crossbeam_array_queue.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -636,7 +636,7 @@ impl<T, const N: usize> Iterator for IntoIter<T, N> {
636636
// Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
637637
lap.wrapping_add(value.one_lap)
638638
};
639-
value.head.store(new, Ordering::AcqRel);
639+
value.head.store(new, Ordering::Release);
640640
Some(val)
641641
} else {
642642
None

0 commit comments

Comments
 (0)