Skip to content

Commit a9ecc6f

Browse files
committed
mpmc: Document #583 and add corresponding loom test
This patch documents #583 adds failing loom tests to exercise #583 hoping that a future algorithm change can make them pass. The tests are ran in CI but their results is ignored for now as they fail. Ideally the `loom` tests will cover more usage of `mpmc::Queue` in the future and also cover `spsc`.
1 parent e7fb25c commit a9ecc6f

File tree

5 files changed

+140
-1
lines changed

5 files changed

+140
-1
lines changed

.github/workflows/build.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ jobs:
8686
- name: Run cargo test
8787
run: cargo test --features="alloc,defmt,mpmc_large,portable-atomic-critical-section,serde,ufmt,bytes,zeroize,embedded-io-v0.7"
8888

89+
- name: Run loom tests
90+
run: cargo test -- loom
91+
env:
92+
RUSTFLAGS: '--cfg loom'
8993
# Run cargo fmt --check
9094
style:
9195
name: style

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
1414
- Implement `TryFrom` for `Deque` from array.
1515
- Switch from `serde` to `serde_core` for enabling faster compilations.
1616
- Implement `Zeroize` trait for all data structures with the `zeroize` feature to securely clear sensitive data from memory.
17+
- `mpmc::Queue`: document non-lock free behaviour, and add loom tests
1718

1819
## [v0.9.1] - 2025-08-19
1920

Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@ stable_deref_trait = { version = "1", default-features = false }
8080
critical-section = { version = "1.1", features = ["std"] }
8181
static_assertions = "1.1.0"
8282

