@@ -7,15 +7,17 @@ Description : This module provides an API for managing worker threads in the IDE
77see Note [Serializing runs in separate thread]
88-}
99module Development.IDE.Core.WorkerThread
10- (withWorkerQueue , awaitRunInThread )
10+ (withWorkerQueue , awaitRunInThread , withWorkerQueueOfOne , WorkerQueue , writeWorkerQueue )
1111 where
1212
1313import Control.Concurrent.Async (withAsync )
1414import Control.Concurrent.STM
1515import Control.Concurrent.Strict (newBarrier , signalBarrier ,
1616 waitBarrier )
17+ import Control.Exception (finally )
1718import Control.Monad (forever )
1819import Control.Monad.Cont (ContT (ContT ))
20+ import Control.Monad.IO.Class (liftIO )
1921
2022{-
2123Note [Serializing runs in separate thread]
@@ -28,27 +30,59 @@ Originally we used various ways to implement this, but it was hard to maintain a
2830Moreover, we can not stop these threads uniformly when we are shutting down the server.
2931-}
3032
31- -- | 'withWorkerQueue' creates a new 'TQueue', and launches a worker
33+ data WorkerQueue a = WorkerQueueOfOne (TMVar a ) | WorkerQueueOfMany (TQueue a )
34+
35+ writeWorkerQueue :: WorkerQueue a -> a -> STM ()
36+ writeWorkerQueue (WorkerQueueOfOne tvar) action = putTMVar tvar action
37+ writeWorkerQueue (WorkerQueueOfMany tqueue) action = writeTQueue tqueue action
38+
39+ newWorkerQueue :: STM (WorkerQueue a )
40+ newWorkerQueue = WorkerQueueOfMany <$> newTQueue
41+
42+ newWorkerQueueOfOne :: STM (WorkerQueue a )
43+ newWorkerQueueOfOne = WorkerQueueOfOne <$> newEmptyTMVar
44+
45+
46+ -- | 'withWorkerQueue' creates a new 'WorkerQueue', and launches a worker
3247-- thread which polls the queue for requests and runs the given worker
3348-- function on them.
34- withWorkerQueue :: (t -> IO a ) -> ContT () IO (TQueue t )
35- withWorkerQueue workerAction = ContT $ \ mainAction -> do
36- q <- newTQueueIO
49+ withWorkerQueue :: (t -> IO a ) -> ContT () IO (WorkerQueue t )
50+ withWorkerQueue workerAction = do
51+ q <- liftIO $ atomically newWorkerQueue
52+ runWorkerQueue q workerAction
53+
54+ -- | 'withWorkerQueueOfOne' creates a new 'WorkerQueue' that only allows one action to be queued at a time.
55+ -- and one action can only be queued after the previous action has been done.
56+ -- this is useful when we want to cancel the action waiting in the queue, if it's thread is cancelled.
57+ -- e.g. session loading in session loader. When a shake session is restarted, we want to cancel the previous pending session loading.
58+ withWorkerQueueOfOne :: (t -> IO a ) -> ContT () IO (WorkerQueue t )
59+ withWorkerQueueOfOne workerAction = do
60+ q <- liftIO $ atomically newWorkerQueueOfOne
61+ runWorkerQueue q workerAction
62+
63+ runWorkerQueue :: WorkerQueue t -> (t -> IO a ) -> ContT () IO (WorkerQueue t )
64+ runWorkerQueue q workerAction = ContT $ \ mainAction -> do
3765 withAsync (writerThread q) $ \ _ -> mainAction q
3866 where
3967 writerThread q =
4068 forever $ do
41- l <- atomically $ readTQueue q
42- workerAction l
69+ case q of
70+ -- only remove the action from the queue after it has been run if it is a one-shot queue
71+ WorkerQueueOfOne tvar -> do
72+ l <- atomically $ readTMVar tvar
73+ workerAction l `finally` atomically (takeTMVar tvar)
74+ WorkerQueueOfMany q -> do
75+ l <- atomically $ readTQueue q
76+ workerAction l
4377
4478-- | 'awaitRunInThread' queues up an 'IO' action to be run by a worker thread,
4579-- and then blocks until the result is computed.
46- awaitRunInThread :: TQueue (IO () ) -> IO result -> IO result
80+ awaitRunInThread :: WorkerQueue (IO () ) -> IO result -> IO result
4781awaitRunInThread q act = do
4882 -- Take an action from TQueue, run it and
4983 -- use barrier to wait for the result
5084 barrier <- newBarrier
51- atomically $ writeTQueue q $ do
85+ atomically $ writeWorkerQueue q $ do
5286 res <- act
5387 signalBarrier barrier res
5488 waitBarrier barrier
0 commit comments