diff --git a/cardano-db-sync/src/Cardano/DbSync/Api.hs b/cardano-db-sync/src/Cardano/DbSync/Api.hs index e87118d1d..525732152 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Api.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Api.hs @@ -320,12 +320,12 @@ mkSyncEnv metricSetters trce dbEnv syncOptions protoInfo nw nwMagic systemStart newEmptyCache CacheCapacity { cacheCapacityAddress = 50000 - , cacheCapacityStake = 50000 + , cacheCapacityStake = 150000 , cacheCapacityDatum = 125000 , cacheCapacityMultiAsset = 125000 , cacheCapacityTx = 50000 , cacheOptimisePools = 50000 - , cacheOptimiseStake = 50000 + , cacheOptimiseStake = 150000 } else pure useNoCache consistentLevelVar <- newTVarIO Unchecked diff --git a/cardano-db-sync/src/Cardano/DbSync/Api/Ledger.hs b/cardano-db-sync/src/Cardano/DbSync/Api/Ledger.hs index 46a7cc87a..01171962c 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Api/Ledger.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Api/Ledger.hs @@ -29,6 +29,7 @@ import Numeric import Ouroboros.Consensus.Cardano.Block hiding (CardanoBlock) import Ouroboros.Consensus.Ledger.Extended (ExtLedgerState, ledgerState) import qualified Ouroboros.Consensus.Shelley.Ledger.Ledger as Consensus +import System.Mem (performMinorGC) import qualified Cardano.Db as DB import Cardano.DbSync.Api @@ -132,6 +133,7 @@ storePage syncEnv percQuantum (n, ls) = do txOutIds <- lift $ DB.insertBulkTxOut False $ etoTxOut . fst <$> txOuts let maTxOuts = concatMap (mkmaTxOuts txOutVariantType) $ zip txOutIds (snd <$> txOuts) void . lift $ DB.insertBulkMaTxOutPiped [maTxOuts] + liftIO performMinorGC where txOutVariantType = getTxOutVariantType syncEnv trce = getTrace syncEnv diff --git a/cardano-db-sync/src/Cardano/DbSync/Cache.hs b/cardano-db-sync/src/Cardano/DbSync/Cache.hs index 8560ad01b..4fc8faa65 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Cache.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Cache.hs @@ -192,9 +192,16 @@ queryStakeAddrWithCacheRetBs syncEnv cacheUA ra@(Ledger.RewardAccount _ cred) = case queryRes of Nothing -> pure queryRes Just stakeAddrsId -> do - let !stakeCache' = case cacheUA of + let stable = scStableCache stakeCache + maxSize = 150000 + trimSize = 145000 -- Trim to 145k when hitting 150k (less aggressive, better hit rate) + trimmedStable = + if Map.size stable >= maxSize + then Map.fromList $ take trimSize $ Map.toList stable + else stable + !stakeCache' = case cacheUA of UpdateCache -> stakeCache {scLruCache = LRU.insert cred stakeAddrsId (scLruCache stakeCache)} - UpdateCacheStrong -> stakeCache {scStableCache = Map.insert cred stakeAddrsId (scStableCache stakeCache)} + UpdateCacheStrong -> stakeCache {scStableCache = Map.insert cred stakeAddrsId trimmedStable} _otherwise -> stakeCache liftIO $ atomically $ diff --git a/cardano-db-sync/src/Cardano/DbSync/Era/Universal/Block.hs b/cardano-db-sync/src/Cardano/DbSync/Era/Universal/Block.hs index cfda9a4b1..749cca44f 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Era/Universal/Block.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Era/Universal/Block.hs @@ -64,8 +64,8 @@ insertBlockUniversal :: insertBlockUniversal syncEnv shouldLog withinTwoMins withinHalfHour blk details isMember applyResult = do -- if we're syncing within 2 mins of the tip, we clean certain caches for tip following. when (isSyncedWithintwoMinutes details) $ cleanCachesForTip cache - -- Optimise caches every 100k blocks to prevent unbounded growth - when (unBlockNo (Generic.blkBlockNo blk) `mod` 100000 == 0) $ optimiseCaches cache + -- Optimise caches every 50k blocks to prevent unbounded growth + when (unBlockNo (Generic.blkBlockNo blk) `mod` 50000 == 0) $ optimiseCaches cache do pbid <- case Generic.blkPreviousHash blk of Nothing -> liftDbLookup mkSyncNodeCallStack $ DB.queryGenesis $ renderErrorMessage (Generic.blkEra blk) -- this is for networks that fork from Byron on epoch 0. diff --git a/cardano-db-sync/src/Cardano/DbSync/Era/Universal/Epoch.hs b/cardano-db-sync/src/Cardano/DbSync/Era/Universal/Epoch.hs index 8b7c30051..253ba1e5a 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Era/Universal/Epoch.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Era/Universal/Epoch.hs @@ -39,6 +39,7 @@ import Cardano.Ledger.Conway.PParams (DRepVotingThresholds (..)) import Cardano.Ledger.Conway.Rules (RatifyState (..)) import Cardano.Prelude import Cardano.Slotting.Slot (EpochNo (..), SlotNo) +import System.Mem (performMinorGC) import qualified Cardano.Db as DB import Cardano.DbSync.Api @@ -222,6 +223,8 @@ insertEpochStake syncEnv nw epochNo stakeChunk = do -- minimising the bulk inserts into hundred thousand chunks to improve performance with pipeline lift $ DB.insertBulkEpochStakePiped dbConstraintEpochStake chunckDbStakes + + liftIO performMinorGC where mkStake :: (StakeCred, (Shelley.Coin, PoolKeyHash)) -> @@ -252,6 +255,8 @@ insertRewards syncEnv nw earnedEpoch spendableEpoch rewardsChunk = do let chunckDbRewards = DB.chunkForBulkQuery (Proxy @DB.Reward) Nothing dbRewards -- minimising the bulk inserts into hundred thousand chunks to improve performance with pipeline lift $ DB.insertBulkRewardsPiped dbConstraintRewards chunckDbRewards + + liftIO performMinorGC where mkRewards :: (StakeCred, Set Generic.Reward) -> diff --git a/cardano-db-sync/src/Cardano/DbSync/Era/Universal/Insert/GovAction.hs b/cardano-db-sync/src/Cardano/DbSync/Era/Universal/Insert/GovAction.hs index 2cdd944ec..63d01aae0 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Era/Universal/Insert/GovAction.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Era/Universal/Insert/GovAction.hs @@ -66,6 +66,7 @@ import qualified Data.ByteString.Lazy.Char8 as LBS import qualified Data.Map.Strict as Map import qualified Data.Text.Encoding as Text import Ouroboros.Consensus.Cardano.Block (ConwayEra) +import System.Mem (performMinorGC) insertGovActionProposal :: SyncEnv -> @@ -383,6 +384,7 @@ insertDrepDistr e pSnapshot = do allDrepDistrs <- mapM processChunk drepChunks -- Insert all chunks in a single pipeline operation lift $ DB.insertBulkDrepDistrPiped allDrepDistrs + liftIO performMinorGC where processChunk = mapM mkEntry diff --git a/cardano-db-sync/src/Cardano/DbSync/Ledger/State.hs b/cardano-db-sync/src/Cardano/DbSync/Ledger/State.hs index 02dfaaabf..69973f998 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Ledger/State.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Ledger/State.hs @@ -65,6 +65,7 @@ import Cardano.Slotting.Slot ( at, fromWithOrigin, ) +import Codec.CBOR.Write (toBuilder) import Control.Concurrent.Class.MonadSTM.Strict ( atomically, newTVarIO, @@ -74,6 +75,7 @@ import Control.Concurrent.Class.MonadSTM.Strict ( import Control.Concurrent.STM.TBQueue (TBQueue, newTBQueueIO, readTBQueue, writeTBQueue) import qualified Control.Exception as Exception import qualified Data.ByteString.Base16 as Base16 +import qualified Data.ByteString.Builder as Builder import qualified Data.ByteString.Char8 as BS import qualified Data.ByteString.Lazy.Char8 as LBS import qualified Data.ByteString.Short as SBS @@ -121,6 +123,7 @@ import Ouroboros.Network.Block (HeaderHash, Point (..)) import qualified Ouroboros.Network.Point as Point import System.Directory (doesFileExist, listDirectory, removeFile) import System.FilePath (dropExtension, takeExtension, ()) +import qualified System.IO as IO import System.Mem (performMajorGC) import Prelude (String, id) @@ -376,18 +379,17 @@ ledgerStateWriteLoop tracer swQueue codecConfig = writeLedgerStateFile :: FilePath -> CardanoLedgerState -> IO () writeLedgerStateFile file ledger = do startTime <- getCurrentTime - -- TODO: write the builder directly. - -- BB.writeFile file $ toBuilder $ - LBS.writeFile file $ - Serialize.serialize $ - encodeCardanoLedgerState - ( Consensus.encodeExtLedgerState - (encodeDisk codecConfig) - (encodeDisk codecConfig) - (encodeDisk codecConfig) - . forgetLedgerTables - ) - ledger + -- Use streaming builder to avoid loading entire state into memory + IO.withBinaryFile file IO.WriteMode $ \h -> do + let encoding = + encodeCardanoLedgerState + ( Consensus.encodeExtLedgerState + (encodeDisk codecConfig) + (encodeDisk codecConfig) + (encodeDisk codecConfig) + ) + ledger + Builder.hPutBuilder h (toBuilder encoding) endTime <- getCurrentTime logInfo tracer $ mconcat @@ -395,7 +397,6 @@ ledgerStateWriteLoop tracer swQueue codecConfig = , Text.pack file , " in " , textShow (diffUTCTime endTime startTime) - , "." ] mkLedgerStateFilename :: LedgerStateDir -> ExtLedgerState CardanoBlock mk -> Maybe EpochNo -> WithOrigin FilePath @@ -641,12 +642,13 @@ loadLedgerStateFromFile tracer config delete point lsf = do safeReadFile :: FilePath -> IO (Either Text CardanoLedgerState) safeReadFile fp = do startTime <- getCurrentTime - mbs <- Exception.try $ BS.readFile fp + -- Use lazy ByteString to enable streaming read + mbs <- Exception.try $ LBS.readFile fp case mbs of Left (err :: IOException) -> pure $ Left (Text.pack $ displayException err) - Right bs -> do + Right lbs -> do mediumTime <- getCurrentTime - case decode bs of + case decode lbs of Left err -> pure $ Left $ textShow err Right ls -> do endTime <- getCurrentTime @@ -656,7 +658,7 @@ loadLedgerStateFromFile tracer config delete point lsf = do , renderPoint point , ". It took " , textShow (diffUTCTime mediumTime startTime) - , " to read from disk and " + , " to read from disk (streaming) and " , textShow (diffUTCTime endTime mediumTime) , " to parse." ] @@ -665,12 +667,11 @@ loadLedgerStateFromFile tracer config delete point lsf = do codecConfig :: CodecConfig CardanoBlock codecConfig = configCodec config - decode :: ByteString -> Either DecoderError CardanoLedgerState - decode = do + decode :: LBS.ByteString -> Either DecoderError CardanoLedgerState + decode = Serialize.decodeFullDecoder "Ledger state file" decodeState - . LBS.fromStrict decodeState :: (forall s. Decoder s CardanoLedgerState) decodeState = diff --git a/cardano-db-sync/src/Cardano/DbSync/Ledger/Types.hs b/cardano-db-sync/src/Cardano/DbSync/Ledger/Types.hs index eade333f8..c232ea9fc 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Ledger/Types.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Ledger/Types.hs @@ -62,7 +62,7 @@ import Prelude (fail, id) -------------------------------------------------------------------------- data HasLedgerEnv = HasLedgerEnv - { leTrace :: Trace IO Text + { leTrace :: !(Trace IO Text) , leUseLedger :: !Bool , leHasRewards :: !Bool , leProtocolInfo :: !(Consensus.ProtocolInfo CardanoBlock) @@ -195,8 +195,8 @@ updatedCommittee membersToRemove membersToAdd newQuorum committee = newCommitteeMembers newQuorum -newtype LedgerDB = LedgerDB - { ledgerDbCheckpoints :: AnchoredSeq (WithOrigin SlotNo) CardanoLedgerState CardanoLedgerState +data LedgerDB = LedgerDB + { ledgerDbCheckpoints :: !(AnchoredSeq (WithOrigin SlotNo) CardanoLedgerState CardanoLedgerState) } instance Anchorable (WithOrigin SlotNo) CardanoLedgerState CardanoLedgerState where diff --git a/cardano-db-sync/src/Cardano/DbSync/LocalStateQuery.hs b/cardano-db-sync/src/Cardano/DbSync/LocalStateQuery.hs index 60213d335..7da383e4d 100644 --- a/cardano-db-sync/src/Cardano/DbSync/LocalStateQuery.hs +++ b/cardano-db-sync/src/Cardano/DbSync/LocalStateQuery.hs @@ -55,7 +55,7 @@ import qualified Ouroboros.Network.Protocol.LocalStateQuery.Client as StateQuery import Ouroboros.Network.Protocol.LocalStateQuery.Type (AcquireFailure, Target (..)) data NoLedgerEnv = NoLedgerEnv - { nleTracer :: Trace IO Text + { nleTracer :: !(Trace IO Text) , nleSystemStart :: !SystemStart , nleQueryVar :: StateQueryTMVar CardanoBlock CardanoInterpreter , nleHistoryInterpreterVar :: StrictTVar IO (Strict.Maybe CardanoInterpreter)