83+
[target.'cfg(loom)'.dependencies]
84+
loom = "0.7.2"
85+
8386
[package.metadata.docs.rs]
8487
features = [
8588
"bytes",
@@ -93,3 +96,6 @@ features = [
9396
# for the pool module
9497
targets = ["i686-unknown-linux-gnu"]
9598
rustdoc-args = ["--cfg", "docsrs"]
99+
100+
[lints.rust]
101+
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(loom)'] }

src/mpmc.rs

Lines changed: 128 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,16 @@
6161
//! - The numbers reported correspond to the successful path, i.e. `dequeue` returning `Some`
6262
//! and `enqueue` returning `Ok`.
6363
//!
64+
//!
65+
//! <div class="warning">
66+
//!
67+
//! This implementation is not fully lock-free. If a thread or task gets preempted during
68+
//! a `dequeue` or `enqueue` operation, it may prevent other operations from succeeding until
69+
//! it's scheduled again to finish its operation.
70+
//!
71+
//! See <https://github.com/rust-embedded/heapless/issues/583> for more details.
72+
//!
73+
//! </div>
6474
//! # References
6575
//!
6676
//! This is an implementation of Dmitry Vyukov's [bounded MPMC queue], minus the
@@ -70,7 +80,13 @@
7080
7181
use core::{cell::UnsafeCell, mem::MaybeUninit};
7282

73-
#[cfg(not(feature = "portable-atomic"))]
83+
#[cfg(loom)]
84+
use loom::sync::atomic;
85+
86+
#[cfg(all(loom, feature = "portable-atomic"))]
87+
compile_error!("Loom can only be used in tests without portable-atomic");
88+
89+
#[cfg(not(any(feature = "portable-atomic", loom)))]
7490
use core::sync::atomic;
7591
#[cfg(feature = "portable-atomic")]
7692
use portable_atomic as atomic;
@@ -113,6 +129,16 @@ pub struct QueueInner<T, S: Storage> {
113129
/// </div>
114130
///
115131
/// The maximum value of `N` is 128 if the `mpmc_large` feature is not enabled.
132+
///
133+
/// <div class="warning">
134+
///
135+
/// This implementation is not fully lock-free. If a thread or task gets preempted during
136+
/// a `dequeue` or `enqueue` operation, it may prevent other operations from succeeding until
137+
/// it's scheduled again to finish its operation.
138+
///
139+
/// See <https://github.com/rust-embedded/heapless/issues/583> for more details.
140+
///
141+
/// </div>
116142
pub type Queue<T, const N: usize> = QueueInner<T, OwnedStorage<N>>;
117143

118144
/// A [`Queue`] with dynamic capacity.
@@ -121,6 +147,7 @@ pub type Queue<T, const N: usize> = QueueInner<T, OwnedStorage<N>>;
121147
pub type QueueView<T> = QueueInner<T, ViewStorage>;
122148

123149
impl<T, const N: usize> Queue<T, N> {
150+
#[cfg(not(loom))]
124151
/// Creates an empty queue.
125152
pub const fn new() -> Self {
126153
const {
@@ -144,6 +171,26 @@ impl<T, const N: usize> Queue<T, N> {
144171
}
145172
}
146173

174+
/// Creates an empty queue.
175+
#[cfg(loom)]
176+
pub fn new() -> Self {
177+
use core::array;
178+
179+
const {
180+
assert!(N > 1);
181+
assert!(N.is_power_of_two());
182+
assert!(N < UintSize::MAX as usize);
183+
}
184+
185+
let result_cells: [Cell<T>; N] = array::from_fn(|idx| Cell::new(idx));
186+
187+
Self {
188+
buffer: UnsafeCell::new(result_cells),
189+
dequeue_pos: AtomicTargetSize::new(0),
190+
enqueue_pos: AtomicTargetSize::new(0),
191+
}
192+
}
193+
147194
/// Used in `Storage` implementation.
148195
pub(crate) fn as_view_private(&self) -> &QueueView<T> {
149196
self
@@ -247,12 +294,20 @@ struct Cell<T> {
247294
}
248295

249296
impl<T> Cell<T> {
297+
#[cfg(not(loom))]
250298
const fn new(seq: usize) -> Self {
251299
Self {
252300
data: MaybeUninit::uninit(),
253301
sequence: AtomicTargetSize::new(seq as UintSize),
254302
}
255303
}
304+
#[cfg(loom)]
305+
fn new(seq: usize) -> Self {
306+
Self {
307+
data: MaybeUninit::uninit(),
308+
sequence: AtomicTargetSize::new(seq as UintSize),
309+
}
310+
}
256311
}
257312

258313
unsafe fn dequeue<T>(
@@ -286,6 +341,8 @@ unsafe fn dequeue<T>(
286341
return None;
287342
}
288343
core::cmp::Ordering::Greater => {
344+
#[cfg(loom)]
345+
loom::hint::spin_loop();
289346
pos = dequeue_pos.load(Ordering::Relaxed);
290347
}
291348
}
@@ -330,6 +387,8 @@ unsafe fn enqueue<T>(
330387
return Err(item);
331388
}
332389
core::cmp::Ordering::Greater => {
390+
#[cfg(loom)]
391+
loom::hint::spin_loop();
333392
pos = enqueue_pos.load(Ordering::Relaxed);
334393
}
335394
}
@@ -342,6 +401,7 @@ unsafe fn enqueue<T>(
342401
Ok(())
343402
}
344403

404+
#[cfg(not(loom))]
345405
#[cfg(test)]
346406
mod tests {
347407
use static_assertions::assert_not_impl_any;
@@ -420,3 +480,70 @@ mod tests {
420480
q.enqueue(0x55).unwrap_err();
421481
}
422482
}
483+
#[cfg(all(loom, test))]
484+
mod tests_loom {
485+
use super::*;
486+
use std::sync::Arc;
487+
const N: usize = 4;
488+
489+
// FIXME: This test should pass. See https://github.com/rust-embedded/heapless/issues/583
490+
#[test]
491+
#[cfg(loom)]
492+
#[should_panic(expected = "Test failed")]
493+
fn loom_issue_583_enqueue() {
494+
loom::model(|| {
495+
let q0 = Arc::new(Queue::<u8, N>::new());
496+
for i in 0..N {
497+
q0.enqueue(i as u8).unwrap();
498+
}
499+
let model_thread = || {
500+
let q0 = q0.clone();
501+
move || {
502+
for k in 0..N + 2 {
503+
let Some(i) = q0.dequeue() else {
504+
return;
505+
panic!("Test failed at dequeue");
506+
};
507+
if q0.enqueue(k as u8).is_err() {
508+
panic!("Test failed at enqueue: {i}");
509+
}
510+
}
511+
}
512+
};
513+
514+
let h1 = loom::thread::spawn(model_thread());
515+
let h2 = loom::thread::spawn(model_thread());
516+
h1.join().unwrap();
517+
h2.join().unwrap();
518+
});
519+
}
520+
521+
// FIXME: This test should pass. See https://github.com/rust-embedded/heapless/issues/583
522+
#[test]
523+
#[cfg(loom)]
524+
#[should_panic(expected = "Test failed")]
525+
fn loom_issue_583_dequeue() {
526+
loom::model(|| {
527+
let q0 = Arc::new(Queue::<u8, N>::new());
528+
let model_thread = || {
529+
let q0 = q0.clone();
530+
move || {
531+
for k in 0..N + 2 {
532+
if q0.enqueue(k as u8).is_err() {
533+
panic!("Test failed at enqueue: {k}");
534+
}
535+
if q0.dequeue().is_none() {
536+
return;
537+
panic!("Test failed at dequeue: {k}");
538+
}
539+
}
540+
}
541+
};
542+
543+
let h1 = loom::thread::spawn(model_thread());
544+
let h2 = loom::thread::spawn(model_thread());
545+
h1.join().unwrap();
546+
h2.join().unwrap();
547+
});
548+
}
549+
}

tests/tsan.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#![deny(rust_2018_compatibility)]
22
#![deny(rust_2018_idioms)]
3+
#![cfg(not(loom))]
34

45
use std::{ptr::addr_of_mut, thread};
56

0 commit comments

Comments
 (0)