diff --git a/cmd/notifier/config/config.toml b/cmd/notifier/config/config.toml index 24ad5c00..fba4695c 100644 --- a/cmd/notifier/config/config.toml +++ b/cmd/notifier/config/config.toml @@ -3,6 +3,10 @@ # Requires a redis instance/cluster and should be used when multiple observers push from the same shard CheckDuplicates = true + # WithReadStateChanges signals if read state changes operation will be handled + # It depends also if read state changes are enabled from observer nodes + WithReadStateChanges = false + # ExternalMarshaller is used for handling incoming/outcoming api requests [General.ExternalMarshaller] Type = "json" @@ -114,3 +118,8 @@ [RabbitMQ.BlockEventsExchange] Name = "block_events" Type = "fanout" + + # The exchange which holds state accesses + [RabbitMQ.StateAccessesExchange] + Name = "state_accesses" + Type = "fanout" diff --git a/common/constants.go b/common/constants.go index 2908cf83..a35a1638 100644 --- a/common/constants.go +++ b/common/constants.go @@ -34,6 +34,9 @@ const ( // BlockScrs defines the subscription event type for block scrs BlockScrs string = "block_scrs" + + // BlockStateAccesses defines the subscription event type for block state accesses + BlockStateAccesses string = "block_state_accesses" ) const ( diff --git a/config/config.go b/config/config.go index b99f17ba..1cec21a2 100644 --- a/config/config.go +++ b/config/config.go @@ -20,9 +20,10 @@ type MainConfig struct { // GeneralConfig maps the general config section type GeneralConfig struct { - ExternalMarshaller MarshallerConfig - AddressConverter AddressConverterConfig - CheckDuplicates bool + ExternalMarshaller MarshallerConfig + AddressConverter AddressConverterConfig + CheckDuplicates bool + WithReadStateChanges bool } // MarshallerConfig maps the marshaller configuration @@ -80,6 +81,7 @@ type RabbitMQConfig struct { BlockTxsExchange RabbitMQExchangeConfig BlockScrsExchange RabbitMQExchangeConfig BlockEventsExchange RabbitMQExchangeConfig + StateAccessesExchange RabbitMQExchangeConfig } // RabbitMQExchangeConfig holds the configuration for a rabbitMQ exchange diff --git a/config/tomlConfig_test.go b/config/tomlConfig_test.go index ddf0dc7c..a0851853 100644 --- a/config/tomlConfig_test.go +++ b/config/tomlConfig_test.go @@ -17,6 +17,7 @@ func TestMainConfig(t *testing.T) { adrConverterPrefix := "erd" adrConverterLength := 32 checkDuplicates := true + withReadStateChanges := true connectorAPIHost := "5000" connectorAPIUsername := "guest" @@ -50,7 +51,8 @@ func TestMainConfig(t *testing.T) { Prefix: adrConverterPrefix, Length: adrConverterLength, }, - CheckDuplicates: checkDuplicates, + CheckDuplicates: checkDuplicates, + WithReadStateChanges: withReadStateChanges, }, WebSocketConnector: config.WebSocketConfig{ Enabled: true, @@ -95,6 +97,10 @@ func TestMainConfig(t *testing.T) { # Requires a redis instance/cluster and should be used when multiple observers push from the same shard CheckDuplicates = true + # WithReadStateChanges signals if read state changes operation will be handled + # It depends also if read state changes are enabled from observer nodes + WithReadStateChanges = true + # ExternalMarshaller is used for handling incoming/outcoming api requests [General.ExternalMarshaller] Type = "` + generalMarshallerType + `" diff --git a/data/block.go b/data/block.go index 785395a7..cf26ebe6 100644 --- a/data/block.go +++ b/data/block.go @@ -9,6 +9,7 @@ import ( "github.com/multiversx/mx-chain-core-go/data/receipt" "github.com/multiversx/mx-chain-core-go/data/rewardTx" "github.com/multiversx/mx-chain-core-go/data/smartContractResult" + "github.com/multiversx/mx-chain-core-go/data/stateChange" "github.com/multiversx/mx-chain-core-go/data/transaction" ) @@ -22,14 +23,15 @@ type SaveBlockData struct { // InterceptorBlockData holds the block data needed for processing type InterceptorBlockData struct { - Hash string - Body nodeData.BodyHandler - Header nodeData.HeaderHandler - Txs map[string]*transaction.Transaction - TxsWithOrder map[string]*outport.TxInfo - Scrs map[string]*smartContractResult.SmartContractResult - ScrsWithOrder map[string]*outport.SCRInfo - LogEvents []Event + Hash string + Body nodeData.BodyHandler + Header nodeData.HeaderHandler + Txs map[string]*transaction.Transaction + TxsWithOrder map[string]*outport.TxInfo + Scrs map[string]*smartContractResult.SmartContractResult + ScrsWithOrder map[string]*outport.SCRInfo + LogEvents []Event + StateAccessesPerAccounts map[string]*stateChange.StateAccesses } // ArgsSaveBlockData holds the block data that will be received on push events @@ -44,6 +46,7 @@ type ArgsSaveBlockData struct { AlteredAccounts map[string]*alteredAccount.AlteredAccount NumberOfShards uint32 HeaderTimeStampMs uint64 + StateAccesses map[string]*stateChange.StateAccesses } // OutportBlockDataOld holds the block data that will be received on push events diff --git a/data/outport.go b/data/outport.go index 383b017f..18ec51ad 100644 --- a/data/outport.go +++ b/data/outport.go @@ -7,6 +7,7 @@ import ( "github.com/multiversx/mx-chain-core-go/data/receipt" "github.com/multiversx/mx-chain-core-go/data/rewardTx" "github.com/multiversx/mx-chain-core-go/data/smartContractResult" + "github.com/multiversx/mx-chain-core-go/data/stateChange" "github.com/multiversx/mx-chain-core-go/data/transaction" ) @@ -73,6 +74,15 @@ type BlockEventsWithOrder struct { Events []Event `json:"events"` } +// BlockStateAccesses holds the block state accesses +type BlockStateAccesses struct { + Hash string `json:"hash"` + ShardID uint32 `json:"shardID"` + TimeStampMs uint64 `json:"timestampMs"` + Nonce uint64 `json:"nonce"` + StateAccessesPerAccounts map[string]*stateChange.StateAccesses `json:"stateAccessesPerAccounts"` +} + // NotifierTransaction defines a wrapper over transaction type NotifierTransaction struct { *transaction.Transaction diff --git a/disabled/disabledHub.go b/disabled/disabledHub.go index f819e711..c9fcb3d5 100644 --- a/disabled/disabledHub.go +++ b/disabled/disabledHub.go @@ -33,6 +33,10 @@ func (h *Hub) PublishScrs(blockScrs data.BlockScrs) { func (h *Hub) PublishBlockEventsWithOrder(blockTxs data.BlockEventsWithOrder) { } +// PublishStateAccesses does nothing +func (h *Hub) PublishStateAccesses(blockTxs data.BlockStateAccesses) { +} + // RegisterEvent does nothing func (h *Hub) RegisterEvent(_ dispatcher.EventDispatcher) { } diff --git a/dispatcher/hub/commonHub.go b/dispatcher/hub/commonHub.go index 027594c9..1278c074 100644 --- a/dispatcher/hub/commonHub.go +++ b/dispatcher/hub/commonHub.go @@ -96,6 +96,10 @@ func (ch *commonHub) handlePushBlockEvents(blockEvents data.BlockEvents, subscri // PublishRevert will publish revert event to dispatcher func (ch *commonHub) PublishRevert(revertBlock data.RevertBlock) { subscriptions := ch.subscriptionMapper.Subscriptions() + _, ok := subscriptions[common.RevertBlockEvents] + if !ok { + return + } dispatchersMap := make(map[uuid.UUID]data.RevertBlock) @@ -115,6 +119,10 @@ func (ch *commonHub) PublishRevert(revertBlock data.RevertBlock) { // PublishFinalized will publish finalized event to dispatcher func (ch *commonHub) PublishFinalized(finalizedBlock data.FinalizedBlock) { subscriptions := ch.subscriptionMapper.Subscriptions() + _, ok := subscriptions[common.FinalizedBlockEvents] + if !ok { + return + } dispatchersMap := make(map[uuid.UUID]data.FinalizedBlock) @@ -134,6 +142,10 @@ func (ch *commonHub) PublishFinalized(finalizedBlock data.FinalizedBlock) { // PublishTxs will publish txs event to dispatcher func (ch *commonHub) PublishTxs(blockTxs data.BlockTxs) { subscriptions := ch.subscriptionMapper.Subscriptions() + _, ok := subscriptions[common.BlockTxs] + if !ok { + return + } dispatchersMap := make(map[uuid.UUID]data.BlockTxs) @@ -153,6 +165,10 @@ func (ch *commonHub) PublishTxs(blockTxs data.BlockTxs) { // PublishBlockEventsWithOrder will publish block events with order to dispatcher func (ch *commonHub) PublishBlockEventsWithOrder(blockTxs data.BlockEventsWithOrder) { subscriptions := ch.subscriptionMapper.Subscriptions() + _, ok := subscriptions[common.BlockEvents] + if !ok { + return + } dispatchersMap := make(map[uuid.UUID]data.BlockEventsWithOrder) @@ -172,6 +188,10 @@ func (ch *commonHub) PublishBlockEventsWithOrder(blockTxs data.BlockEventsWithOr // PublishScrs will publish scrs events to dispatcher func (ch *commonHub) PublishScrs(blockScrs data.BlockScrs) { subscriptions := ch.subscriptionMapper.Subscriptions() + _, ok := subscriptions[common.BlockScrs] + if !ok { + return + } dispatchersMap := make(map[uuid.UUID]data.BlockScrs) @@ -188,6 +208,29 @@ func (ch *commonHub) PublishScrs(blockScrs data.BlockScrs) { } } +// PublishStateAccesses will publish state accesses to dispatcher +func (ch *commonHub) PublishStateAccesses(stateAccesses data.BlockStateAccesses) { + subscriptions := ch.subscriptionMapper.Subscriptions() + _, ok := subscriptions[common.BlockStateAccesses] + if !ok { + return + } + + dispatchersMap := make(map[uuid.UUID]data.BlockStateAccesses) + + for _, subscription := range subscriptions[common.BlockStateAccesses] { + dispatchersMap[subscription.DispatcherID] = stateAccesses + } + + ch.mutDispatchers.RLock() + defer ch.mutDispatchers.RUnlock() + for id, event := range dispatchersMap { + if d, ok := ch.dispatchers[id]; ok { + d.StateAccessesEvent(event) + } + } +} + func (ch *commonHub) registerDispatcher(d dispatcher.EventDispatcher) { ch.mutDispatchers.Lock() defer ch.mutDispatchers.Unlock() diff --git a/dispatcher/interface.go b/dispatcher/interface.go index 9a2c33a1..3ac97dda 100644 --- a/dispatcher/interface.go +++ b/dispatcher/interface.go @@ -19,6 +19,7 @@ type EventDispatcher interface { TxsEvent(event data.BlockTxs) BlockEvents(event data.BlockEventsWithOrder) ScrsEvent(event data.BlockScrs) + StateAccessesEvent(event data.BlockStateAccesses) } // Hub defines the behaviour of a component which should be able to receive events diff --git a/dispatcher/subscription.go b/dispatcher/subscription.go index a5c048ee..698ab1c1 100644 --- a/dispatcher/subscription.go +++ b/dispatcher/subscription.go @@ -145,7 +145,8 @@ func getEventType(subEntry data.SubscriptionEntry) string { subEntry.EventType == common.RevertBlockEvents || subEntry.EventType == common.BlockTxs || subEntry.EventType == common.BlockScrs || - subEntry.EventType == common.BlockEvents { + subEntry.EventType == common.BlockEvents || + subEntry.EventType == common.BlockStateAccesses { return subEntry.EventType } diff --git a/dispatcher/ws/wsDispatcher.go b/dispatcher/ws/wsDispatcher.go index 21c3d6d1..8db20447 100644 --- a/dispatcher/ws/wsDispatcher.go +++ b/dispatcher/ws/wsDispatcher.go @@ -193,6 +193,26 @@ func (wd *websocketDispatcher) ScrsEvent(event data.BlockScrs) { wd.send <- wsEventBytes } +// StateAccessesEvent receives a block state accesses event and process it before pushing to socket +func (wd *websocketDispatcher) StateAccessesEvent(event data.BlockStateAccesses) { + eventBytes, err := wd.marshaller.Marshal(event) + if err != nil { + log.Error("failure marshalling events", "err", err.Error()) + return + } + wsEvent := &data.WebSocketEvent{ + Type: common.BlockStateAccesses, + Data: eventBytes, + } + wsEventBytes, err := wd.marshaller.Marshal(wsEvent) + if err != nil { + log.Error("failure marshalling events", "err", err.Error()) + return + } + + wd.send <- wsEventBytes +} + // writePump listens on the send-channel and pushes data on the socket stream func (wd *websocketDispatcher) writePump() { ticker := time.NewTicker(pingPeriod) diff --git a/factory/processFactory.go b/factory/processFactory.go index 9fd41f67..73fc8b2e 100644 --- a/factory/processFactory.go +++ b/factory/processFactory.go @@ -24,7 +24,8 @@ func CreateEventsInterceptor(cfg config.GeneralConfig) (process.EventsIntercepto } argsEventsInterceptor := process.ArgsEventsInterceptor{ - PubKeyConverter: pubKeyConverter, + PubKeyConverter: pubKeyConverter, + WithReadStateChanges: cfg.WithReadStateChanges, } return process.NewEventsInterceptor(argsEventsInterceptor) diff --git a/go.mod b/go.mod index d00d6999..e3e11a4a 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.3 github.com/multiversx/mx-chain-communication-go v1.2.0 - github.com/multiversx/mx-chain-core-go v1.4.0 + github.com/multiversx/mx-chain-core-go v1.4.1 github.com/multiversx/mx-chain-logger-go v1.1.0 github.com/pelletier/go-toml v1.9.3 github.com/prometheus/client_model v0.6.1 diff --git a/go.sum b/go.sum index 6cda5d90..f63f3c00 100644 --- a/go.sum +++ b/go.sum @@ -130,8 +130,8 @@ github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= github.com/multiversx/mx-chain-communication-go v1.2.0 h1:0wOoLldiRbvaOPxwICbnRCqCpLqPewg8M/FMbC/0OXY= github.com/multiversx/mx-chain-communication-go v1.2.0/go.mod h1:wS3aAwkmHbC9mlzQdvL6p7l8Rqw3vmzhj7WZW1dTveA= -github.com/multiversx/mx-chain-core-go v1.4.0 h1:p6FbfCzvMXF54kpS0B5mrjNWYpq4SEQqo0UvrMF7YVY= -github.com/multiversx/mx-chain-core-go v1.4.0/go.mod h1:IO+vspNan+gT0WOHnJ95uvWygiziHZvfXpff6KnxV7g= +github.com/multiversx/mx-chain-core-go v1.4.1 h1:ljs53jpdjtCohpaqm2n/dvTGrFlSgIpoZYH8RVt5cWo= +github.com/multiversx/mx-chain-core-go v1.4.1/go.mod h1:IO+vspNan+gT0WOHnJ95uvWygiziHZvfXpff6KnxV7g= github.com/multiversx/mx-chain-crypto-go v1.2.13-0.20250218161752-9482d9a22234 h1:NNI7kYxzsq+4mTPSUJo0cK1+iPxjUX+gRJDaBRwEQ7M= github.com/multiversx/mx-chain-crypto-go v1.2.13-0.20250218161752-9482d9a22234/go.mod h1:QZAw2bZcOxGQRgYACTrmP8pfTa3NyxENIL+00G6nM5E= github.com/multiversx/mx-chain-logger-go v1.1.0 h1:97x84A6L4RfCa6YOx1HpAFxZp1cf/WI0Qh112whgZNM= diff --git a/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go b/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go index 0c9f97e5..6be251f8 100644 --- a/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go +++ b/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go @@ -1,6 +1,7 @@ package rabbitmq import ( + "encoding/hex" "encoding/json" "sync" "testing" @@ -10,6 +11,7 @@ import ( "github.com/multiversx/mx-chain-core-go/data/block" "github.com/multiversx/mx-chain-core-go/data/outport" "github.com/multiversx/mx-chain-core-go/data/smartContractResult" + "github.com/multiversx/mx-chain-core-go/data/stateChange" "github.com/multiversx/mx-chain-core-go/data/transaction" logger "github.com/multiversx/mx-chain-logger-go" "github.com/multiversx/mx-chain-notifier-go/common" @@ -33,12 +35,16 @@ func TestNotifierWithRabbitMQ(t *testing.T) { func testNotifierWithRabbitMQ(t *testing.T, observerType string, payloadVersion uint32) { cfg := integrationTests.GetDefaultConfigs() cfg.MainConfig.General.CheckDuplicates = true + cfg.MainConfig.General.WithReadStateChanges = true notifier, err := integrationTests.NewTestNotifierWithRabbitMq(cfg.MainConfig) require.Nil(t, err) client, err := integrationTests.CreateObserverConnector(notifier.Facade, observerType, common.MessageQueuePublisherType, payloadVersion) require.Nil(t, err) + // wait for components to start + time.Sleep(time.Second * 5) + _ = notifier.Publisher.Run() defer notifier.Publisher.Close() @@ -56,7 +62,7 @@ func testNotifierWithRabbitMQ(t *testing.T, observerType string, payloadVersion integrationTests.WaitTimeout(t, wg, time.Second*2) assert.Equal(t, 3, len(notifier.RedisClient.GetEntries())) - assert.Equal(t, 6, len(notifier.RabbitMQClient.GetEntries())) + assert.Equal(t, 7, len(notifier.RabbitMQClient.GetEntries())) } func pushEventsRequest(wg *sync.WaitGroup, webServer integrationTests.ObserverConnector) { @@ -69,7 +75,7 @@ func pushEventsRequest(wg *sync.WaitGroup, webServer integrationTests.ObserverCo txPool := &outport.TransactionPool{ Transactions: map[string]*outport.TxInfo{ - "hash1": { + hex.EncodeToString([]byte("hash1")): { Transaction: &transaction.Transaction{ Nonce: 1, }, @@ -80,7 +86,7 @@ func pushEventsRequest(wg *sync.WaitGroup, webServer integrationTests.ObserverCo }, }, SmartContractResults: map[string]*outport.SCRInfo{ - "hash2": { + hex.EncodeToString([]byte("hash2")): { SmartContractResult: &smartContractResult.SmartContractResult{ Nonce: 2, }, @@ -101,6 +107,21 @@ func pushEventsRequest(wg *sync.WaitGroup, webServer integrationTests.ObserverCo }, } + stateAccesses := make(map[string]*stateChange.StateAccesses) + stateAccesses["txHash1"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + MainTrieKey: []byte("mainTrieKey1"), + MainTrieVal: []byte("mainTrieVal1"), + }, + &stateChange.StateAccess{ + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal2"), + }, + }, + } + stateAccesses["txHash2"] = &stateChange.StateAccesses{} + saveBlockData := &outport.OutportBlock{ BlockData: &outport.BlockData{ HeaderBytes: headerBytes, @@ -114,6 +135,7 @@ func pushEventsRequest(wg *sync.WaitGroup, webServer integrationTests.ObserverCo }, TransactionPool: txPool, HeaderGasConsumption: &outport.HeaderGasConsumption{}, + StateAccesses: stateAccesses, } err := webServer.PushEventsRequest(saveBlockData) diff --git a/integrationTests/testNotifierProxy.go b/integrationTests/testNotifierProxy.go index 0a0c839b..9c68c930 100644 --- a/integrationTests/testNotifierProxy.go +++ b/integrationTests/testNotifierProxy.go @@ -234,6 +234,10 @@ func GetDefaultConfigs() config.Configs { Name: "blockevents", Type: "fanout", }, + StateAccessesExchange: config.RabbitMQExchangeConfig{ + Name: "stateaccesses", + Type: "fanout", + }, }, }, Flags: config.FlagsConfig{ diff --git a/integrationTests/websocket/testNotifierWithWebsockets_test.go b/integrationTests/websocket/testNotifierWithWebsockets_test.go index efafd13f..3c558792 100644 --- a/integrationTests/websocket/testNotifierWithWebsockets_test.go +++ b/integrationTests/websocket/testNotifierWithWebsockets_test.go @@ -11,6 +11,7 @@ import ( "github.com/multiversx/mx-chain-core-go/data/block" "github.com/multiversx/mx-chain-core-go/data/outport" "github.com/multiversx/mx-chain-core-go/data/smartContractResult" + "github.com/multiversx/mx-chain-core-go/data/stateChange" "github.com/multiversx/mx-chain-core-go/data/transaction" "github.com/multiversx/mx-chain-notifier-go/common" "github.com/multiversx/mx-chain-notifier-go/data" @@ -61,6 +62,22 @@ func TestNotifierWithWebsockets_PushEvents(t *testing.T) { }, } headerBytes, _ := json.Marshal(header) + + stateAccesses := make(map[string]*stateChange.StateAccesses) + stateAccesses["txHash1"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + MainTrieKey: []byte("mainTrieKey1"), + MainTrieVal: []byte("mainTrieVal1"), + }, + &stateChange.StateAccess{ + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal2"), + }, + }, + } + stateAccesses["txHash2"] = &stateChange.StateAccesses{} + saveBlockData := &outport.OutportBlock{ TransactionPool: &outport.TransactionPool{ Logs: []*outport.LogData{ @@ -85,6 +102,7 @@ func TestNotifierWithWebsockets_PushEvents(t *testing.T) { }, }, HeaderGasConsumption: &outport.HeaderGasConsumption{}, + StateAccesses: stateAccesses, } wg := &sync.WaitGroup{} @@ -156,6 +174,10 @@ func TestNotifierWithWebsockets_BlockEvents(t *testing.T) { }, } headerBytes, _ := json.Marshal(header) + + stateAccesses := make(map[string]*stateChange.StateAccesses) + stateAccesses["txHash1"] = &stateChange.StateAccesses{} + saveBlockData := &outport.OutportBlock{ TransactionPool: &outport.TransactionPool{ Logs: []*outport.LogData{ @@ -181,6 +203,7 @@ func TestNotifierWithWebsockets_BlockEvents(t *testing.T) { TimestampMs: 1234000, }, HeaderGasConsumption: &outport.HeaderGasConsumption{}, + StateAccesses: stateAccesses, } wg := &sync.WaitGroup{} @@ -355,6 +378,10 @@ func TestNotifierWithWebsockets_TxsEvents(t *testing.T) { }, } headerBytes, _ := json.Marshal(header) + + stateAccesses := make(map[string]*stateChange.StateAccesses) + stateAccesses["txHash1"] = &stateChange.StateAccesses{} + saveBlockData := &outport.OutportBlock{ TransactionPool: &outport.TransactionPool{ Transactions: txs, @@ -368,6 +395,7 @@ func TestNotifierWithWebsockets_TxsEvents(t *testing.T) { }, }, HeaderGasConsumption: &outport.HeaderGasConsumption{}, + StateAccesses: stateAccesses, } expTxs := map[string]*transaction.Transaction{ @@ -438,6 +466,10 @@ func TestNotifierWithWebsockets_ScrsEvents(t *testing.T) { }, } headerBytes, _ := json.Marshal(header) + + stateAccesses := make(map[string]*stateChange.StateAccesses) + stateAccesses["txHash1"] = &stateChange.StateAccesses{} + blockEvents := &outport.OutportBlock{ TransactionPool: &outport.TransactionPool{ SmartContractResults: scrs, @@ -451,6 +483,7 @@ func TestNotifierWithWebsockets_ScrsEvents(t *testing.T) { }, }, HeaderGasConsumption: &outport.HeaderGasConsumption{}, + StateAccesses: stateAccesses, } expScrs := map[string]*smartContractResult.SmartContractResult{ @@ -637,6 +670,10 @@ func testNotifierWithWebsockets_AllEvents(t *testing.T, observerType string) { }, } headerBytes, _ = json.Marshal(header) + + stateAccesses := make(map[string]*stateChange.StateAccesses) + stateAccesses["txHash1"] = &stateChange.StateAccesses{} + blockEvents := &outport.OutportBlock{ TransactionPool: &outport.TransactionPool{ Transactions: txs, @@ -665,6 +702,7 @@ func testNotifierWithWebsockets_AllEvents(t *testing.T, observerType string) { TimestampMs: 1234000, }, HeaderGasConsumption: &outport.HeaderGasConsumption{}, + StateAccesses: stateAccesses, } numEvents := 6 diff --git a/mocks/dispatcherMock.go b/mocks/dispatcherMock.go index 48b4581a..6962dd1d 100644 --- a/mocks/dispatcherMock.go +++ b/mocks/dispatcherMock.go @@ -52,6 +52,10 @@ func (d *DispatcherMock) TxsEvent(event data.BlockTxs) { func (d *DispatcherMock) ScrsEvent(event data.BlockScrs) { } +// StateAccessesEvent - +func (d *DispatcherMock) StateAccessesEvent(event data.BlockStateAccesses) { +} + // Subscribe - func (d *DispatcherMock) Subscribe(event data.SubscribeEvent) { d.hub.Subscribe(event) diff --git a/mocks/dispatcherStub.go b/mocks/dispatcherStub.go index e851a342..816e1c7a 100644 --- a/mocks/dispatcherStub.go +++ b/mocks/dispatcherStub.go @@ -7,13 +7,14 @@ import ( // DispatcherStub implements dispatcher EventDispatcher interface type DispatcherStub struct { - GetIDCalled func() uuid.UUID - PushEventsCalled func(events []data.Event) - BlockEventsCalled func(event data.BlockEventsWithOrder) - RevertEventCalled func(event data.RevertBlock) - FinalizedEventCalled func(event data.FinalizedBlock) - TxsEventCalled func(event data.BlockTxs) - ScrsEventCalled func(event data.BlockScrs) + GetIDCalled func() uuid.UUID + PushEventsCalled func(events []data.Event) + BlockEventsCalled func(event data.BlockEventsWithOrder) + RevertEventCalled func(event data.RevertBlock) + FinalizedEventCalled func(event data.FinalizedBlock) + TxsEventCalled func(event data.BlockTxs) + ScrsEventCalled func(event data.BlockScrs) + StateAccessesEventCalled func(event data.BlockStateAccesses) } // GetID - @@ -66,3 +67,10 @@ func (d *DispatcherStub) ScrsEvent(event data.BlockScrs) { d.ScrsEventCalled(event) } } + +// StateAccessesEvent - +func (d *DispatcherStub) StateAccessesEvent(event data.BlockStateAccesses) { + if d.StateAccessesEventCalled != nil { + d.StateAccessesEventCalled(event) + } +} diff --git a/mocks/publisherHandlerStub.go b/mocks/publisherHandlerStub.go index d74f5ae0..7f1edd90 100644 --- a/mocks/publisherHandlerStub.go +++ b/mocks/publisherHandlerStub.go @@ -10,6 +10,7 @@ type PublisherHandlerStub struct { PublishTxsCalled func(blockTxs data.BlockTxs) PublishScrsCalled func(blockScrs data.BlockScrs) PublishBlockEventsWithOrderCalled func(blockTxs data.BlockEventsWithOrder) + PublishBlockStateAccessesCalled func(stateAccesses data.BlockStateAccesses) CloseCalled func() error } @@ -55,6 +56,13 @@ func (p *PublisherHandlerStub) PublishBlockEventsWithOrder(blockTxs data.BlockEv } } +// PublishStateAccesses - +func (p *PublisherHandlerStub) PublishStateAccesses(stateAccesses data.BlockStateAccesses) { + if p.PublishBlockStateAccessesCalled != nil { + p.PublishBlockStateAccessesCalled(stateAccesses) + } +} + // Close - func (p *PublisherHandlerStub) Close() error { if p.CloseCalled != nil { diff --git a/mocks/publisherStub.go b/mocks/publisherStub.go index fbea84ce..48f10340 100644 --- a/mocks/publisherStub.go +++ b/mocks/publisherStub.go @@ -11,6 +11,7 @@ type PublisherStub struct { BroadcastTxsCalled func(event data.BlockTxs) BroadcastScrsCalled func(event data.BlockScrs) BroadcastBlockEventsWithOrderCalled func(event data.BlockEventsWithOrder) + BroadcastStateAccessesCalled func(event data.BlockStateAccesses) CloseCalled func() error } @@ -65,6 +66,13 @@ func (ps *PublisherStub) BroadcastBlockEventsWithOrder(event data.BlockEventsWit } } +// BroadcastStateAccesses - +func (ps *PublisherStub) BroadcastStateAccesses(event data.BlockStateAccesses) { + if ps.BroadcastStateAccessesCalled != nil { + ps.BroadcastStateAccessesCalled(event) + } +} + // Close - func (ps *PublisherStub) Close() error { if ps.CloseCalled != nil { diff --git a/process/errors.go b/process/errors.go index 51ed1751..1c1739dd 100644 --- a/process/errors.go +++ b/process/errors.go @@ -31,3 +31,6 @@ var ErrNilPublisherHandler = errors.New("nil publisher handler provided") // ErrNilEventsInterceptor signals that a nil events interceptor was provided var ErrNilEventsInterceptor = errors.New("nil events interceptor") + +// ErrNilStateAccesses signals that a nil state accesses has been provided +var ErrNilStateAccesses = errors.New("nil state accesses provided") diff --git a/process/eventsHandler.go b/process/eventsHandler.go index bb72dae2..0c8856bb 100644 --- a/process/eventsHandler.go +++ b/process/eventsHandler.go @@ -130,6 +130,15 @@ func (eh *eventsHandler) HandleSaveBlockEvents(allEvents data.ArgsSaveBlockData) } eh.handleBlockEventsWithOrder(txsWithOrder) + stateAccesses := data.BlockStateAccesses{ + Hash: eventsData.Hash, + ShardID: eventsData.Header.GetShardID(), + TimeStampMs: headerTimeStampMs, + Nonce: eventsData.Header.GetNonce(), + StateAccessesPerAccounts: eventsData.StateAccessesPerAccounts, + } + eh.handleStateAccesses(stateAccesses) + return nil } @@ -306,6 +315,23 @@ func (eh *eventsHandler) handleBlockEventsWithOrder(blockTxs data.BlockEventsWit eh.metricsHandler.AddRequest(getRabbitOpID(common.BlockEvents), time.Since(t)) } +func (eh *eventsHandler) handleStateAccesses(stateAccesses data.BlockStateAccesses) { + if stateAccesses.Hash == "" { + log.Warn("received empty state accesses", + "will process", false, + ) + return + } + + log.Info("received state accesses", + "block hash", stateAccesses.Hash, + ) + + t := time.Now() + eh.publisher.BroadcastStateAccesses(stateAccesses) + eh.metricsHandler.AddRequest(getRabbitOpID(common.BlockStateAccesses), time.Since(t)) +} + func (eh *eventsHandler) tryCheckProcessedWithRetry(id, blockHash string) bool { var err error var setSuccessful bool diff --git a/process/eventsInterceptor.go b/process/eventsInterceptor.go index d85f55d0..778b2bc5 100644 --- a/process/eventsInterceptor.go +++ b/process/eventsInterceptor.go @@ -2,16 +2,24 @@ package process import ( "encoding/hex" + "sort" "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/core/check" nodeData "github.com/multiversx/mx-chain-core-go/data" "github.com/multiversx/mx-chain-core-go/data/outport" "github.com/multiversx/mx-chain-core-go/data/smartContractResult" + "github.com/multiversx/mx-chain-core-go/data/stateChange" "github.com/multiversx/mx-chain-core-go/data/transaction" + logger "github.com/multiversx/mx-chain-logger-go" "github.com/multiversx/mx-chain-notifier-go/data" ) +type txWithOrder struct { + hash string + index uint32 +} + // logEvent defines a log event associated with corresponding tx hash type logEvent struct { EventHandler nodeData.EventHandler @@ -20,11 +28,13 @@ type logEvent struct { // ArgsEventsInterceptor defines the arguments needed for creating an events interceptor instance type ArgsEventsInterceptor struct { - PubKeyConverter core.PubkeyConverter + PubKeyConverter core.PubkeyConverter + WithReadStateChanges bool } type eventsInterceptor struct { - pubKeyConverter core.PubkeyConverter + pubKeyConverter core.PubkeyConverter + withReadStateChanges bool } // NewEventsInterceptor creates a new eventsInterceptor instance @@ -34,7 +44,8 @@ func NewEventsInterceptor(args ArgsEventsInterceptor) (*eventsInterceptor, error } return &eventsInterceptor{ - pubKeyConverter: args.PubKeyConverter, + pubKeyConverter: args.PubKeyConverter, + withReadStateChanges: args.WithReadStateChanges, }, nil } @@ -67,18 +78,130 @@ func (ei *eventsInterceptor) ProcessBlockEvents(eventsData *data.ArgsSaveBlockDa } scrsWithOrder := eventsData.TransactionsPool.SmartContractResults + stateAccessesPerAccounts := ei.getStateAccessesPerAccounts(eventsData) + return &data.InterceptorBlockData{ - Hash: hex.EncodeToString(eventsData.HeaderHash), - Body: eventsData.Body, - Header: eventsData.Header, - Txs: txs, - TxsWithOrder: txsWithOrder, - Scrs: scrs, - ScrsWithOrder: scrsWithOrder, - LogEvents: events, + Hash: hex.EncodeToString(eventsData.HeaderHash), + Body: eventsData.Body, + Header: eventsData.Header, + Txs: txs, + TxsWithOrder: txsWithOrder, + Scrs: scrs, + ScrsWithOrder: scrsWithOrder, + LogEvents: events, + StateAccessesPerAccounts: stateAccessesPerAccounts, }, nil } +func getTxsWithOrder(transactionsPool *outport.TransactionPool) []txWithOrder { + txsWithOrderMap := make(map[string]uint32) + + for txHash, txInfo := range transactionsPool.Transactions { + txsWithOrderMap[txHash] = txInfo.ExecutionOrder + } + for txHash, txInfo := range transactionsPool.SmartContractResults { + txsWithOrderMap[txHash] = txInfo.ExecutionOrder + } + for txHash, txInfo := range transactionsPool.Rewards { + txsWithOrderMap[txHash] = txInfo.ExecutionOrder + } + for txHash, txInfo := range transactionsPool.InvalidTxs { + txsWithOrderMap[txHash] = txInfo.ExecutionOrder + } + + txsWithOrder := make([]txWithOrder, 0, len(txsWithOrderMap)) + for txHash, index := range txsWithOrderMap { + txsWithOrder = append(txsWithOrder, txWithOrder{ + hash: txHash, + index: index, + }) + } + + sort.Slice(txsWithOrder, func(i, j int) bool { + return txsWithOrder[i].index < txsWithOrder[j].index + }) + + return txsWithOrder +} + +func (ei *eventsInterceptor) getStateAccessesPerAccounts(eventsData *data.ArgsSaveBlockData) map[string]*stateChange.StateAccesses { + if eventsData.StateAccesses == nil { + log.Warn("getStateAccessesPerAccounts failed: will return empty state accesses per accounts", + "block hash", eventsData.HeaderHash, + "error", ErrNilStateAccesses, + ) + + return make(map[string]*stateChange.StateAccesses) + } + + stateAccessesPerTxs := eventsData.StateAccesses + + logStateAccessesPerTxs(stateAccessesPerTxs) + + // txs hashes with order + txsWithOrder := getTxsWithOrder(eventsData.TransactionsPool) + + stateAccessesPerAccounts := make(map[string]*stateChange.StateAccesses) + for _, txInfo := range txsWithOrder { + txHash, err := hex.DecodeString(txInfo.hash) + if err != nil { + log.Error("failed to decode tx hash", "txHash", txInfo.hash) + continue + } + + stateAccessesPerTx, ok := stateAccessesPerTxs[string(txHash)] + if !ok { + log.Warn("did not find state accesses for tx", "txHash", txInfo.hash) + continue + } + + for _, stateAccess := range stateAccessesPerTx.StateAccess { + if stateAccess.Type == stateChange.Read && !ei.withReadStateChanges { + continue + } + + accKey := hex.EncodeToString(stateAccess.MainTrieKey) + _, ok := stateAccessesPerAccounts[accKey] + if !ok { + stateAccessesPerAccounts[accKey] = &stateChange.StateAccesses{ + StateAccess: make([]*stateChange.StateAccess, 0), + } + } + + stateAccessesPerAccounts[accKey].StateAccess = append(stateAccessesPerAccounts[accKey].StateAccess, stateAccess) + } + } + + log.Trace("getStateAccessesPerAccounts", + "num stateAccessesPerAccounts", len(stateAccessesPerAccounts), + ) + + return stateAccessesPerAccounts +} + +func logStateAccessesPerTxs(stateAccesses map[string]*stateChange.StateAccesses) { + if log.GetLevel() > logger.LogTrace { + return + } + + log.Trace("getStateAccessesPerAccounts", + "num stateAccessesPerTxs", len(stateAccesses), + ) + + for txHash, sts := range stateAccesses { + log.Trace("stateAccessesPerTx", + "txHash", txHash, + ) + + for _, st := range sts.StateAccess { + log.Trace("st", + "actionType", st.GetType(), + "operation", st.GetOperation(), + ) + } + } +} + func (ei *eventsInterceptor) getLogEventsFromTransactionsPool(logs []*outport.LogData) []data.Event { var logEvents []*logEvent for _, logData := range logs { diff --git a/process/eventsInterceptor_test.go b/process/eventsInterceptor_test.go index 61bb504e..986fb089 100644 --- a/process/eventsInterceptor_test.go +++ b/process/eventsInterceptor_test.go @@ -8,6 +8,7 @@ import ( "github.com/multiversx/mx-chain-core-go/data/block" "github.com/multiversx/mx-chain-core-go/data/outport" "github.com/multiversx/mx-chain-core-go/data/smartContractResult" + "github.com/multiversx/mx-chain-core-go/data/stateChange" "github.com/multiversx/mx-chain-core-go/data/transaction" "github.com/multiversx/mx-chain-notifier-go/data" "github.com/multiversx/mx-chain-notifier-go/mocks" @@ -17,7 +18,8 @@ import ( func createMockEventsInterceptorArgs() process.ArgsEventsInterceptor { return process.ArgsEventsInterceptor{ - PubKeyConverter: &mocks.PubkeyConverterMock{}, + PubKeyConverter: &mocks.PubkeyConverterMock{}, + WithReadStateChanges: false, } } @@ -101,6 +103,36 @@ func TestProcessBlockEvents(t *testing.T) { require.Equal(t, process.ErrNilBlockHeader, err) }) + t.Run("nil state accesses, should return empty map", func(t *testing.T) { + t.Parallel() + + eventsInterceptor, _ := process.NewEventsInterceptor(createMockEventsInterceptorArgs()) + + eventsData := &data.ArgsSaveBlockData{ + HeaderHash: []byte("headerHash"), + TransactionsPool: &outport.TransactionPool{}, + Body: &block.Body{}, + Header: &block.HeaderV2{}, + StateAccesses: nil, + } + events, err := eventsInterceptor.ProcessBlockEvents(eventsData) + require.Nil(t, err) + + expInterceptorData := &data.InterceptorBlockData{ + Hash: hex.EncodeToString([]byte("headerHash")), + Body: &block.Body{}, + Header: &block.HeaderV2{}, + Txs: map[string]*transaction.Transaction{}, + TxsWithOrder: map[string]*outport.TxInfo(nil), + Scrs: map[string]*smartContractResult.SmartContractResult{}, + ScrsWithOrder: map[string]*outport.SCRInfo(nil), + LogEvents: []data.Event{}, + StateAccessesPerAccounts: map[string]*stateChange.StateAccesses{}, + } + + require.Equal(t, expInterceptorData, events) + }) + t.Run("should work", func(t *testing.T) { t.Parallel() @@ -157,6 +189,7 @@ func TestProcessBlockEvents(t *testing.T) { SmartContractResults: scrs, Logs: logs, }, + StateAccesses: make(map[string]*stateChange.StateAccesses), } expTxs := map[string]*transaction.Transaction{ @@ -202,6 +235,7 @@ func TestProcessBlockEvents(t *testing.T) { Topics: make([][]byte, 0), }, }, + StateAccessesPerAccounts: make(map[string]*stateChange.StateAccesses), } events, err := eventsInterceptor.ProcessBlockEvents(&blockEvents) @@ -250,6 +284,7 @@ func TestProcessBlockEvents(t *testing.T) { TransactionsPool: &outport.TransactionPool{ Logs: logs, }, + StateAccesses: make(map[string]*stateChange.StateAccesses), } expEvents := &data.InterceptorBlockData{ @@ -266,6 +301,7 @@ func TestProcessBlockEvents(t *testing.T) { Topics: make([][]byte, 0), }, }, + StateAccessesPerAccounts: make(map[string]*stateChange.StateAccesses), } events, err := eventsInterceptor.ProcessBlockEvents(&blockEvents) @@ -331,3 +367,302 @@ func TestGetLogEventsFromTransactionsPool(t *testing.T) { require.Equal(t, txHash1, receivedEvents[1].TxHash) require.Equal(t, txHash2, receivedEvents[2].TxHash) } + +func TestEventsInterceptor_GetStateAccessesPerAccounts(t *testing.T) { + t.Parallel() + + txs := map[string]*outport.TxInfo{ + hex.EncodeToString([]byte("txHash1")): { + Transaction: &transaction.Transaction{ + Nonce: 2, + }, + ExecutionOrder: 1, + }, + } + scrs := map[string]*outport.SCRInfo{ + hex.EncodeToString([]byte("txHash2")): { + SmartContractResult: &smartContractResult.SmartContractResult{ + Nonce: 3, + }, + ExecutionOrder: 2, + }, + } + invalidTxs := map[string]*outport.TxInfo{ + hex.EncodeToString([]byte("txHash0")): { + Transaction: &transaction.Transaction{ + Nonce: 1, + }, + ExecutionOrder: 0, + }, + } + + stateAccessesRead := make(map[string]*stateChange.StateAccesses) + stateAccessesRead["txHash1"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Read, + MainTrieKey: []byte("mainTrieKey1"), + MainTrieVal: []byte("mainTrieVal1"), + }, + &stateChange.StateAccess{ + Type: stateChange.Read, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal2"), + }, + }, + } + stateAccessesRead["txHash2"] = &stateChange.StateAccesses{} + stateAccessesRead["txHash0"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Read, + MainTrieKey: []byte("mainTrieKey3"), + MainTrieVal: []byte("mainTrieVal3"), + }, + &stateChange.StateAccess{ + Type: stateChange.Read, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal4"), + }, + }, + } + + stateAccessesWrite := make(map[string]*stateChange.StateAccesses) + stateAccessesWrite["txHash1"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey1"), + MainTrieVal: []byte("mainTrieVal1"), + }, + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal2"), + }, + }, + } + stateAccessesWrite["txHash2"] = &stateChange.StateAccesses{} + stateAccessesWrite["txHash0"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey3"), + MainTrieVal: []byte("mainTrieVal3"), + }, + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal4"), + }, + }, + } + + stateAccessesReadWrite := make(map[string]*stateChange.StateAccesses) + stateAccessesReadWrite["txHash1"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Read, + MainTrieKey: []byte("mainTrieKey1"), + MainTrieVal: []byte("mainTrieVal1"), + }, + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal2"), + }, + }, + } + stateAccessesReadWrite["txHash2"] = &stateChange.StateAccesses{} + stateAccessesReadWrite["txHash0"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey3"), + MainTrieVal: []byte("mainTrieVal3"), + }, + &stateChange.StateAccess{ + Type: stateChange.Read, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal4"), + }, + }, + } + + blockHash := []byte("blockHash") + + t.Run("with write operations", func(t *testing.T) { + t.Parallel() + + blockEvents := &data.ArgsSaveBlockData{ + HeaderHash: blockHash, + TransactionsPool: &outport.TransactionPool{ + Transactions: txs, + SmartContractResults: scrs, + InvalidTxs: invalidTxs, + }, + StateAccesses: stateAccessesWrite, + } + + expStateAccessesPerAccounts := make(map[string]*stateChange.StateAccesses) + expStateAccessesPerAccounts[hex.EncodeToString([]byte("mainTrieKey1"))] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey1"), + MainTrieVal: []byte("mainTrieVal1"), + }, + }, + } + expStateAccessesPerAccounts[hex.EncodeToString([]byte("mainTrieKey2"))] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal4"), + }, + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal2"), + }, + }, + } + expStateAccessesPerAccounts[hex.EncodeToString([]byte("mainTrieKey3"))] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey3"), + MainTrieVal: []byte("mainTrieVal3"), + }, + }, + } + + args := createMockEventsInterceptorArgs() + en, _ := process.NewEventsInterceptor(args) + + stateAccessesPerAccounts := en.GetStateAccessesPerAccounts(blockEvents) + + require.Equal(t, expStateAccessesPerAccounts, stateAccessesPerAccounts) + }) + + t.Run("with read operations, but not enabled from config", func(t *testing.T) { + t.Parallel() + + blockEvents := &data.ArgsSaveBlockData{ + HeaderHash: blockHash, + TransactionsPool: &outport.TransactionPool{ + Transactions: txs, + SmartContractResults: scrs, + InvalidTxs: invalidTxs, + }, + StateAccesses: stateAccessesRead, + } + + args := createMockEventsInterceptorArgs() + en, _ := process.NewEventsInterceptor(args) + + expStateAccessesPerAccounts := make(map[string]*stateChange.StateAccesses) + + stateAccessesPerAccounts := en.GetStateAccessesPerAccounts(blockEvents) + + require.Equal(t, expStateAccessesPerAccounts, stateAccessesPerAccounts) + }) + + t.Run("with read (not enabled from config) and write operations", func(t *testing.T) { + t.Parallel() + + blockEvents := &data.ArgsSaveBlockData{ + HeaderHash: blockHash, + TransactionsPool: &outport.TransactionPool{ + Transactions: txs, + SmartContractResults: scrs, + InvalidTxs: invalidTxs, + }, + StateAccesses: stateAccessesReadWrite, + } + + expStateAccessesPerAccounts := make(map[string]*stateChange.StateAccesses) + expStateAccessesPerAccounts[hex.EncodeToString([]byte("mainTrieKey2"))] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal2"), + }, + }, + } + expStateAccessesPerAccounts[hex.EncodeToString([]byte("mainTrieKey3"))] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey3"), + MainTrieVal: []byte("mainTrieVal3"), + }, + }, + } + + args := createMockEventsInterceptorArgs() + en, _ := process.NewEventsInterceptor(args) + + stateAccessesPerAccounts := en.GetStateAccessesPerAccounts(blockEvents) + + require.Equal(t, expStateAccessesPerAccounts, stateAccessesPerAccounts) + }) + + t.Run("with read and write operations", func(t *testing.T) { + t.Parallel() + + blockEvents := &data.ArgsSaveBlockData{ + HeaderHash: blockHash, + TransactionsPool: &outport.TransactionPool{ + Transactions: txs, + SmartContractResults: scrs, + InvalidTxs: invalidTxs, + }, + StateAccesses: stateAccessesReadWrite, + } + + expStateAccessesPerAccounts := make(map[string]*stateChange.StateAccesses) + expStateAccessesPerAccounts[hex.EncodeToString([]byte("mainTrieKey1"))] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Read, + MainTrieKey: []byte("mainTrieKey1"), + MainTrieVal: []byte("mainTrieVal1"), + }, + }, + } + expStateAccessesPerAccounts[hex.EncodeToString([]byte("mainTrieKey2"))] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Read, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal4"), + }, + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal2"), + }, + }, + } + expStateAccessesPerAccounts[hex.EncodeToString([]byte("mainTrieKey3"))] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey3"), + MainTrieVal: []byte("mainTrieVal3"), + }, + }, + } + + args := createMockEventsInterceptorArgs() + args.WithReadStateChanges = true + en, _ := process.NewEventsInterceptor(args) + + stateAccessesPerAccounts := en.GetStateAccessesPerAccounts(blockEvents) + + require.Equal(t, expStateAccessesPerAccounts, stateAccessesPerAccounts) + }) +} diff --git a/process/export_test.go b/process/export_test.go index 8e107de4..fa032b33 100644 --- a/process/export_test.go +++ b/process/export_test.go @@ -2,6 +2,7 @@ package process import ( "github.com/multiversx/mx-chain-core-go/data/outport" + "github.com/multiversx/mx-chain-core-go/data/stateChange" "github.com/multiversx/mx-chain-notifier-go/data" ) @@ -39,3 +40,8 @@ func (eh *eventsHandler) ShouldProcessSaveBlockEvents(blockHash string) bool { func (ei *eventsInterceptor) GetLogEventsFromTransactionsPool(logs []*outport.LogData) []data.Event { return ei.getLogEventsFromTransactionsPool(logs) } + +// GetStateAccessesPerAccounts - +func (ei *eventsInterceptor) GetStateAccessesPerAccounts(eventsData *data.ArgsSaveBlockData) map[string]*stateChange.StateAccesses { + return ei.getStateAccessesPerAccounts(eventsData) +} diff --git a/process/interface.go b/process/interface.go index 34c93fef..5e9c8636 100644 --- a/process/interface.go +++ b/process/interface.go @@ -24,6 +24,7 @@ type Publisher interface { BroadcastTxs(event data.BlockTxs) BroadcastBlockEventsWithOrder(event data.BlockEventsWithOrder) BroadcastScrs(event data.BlockScrs) + BroadcastStateAccesses(events data.BlockStateAccesses) Close() error IsInterfaceNil() bool } @@ -71,6 +72,7 @@ type PublisherHandler interface { PublishTxs(blockTxs data.BlockTxs) PublishScrs(blockScrs data.BlockScrs) PublishBlockEventsWithOrder(blockTxs data.BlockEventsWithOrder) + PublishStateAccesses(stateAccesses data.BlockStateAccesses) Close() error IsInterfaceNil() bool } diff --git a/process/preprocess/eventsPreProcessorV1.go b/process/preprocess/eventsPreProcessorV1.go index 077acb0e..9a4add06 100644 --- a/process/preprocess/eventsPreProcessorV1.go +++ b/process/preprocess/eventsPreProcessorV1.go @@ -65,6 +65,7 @@ func (d *eventsPreProcessorV1) SaveBlock(marshalledData []byte) error { TransactionsPool: outportBlock.TransactionPool, Header: header, HeaderTimeStampMs: outportBlock.BlockData.GetTimestampMs(), + StateAccesses: outportBlock.GetStateAccesses(), } err = d.facade.HandlePushEvents(*saveBlockData) diff --git a/process/publisher.go b/process/publisher.go index 4d6713ec..e6d7aa47 100644 --- a/process/publisher.go +++ b/process/publisher.go @@ -18,6 +18,7 @@ type publisher struct { broadcastTxs chan data.BlockTxs broadcastBlockEventsWithOrder chan data.BlockEventsWithOrder broadcastScrs chan data.BlockScrs + broadcastStateAccesses chan data.BlockStateAccesses cancelFunc func() closeChan chan struct{} @@ -38,6 +39,7 @@ func NewPublisher(handler PublisherHandler) (*publisher, error) { broadcastTxs: make(chan data.BlockTxs), broadcastScrs: make(chan data.BlockScrs), broadcastBlockEventsWithOrder: make(chan data.BlockEventsWithOrder), + broadcastStateAccesses: make(chan data.BlockStateAccesses), closeChan: make(chan struct{}), } @@ -79,6 +81,8 @@ func (p *publisher) run(ctx context.Context) { p.handler.PublishScrs(blockScrs) case blockEvents := <-p.broadcastBlockEventsWithOrder: p.handler.PublishBlockEventsWithOrder(blockEvents) + case blockStateAccesses := <-p.broadcastStateAccesses: + p.handler.PublishStateAccesses(blockStateAccesses) } } } @@ -131,6 +135,14 @@ func (p *publisher) BroadcastBlockEventsWithOrder(events data.BlockEventsWithOrd } } +// BroadcastStateAccesses will handle state accesses pushed by producers +func (p *publisher) BroadcastStateAccesses(events data.BlockStateAccesses) { + select { + case p.broadcastStateAccesses <- events: + case <-p.closeChan: + } +} + // Close will close the channels func (p *publisher) Close() error { p.mutState.RLock() diff --git a/rabbitmq/interface.go b/rabbitmq/interface.go index 79caf2c5..00dc6b57 100644 --- a/rabbitmq/interface.go +++ b/rabbitmq/interface.go @@ -27,6 +27,7 @@ type PublisherService interface { BroadcastTxs(event data.BlockTxs) BroadcastScrs(event data.BlockScrs) BroadcastBlockEventsWithOrder(event data.BlockEventsWithOrder) + BroadcastStateAccesses(events data.BlockStateAccesses) Close() error IsInterfaceNil() bool } diff --git a/rabbitmq/publisher.go b/rabbitmq/publisher.go index 01b167c9..9157f50f 100644 --- a/rabbitmq/publisher.go +++ b/rabbitmq/publisher.go @@ -1,6 +1,8 @@ package rabbitmq import ( + "fmt" + "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-core-go/marshal" logger "github.com/multiversx/mx-chain-logger-go" @@ -59,40 +61,46 @@ func checkArgs(args ArgsRabbitMqPublisher) error { } if args.Config.EventsExchange.Name == "" { - return ErrInvalidRabbitMqExchangeName + return fmt.Errorf("%w for EventsExchange", ErrInvalidRabbitMqExchangeName) } if args.Config.EventsExchange.Type == "" { - return ErrInvalidRabbitMqExchangeType + return fmt.Errorf("%w for EventsExchange", ErrInvalidRabbitMqExchangeType) } if args.Config.RevertEventsExchange.Name == "" { - return ErrInvalidRabbitMqExchangeName + return fmt.Errorf("%w for RevertEventsExchange", ErrInvalidRabbitMqExchangeName) } if args.Config.RevertEventsExchange.Type == "" { - return ErrInvalidRabbitMqExchangeType + return fmt.Errorf("%w for RevertEventsExchange", ErrInvalidRabbitMqExchangeType) } if args.Config.FinalizedEventsExchange.Name == "" { - return ErrInvalidRabbitMqExchangeName + return fmt.Errorf("%w for FinalizedEventsExchange", ErrInvalidRabbitMqExchangeName) } if args.Config.FinalizedEventsExchange.Type == "" { - return ErrInvalidRabbitMqExchangeType + return fmt.Errorf("%w for FinalizedEventsExchange", ErrInvalidRabbitMqExchangeType) } if args.Config.BlockTxsExchange.Name == "" { - return ErrInvalidRabbitMqExchangeName + return fmt.Errorf("%w for BlockTxsExchange", ErrInvalidRabbitMqExchangeName) } if args.Config.BlockTxsExchange.Type == "" { - return ErrInvalidRabbitMqExchangeType + return fmt.Errorf("%w for BlockTxsExchange", ErrInvalidRabbitMqExchangeType) } if args.Config.BlockScrsExchange.Name == "" { - return ErrInvalidRabbitMqExchangeName + return fmt.Errorf("%w for BlockScrsExchange", ErrInvalidRabbitMqExchangeName) } if args.Config.BlockScrsExchange.Type == "" { - return ErrInvalidRabbitMqExchangeType + return fmt.Errorf("%w for BlockScrsExchange", ErrInvalidRabbitMqExchangeType) } if args.Config.BlockEventsExchange.Name == "" { - return ErrInvalidRabbitMqExchangeName + return fmt.Errorf("%w for BlockEventsExchange", ErrInvalidRabbitMqExchangeName) } if args.Config.BlockEventsExchange.Type == "" { - return ErrInvalidRabbitMqExchangeType + return fmt.Errorf("%w for BlockEventsExchange", ErrInvalidRabbitMqExchangeType) + } + if args.Config.StateAccessesExchange.Name == "" { + return fmt.Errorf("%w for StateAccessesExchange", ErrInvalidRabbitMqExchangeName) + } + if args.Config.StateAccessesExchange.Type == "" { + return fmt.Errorf("%w for StateAccessesExchange", ErrInvalidRabbitMqExchangeType) } return nil @@ -124,6 +132,10 @@ func (rp *rabbitMqPublisher) createExchanges() error { if err != nil { return err } + err = rp.createExchange(rp.cfg.StateAccessesExchange) + if err != nil { + return err + } return nil } @@ -223,6 +235,20 @@ func (rp *rabbitMqPublisher) PublishBlockEventsWithOrder(blockTxs data.BlockEven } } +// PublishStateAccesses will publish block state accesses to rabbitmq +func (rp *rabbitMqPublisher) PublishStateAccesses(stateAccesses data.BlockStateAccesses) { + stateAccessesBytes, err := rp.marshaller.Marshal(stateAccesses) + if err != nil { + log.Error("could not marshal block state accesses", "err", err.Error()) + return + } + + err = rp.publishFanout(rp.cfg.StateAccessesExchange.Name, stateAccessesBytes) + if err != nil { + log.Error("failed to publish block state accesses to rabbitMQ", "err", err.Error()) + } +} + func (rp *rabbitMqPublisher) publishFanout(exchangeName string, payload []byte) error { return rp.client.Publish( exchangeName, diff --git a/rabbitmq/publisher_test.go b/rabbitmq/publisher_test.go index c0155d1b..223346ea 100644 --- a/rabbitmq/publisher_test.go +++ b/rabbitmq/publisher_test.go @@ -43,6 +43,10 @@ func createMockArgsRabbitMqPublisher() rabbitmq.ArgsRabbitMqPublisher { Name: "blockeventswithorder", Type: "fanout", }, + StateAccessesExchange: config.RabbitMQExchangeConfig{ + Name: "stateAccesses", + Type: "fanout", + }, }, Marshaller: &mock.MarshalizerMock{}, } @@ -128,6 +132,28 @@ func TestRabbitMqPublisher(t *testing.T) { require.True(t, errors.Is(err, rabbitmq.ErrInvalidRabbitMqExchangeName)) }) + t.Run("invalid state accesses exchange name", func(t *testing.T) { + t.Parallel() + + args := createMockArgsRabbitMqPublisher() + args.Config.StateAccessesExchange.Name = "" + + client, err := rabbitmq.NewRabbitMqPublisher(args) + require.True(t, check.IfNil(client)) + require.True(t, errors.Is(err, rabbitmq.ErrInvalidRabbitMqExchangeName)) + }) + + t.Run("invalid state accesses exchange type", func(t *testing.T) { + t.Parallel() + + args := createMockArgsRabbitMqPublisher() + args.Config.StateAccessesExchange.Type = "" + + client, err := rabbitmq.NewRabbitMqPublisher(args) + require.True(t, check.IfNil(client)) + require.True(t, errors.Is(err, rabbitmq.ErrInvalidRabbitMqExchangeType)) + }) + t.Run("invalid exchange type", func(t *testing.T) { t.Parallel() @@ -308,6 +334,28 @@ func TestBroadcastBlockEventsWithOrder(t *testing.T) { require.True(t, wasCalled) } +func TestBroadcastBlockStateAccesses(t *testing.T) { + t.Parallel() + + wasCalled := false + client := &mocks.RabbitClientStub{ + PublishCalled: func(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error { + wasCalled = true + return nil + }, + } + + args := createMockArgsRabbitMqPublisher() + args.Client = client + + rabbitmq, err := rabbitmq.NewRabbitMqPublisher(args) + require.Nil(t, err) + + rabbitmq.PublishStateAccesses(data.BlockStateAccesses{}) + + require.True(t, wasCalled) +} + func TestClose(t *testing.T) { t.Parallel() diff --git a/testdata/testData.go b/testdata/testData.go index a89c89a1..9962d4d3 100644 --- a/testdata/testData.go +++ b/testdata/testData.go @@ -1,10 +1,13 @@ package testdata import ( + "encoding/hex" + "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-core-go/data/block" "github.com/multiversx/mx-chain-core-go/data/outport" "github.com/multiversx/mx-chain-core-go/data/smartContractResult" + "github.com/multiversx/mx-chain-core-go/data/stateChange" "github.com/multiversx/mx-chain-core-go/data/transaction" "github.com/multiversx/mx-chain-core-go/marshal" "github.com/multiversx/mx-chain-notifier-go/common" @@ -115,6 +118,27 @@ func (bd *blockData) OutportBlockV1() *outport.OutportBlock { } headerBytes, _ := bd.marshaller.Marshal(header) + stateAccesses := make(map[string]*stateChange.StateAccesses) + stateAccesses["txHash1"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey1"), + MainTrieVal: []byte("mainTrieVal1"), + TxHash: []byte("txHash1"), + AccountChanges: 8, + }, + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal2"), + TxHash: []byte("txHash1"), + AccountChanges: 4, + }, + }, + } + stateAccesses["txHash2"] = &stateChange.StateAccesses{} + return &outport.OutportBlock{ BlockData: &outport.BlockData{ HeaderBytes: headerBytes, @@ -133,7 +157,7 @@ func (bd *blockData) OutportBlockV1() *outport.OutportBlock { HeaderGasConsumption: &outport.HeaderGasConsumption{}, TransactionPool: &outport.TransactionPool{ Transactions: map[string]*outport.TxInfo{ - "txHash1": { + hex.EncodeToString([]byte("txHash1")): { Transaction: &transaction.Transaction{ Nonce: 1, GasPrice: 1, @@ -146,7 +170,7 @@ func (bd *blockData) OutportBlockV1() *outport.OutportBlock { }, }, SmartContractResults: map[string]*outport.SCRInfo{ - "scrHash1": { + hex.EncodeToString([]byte("scrHash1")): { SmartContractResult: &smartContractResult.SmartContractResult{ Nonce: 2, GasLimit: 2, @@ -169,6 +193,7 @@ func (bd *blockData) OutportBlockV1() *outport.OutportBlock { }, }, }, + StateAccesses: stateAccesses, NumberOfShards: 2, } } diff --git a/tools/wsConnector/main.go b/tools/wsConnector/main.go index 49eed11a..552ee3ee 100644 --- a/tools/wsConnector/main.go +++ b/tools/wsConnector/main.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "time" wsData "github.com/multiversx/mx-chain-communication-go/websocket/data" wsFactory "github.com/multiversx/mx-chain-communication-go/websocket/factory" @@ -19,28 +20,38 @@ func main() { return } - blockData, err := testdata.NewBlockData(marshaller) - if err != nil { - fmt.Println(err.Error()) - return - } - - err = wsClient.PushEventsRequest(blockData.OutportBlockV1()) - if err != nil { - fmt.Println(err.Error()) - return - } - - err = wsClient.RevertEventsRequest(blockData.RevertBlockV1()) - if err != nil { - fmt.Println(err.Error()) - return - } - - err = wsClient.FinalizedEventsRequest(blockData.FinalizedBlockV1()) - if err != nil { - fmt.Println(err.Error()) - return + for { + blockData, err := testdata.NewBlockData(marshaller) + if err != nil { + fmt.Println(err.Error()) + time.Sleep(6 * time.Second) + continue + } + + err = wsClient.PushEventsRequest(blockData.OutportBlockV1()) + if err != nil { + fmt.Println(err.Error()) + time.Sleep(6 * time.Second) + continue + } + + err = wsClient.RevertEventsRequest(blockData.RevertBlockV1()) + if err != nil { + time.Sleep(6 * time.Second) + fmt.Println(err.Error()) + continue + } + + err = wsClient.FinalizedEventsRequest(blockData.FinalizedBlockV1()) + if err != nil { + fmt.Println(err.Error()) + time.Sleep(6 * time.Second) + continue + } + + fmt.Println("sent properly") + + time.Sleep(6 * time.Second) } } @@ -63,7 +74,7 @@ func newWSObsClient(marshaller marshal.Marshalizer) (*wsObsClient, error) { port := 22111 wsHost, err := wsFactory.CreateWebSocketHost(wsFactory.ArgsWebSocketHost{ WebSocketConfig: wsData.WebSocketConfig{ - URL: "localhost:" + fmt.Sprintf("%d", port), + URL: "ws://localhost:" + fmt.Sprintf("%d", port), WithAcknowledge: true, Mode: "client", RetryDurationInSec: 5, diff --git a/tools/wsPublisher/main.go b/tools/wsPublisher/main.go index 789a822d..93a50873 100644 --- a/tools/wsPublisher/main.go +++ b/tools/wsPublisher/main.go @@ -5,6 +5,7 @@ import ( "fmt" "net/url" "sync" + "time" "github.com/gorilla/websocket" "github.com/multiversx/mx-chain-notifier-go/common" @@ -36,6 +37,9 @@ func main() { { EventType: common.BlockTxs, }, + { + EventType: common.BlockStateAccesses, + }, }, } @@ -59,7 +63,7 @@ func main() { case common.BlockEvents: var event data.BlockEventsWithOrder _ = json.Unmarshal(reply.Data, &event) - fmt.Println(event) + fmt.Printf("Hash: %s, TimeStamp: %d\n", event.Hash, event.TimeStamp) case common.RevertBlockEvents: var event *data.RevertBlock _ = json.Unmarshal(reply.Data, &event) @@ -67,7 +71,7 @@ func main() { case common.FinalizedBlockEvents: var event *data.FinalizedBlock _ = json.Unmarshal(reply.Data, &event) - fmt.Println(event) + fmt.Printf("Hash: %s, TimeStamp: %d\n", event.Hash, time.Now().Unix()) case common.BlockTxs: var event *data.BlockTxs _ = json.Unmarshal(reply.Data, &event) @@ -76,6 +80,16 @@ func main() { var event data.BlockScrs _ = json.Unmarshal(reply.Data, &event) fmt.Println(event.Hash) + case common.BlockStateAccesses: + var event data.BlockStateAccesses + _ = json.Unmarshal(reply.Data, &event) + fmt.Printf("SA: Hash: %s, TimeStamp: %d, Len sa: %d\n", event.Hash, time.Now().Unix(), len(event.StateAccessesPerAccounts)) + for acc, sas := range event.StateAccessesPerAccounts { + fmt.Printf("Account: %s\n", acc) + for _, sa := range sas.StateAccess { + fmt.Printf("Txhash: %s, Type: %d, AccountChanges: %d\n", sa.TxHash, sa.Type, sa.AccountChanges) + } + } default: fmt.Println("invalid message type") }