22
33module Cardano.DbSync.Ledger.Async where
44
5+ import Cardano.DbSync.Era.Shelley.Generic.Rewards as Generic
6+ import Cardano.DbSync.Ledger.Event
57import Cardano.DbSync.Ledger.Types
6- import Cardano.Ledger.BaseTypes (EpochNo )
8+ import Cardano.DbSync.Types
9+ import Cardano.Ledger.BaseTypes
710import Cardano.Ledger.Crypto (StandardCrypto )
811import qualified Cardano.Ledger.EpochBoundary as Ledger
12+ import qualified Cardano.Ledger.Rewards as Ledger
13+ import Cardano.Ledger.Shelley.RewardUpdate as Ledger
914import Control.Concurrent.Class.MonadSTM.Strict
1015import qualified Control.Concurrent.STM.TBQueue as TBQ
16+ import Control.Monad.Extra (whenJust )
17+ import Data.Map (Map )
18+ import Data.Set (Set )
19+
20+ --------------------------------------------------------------------------------
21+ -- EpochStake
22+ --------------------------------------------------------------------------------
1123
1224newEpochStakeChannels :: IO EpochStakeChannels
1325newEpochStakeChannels =
@@ -18,27 +30,94 @@ newEpochStakeChannels =
1830 <*> newTVarIO Nothing
1931
2032-- To be used by the main thread
21- ensureEpochDone :: EpochStakeChannels -> EpochNo -> Ledger. SnapShot StandardCrypto -> IO ()
22- ensureEpochDone sQueue epoch snapshot = atomically $ do
23- mLastEpochDone <- waitFinished sQueue
33+ ensureStakeDone :: EpochStakeChannels -> EpochNo -> Ledger. SnapShot StandardCrypto -> IO ()
34+ ensureStakeDone esc epoch snapshot = do
35+ mLastEpochDone <- atomically $ waitStakeFinished esc
2436 case mLastEpochDone of
25- Just lastEpochDone | lastEpochDone = = epoch -> pure ()
37+ Just lastEpochDone | lastEpochDone > = epoch -> pure ()
2638 _ -> do
2739 -- If last is not already there, put it to list and wait again
28- writeEpochStakeAction sQueue epoch snapshot True
29- retry
40+ atomically $ writeEpochStakeAction esc epoch snapshot True -- TODO: do outside STM
41+ _ <- atomically $ waitStakeFinished esc
42+ pure ()
3043
31- -- To be used by the main thread
32- waitFinished :: EpochStakeChannels -> STM IO (Maybe EpochNo )
33- waitFinished sQueue = do
34- stakeThreadState <- readTVar (epochResult sQueue )
44+ -- To be used by the main thread. Only blocks if it's 'Running', until it finishes.
45+ waitStakeFinished :: EpochStakeChannels -> STM IO (Maybe EpochNo )
46+ waitStakeFinished esc = do
47+ stakeThreadState <- readTVar (epochResult esc )
3548 case stakeThreadState of
3649 Just (lastEpoch, Done ) -> pure $ Just lastEpoch -- Normal case
3750 Just (_, Running ) -> retry -- Wait to finish current work.
3851 Nothing -> pure Nothing -- This will happen after a restart
3952
4053-- To be used by the main thread
4154writeEpochStakeAction :: EpochStakeChannels -> EpochNo -> Ledger. SnapShot StandardCrypto -> Bool -> STM IO ()
42- writeEpochStakeAction sQueue epoch snapShot checkFirst = do
43- TBQ. writeTBQueue (estakeQueue sQueue) $ EpochStakeDBAction epoch snapShot checkFirst
44- writeTVar (epochResult sQueue) $ Just (epoch, Running )
55+ writeEpochStakeAction esc epoch snapShot checkFirst = do
56+ TBQ. writeTBQueue (estakeQueue esc) $ EpochStakeDBAction epoch snapShot checkFirst
57+ writeTVar (epochResult esc) $ Just (epoch, Running )
58+
59+ --------------------------------------------------------------------------------
60+ -- Rewards
61+ --------------------------------------------------------------------------------
62+
63+ newRewardsChannels :: IO RewardsChannels
64+ newRewardsChannels =
65+ RewardsChannels
66+ <$> TBQ. newTBQueueIO 5
67+ <*> newTVarIO Nothing
68+
69+ asyncWriteRewards :: HasLedgerEnv -> CardanoLedgerState -> EpochNo -> Bool -> [LedgerEvent ] -> IO ()
70+ asyncWriteRewards env newState currentEpochNo isNewEpoch rewardEventsEB = do
71+ rewState <- atomically $ readTVar $ rewardsResult rc
72+ if isNewEpoch
73+ then do
74+ case rewState of
75+ Just (e', RewRunning ) | e' == currentEpochNo -> do
76+ waitRewardUntil rc (e', RewDone )
77+ _ -> do
78+ ensureRewardsDone rc currentEpochNo (findTotal rewardEventsEB)
79+ waitEBRewardsAction rc currentEpochNo rewardEventsEB
80+ else do
81+ case rewState of
82+ Just (e', _) | e' >= currentEpochNo -> pure ()
83+ _ ->
84+ whenJust (Generic. getRewardsUpdate (getTopLevelconfigHasLedger env) (clsState newState)) $ \ ru -> do
85+ atomically $ writeRewardsAction rc currentEpochNo False (Ledger. rs ru) -- (e-1) (e+1)
86+ where
87+ rc = leRewardsChans env
88+
89+ findTotal :: [LedgerEvent ] -> Maybe (Map StakeCred (Set (Ledger. Reward StandardCrypto )))
90+ findTotal [] = Nothing
91+ findTotal (LedgerTotalRewards _ mp : _) = Just mp
92+ findTotal (_ : rest) = findTotal rest
93+
94+ -- To be used by the main thread
95+ ensureRewardsDone :: RewardsChannels -> EpochNo -> Maybe (Map StakeCred (Set (Ledger. Reward StandardCrypto ))) -> IO ()
96+ ensureRewardsDone rc epoch mmp = do
97+ whenJust mmp $ \ mp -> do
98+ atomically $ writeRewardsAction rc epoch True mp
99+ waitRewardUntil rc (epoch, RewDone )
100+
101+ waitEBRewardsAction :: RewardsChannels -> EpochNo -> [LedgerEvent ] -> IO ()
102+ waitEBRewardsAction rc epoch les = do
103+ atomically $ do
104+ TBQ. writeTBQueue (rQueue rc) $ RewardsEpochBoundary epoch les
105+ writeTVar (rewardsResult rc) $ Just (epoch, RewEBRunning )
106+ waitRewardUntil rc (epoch, RewEBDone )
107+
108+ -- To be used by the main thread
109+ writeRewardsAction :: RewardsChannels -> EpochNo -> Bool -> Map StakeCred (Set (Ledger. Reward StandardCrypto )) -> STM IO ()
110+ writeRewardsAction rc epoch checkFirst mp = do
111+ TBQ. writeTBQueue (rQueue rc) $ RewardsDBAction epoch mp checkFirst
112+ writeTVar (rewardsResult rc) $ Just (epoch, RewRunning )
113+
114+ waitRewardUntil :: RewardsChannels -> (EpochNo , EpochRewardState ) -> IO ()
115+ waitRewardUntil rc st = waitRewardUntilPred rc (== st)
116+
117+ -- blocks until the reward result satisfies a specific predicate.
118+ waitRewardUntilPred :: RewardsChannels -> ((EpochNo , EpochRewardState ) -> Bool ) -> IO ()
119+ waitRewardUntilPred rc prd = atomically $ do
120+ rewardsThreadState <- readTVar (rewardsResult rc)
121+ case rewardsThreadState of
122+ Just st | prd st -> pure ()
123+ _ -> retry
0 commit comments