6161//! - The numbers reported correspond to the successful path, i.e. `dequeue` returning `Some`
6262//! and `enqueue` returning `Ok`.
6363//!
64+ //!
65+ //! <div class="warning">
66+ //!
67+ //! This implementation is not fully lock-free. If a thread or task gets preempted during
68+ //! a `dequeue` or `enqueue` operation, it may prevent other operations from succeeding until
69+ //! it's scheduled again to finish its operation.
70+ //!
71+ //! See <https://github.com/rust-embedded/heapless/issues/583> for more details.
72+ //!
73+ //! </div>
6474//! # References
6575//!
6676//! This is an implementation of Dmitry Vyukov's [bounded MPMC queue], minus the
7080
7181use core:: { cell:: UnsafeCell , mem:: MaybeUninit } ;
7282
73- #[ cfg( not( feature = "portable-atomic" ) ) ]
83+ #[ cfg( loom) ]
84+ use loom:: sync:: atomic;
85+
86+ #[ cfg( not( any( feature = "portable-atomic" , loom) ) ) ]
7487use core:: sync:: atomic;
7588#[ cfg( feature = "portable-atomic" ) ]
7689use portable_atomic as atomic;
@@ -113,6 +126,16 @@ pub struct QueueInner<T, S: Storage> {
113126/// </div>
114127///
115128/// The maximum value of `N` is 128 if the `mpmc_large` feature is not enabled.
129+ ///
130+ /// <div class="warning">
131+ ///
132+ /// This implementation is not fully lock-free. If a thread or task gets preempted during
133+ /// a `dequeue` or `enqueue` operation, it may prevent other operations from succeeding until
134+ /// it's scheduled again to finish its operation.
135+ ///
136+ /// See <https://github.com/rust-embedded/heapless/issues/583> for more details.
137+ ///
138+ /// </div>
116139pub type Queue < T , const N : usize > = QueueInner < T , OwnedStorage < N > > ;
117140
118141/// A [`Queue`] with dynamic capacity.
@@ -121,6 +144,7 @@ pub type Queue<T, const N: usize> = QueueInner<T, OwnedStorage<N>>;
121144pub type QueueView < T > = QueueInner < T , ViewStorage > ;
122145
123146impl < T , const N : usize > Queue < T , N > {
147+ #[ cfg( not( loom) ) ]
124148 /// Creates an empty queue.
125149 pub const fn new ( ) -> Self {
126150 const {
@@ -144,6 +168,26 @@ impl<T, const N: usize> Queue<T, N> {
144168 }
145169 }
146170
171+ /// Creates an empty queue.
172+ #[ cfg( loom) ]
173+ pub fn new ( ) -> Self {
174+ use core:: array;
175+
176+ const {
177+ assert ! ( N > 1 ) ;
178+ assert ! ( N . is_power_of_two( ) ) ;
179+ assert ! ( N < UintSize :: MAX as usize ) ;
180+ }
181+
182+ let result_cells: [ Cell < T > ; N ] = array:: from_fn ( |idx| Cell :: new ( idx) ) ;
183+
184+ Self {
185+ buffer : UnsafeCell :: new ( result_cells) ,
186+ dequeue_pos : AtomicTargetSize :: new ( 0 ) ,
187+ enqueue_pos : AtomicTargetSize :: new ( 0 ) ,
188+ }
189+ }
190+
147191 /// Used in `Storage` implementation.
148192 pub ( crate ) fn as_view_private ( & self ) -> & QueueView < T > {
149193 self
@@ -247,12 +291,20 @@ struct Cell<T> {
247291}
248292
249293impl < T > Cell < T > {
294+ #[ cfg( not( loom) ) ]
250295 const fn new ( seq : usize ) -> Self {
251296 Self {
252297 data : MaybeUninit :: uninit ( ) ,
253298 sequence : AtomicTargetSize :: new ( seq as UintSize ) ,
254299 }
255300 }
301+ #[ cfg( loom) ]
302+ fn new ( seq : usize ) -> Self {
303+ Self {
304+ data : MaybeUninit :: uninit ( ) ,
305+ sequence : AtomicTargetSize :: new ( seq as UintSize ) ,
306+ }
307+ }
256308}
257309
258310unsafe fn dequeue < T > (
@@ -342,6 +394,7 @@ unsafe fn enqueue<T>(
342394 Ok ( ( ) )
343395}
344396
397+ #[ cfg( not( loom) ) ]
345398#[ cfg( test) ]
346399mod tests {
347400 use static_assertions:: assert_not_impl_any;
@@ -420,3 +473,63 @@ mod tests {
420473 q. enqueue ( 0x55 ) . unwrap_err ( ) ;
421474 }
422475}
476+ #[ cfg( all( loom, test) ) ]
477+ mod tests_loom {
478+ use super :: * ;
479+ use std:: sync:: Arc ;
480+ const N : usize = 4 ;
481+
482+ #[ test]
483+ #[ cfg( loom) ]
484+ fn loom_issue_583_enqueue ( ) {
485+ loom:: model ( || {
486+ let q0 = Arc :: new ( Queue :: < u8 , N > :: new ( ) ) ;
487+ q0. enqueue ( 0 ) . unwrap ( ) ;
488+ q0. enqueue ( 1 ) . unwrap ( ) ;
489+ q0. enqueue ( 2 ) . unwrap ( ) ;
490+ q0. enqueue ( 3 ) . unwrap ( ) ;
491+ let model_thread = || {
492+ let q0 = q0. clone ( ) ;
493+ move || {
494+ for k in 0 ..10 {
495+ let Some ( i) = q0. dequeue ( ) else {
496+ panic ! ( "{k}" ) ;
497+ } ;
498+ if q0. enqueue ( k as u8 ) . is_err ( ) {
499+ panic ! ( "{i}" ) ;
500+ }
501+ }
502+ }
503+ } ;
504+
505+ let h1 = loom:: thread:: spawn ( model_thread ( ) ) ;
506+ let h2 = loom:: thread:: spawn ( model_thread ( ) ) ;
507+ h1. join ( ) . unwrap ( ) ;
508+ h2. join ( ) . unwrap ( ) ;
509+ } ) ;
510+ }
511+
512+ #[ test]
513+ #[ cfg( loom) ]
514+ fn loom_issue_583_dequeue ( ) {
515+ loom:: model ( || {
516+ let q0 = Arc :: new ( Queue :: < u8 , N > :: new ( ) ) ;
517+ let model_thread = || {
518+ let q0 = q0. clone ( ) ;
519+ move || {
520+ for k in 0 ..10 {
521+ q0. enqueue ( k as u8 ) . unwrap ( ) ;
522+ if q0. dequeue ( ) . is_none ( ) {
523+ panic ! ( "{k}" ) ;
524+ }
525+ }
526+ }
527+ } ;
528+
529+ let h1 = loom:: thread:: spawn ( model_thread ( ) ) ;
530+ let h2 = loom:: thread:: spawn ( model_thread ( ) ) ;
531+ h1. join ( ) . unwrap ( ) ;
532+ h2. join ( ) . unwrap ( ) ;
533+ } ) ;
534+ }
535+ }
0 commit comments