Skip to content

Commit 7319397

Browse files
committed
improve hls graph
1 parent 42bbfbe commit 7319397

File tree

3 files changed

+106
-143
lines changed

3 files changed

+106
-143
lines changed

hls-graph/src/Development/IDE/Graph/Internal/Action.hs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,10 @@ actionFork act k = do
8181

8282
isAsyncException :: SomeException -> Bool
8383
isAsyncException e
84+
| Just (_ :: SomeAsyncException) <- fromException e = True
8485
| Just (_ :: AsyncCancelled) <- fromException e = True
8586
| Just (_ :: AsyncException) <- fromException e = True
87+
| Just (_ :: AsyncParentKill) <- fromException e = True
8688
| Just (_ :: ExitCode) <- fromException e = True
8789
| otherwise = False
8890

hls-graph/src/Development/IDE/Graph/Internal/Database.hs

Lines changed: 89 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -8,24 +8,24 @@
88
{-# LANGUAGE RecordWildCards #-}
99
{-# LANGUAGE TypeFamilies #-}
1010

11-
module Development.IDE.Graph.Internal.Database (compute, newDatabase, incDatabase, build, getDirtySet, getKeysAndVisitAge) where
11+
module Development.IDE.Graph.Internal.Database (compute, newDatabase, incDatabase, build, getDirtySet, getKeysAndVisitAge, AsyncParentKill(..)) where
1212

1313
import Prelude hiding (unzip)
1414

1515
import Control.Concurrent.Async
1616
import Control.Concurrent.Extra
17-
import Control.Concurrent.STM.Stats (STM, atomically,
17+
import Control.Concurrent.STM.Stats (STM, TVar, atomically,
1818
atomicallyNamed,
1919
modifyTVar', newTVarIO,
20-
readTVarIO)
20+
readTVar, readTVarIO,
21+
retry)
2122
import Control.Exception
2223
import Control.Monad
2324
import Control.Monad.IO.Class (MonadIO (liftIO))
2425
import Control.Monad.Trans.Class (lift)
2526
import Control.Monad.Trans.Reader
2627
import qualified Control.Monad.Trans.State.Strict as State
2728
import Data.Dynamic
28-
import Data.Either
2929
import Data.Foldable (for_, traverse_)
3030
import Data.IORef.Extra
3131
import Data.Maybe
@@ -41,6 +41,8 @@ import qualified ListT
4141
import qualified StmContainers.Map as SMap
4242
import System.IO.Unsafe
4343
import System.Time.Extra (duration, sleep)
44+
import UnliftIO (MonadUnliftIO (withRunInIO))
45+
import qualified UnliftIO.Exception as UE
4446

4547
#if MIN_VERSION_base(4,19,0)
4648
import Data.Functor (unzip)
@@ -78,7 +80,7 @@ incDatabase db Nothing = do
7880
updateDirty :: Monad m => Focus.Focus KeyDetails m ()
7981
updateDirty = Focus.adjust $ \(KeyDetails status rdeps) ->
8082
let status'
81-
| Running _ _ _ x <- status = Dirty x
83+
| Running _ x <- status = Dirty x
8284
| Clean x <- status = Dirty (Just x)
8385
| otherwise = status
8486
in KeyDetails status' rdeps
@@ -88,11 +90,8 @@ build
8890
=> Database -> Stack -> f key -> IO (f Key, f value)
8991
-- build _ st k | traceShow ("build", st, k) False = undefined
9092
build db stack keys = do
91-
built <- runAIO $ do
92-
built <- builder db stack (fmap newKey keys)
93-
case built of
94-
Left clean -> return clean
95-
Right dirty -> liftIO dirty
93+
step <- readTVarIO $ databaseStep db
94+
!built <- runAIO step $ builder db stack (fmap newKey keys)
9695
let (ids, vs) = unzip built
9796
pure (ids, fmap (asV . resultValue) vs)
9897
where
@@ -102,44 +101,41 @@ build db stack keys = do
102101
-- | Build a list of keys and return their results.
103102
-- If none of the keys are dirty, we can return the results immediately.
104103
-- Otherwise, a blocking computation is returned *which must be evaluated asynchronously* to avoid deadlock.
105-
builder
106-
:: Traversable f => Database -> Stack -> f Key -> AIO (Either (f (Key, Result)) (IO (f (Key, Result))))
104+
builder :: (Traversable f) => Database -> Stack -> f Key -> AIO (f (Key, Result))
107105
-- builder _ st kk | traceShow ("builder", st,kk) False = undefined
108-
builder db@Database{..} stack keys = withRunInIO $ \(RunInIO run) -> do
109-
-- Things that I need to force before my results are ready
110-
toForce <- liftIO $ newTVarIO []
111-
current <- liftIO $ readTVarIO databaseStep
112-
results <- liftIO $ for keys $ \id ->
113-
-- Updating the status of all the dependencies atomically is not necessary.
114-
-- Therefore, run one transaction per dep. to avoid contention
115-
atomicallyNamed "builder" $ do
116-
-- Spawn the id if needed
117-
status <- SMap.lookup id databaseValues
118-
val <- case viewDirty current $ maybe (Dirty Nothing) keyStatus status of
119-
Clean r -> pure r
120-
Running _ force val _
121-
| memberStack id stack -> throw $ StackException stack
122-
| otherwise -> do
123-
modifyTVar' toForce (Wait force :)
124-
pure val
125-
Dirty s -> do
126-
let act = run (refresh db stack id s)
127-
(force, val) = splitIO (join act)
128-
SMap.focus (updateStatus $ Running current force val s) id databaseValues
129-
modifyTVar' toForce (Spawn force:)
130-
pure val
131-
132-
pure (id, val)
133-
134-
toForceList <- liftIO $ readTVarIO toForce
135-
let waitAll = run $ waitConcurrently_ toForceList
136-
case toForceList of
137-
[] -> return $ Left results
138-
_ -> return $ Right $ do
139-
waitAll
140-
pure results
141-
142-
106+
builder db stack keys = do
107+
keyWaits <- for keys $ \k -> builderOne db stack k
108+
!res <- for keyWaits $ \(k, waitR) -> do
109+
!v<- liftIO waitR
110+
return (k, v)
111+
return res
112+
113+
builderOne :: Database -> Stack -> Key -> AIO (Key, IO Result)
114+
builderOne db@Database {..} stack id = UE.mask $ \restore -> do
115+
current <- liftIO $ readTVarIO databaseStep
116+
(k, registerWaitResult) <- liftIO $ atomicallyNamed "builder" $ do
117+
-- Spawn the id if needed
118+
status <- SMap.lookup id databaseValues
119+
val <-
120+
let refreshRsult s = do
121+
let act =
122+
restore $ asyncWithCleanUp $
123+
refresh db stack id s
124+
`UE.onException` (UE.uninterruptibleMask_ $ liftIO (atomicallyNamed "builder - onException" (SMap.focus updateDirty id databaseValues)))
125+
126+
SMap.focus (updateStatus $ Running current s) id databaseValues
127+
return act
128+
in case viewDirty current $ maybe (Dirty Nothing) keyStatus status of
129+
Dirty mbr -> refreshRsult mbr
130+
Running step _mbr
131+
| step /= current -> error $ "Inconsistent database state: key " ++ show id ++ " is marked Running at step " ++ show step ++ " but current step is " ++ show current
132+
| memberStack id stack -> throw $ StackException stack
133+
| otherwise -> retry
134+
Clean r -> pure . pure . pure $ r
135+
-- force here might contains async exceptions from previous runs
136+
pure (id, val)
137+
waitR <- registerWaitResult
138+
return (k, waitR)
143139
-- | isDirty
144140
-- only dirty when it's build time is older than the changed time of one of its dependencies
145141
isDirty :: Foldable t => Result -> t (a, Result) -> Bool
@@ -155,41 +151,37 @@ isDirty me = any (\(_,dep) -> resultBuilt me < resultChanged dep)
155151
refreshDeps :: KeySet -> Database -> Stack -> Key -> Result -> [KeySet] -> AIO Result
156152
refreshDeps visited db stack key result = \case
157153
-- no more deps to refresh
158-
[] -> liftIO $ compute db stack key RunDependenciesSame (Just result)
154+
[] -> compute' db stack key RunDependenciesSame (Just result)
159155
(dep:deps) -> do
160156
let newVisited = dep <> visited
161157
res <- builder db stack (toListKeySet (dep `differenceKeySet` visited))
162-
case res of
163-
Left res -> if isDirty result res
158+
if isDirty result res
164159
-- restart the computation if any of the deps are dirty
165-
then liftIO $ compute db stack key RunDependenciesChanged (Just result)
160+
then compute' db stack key RunDependenciesChanged (Just result)
166161
-- else kick the rest of the deps
167162
else refreshDeps newVisited db stack key result deps
168-
Right iores -> do
169-
res <- liftIO iores
170-
if isDirty result res
171-
then liftIO $ compute db stack key RunDependenciesChanged (Just result)
172-
else refreshDeps newVisited db stack key result deps
173-
174-
-- | Refresh a key:
175-
refresh :: Database -> Stack -> Key -> Maybe Result -> AIO (IO Result)
163+
164+
165+
-- refresh :: Database -> Stack -> Key -> Maybe Result -> IO Result
176166
-- refresh _ st k _ | traceShow ("refresh", st, k) False = undefined
167+
refresh :: Database -> Stack -> Key -> Maybe Result -> AIO Result
177168
refresh db stack key result = case (addStack key stack, result) of
178169
(Left e, _) -> throw e
179-
(Right stack, Just me@Result{resultDeps = ResultDeps deps}) -> asyncWithCleanUp $ refreshDeps mempty db stack key me (reverse deps)
180-
(Right stack, _) ->
181-
asyncWithCleanUp $ liftIO $ compute db stack key RunDependenciesChanged result
170+
(Right stack, Just me@Result{resultDeps = ResultDeps deps}) -> refreshDeps mempty db stack key me (reverse deps)
171+
(Right stack, _) -> compute' db stack key RunDependenciesChanged result
182172

173+
compute' :: Database -> Stack -> Key -> RunMode -> Maybe Result -> AIO Result
174+
compute' db stack key mode result = liftIO $ compute db stack key mode result
183175
-- | Compute a key.
184176
compute :: Database -> Stack -> Key -> RunMode -> Maybe Result -> IO Result
185177
-- compute _ st k _ _ | traceShow ("compute", st, k) False = undefined
186178
compute db@Database{..} stack key mode result = do
187179
let act = runRule databaseRules key (fmap resultData result) mode
188-
deps <- newIORef UnknownDeps
180+
deps <- liftIO $ newIORef UnknownDeps
189181
(execution, RunResult{..}) <-
190-
duration $ runReaderT (fromAction act) $ SAction db deps stack
191-
curStep <- readTVarIO databaseStep
192-
deps <- readIORef deps
182+
liftIO $ duration $ runReaderT (fromAction act) $ SAction db deps stack
183+
curStep <- liftIO $ readTVarIO databaseStep
184+
deps <- liftIO $ readIORef deps
193185
let lastChanged = maybe curStep resultChanged result
194186
let lastBuild = maybe curStep resultBuilt result
195187
-- changed time is always older than or equal to build time
@@ -212,12 +204,12 @@ compute db@Database{..} stack key mode result = do
212204
-- If an async exception strikes before the deps have been recorded,
213205
-- we won't be able to accurately propagate dirtiness for this key
214206
-- on the next build.
215-
void $
207+
liftIO $ void $
216208
updateReverseDeps key db
217209
(getResultDepsDefault mempty previousDeps)
218210
deps
219211
_ -> pure ()
220-
atomicallyNamed "compute and run hook" $ do
212+
liftIO $ atomicallyNamed "compute and run hook" $ do
221213
runHook
222214
SMap.focus (updateStatus $ Clean res) key databaseValues
223215
pure res
@@ -247,18 +239,6 @@ getKeysAndVisitAge db = do
247239
getAge Result{resultVisited = Step s} = curr - s
248240
return keysWithVisitAge
249241
--------------------------------------------------------------------------------
250-
-- Lazy IO trick
251-
252-
data Box a = Box {fromBox :: a}
253-
254-
-- | Split an IO computation into an unsafe lazy value and a forcing computation
255-
splitIO :: IO a -> (IO (), a)
256-
splitIO act = do
257-
let act2 = Box <$> act
258-
let res = unsafePerformIO act2
259-
(void $ evaluate res, fromBox res)
260-
261-
--------------------------------------------------------------------------------
262242
-- Reverse dependencies
263243

264244
-- | Update the reverse dependencies of an Id
@@ -301,14 +281,29 @@ transitiveDirtySet database = flip State.execStateT mempty . traverse_ loop
301281

302282
-- | A simple monad to implement cancellation on top of 'Async',
303283
-- generalizing 'withAsync' to monadic scopes.
304-
newtype AIO a = AIO { unAIO :: ReaderT (IORef [Async ()]) IO a }
284+
newtype AIO a = AIO { unAIO :: ReaderT (TVar [Async ()]) IO a }
305285
deriving newtype (Applicative, Functor, Monad, MonadIO)
306286

287+
data AsyncParentKill = AsyncParentKill ThreadId Step
288+
deriving (Show, Eq)
289+
290+
instance Exception AsyncParentKill where
291+
toException = asyncExceptionToException
292+
fromException = asyncExceptionFromException
293+
307294
-- | Run the monadic computation, cancelling all the spawned asyncs if an exception arises
308-
runAIO :: AIO a -> IO a
309-
runAIO (AIO act) = do
310-
asyncs <- newIORef []
311-
runReaderT act asyncs `onException` cleanupAsync asyncs
295+
runAIO :: Step -> AIO a -> IO a
296+
runAIO s (AIO act) = do
297+
asyncsRef <- newTVarIO []
298+
-- Log the exact exception (including async exceptions) before cleanup,
299+
-- then rethrow to preserve previous semantics.
300+
runReaderT act asyncsRef `onException` do
301+
asyncs <- atomically $ do
302+
r <- readTVar asyncsRef
303+
modifyTVar' asyncsRef $ const []
304+
return r
305+
tid <- myThreadId
306+
cleanupAsync asyncs tid s
312307

313308
-- | Like 'async' but with built-in cancellation.
314309
-- Returns an IO action to wait on the result.
@@ -319,27 +314,25 @@ asyncWithCleanUp act = do
319314
-- mask to make sure we keep track of the spawned async
320315
liftIO $ uninterruptibleMask $ \restore -> do
321316
a <- async $ restore io
322-
atomicModifyIORef'_ st (void a :)
317+
atomically $ modifyTVar' st (void a :)
323318
return $ wait a
324319

325320
unliftAIO :: AIO a -> AIO (IO a)
326321
unliftAIO act = do
327322
st <- AIO ask
328323
return $ runReaderT (unAIO act) st
329324

330-
newtype RunInIO = RunInIO (forall a. AIO a -> IO a)
325+
instance MonadUnliftIO AIO where
326+
withRunInIO k = do
327+
st <- AIO ask
328+
liftIO $ k (\aio -> runReaderT (unAIO aio) st)
331329

332-
withRunInIO :: (RunInIO -> AIO b) -> AIO b
333-
withRunInIO k = do
334-
st <- AIO ask
335-
k $ RunInIO (\aio -> runReaderT (unAIO aio) st)
336-
337-
cleanupAsync :: IORef [Async a] -> IO ()
330+
cleanupAsync :: [Async a] -> ThreadId -> Step -> IO ()
338331
-- mask to make sure we interrupt all the asyncs
339-
cleanupAsync ref = uninterruptibleMask $ \unmask -> do
340-
asyncs <- atomicModifyIORef' ref ([],)
332+
cleanupAsync asyncs tid step = uninterruptibleMask $ \unmask -> do
341333
-- interrupt all the asyncs without waiting
342-
mapM_ (\a -> throwTo (asyncThreadId a) AsyncCancelled) asyncs
334+
-- mapM_ (\a -> throwTo (asyncThreadId a) AsyncCancelled) asyncs
335+
mapM_ (\a -> throwTo (asyncThreadId a) $ AsyncParentKill tid step) asyncs
343336
-- Wait until all the asyncs are done
344337
-- But if it takes more than 10 seconds, log to stderr
345338
unless (null asyncs) $ do
@@ -348,32 +341,3 @@ cleanupAsync ref = uninterruptibleMask $ \unmask -> do
348341
traceM "cleanupAsync: waiting for asyncs to finish"
349342
withAsync warnIfTakingTooLong $ \_ ->
350343
mapM_ waitCatch asyncs
351-
352-
data Wait
353-
= Wait {justWait :: !(IO ())}
354-
| Spawn {justWait :: !(IO ())}
355-
356-
fmapWait :: (IO () -> IO ()) -> Wait -> Wait
357-
fmapWait f (Wait io) = Wait (f io)
358-
fmapWait f (Spawn io) = Spawn (f io)
359-
360-
waitOrSpawn :: Wait -> IO (Either (IO ()) (Async ()))
361-
waitOrSpawn (Wait io) = pure $ Left io
362-
waitOrSpawn (Spawn io) = Right <$> async io
363-
364-
waitConcurrently_ :: [Wait] -> AIO ()
365-
waitConcurrently_ [] = pure ()
366-
waitConcurrently_ [one] = liftIO $ justWait one
367-
waitConcurrently_ many = do
368-
ref <- AIO ask
369-
-- spawn the async computations.
370-
-- mask to make sure we keep track of all the asyncs.
371-
(asyncs, syncs) <- liftIO $ uninterruptibleMask $ \unmask -> do
372-
waits <- liftIO $ traverse (waitOrSpawn . fmapWait unmask) many
373-
let (syncs, asyncs) = partitionEithers waits
374-
liftIO $ atomicModifyIORef'_ ref (asyncs ++)
375-
return (asyncs, syncs)
376-
-- work on the sync computations
377-
liftIO $ sequence_ syncs
378-
-- wait for the async computations before returning
379-
liftIO $ traverse_ wait asyncs

0 commit comments

Comments
 (0)