@@ -20,7 +20,8 @@ import Data.Foldable (fold)
2020import qualified Data.HashMap.Strict as Map
2121import Data.IORef
2222import Data.List (intercalate )
23- import Data.Maybe (fromMaybe , isNothing )
23+ import Data.Maybe (fromMaybe , isJust ,
24+ isNothing )
2425import Data.Set (Set )
2526import qualified Data.Set as S
2627import Data.Typeable
@@ -160,31 +161,39 @@ onKeyReverseDeps f it@KeyDetails{..} =
160161
161162type DBQue = TaskQueue (Either Dynamic (IO () ))
162163data Database = Database {
163- databaseExtra :: Dynamic ,
164+ databaseExtra :: Dynamic ,
164165
165- databaseThreads :: TVar [(DeliverStatus , Async () )],
166+ databaseThreads :: TVar [(DeliverStatus , Async () )],
166167
167- databaseRuntimeRevDep :: SMap. Map Key KeySet ,
168+ databaseRuntimeDep :: SMap. Map Key KeySet ,
168169 -- For each key, the set of keys that depend on it directly.
169170
170171 -- it is used to compute the transitive reverse deps, so
171172 -- if not in any of the transitive reverse deps of a dirty node, it is clean
172173 -- we can skip clean the threads.
173174 -- this is update right before we query the database for the key result.
174- dataBaseLogger :: String -> IO () ,
175+ dataBaseLogger :: String -> IO () ,
175176
176- databaseQueue :: DBQue ,
177+ databaseQueue :: DBQue ,
177178
178- databaseRules :: TheRules ,
179- databaseStep :: ! (TVar Step ),
179+ databaseRules :: TheRules ,
180+ databaseStep :: ! (TVar Step ),
180181
181- databaseValuesLock :: ! (TVar Bool ),
182+ databaseValuesLock :: ! (TVar Bool ),
182183 -- when we restart a build, we set this to False to block any other
183184 -- threads from reading databaseValues
184- databaseValues :: ! (Map Key KeyDetails )
185+ databaseValues :: ! (Map Key KeyDetails )
185186
186187 }
187188---------------------------------------------------------------------
189+ computeReverseRuntimeMap :: Database -> STM (Map Key KeySet )
190+ computeReverseRuntimeMap db = do
191+ -- Create a fresh STM Map and copy the current runtime reverse deps into it.
192+ -- This yields a stable snapshot that won't be mutated by concurrent updates.
193+ m <- SMap. new
194+ pairs <- ListT. toList $ SMap. listT (databaseRuntimeDep db)
195+ forM_ pairs $ \ (k, ks) -> SMap. insert ks k m
196+ pure m
188197-- compute to preserve asyncs
189198-- only the running stage 2 keys are actually running
190199-- so we only need to preserve them if they are not affected by the dirty set
@@ -193,34 +202,36 @@ data Database = Database {
193202-- all non-dirty running need to have an updated step,
194203-- so it won't be view as dirty when we restart the build
195204-- computeToPreserve :: Database -> KeySet -> STM ([(Key, Async ())], KeySet, [Key])
196- computeToPreserve db dirtySet = do
197- -- All keys that depend (directly or transitively) on any dirty key
198- affected <- computeTransitiveReverseDeps db dirtySet
199- allRunings <- getRunningKeys db
200- let allRuningkeys = map fst allRunings
201- let running2UnAffected = [ (k ,async) | (k, v) <- allRunings, not (k `memberKeySet` affected), Running _ _ _ (RunningStage2 async) <- [keyStatus v] ]
202- forM_ allRuningkeys $ \ k -> do
203- -- if not dirty, bump its step
204- unless (memberKeySet k affected) $ do
205- SMap. focus
206- ( Focus. alter $ \ case
207- Just kd@ KeyDetails {keyStatus = Running {runningStep, runningPrev, runningWait, runningStage}} ->
208- Just (kd {keyStatus = Running (runningStep + 1 ) runningPrev runningWait runningStage})
209- _ -> Nothing
210- )
211- k
212- (databaseValues db)
213- -- Keep only those whose key is NOT affected by the dirty set
214- pure ([kv | kv@ (k, _async) <- running2UnAffected, not (memberKeySet k affected)], allRuningkeys)
205+ -- computeToPreserve db dirtySet = do
206+ -- -- All keys that depend (directly or transitively) on any dirty key
207+ -- affected <- computeTransitiveReverseDeps db dirtySet
208+ -- allRunings <- getRunningKeys db
209+ -- let allRuningkeys = map fst allRunings
210+ -- let running2UnAffected = [ (k ,async) | (k, v) <- allRunings, not (k `memberKeySet` affected), Running _ _ _ (RunningStage2 async) <- [keyStatus v] ]
211+ -- forM_ allRuningkeys $ \k -> do
212+ -- -- if not dirty, bump its step
213+ -- unless (memberKeySet k affected) $ do
214+ -- SMap.focus
215+ -- ( Focus.alter $ \case
216+ -- Just kd@KeyDetails {keyStatus = Running {runningStep, runningPrev, runningWait, runningStage}} ->
217+ -- Just (kd {keyStatus = Running (runningStep + 1) runningPrev runningWait runningStage})
218+ -- _ -> Nothing
219+ -- )
220+ -- k
221+ -- (databaseValues db)
222+ -- -- Keep only those whose key is NOT affected by the dirty set
223+ -- pure ([kv | kv@(k, _async) <- running2UnAffected, not (memberKeySet k affected)], allRuningkeys)
215224
216225-- computeToPreserve1 :: Database -> KeySet -> STM ([(Key, Async ())], KeySet, [Key])
226+ computeToPreserve1 :: Database -> KeySet -> STM ([(Key , Async () )], [Key ])
217227computeToPreserve1 db dirtySet = do
218228 -- All keys that depend (directly or transitively) on any dirty key
219229 affected <- computeTransitiveReverseDeps db dirtySet
220230 let rootKey = newKey " root"
221231 threads <- readTVar $ databaseThreads db
222232 -- not root and not effected
223233 let uneffected = [(k, async) | (DeliverStatus _ k, async) <- threads, not (memberKeySet k affected), k /= rootKey]
234+
224235 let allRuningkeys = map (deliverName. fst ) threads
225236 forM_ allRuningkeys $ \ k -> do
226237 -- if not dirty, bump its step
@@ -236,17 +247,17 @@ computeToPreserve1 db dirtySet = do
236247 -- Keep only those whose key is NOT affected by the dirty set
237248 pure (uneffected, allRuningkeys)
238249
250+
239251getRunningKeys :: Database -> STM [(Key , KeyDetails )]
240252getRunningKeys db = do
241253 ListT. toList $ SMap. listT (databaseValues db)
242254
243255-- compute the transitive reverse dependencies of a set of keys
244- -- using databaseRuntimeRevDep in the Database
256+ -- using databaseRuntimeDep in the Database
245257computeTransitiveReverseDeps :: Database -> KeySet -> STM KeySet
246258computeTransitiveReverseDeps db seeds = do
247- let rev = databaseRuntimeRevDep db
248-
249- -- BFS worklist starting from all seed keys.
259+ rev <- computeReverseRuntimeMap db
260+ let -- BFS worklist starting from all seed keys.
250261 -- visited contains everything we've already enqueued (including seeds).
251262 go :: KeySet -> [Key ] -> STM KeySet
252263 go visited [] = pure visited
@@ -264,9 +275,13 @@ computeTransitiveReverseDeps db seeds = do
264275 go seeds (toListKeySet seeds)
265276
266277
267- insertdatabaseRuntimeRevDep :: Key -> Key -> Database -> STM ()
268- insertdatabaseRuntimeRevDep k pk db = do
269- SMap. focus (Focus. alter (Just . maybe (singletonKeySet pk) (insertKeySet pk))) k (databaseRuntimeRevDep db)
278+ insertdatabaseRuntimeDep :: Key -> Key -> Database -> STM ()
279+ insertdatabaseRuntimeDep k pk db = do
280+ SMap. focus (Focus. alter (Just . maybe (singletonKeySet k) (insertKeySet k))) pk (databaseRuntimeDep db)
281+
282+ deleteDatabaseRuntimeDep :: Key -> Database -> STM ()
283+ deleteDatabaseRuntimeDep k db = do
284+ SMap. delete k (databaseRuntimeDep db)
270285
271286---------------------------------------------------------------------
272287
@@ -333,7 +348,8 @@ instance Exception AsyncParentKill where
333348
334349shutDatabase :: Set (Async () ) -> Database -> IO ()
335350shutDatabase preserve Database {.. } = uninterruptibleMask $ \ unmask -> do
336- -- wait for all threads to finish
351+ -- prune
352+ pruneFinished Database {.. }
337353 asyncs <- readTVarIO databaseThreads
338354 step <- readTVarIO databaseStep
339355 tid <- myThreadId
@@ -349,7 +365,8 @@ shutDatabase preserve Database{..} = uninterruptibleMask $ \unmask -> do
349365 -- But if it takes more than 10 seconds, log to stderr
350366 unless (null asyncs) $ do
351367 let warnIfTakingTooLong = unmask $ forever $ do
352- sleep 5
368+ sleep 10
369+ -- prune finished asyncs to keep the TVar small and report only active ones
353370 as <- readTVarIO databaseThreads
354371 -- poll each async: Nothing => still running
355372 statuses <- forM as $ \ (d,a) -> do
@@ -364,6 +381,24 @@ shutDatabase preserve Database{..} = uninterruptibleMask $ \unmask -> do
364381-- waitForDatabaseRunningKeys :: Database -> IO ()
365382-- waitForDatabaseRunningKeys = getDatabaseValues >=> mapM_ (waitRunning . snd)
366383
384+ -- | Remove finished asyncs from 'databaseThreads' (non-blocking).
385+ -- Uses 'poll' to check completion without waiting.
386+ pruneFinished :: Database -> IO ()
387+ pruneFinished db@ Database {.. } = do
388+ threads <- readTVarIO databaseThreads
389+ statuses <- forM threads $ \ (d,a) -> do
390+ p <- poll a
391+ return (d,a,p)
392+ let still = [ (d,a) | (d,a,p) <- statuses, isNothing p ]
393+ -- deleteDatabaseRuntimeDep of finished async keys
394+ forM_ statuses $ \ (d,_,p) -> when (isJust p) $ do
395+ let k = deliverName d
396+ atomically $ deleteDatabaseRuntimeDep k db
397+
398+ atomically $ modifyTVar' databaseThreads (const still)
399+
400+
401+
367402getDatabaseValues :: Database -> IO [(Key , Status )]
368403getDatabaseValues = atomically
369404 . (fmap . fmap ) (second keyStatus)
0 commit comments