11{-# OPTIONS_GHC -fno-warn-name-shadowing #-}
22{-# LANGUAGE CPP, DeriveDataTypeable #-}
3+ {-# LANGUAGE BangPatterns #-}
34
45#if __GLASGOW_HASKELL__ >= 701
56{-# LANGUAGE Trustworthy #-}
@@ -50,57 +51,72 @@ import GHC.Conc
5051import Control.Monad (unless )
5152import Data.Typeable (Typeable )
5253
54+ data End a = End ! Int [a ]
55+
5356-- | 'TQueue' is an abstract type representing an unbounded FIFO channel.
5457--
5558-- @since 2.4
56- data TQueue a = TQueue {- # UNPACK #-} !(TVar [a ])
57- {- # UNPACK #-} !(TVar [a ])
59+ data TQueue a = TQueue {- # UNPACK #-} !(TVar Int )
60+ {- # UNPACK #-} !(TVar (End a ))
61+ {- # UNPACK #-} !(TVar (End a ))
5862 deriving Typeable
5963
6064instance Eq (TQueue a ) where
61- TQueue a _ == TQueue b _ = a == b
65+ TQueue a _ _ == TQueue b _ _ = a == b
6266
6367-- | Build and returns a new instance of 'TQueue'
6468newTQueue :: STM (TQueue a )
6569newTQueue = do
66- read <- newTVar []
67- write <- newTVar []
68- return (TQueue read write)
70+ old_len <- newTVar 0
71+ read <- newTVar (End 0 [] )
72+ write <- newTVar (End 0 [] )
73+ return (TQueue old_len read write)
6974
7075-- | @IO@ version of 'newTQueue'. This is useful for creating top-level
7176-- 'TQueue's using 'System.IO.Unsafe.unsafePerformIO', because using
7277-- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
7378-- possible.
7479newTQueueIO :: IO (TQueue a )
7580newTQueueIO = do
76- read <- newTVarIO []
77- write <- newTVarIO []
78- return (TQueue read write)
81+ old_len <- newTVarIO 0
82+ read <- newTVarIO (End 0 [] )
83+ write <- newTVarIO (End 0 [] )
84+ return (TQueue old_len read write)
7985
8086-- | Write a value to a 'TQueue'.
8187writeTQueue :: TQueue a -> a -> STM ()
82- writeTQueue (TQueue _read write) a = do
83- listend <- readTVar write
84- writeTVar write (a: listend)
88+ writeTQueue (TQueue old_len read write) a = do
89+ ol <- readTVar old_len
90+ End write_count listend <- readTVar write
91+ let write_count' = write_count + 1
92+ if 2 * write_count' >= ol
93+ then do
94+ End read_count front <- readTVar read
95+ let ! len = ol + write_count' - read_count
96+ writeTVar old_len len
97+ writeTVar read (End 0 (front ++ reverse listend ++ [a]))
98+ writeTVar write (End 0 [] )
99+ else writeTVar write (End write_count' (a: listend))
85100
86101-- | Read the next value from the 'TQueue'.
87102readTQueue :: TQueue a -> STM a
88- readTQueue (TQueue read write) = do
89- xs <- readTVar read
90- case xs of
91- (x: xs') -> do
92- writeTVar read xs'
93- return x
94- [] -> do
95- ys <- readTVar write
96- case ys of
97- [] -> retry
98- _ -> do
99- let (z: zs) = reverse ys -- NB. lazy: we want the transaction to be
100- -- short, otherwise it will conflict
101- writeTVar write []
102- writeTVar read zs
103- return z
103+ readTQueue (TQueue old_len read write) = do
104+ ol <- readTVar old_len
105+ End read_count front <- readTVar read
106+ case front of
107+ [] -> retry
108+ (a: as) -> do
109+ let read_count' = read_count + 1
110+ if 2 * read_count' >= ol
111+ then do
112+ End write_count listend <- readTVar write
113+ let ! len = ol + write_count - read_count'
114+ writeTVar old_len len
115+ writeTVar read (End 0 (as ++ reverse listend))
116+ writeTVar write (End 0 [] )
117+ else do
118+ writeTVar read (End read_count' as)
119+ return a
104120
105121-- | A version of 'readTQueue' which does not retry. Instead it
106122-- returns @Nothing@ if no value is available.
@@ -112,45 +128,42 @@ tryReadTQueue c = fmap Just (readTQueue c) `orElse` return Nothing
112128--
113129-- @since 2.4.5
114130flushTQueue :: TQueue a -> STM [a ]
115- flushTQueue (TQueue read write) = do
116- xs <- readTVar read
117- ys <- readTVar write
118- unless (null xs) $ writeTVar read []
119- unless (null ys) $ writeTVar write []
131+ flushTQueue (TQueue old_len read write) = do
132+ End read_count xs <- readTVar read
133+ End write_count ys <- readTVar write
134+ unless (read_count == 0 && null xs) $ writeTVar read (End 0 [] )
135+ unless (write_count == 0 && null ys) $ writeTVar write (End 0 [] )
136+ writeTVar old_len 0
120137 return (xs ++ reverse ys)
121138
122139-- | Get the next value from the @TQueue@ without removing it,
123140-- retrying if the channel is empty.
124141peekTQueue :: TQueue a -> STM a
125- peekTQueue c = do
126- x <- readTQueue c
127- unGetTQueue c x
128- return x
142+ peekTQueue (TQueue _old_len read _write) = do
143+ End _ xs <- readTVar read
144+ case xs of
145+ x: _ -> return x
146+ [] -> retry
129147
130148-- | A version of 'peekTQueue' which does not retry. Instead it
131149-- returns @Nothing@ if no value is available.
132150tryPeekTQueue :: TQueue a -> STM (Maybe a )
133- tryPeekTQueue c = do
134- m <- tryReadTQueue c
135- case m of
136- Nothing -> return Nothing
137- Just x -> do
138- unGetTQueue c x
139- return m
151+ tryPeekTQueue (TQueue _old_len read _write) = do
152+ End _ xs <- readTVar read
153+ case xs of
154+ x: _ -> return (Just x)
155+ [] -> return Nothing
140156
141157-- | Put a data item back onto a channel, where it will be the next item read.
142158unGetTQueue :: TQueue a -> a -> STM ()
143- unGetTQueue (TQueue read _write) a = do
144- xs <- readTVar read
145- writeTVar read (a: xs)
159+ unGetTQueue (TQueue _old_len read _write) a = do
160+ End read_count xs <- readTVar read
161+ writeTVar read (End (read_count - 1 ) ( a: xs) )
146162
147163-- | Returns 'True' if the supplied 'TQueue' is empty.
148164isEmptyTQueue :: TQueue a -> STM Bool
149- isEmptyTQueue (TQueue read write ) = do
150- xs <- readTVar read
165+ isEmptyTQueue (TQueue _old_len read _write ) = do
166+ End _ xs <- readTVar read
151167 case xs of
152168 (_: _) -> return False
153- [] -> do ys <- readTVar write
154- case ys of
155- [] -> return True
156- _ -> return False
169+ [] -> return True
0 commit comments