6161//! - The numbers reported correspond to the successful path, i.e. `dequeue` returning `Some`
6262//! and `enqueue` returning `Ok`.
6363//!
64+ //!
65+ //! <div class="warning">
66+ //!
67+ //! This implementation is not fully lock-free. If a thread or task gets preempted during
68+ //! a `dequeue` or `enqueue` operation, it may prevent other operations from succeeding until
69+ //! it's scheduled again to finish its operation.
70+ //!
71+ //! See <https://github.com/rust-embedded/heapless/issues/583> for more details.
72+ //!
73+ //! </div>
6474//! # References
6575//!
6676//! This is an implementation of Dmitry Vyukov's [bounded MPMC queue], minus the
7080
7181use core:: { cell:: UnsafeCell , mem:: MaybeUninit } ;
7282
73- #[ cfg( not( feature = "portable-atomic" ) ) ]
83+ #[ cfg( loom) ]
84+ use loom:: sync:: atomic;
85+
86+ #[ cfg( all( loom, feature = "portable-atomic" ) ) ]
87+ compile_error ! ( "Loom can only be used in tests without portable-atomic" ) ;
88+
89+ #[ cfg( not( any( feature = "portable-atomic" , loom) ) ) ]
7490use core:: sync:: atomic;
7591#[ cfg( feature = "portable-atomic" ) ]
7692use portable_atomic as atomic;
@@ -113,6 +129,16 @@ pub struct QueueInner<T, S: Storage> {
113129/// </div>
114130///
115131/// The maximum value of `N` is 128 if the `mpmc_large` feature is not enabled.
132+ ///
133+ /// <div class="warning">
134+ ///
135+ /// This implementation is not fully lock-free. If a thread or task gets preempted during
136+ /// a `dequeue` or `enqueue` operation, it may prevent other operations from succeeding until
137+ /// it's scheduled again to finish its operation.
138+ ///
139+ /// See <https://github.com/rust-embedded/heapless/issues/583> for more details.
140+ ///
141+ /// </div>
116142pub type Queue < T , const N : usize > = QueueInner < T , OwnedStorage < N > > ;
117143
118144/// A [`Queue`] with dynamic capacity.
@@ -121,6 +147,7 @@ pub type Queue<T, const N: usize> = QueueInner<T, OwnedStorage<N>>;
121147pub type QueueView < T > = QueueInner < T , ViewStorage > ;
122148
123149impl < T , const N : usize > Queue < T , N > {
150+ #[ cfg( not( loom) ) ]
124151 /// Creates an empty queue.
125152 pub const fn new ( ) -> Self {
126153 const {
@@ -144,6 +171,26 @@ impl<T, const N: usize> Queue<T, N> {
144171 }
145172 }
146173
174+ /// Creates an empty queue.
175+ #[ cfg( loom) ]
176+ pub fn new ( ) -> Self {
177+ use core:: array;
178+
179+ const {
180+ assert ! ( N > 1 ) ;
181+ assert ! ( N . is_power_of_two( ) ) ;
182+ assert ! ( N < UintSize :: MAX as usize ) ;
183+ }
184+
185+ let result_cells: [ Cell < T > ; N ] = array:: from_fn ( |idx| Cell :: new ( idx) ) ;
186+
187+ Self {
188+ buffer : UnsafeCell :: new ( result_cells) ,
189+ dequeue_pos : AtomicTargetSize :: new ( 0 ) ,
190+ enqueue_pos : AtomicTargetSize :: new ( 0 ) ,
191+ }
192+ }
193+
147194 /// Used in `Storage` implementation.
148195 pub ( crate ) fn as_view_private ( & self ) -> & QueueView < T > {
149196 self
@@ -247,12 +294,20 @@ struct Cell<T> {
247294}
248295
249296impl < T > Cell < T > {
297+ #[ cfg( not( loom) ) ]
250298 const fn new ( seq : usize ) -> Self {
251299 Self {
252300 data : MaybeUninit :: uninit ( ) ,
253301 sequence : AtomicTargetSize :: new ( seq as UintSize ) ,
254302 }
255303 }
304+ #[ cfg( loom) ]
305+ fn new ( seq : usize ) -> Self {
306+ Self {
307+ data : MaybeUninit :: uninit ( ) ,
308+ sequence : AtomicTargetSize :: new ( seq as UintSize ) ,
309+ }
310+ }
256311}
257312
258313unsafe fn dequeue < T > (
@@ -342,6 +397,7 @@ unsafe fn enqueue<T>(
342397 Ok ( ( ) )
343398}
344399
400+ #[ cfg( not( loom) ) ]
345401#[ cfg( test) ]
346402mod tests {
347403 use static_assertions:: assert_not_impl_any;
@@ -420,3 +476,68 @@ mod tests {
420476 q. enqueue ( 0x55 ) . unwrap_err ( ) ;
421477 }
422478}
479+ #[ cfg( all( loom, test) ) ]
480+ mod tests_loom {
481+ use super :: * ;
482+ use std:: sync:: Arc ;
483+ const N : usize = 4 ;
484+
485+ // FIXME: This test should pass. See https://github.com/rust-embedded/heapless/issues/583
486+ #[ test]
487+ #[ cfg( loom) ]
488+ #[ should_panic]
489+ fn loom_issue_583_enqueue ( ) {
490+ loom:: model ( || {
491+ let q0 = Arc :: new ( Queue :: < u8 , N > :: new ( ) ) ;
492+ for i in 0 ..N {
493+ q0. enqueue ( i as u8 ) . unwrap ( ) ;
494+ }
495+ let model_thread = || {
496+ let q0 = q0. clone ( ) ;
497+ move || {
498+ for k in 0 ..N + 2 {
499+ let Some ( i) = q0. dequeue ( ) else {
500+ panic ! ( "Test failed at dequeue" ) ;
501+ } ;
502+ if q0. enqueue ( k as u8 ) . is_err ( ) {
503+ panic ! ( "Test failed at enqueue: {i}" ) ;
504+ }
505+ }
506+ }
507+ } ;
508+
509+ let h1 = loom:: thread:: spawn ( model_thread ( ) ) ;
510+ let h2 = loom:: thread:: spawn ( model_thread ( ) ) ;
511+ h1. join ( ) . unwrap ( ) ;
512+ h2. join ( ) . unwrap ( ) ;
513+ } ) ;
514+ }
515+
516+ // FIXME: This test should pass. See https://github.com/rust-embedded/heapless/issues/583
517+ #[ test]
518+ #[ cfg( loom) ]
519+ #[ should_panic]
520+ fn loom_issue_583_dequeue ( ) {
521+ loom:: model ( || {
522+ let q0 = Arc :: new ( Queue :: < u8 , N > :: new ( ) ) ;
523+ let model_thread = || {
524+ let q0 = q0. clone ( ) ;
525+ move || {
526+ for k in 0 ..N + 2 {
527+ if q0. enqueue ( k as u8 ) . is_err ( ) {
528+ panic ! ( "Test failed at enqueue: {k}" ) ;
529+ }
530+ if q0. dequeue ( ) . is_none ( ) {
531+ panic ! ( "Test failed at dequeue: {k}" ) ;
532+ }
533+ }
534+ }
535+ } ;
536+
537+ let h1 = loom:: thread:: spawn ( model_thread ( ) ) ;
538+ let h2 = loom:: thread:: spawn ( model_thread ( ) ) ;
539+ h1. join ( ) . unwrap ( ) ;
540+ h2. join ( ) . unwrap ( ) ;
541+ } ) ;
542+ }
543+ }
0 commit comments