6868//!
6969//! [bounded MPMC queue]: http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
7070
71+ use crate :: storage:: ViewStorage ;
72+
73+ #[ cfg( not( feature = "portable-atomic" ) ) ]
74+ use core:: sync:: atomic;
75+ #[ cfg( feature = "portable-atomic" ) ]
76+ use portable_atomic as atomic;
77+
78+ #[ cfg( feature = "mpmc_large" ) ]
79+ type AtomicTargetSize = atomic:: AtomicUsize ;
80+ #[ cfg( not( feature = "mpmc_large" ) ) ]
81+ type AtomicTargetSize = atomic:: AtomicU8 ;
82+
83+ #[ cfg( feature = "mpmc_large" ) ]
84+ type UintSize = usize ;
85+ #[ cfg( not( feature = "mpmc_large" ) ) ]
86+ type UintSize = u8 ;
87+
7188#[ cfg( feature = "mpmc_crossbeam" ) ]
7289pub mod crossbeam_array_queue;
7390#[ cfg( feature = "mpmc_crossbeam" ) ]
@@ -77,3 +94,141 @@ pub use crossbeam_array_queue::*;
7794mod original;
7895#[ cfg( not( feature = "mpmc_crossbeam" ) ) ]
7996pub use original:: * ;
97+
98+ /// A [`Queue`] with dynamic capacity.
99+ ///
100+ /// [`Queue`] coerces to `QueueView`. `QueueView` is `!Sized`, meaning it can only ever be used by reference.
101+ pub type QueueView < T > = QueueInner < T , ViewStorage > ;
102+
103+ #[ cfg( test) ]
104+ mod tests {
105+ use static_assertions:: assert_not_impl_any;
106+
107+ use super :: { Queue , QueueView } ;
108+
109+ // Ensure a `Queue` containing `!Send` values stays `!Send` itself.
110+ assert_not_impl_any ! ( Queue <* const ( ) , 4 >: Send ) ;
111+
112+ fn to_vec < T > ( q : & QueueView < T > ) -> Vec < T > {
113+ // inaccurate
114+ let mut ret = vec ! [ ] ;
115+ while let Some ( v) = q. dequeue ( ) {
116+ ret. push ( v) ;
117+ }
118+ ret
119+ }
120+
121+ #[ test]
122+ fn memory_leak ( ) {
123+ droppable ! ( ) ;
124+
125+ let q = Queue :: < _ , 2 > :: new ( ) ;
126+ q. enqueue ( Droppable :: new ( ) ) . unwrap_or_else ( |_| panic ! ( ) ) ;
127+ q. enqueue ( Droppable :: new ( ) ) . unwrap_or_else ( |_| panic ! ( ) ) ;
128+ drop ( q) ;
129+
130+ assert_eq ! ( Droppable :: count( ) , 0 ) ;
131+ }
132+
133+ #[ test]
134+ fn sanity ( ) {
135+ let q = Queue :: < _ , 2 > :: new ( ) ;
136+ q. enqueue ( 0 ) . unwrap ( ) ;
137+ q. enqueue ( 1 ) . unwrap ( ) ;
138+ assert ! ( q. enqueue( 2 ) . is_err( ) ) ;
139+
140+ assert_eq ! ( q. dequeue( ) , Some ( 0 ) ) ;
141+ assert_eq ! ( q. dequeue( ) , Some ( 1 ) ) ;
142+ assert_eq ! ( q. dequeue( ) , None ) ;
143+ }
144+
145+ #[ test]
146+ fn drain_at_pos255 ( ) {
147+ let q = Queue :: < _ , 2 > :: new ( ) ;
148+ for _ in 0 ..255 {
149+ assert ! ( q. enqueue( 0 ) . is_ok( ) ) ;
150+ assert_eq ! ( q. dequeue( ) , Some ( 0 ) ) ;
151+ }
152+
153+ // Queue is empty, this should not block forever.
154+ assert_eq ! ( q. dequeue( ) , None ) ;
155+ }
156+
157+ #[ test]
158+ fn full_at_wrapped_pos0 ( ) {
159+ let q = Queue :: < _ , 2 > :: new ( ) ;
160+ for _ in 0 ..254 {
161+ assert ! ( q. enqueue( 0 ) . is_ok( ) ) ;
162+ assert_eq ! ( q. dequeue( ) , Some ( 0 ) ) ;
163+ }
164+ assert ! ( q. enqueue( 0 ) . is_ok( ) ) ;
165+ assert ! ( q. enqueue( 0 ) . is_ok( ) ) ;
166+ // this should not block forever
167+ assert ! ( q. enqueue( 0 ) . is_err( ) ) ;
168+ }
169+
170+ #[ test]
171+ fn enqueue_full ( ) {
172+ #[ cfg( not( feature = "mpmc_large" ) ) ]
173+ const CAPACITY : usize = 128 ;
174+
175+ #[ cfg( feature = "mpmc_large" ) ]
176+ const CAPACITY : usize = 256 ;
177+
178+ let q: Queue < u8 , CAPACITY > = Queue :: new ( ) ;
179+
180+ assert_eq ! ( q. capacity( ) , CAPACITY ) ;
181+
182+ for _ in 0 ..CAPACITY {
183+ q. enqueue ( 0xAA ) . unwrap ( ) ;
184+ }
185+
186+ // Queue is full, this should not block forever.
187+ q. enqueue ( 0x55 ) . unwrap_err ( ) ;
188+ }
189+
190+ #[ test]
191+ fn issue_583_enqueue ( ) {
192+ const N : usize = 4 ;
193+
194+ let q0 = Queue :: < u8 , N > :: new ( ) ;
195+ for i in 0 ..N {
196+ q0. enqueue ( i as u8 ) . expect ( "new enqueue" ) ;
197+ }
198+ eprintln ! ( "start!" ) ;
199+
200+ std:: thread:: scope ( |sc| {
201+ for _ in 0 ..2 {
202+ sc. spawn ( || {
203+ for k in 0 ..1000_000 {
204+ if let Some ( v) = q0. dequeue ( ) {
205+ q0. enqueue ( v) . unwrap_or_else ( |v| {
206+ panic ! ( "{k}: q0 -> q0: {v}, {:?}" , to_vec( & q0) )
207+ } ) ;
208+ }
209+ }
210+ } ) ;
211+ }
212+ } ) ;
213+ }
214+
215+ #[ test]
216+ fn issue_583_dequeue ( ) {
217+ const N : usize = 4 ;
218+
219+ let q0 = Queue :: < u8 , N > :: new ( ) ;
220+ eprintln ! ( "start!" ) ;
221+ std:: thread:: scope ( |sc| {
222+ for _ in 0 ..2 {
223+ sc. spawn ( || {
224+ for k in 0 ..1000_000 {
225+ q0. enqueue ( k as u8 ) . unwrap ( ) ;
226+ if q0. dequeue ( ) . is_none ( ) {
227+ panic ! ( "{k}" ) ;
228+ }
229+ }
230+ } ) ;
231+ }
232+ } ) ;
233+ }
234+ }
0 commit comments