From db9505b163aa36c7cfd9942603e7f27e0137360c Mon Sep 17 00:00:00 2001 From: Bromel777 Date: Sat, 13 Jan 2024 19:28:21 +0100 Subject: [PATCH 1/3] update --- amm-executor/resources/config.dhall | 18 ++++--- amm-executor/src/Spectrum/Executor.hs | 2 +- .../src/Spectrum/Executor/Backlog/Config.hs | 3 +- .../src/Spectrum/Executor/Backlog/Service.hs | 2 +- .../Executor/OrdersExecutor/Process.hs | 16 ++++++- .../Executor/OrdersExecutor/Service.hs | 47 ++++++++++++------- amm-executor/test/Gen/ConstantsGen.hs | 1 + dcConfigs/dcSpectrumConfig.dhall | 12 +++-- 8 files changed, 69 insertions(+), 32 deletions(-) diff --git a/amm-executor/resources/config.dhall b/amm-executor/resources/config.dhall index a205916e..a74826a8 100755 --- a/amm-executor/resources/config.dhall +++ b/amm-executor/resources/config.dhall @@ -1,4 +1,4 @@ -let FeePolicy = < Strict | Balance > +let FeePolicy = < Strict | Balance | SplitBetween : List Text > let CollateralPolicy = < Ignore | Cover > let Network = < Mainnet | Preview > @@ -88,9 +88,9 @@ in {- The backlog is an essential component of the application responsible for executing orders. The configuration provided below specifies the main parameters related to order execution: - * orderLifetime - This parameter defines the duration in picoseconds during which an + * orderLifetime - This parameter defines the duration in milliseconds during which an order will be considered ready for execution, starting from the current time. - * orderExecTime - This parameter determines the duration in picoseconds for rechecking + * orderExecTime - This parameter determines the duration in milliseconds for rechecking the execution status of an order. If an order was not executed within this timeframe, the backlog will attempt to execute it again. * suspendedPropability - This parameter sets the probability level for executing orders @@ -103,9 +103,10 @@ in of 95%. -} backlogConfig = - { orderLifetime = 4500 - , orderExecTime = 1500 - , suspendedPropability = 0 + { orderLifetime = 45000000 + , orderExecTime = 15000000 + , suspendedPropability = 90000 + , unsafeQueueOrderLifetime = 60000000000000 }, {- @@ -192,11 +193,14 @@ in utxoStoreConfig = { utxoStorePath = "./path/to/utxoStore" , createIfMissing = True - }, + } , unsafeEval = { unsafeTxFee = +320000 , exUnits = 165000000 , exMem = 530000 } +, httpSubmit = + { submitUri = "http://localhost:8090/api/submit/tx" + } } \ No newline at end of file diff --git a/amm-executor/src/Spectrum/Executor.hs b/amm-executor/src/Spectrum/Executor.hs index c94ae5dd..c2894caf 100755 --- a/amm-executor/src/Spectrum/Executor.hs +++ b/amm-executor/src/Spectrum/Executor.hs @@ -272,7 +272,7 @@ wireApp = App { unApp = interceptSigTerm >> do poolActionsV1 = mkPoolActions unsafeEval (PaymentPubKeyHash executorPkh) validatorsV1 poolActionsV2 = mkPoolActions unsafeEval (PaymentPubKeyHash executorPkh) validatorsV2 refInputs <- liftIO $ mkRefInputs txsInsRefs explorer - executorService <- mkOrdersExecutorService backlogService transactions explorer resolver poolActionsV1 poolActionsV2 refInputs + executorService <- mkOrdersExecutorService backlogService transactions backlogConfig explorer resolver poolActionsV1 poolActionsV2 refInputs executor <- mkOrdersExecutor backlogService executorService pendingOrdersLogging <- forComponent mkLogging "Bots.PendingOrdersHandler" mempoolOrdersLogging <- forComponent mkLogging "Bots.MempoolOrdersHandler" diff --git a/amm-executor/src/Spectrum/Executor/Backlog/Config.hs b/amm-executor/src/Spectrum/Executor/Backlog/Config.hs index 7653324c..b7ce63ae 100755 --- a/amm-executor/src/Spectrum/Executor/Backlog/Config.hs +++ b/amm-executor/src/Spectrum/Executor/Backlog/Config.hs @@ -17,7 +17,8 @@ import GHC.Natural data BacklogServiceConfig = BacklogServiceConfig { orderLifetime :: !NominalDiffTime , orderExecTime :: !NominalDiffTime - , suspendedPropability :: !Natural + , suspendedPropability :: !Natural + , unsafeQueueOrderLifetime :: !NominalDiffTime } deriving (Generic, FromDhall, Show) instance FromDhall NominalDiffTime where diff --git a/amm-executor/src/Spectrum/Executor/Backlog/Service.hs b/amm-executor/src/Spectrum/Executor/Backlog/Service.hs index 2740142e..da3378a1 100755 --- a/amm-executor/src/Spectrum/Executor/Backlog/Service.hs +++ b/amm-executor/src/Spectrum/Executor/Backlog/Service.hs @@ -28,7 +28,7 @@ import Spectrum.Executor.Backlog.Data.BacklogOrder import Spectrum.Executor.Backlog.Persistence.BacklogStore ( BacklogStore(BacklogStore, exists, get, dropOrder, put, getAll) ) import Spectrum.Executor.Backlog.Config - ( BacklogServiceConfig (BacklogServiceConfig, orderLifetime, orderExecTime, suspendedPropability) ) + ( BacklogServiceConfig (..) ) import Spectrum.Executor.Data.OrderState ( OrderState (..), OrderInState (PendingOrder, SuspendedOrder, InProgressOrder) ) import Spectrum.Executor.Types diff --git a/amm-executor/src/Spectrum/Executor/OrdersExecutor/Process.hs b/amm-executor/src/Spectrum/Executor/OrdersExecutor/Process.hs index aeb0dbdc..6d8be20d 100755 --- a/amm-executor/src/Spectrum/Executor/OrdersExecutor/Process.hs +++ b/amm-executor/src/Spectrum/Executor/OrdersExecutor/Process.hs @@ -12,9 +12,11 @@ import RIO ( (&), MonadReader, MonadUnliftIO, MonadIO (liftIO), QSem, (<&>) ) import qualified RIO.List as List import Streamly.Prelude as S - ( repeatM, mapM, MonadAsync, IsStream, before, drain ) + ( repeatM, mapM, MonadAsync, IsStream, before, drain, fromEffect ) import Control.Monad.Catch ( MonadThrow, SomeException, MonadCatch, catches, Handler (Handler) ) +import RIO + ( atomicModifyIORef' ) import System.Logging.Hlog ( MakeLogging (MakeLogging, forComponent), Logging (Logging, debugM, debugM) ) @@ -99,7 +101,17 @@ run' -> OrdersExecutorService m -> s m () run' Logging{..} BacklogService{..} OrdersExecutorService{..} = - S.repeatM (liftIO (threadDelay 5000000) >> tryAcquire) & S.mapM (\case + S.repeatM (liftIO (threadDelay 5000000) >> atomicModifyIORef' getQueue (\queue -> + case queue of + [] -> ([], Nothing) + [xs] -> ([], Just xs) + (x:xs) -> (xs, Just x) + )) & S.mapM (\case + Just order -> + pure $ Just order + Nothing -> + tryAcquire + ) & S.mapM (\case Just orderWithCreationTime@(OrderWithCreationTime order _) -> infoM ("Going to execute order for pool" ++ show (orderId order)) >> execute orderWithCreationTime diff --git a/amm-executor/src/Spectrum/Executor/OrdersExecutor/Service.hs b/amm-executor/src/Spectrum/Executor/OrdersExecutor/Service.hs index d0249733..426c3c42 100644 --- a/amm-executor/src/Spectrum/Executor/OrdersExecutor/Service.hs +++ b/amm-executor/src/Spectrum/Executor/OrdersExecutor/Service.hs @@ -7,7 +7,7 @@ import Prelude hiding (drop) import RIO.Time ( UTCTime, getCurrentTime, diffUTCTime, nominalDiffTimeToSeconds ) import RIO - ( MonadReader, MonadUnliftIO, (<&>), MonadIO, Text ) + ( MonadReader, MonadUnliftIO, (<&>), MonadIO, Text, IORef, atomicModifyIORef', newIORef ) import qualified RIO.List as List import Control.Monad.Catch ( MonadThrow, SomeException, MonadCatch, catches, Handler (Handler) ) @@ -68,33 +68,39 @@ import Data.Text (pack) import Data.Aeson (encode) import qualified System.Logging.Hlog as Trace import Spectrum.Executor.OrdersExecutor.RefInputs (RefInputs(..)) +import Spectrum.Executor.Backlog.Config (BacklogServiceConfig(..)) data OrdersExecutorService m = OrdersExecutorService { execute :: OrderWithCreationTime -> m () , executeUnsafe :: OrderWithCreationTime -> m () + , getQueue :: IORef [OrderWithCreationTime] } mkOrdersExecutorService :: forall f m env era. ( MonadUnliftIO m , MonadReader env f + , MonadUnliftIO f , HasType (MakeLogging f m) env , HasType TxRefs env, MonadCatch m) => BacklogService m -> Transactions m era + -> BacklogServiceConfig -> Explorer m -> PoolResolver m -> PoolActions 'V1 -> PoolActions 'V2 -> RefInputs -> f (OrdersExecutorService m) -mkOrdersExecutorService backlog transactions explorer resolver poolActionsV1 poolActionsV2 refInputs = do - MakeLogging{..} <- askContext - txRefsCfg <- askContext - logging <- forComponent "Bots.OrdersExecutorService" +mkOrdersExecutorService backlog transactions config explorer resolver poolActionsV1 poolActionsV2 refInputs = do + MakeLogging{..} <- askContext + txRefsCfg <- askContext + logging <- forComponent "Bots.OrdersExecutorService" + unsafeOrderQueue <- newIORef [] pure $ OrdersExecutorService - { execute = execute' logging txRefsCfg backlog transactions explorer resolver poolActionsV1 poolActionsV2 - , executeUnsafe = executeUnsafe' logging refInputs backlog transactions explorer resolver poolActionsV1 poolActionsV2 + { execute = execute' logging txRefsCfg backlog transactions config explorer resolver poolActionsV1 poolActionsV2 unsafeOrderQueue + , executeUnsafe = executeUnsafe' logging refInputs backlog transactions config explorer resolver poolActionsV1 poolActionsV2 unsafeOrderQueue + , getQueue = unsafeOrderQueue } execute' @@ -103,13 +109,15 @@ execute' -> TxRefs -> BacklogService m -> Transactions m era + -> BacklogServiceConfig -> Explorer m -> PoolResolver m -> PoolActions 'V1 -> PoolActions 'V2 + -> IORef [OrderWithCreationTime] -> OrderWithCreationTime -> m () -execute' l@Logging{..} txRefs backlog@BacklogService{suspend, drop} txs explorer resolver poolActionsV1 poolActionsV2 (OrderWithCreationTime order orderTime) = do +execute' l@Logging{..} txRefs backlog@BacklogService{suspend, drop} txs config explorer resolver poolActionsV1 poolActionsV2 priorityMap (OrderWithCreationTime order orderTime) = do executionStartTime <- getCurrentTime executeOrder' backlog l txRefs txs explorer resolver poolActionsV1 poolActionsV2 order executionStartTime `catches` [ Handler (\ (execErr :: OrderExecErr) -> case execErr of @@ -120,7 +128,7 @@ execute' l@Logging{..} txRefs backlog@BacklogService{suspend, drop} txs explorer dropError -> drop (orderId order) >> infoM ("Err " ++ show dropError ++ " occured for " ++ show (orderId order) ++ ". Going to drop") ) - , Handler (\ (dropError :: SomeException) -> processOrderExecutionException l backlog dropError order orderTime) + , Handler (\ (dropError :: SomeException) -> processOrderExecutionException l backlog config dropError order orderTime priorityMap executionStartTime) ] executionEndTime <- getCurrentTime let timeDiff = fromEnum $ nominalDiffTimeToSeconds $ diffUTCTime executionEndTime executionStartTime @@ -134,13 +142,15 @@ executeUnsafe' -> RefInputs -> BacklogService m -> Transactions m era + -> BacklogServiceConfig -> Explorer m -> PoolResolver m -> PoolActions 'V1 -> PoolActions 'V2 + -> IORef [OrderWithCreationTime] -> OrderWithCreationTime -> m () -executeUnsafe' l@Logging{..} refInputs backlog@BacklogService{suspend, drop} txs explorer resolver poolActionsV1 poolActionsV2 (OrderWithCreationTime order orderTime) = do +executeUnsafe' l@Logging{..} refInputs backlog@BacklogService{suspend, drop} txs config explorer resolver poolActionsV1 poolActionsV2 priorityMap (OrderWithCreationTime order orderTime) = do executionStartTime <- getCurrentTime executeOrderUnsafe' backlog l refInputs txs explorer resolver poolActionsV1 poolActionsV2 order executionStartTime `catches` [ Handler (\ (execErr :: OrderExecErr) -> case execErr of @@ -151,7 +161,7 @@ executeUnsafe' l@Logging{..} refInputs backlog@BacklogService{suspend, drop} txs dropError -> drop (orderId order) >> infoM ("(Unsafe) Err " ++ show dropError ++ " occured for " ++ show (orderId order) ++ ". Going to drop") ) - , Handler (\ (dropError :: SomeException) -> processOrderExecutionException l backlog dropError order orderTime) + , Handler (\ (dropError :: SomeException) -> processOrderExecutionException l backlog config dropError order orderTime priorityMap executionStartTime) ] executionEndTime <- getCurrentTime let timeDiff = fromEnum $ nominalDiffTimeToSeconds $ diffUTCTime executionEndTime executionStartTime @@ -159,13 +169,18 @@ executeUnsafe' l@Logging{..} refInputs backlog@BacklogService{suspend, drop} txs infoM $ "(Unsafe) Time of end order processing is " ++ show executionEndTime infoM $ "(Unsafe) Time of processing order is " ++ show (timeDiff `div` 1000000000) ++ " mills" -processOrderExecutionException :: Monad m => Logging m -> BacklogService m -> SomeException -> Order -> UTCTime -> m () -processOrderExecutionException Logging{..} BacklogService{suspend, drop} executionError order@(OnChain FullTxOut{..} _) orderTime = do +processOrderExecutionException :: MonadIO m => Logging m -> BacklogService m -> BacklogServiceConfig -> SomeException -> Order -> UTCTime -> IORef [OrderWithCreationTime] -> UTCTime -> m () +processOrderExecutionException Logging{..} BacklogService{suspend, drop} BacklogServiceConfig{..} executionError order@(OnChain FullTxOut{..} _) orderTime priorityQueue executionStartTime = do let errMsgText = pack (show executionError) if isInfixOf "BadInputsUTxO" errMsgText && not (pack (show (txOutRefId fullTxOutRef)) `isInfixOf` errMsgText) then - suspend (SuspendedOrder order orderTime) >> - infoM ("Got BadInputsUTxO error during order (" ++ show (orderId order) ++ ") execution without orderId (" ++ show (txOutRefId fullTxOutRef) ++ "). " ++ show errMsgText ++ ". Going to suspend order") + if (diffUTCTime executionStartTime orderTime < unsafeQueueOrderLifetime) + then + atomicModifyIORef' priorityQueue (\prevQueue -> (OrderWithCreationTime order orderTime : prevQueue, ())) >> + infoM ("Got BadInputsUTxO error during order (" ++ show (orderId order) ++ ") execution without orderId (" ++ show (txOutRefId fullTxOutRef) ++ "). " ++ show errMsgText ++ ". Going to put it to priority unsafe order queue") + else + suspend (SuspendedOrder order orderTime) >> + infoM ("Got BadInputsUTxO error during order (" ++ show (orderId order) ++ ") execution without orderId (" ++ show (txOutRefId fullTxOutRef) ++ "). " ++ show errMsgText ++ ". Going to suspend order") else drop (orderId order) >> infoM ("Got error (" ++ show (processDropErrorMsg errMsgText)++ ") during order (" ++ show (orderId order) ++ ") " ++ ". Going to drop order") processDropErrorMsg :: Text -> Text @@ -293,7 +308,7 @@ runOrder -> Logging m -> m (TxCandidate, Predicted Core.Pool) runOrder TxRefs{..} Explorer{..} (Pool (OnChain poolOut pool) version) (OnChain orderOut Core.AnyOrder{..}) PoolActions{..} Logging{..} = do - let + let poolOutRef = case version of V1 -> Interop.fromCardanoTxIn poolV1Ref V2 -> Interop.fromCardanoTxIn poolV2Ref diff --git a/amm-executor/test/Gen/ConstantsGen.hs b/amm-executor/test/Gen/ConstantsGen.hs index 1b8ea1de..c1c6f4db 100755 --- a/amm-executor/test/Gen/ConstantsGen.hs +++ b/amm-executor/test/Gen/ConstantsGen.hs @@ -59,6 +59,7 @@ cfgForOnlyPendingOrders = BacklogServiceConfig { orderLifetime = 900 :: NominalDiffTime , orderExecTime = 600 :: NominalDiffTime , suspendedPropability = 0 + , unsafeQueueOrderLifetime = 600 :: NominalDiffTime } data ValidatorInfo = ValidatorInfo diff --git a/dcConfigs/dcSpectrumConfig.dhall b/dcConfigs/dcSpectrumConfig.dhall index 68e1f017..3f16f303 100755 --- a/dcConfigs/dcSpectrumConfig.dhall +++ b/dcConfigs/dcSpectrumConfig.dhall @@ -1,4 +1,4 @@ -let FeePolicy = < Strict | Balance > +let FeePolicy = < Strict | Balance | SplitBetween : List Text > let CollateralPolicy = < Ignore | Cover > let Network = < Mainnet | Preview > @@ -31,9 +31,10 @@ in , createIfMissing = True } , backlogConfig = - { orderLifetime = 4500 - , orderExecTime = 1500 - , suspendedPropability = 0 + { orderLifetime = 45000000 + , orderExecTime = 15000000 + , suspendedPropability = 90000 + , unsafeQueueOrderLifetime = 60000000000000 } , backlogStoreConfig = { storePath = "./data/backlogStore" @@ -80,4 +81,7 @@ in , exUnits = 165000000 , exMem = 530000 } +, httpSubmit = + { submitUri = "http://localhost:8090/api/submit/tx" + } } \ No newline at end of file From 5d72ee0185080fa5bf9daccda5dd49c4fbc660b1 Mon Sep 17 00:00:00 2001 From: Bromel777 Date: Sat, 13 Jan 2024 19:42:38 +0100 Subject: [PATCH 2/3] update configs --- amm-executor/src/Spectrum/Executor/OrdersExecutor/Service.hs | 2 +- dcConfigs/dcSpectrumConfig.dhall | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/amm-executor/src/Spectrum/Executor/OrdersExecutor/Service.hs b/amm-executor/src/Spectrum/Executor/OrdersExecutor/Service.hs index 426c3c42..d335a61a 100644 --- a/amm-executor/src/Spectrum/Executor/OrdersExecutor/Service.hs +++ b/amm-executor/src/Spectrum/Executor/OrdersExecutor/Service.hs @@ -95,7 +95,7 @@ mkOrdersExecutorService mkOrdersExecutorService backlog transactions config explorer resolver poolActionsV1 poolActionsV2 refInputs = do MakeLogging{..} <- askContext txRefsCfg <- askContext - logging <- forComponent "Bots.OrdersExecutorService" + logging <- forComponent "Bots.OrdersExecutorService123" unsafeOrderQueue <- newIORef [] pure $ OrdersExecutorService { execute = execute' logging txRefsCfg backlog transactions config explorer resolver poolActionsV1 poolActionsV2 unsafeOrderQueue diff --git a/dcConfigs/dcSpectrumConfig.dhall b/dcConfigs/dcSpectrumConfig.dhall index 3f16f303..9e60e913 100755 --- a/dcConfigs/dcSpectrumConfig.dhall +++ b/dcConfigs/dcSpectrumConfig.dhall @@ -34,7 +34,7 @@ in { orderLifetime = 45000000 , orderExecTime = 15000000 , suspendedPropability = 90000 - , unsafeQueueOrderLifetime = 60000000000000 + , unsafeQueueOrderLifetime = 60000 } , backlogStoreConfig = { storePath = "./data/backlogStore" From 3a60e95dec60c2b7f6595fccf89fdfc046abf730 Mon Sep 17 00:00:00 2001 From: Bromel777 Date: Sat, 13 Jan 2024 20:27:39 +0100 Subject: [PATCH 3/3] fix config --- amm-executor/resources/config.dhall | 2 +- dcConfigs/dcSpectrumConfig.dhall | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/amm-executor/resources/config.dhall b/amm-executor/resources/config.dhall index a74826a8..aa1a76aa 100755 --- a/amm-executor/resources/config.dhall +++ b/amm-executor/resources/config.dhall @@ -106,7 +106,7 @@ in { orderLifetime = 45000000 , orderExecTime = 15000000 , suspendedPropability = 90000 - , unsafeQueueOrderLifetime = 60000000000000 + , unsafeQueueOrderLifetime = 60000 }, {- diff --git a/dcConfigs/dcSpectrumConfig.dhall b/dcConfigs/dcSpectrumConfig.dhall index 9e60e913..849288d9 100755 --- a/dcConfigs/dcSpectrumConfig.dhall +++ b/dcConfigs/dcSpectrumConfig.dhall @@ -33,7 +33,7 @@ in , backlogConfig = { orderLifetime = 45000000 , orderExecTime = 15000000 - , suspendedPropability = 90000 + , suspendedPropability = 50 , unsafeQueueOrderLifetime = 60000 } , backlogStoreConfig =