7070
7171use core:: { cell:: UnsafeCell , mem:: MaybeUninit } ;
7272
73- #[ cfg( not( feature = "portable-atomic" ) ) ]
73+ #[ cfg( all ( not( feature = "portable-atomic" ) , not ( feature = "loom" ) ) ) ]
7474use core:: sync:: atomic;
75- #[ cfg( feature = "portable-atomic" ) ]
75+ #[ cfg( feature = "loom" ) ]
76+ use loom:: sync:: atomic;
77+ #[ cfg( all( feature = "portable-atomic" , not( feature = "loom" ) ) ) ]
7678use portable_atomic as atomic;
7779
7880use atomic:: Ordering ;
@@ -122,6 +124,7 @@ pub type QueueView<T> = QueueInner<T, ViewStorage>;
122124
123125impl < T , const N : usize > Queue < T , N > {
124126 /// Creates an empty queue.
127+ #[ cfg( not( feature = "loom" ) ) ]
125128 pub const fn new ( ) -> Self {
126129 const {
127130 assert ! ( N > 1 ) ;
@@ -144,6 +147,24 @@ impl<T, const N: usize> Queue<T, N> {
144147 }
145148 }
146149
150+ /// Creates an empty queue.
151+ #[ cfg( feature = "loom" ) ]
152+ pub fn new ( ) -> Self {
153+ const {
154+ assert ! ( N > 1 ) ;
155+ assert ! ( N . is_power_of_two( ) ) ;
156+ assert ! ( N < UintSize :: MAX as usize ) ;
157+ }
158+
159+ let result_cells: [ Cell < T > ; N ] = core:: array:: from_fn ( Cell :: new) ;
160+
161+ Self {
162+ buffer : UnsafeCell :: new ( result_cells) ,
163+ dequeue_pos : AtomicTargetSize :: new ( 0 ) ,
164+ enqueue_pos : AtomicTargetSize :: new ( 0 ) ,
165+ }
166+ }
167+
147168 /// Used in `Storage` implementation.
148169 pub ( crate ) fn as_view_private ( & self ) -> & QueueView < T > {
149170 self
@@ -247,12 +268,21 @@ struct Cell<T> {
247268}
248269
249270impl < T > Cell < T > {
271+ #[ cfg( not( feature = "loom" ) ) ]
250272 const fn new ( seq : usize ) -> Self {
251273 Self {
252274 data : MaybeUninit :: uninit ( ) ,
253275 sequence : AtomicTargetSize :: new ( seq as UintSize ) ,
254276 }
255277 }
278+
279+ #[ cfg( feature = "loom" ) ]
280+ fn new ( seq : usize ) -> Self {
281+ Self {
282+ data : MaybeUninit :: uninit ( ) ,
283+ sequence : AtomicTargetSize :: new ( seq as UintSize ) ,
284+ }
285+ }
256286}
257287
258288unsafe fn dequeue < T > (
@@ -346,12 +376,22 @@ unsafe fn enqueue<T>(
346376mod tests {
347377 use static_assertions:: assert_not_impl_any;
348378
349- use super :: Queue ;
379+ use super :: { Queue , QueueView } ;
350380
351381 // Ensure a `Queue` containing `!Send` values stays `!Send` itself.
352382 assert_not_impl_any ! ( Queue <* const ( ) , 4 >: Send ) ;
353383
384+ fn to_vec < T > ( q : & QueueView < T > ) -> Vec < T > {
385+ // inaccurate
386+ let mut ret = vec ! [ ] ;
387+ while let Some ( v) = q. dequeue ( ) {
388+ ret. push ( v) ;
389+ }
390+ ret
391+ }
392+
354393 #[ test]
394+ #[ cfg( not( feature = "loom" ) ) ]
355395 fn memory_leak ( ) {
356396 droppable ! ( ) ;
357397
@@ -364,6 +404,7 @@ mod tests {
364404 }
365405
366406 #[ test]
407+ #[ cfg( not( feature = "loom" ) ) ]
367408 fn sanity ( ) {
368409 let q = Queue :: < _ , 2 > :: new ( ) ;
369410 q. enqueue ( 0 ) . unwrap ( ) ;
@@ -376,6 +417,7 @@ mod tests {
376417 }
377418
378419 #[ test]
420+ #[ cfg( not( feature = "loom" ) ) ]
379421 fn drain_at_pos255 ( ) {
380422 let q = Queue :: < _ , 2 > :: new ( ) ;
381423 for _ in 0 ..255 {
@@ -388,6 +430,7 @@ mod tests {
388430 }
389431
390432 #[ test]
433+ #[ cfg( not( feature = "loom" ) ) ]
391434 fn full_at_wrapped_pos0 ( ) {
392435 let q = Queue :: < _ , 2 > :: new ( ) ;
393436 for _ in 0 ..254 {
@@ -401,6 +444,7 @@ mod tests {
401444 }
402445
403446 #[ test]
447+ #[ cfg( not( feature = "loom" ) ) ]
404448 fn enqueue_full ( ) {
405449 #[ cfg( not( feature = "mpmc_large" ) ) ]
406450 const CAPACITY : usize = 128 ;
@@ -419,4 +463,116 @@ mod tests {
419463 // Queue is full, this should not block forever.
420464 q. enqueue ( 0x55 ) . unwrap_err ( ) ;
421465 }
466+
467+ #[ test]
468+ #[ cfg( not( feature = "loom" ) ) ]
469+ fn issue_583_enqueue ( ) {
470+ const N : usize = 4 ;
471+
472+ let q0 = Queue :: < u8 , N > :: new ( ) ;
473+ for i in 0 ..N {
474+ q0. enqueue ( i as u8 ) . expect ( "new enqueue" ) ;
475+ }
476+ eprintln ! ( "start!" ) ;
477+
478+ std:: thread:: scope ( |sc| {
479+ for _ in 0 ..2 {
480+ sc. spawn ( || {
481+ for k in 0 ..1_000_000 {
482+ if let Some ( v) = q0. dequeue ( ) {
483+ q0. enqueue ( v) . unwrap_or_else ( |v| {
484+ panic ! ( "{k}: q0 -> q0: {v}, {:?}" , to_vec( & q0) )
485+ } ) ;
486+ }
487+ }
488+ } ) ;
489+ }
490+ } ) ;
491+ }
492+
493+ #[ test]
494+ #[ cfg( not( feature = "loom" ) ) ]
495+ fn issue_583_dequeue ( ) {
496+ const N : usize = 4 ;
497+
498+ let q0 = Queue :: < u8 , N > :: new ( ) ;
499+ eprintln ! ( "start!" ) ;
500+ std:: thread:: scope ( |sc| {
501+ for _ in 0 ..2 {
502+ sc. spawn ( || {
503+ for k in 0 ..1_000_000 {
504+ q0. enqueue ( k as u8 ) . unwrap ( ) ;
505+ if q0. dequeue ( ) . is_none ( ) {
506+ panic ! ( "{k}" ) ;
507+ }
508+ }
509+ } ) ;
510+ }
511+ } ) ;
512+ }
513+
514+ #[ test]
515+ #[ cfg( feature = "loom" ) ]
516+ fn issue_583_enqueue_loom ( ) {
517+ loom:: model ( || {
518+ const N : usize = 4 ;
519+
520+ let q0 = loom:: sync:: Arc :: new ( Queue :: < u8 , N > :: new ( ) ) ;
521+ for i in 0 ..N {
522+ q0. enqueue ( i as u8 ) . expect ( "new enqueue" ) ;
523+ }
524+ eprintln ! ( "start!" ) ;
525+
526+ let q1 = q0. clone ( ) ;
527+ loom:: thread:: spawn ( move || {
528+ for k in 0 ..1000_000 {
529+ if let Some ( v) = q0. dequeue ( ) {
530+ q0. enqueue ( v)
531+ . unwrap_or_else ( |v| panic ! ( "{k}: q0 -> q0: {v}, {:?}" , to_vec( & * q0) ) ) ;
532+ }
533+ }
534+ } ) ;
535+
536+ loom:: thread:: spawn ( move || {
537+ for k in 0 ..1000_000 {
538+ if let Some ( v) = q1. dequeue ( ) {
539+ q1. enqueue ( v)
540+ . unwrap_or_else ( |v| panic ! ( "{k}: q0 -> q0: {v}, {:?}" , to_vec( & * q1) ) ) ;
541+ }
542+ }
543+ } ) ;
544+ } ) ;
545+ }
546+
547+ #[ test]
548+ #[ cfg( feature = "loom" ) ]
549+ fn issue_583_dequeue_loom ( ) {
550+ loom:: model ( || {
551+ const N : usize = 4 ;
552+
553+ let q0 = loom:: sync:: Arc :: new ( Queue :: < u8 , N > :: new ( ) ) ;
554+
555+ eprintln ! ( "start!" ) ;
556+
557+ let q1 = q0. clone ( ) ;
558+
559+ loom:: thread:: spawn ( move || {
560+ for k in 0 ..1000_000 {
561+ q0. enqueue ( k as u8 ) . unwrap ( ) ;
562+ if q0. dequeue ( ) . is_none ( ) {
563+ panic ! ( "{k}" ) ;
564+ }
565+ }
566+ } ) ;
567+
568+ loom:: thread:: spawn ( move || {
569+ for k in 0 ..1000_000 {
570+ q1. enqueue ( k as u8 ) . unwrap ( ) ;
571+ if q1. dequeue ( ) . is_none ( ) {
572+ panic ! ( "{k}" ) ;
573+ }
574+ }
575+ } ) ;
576+ } ) ;
577+ }
422578}
0 commit comments