From 08783fa242c1946ac33a48d86a07c6ebc5978e9d Mon Sep 17 00:00:00 2001 From: ssd04 Date: Tue, 19 Aug 2025 10:36:58 +0300 Subject: [PATCH 01/28] added rabbitmq exchange for state changes --- cmd/notifier/config/config.toml | 5 ++ common/constants.go | 3 ++ config/config.go | 1 + data/block.go | 3 ++ data/outport.go | 7 +++ disabled/disabledHub.go | 4 ++ dispatcher/hub/commonHub.go | 19 ++++++++ dispatcher/interface.go | 1 + dispatcher/ws/wsDispatcher.go | 20 ++++++++ go.mod | 2 +- go.sum | 4 +- .../rabbitmq/testNotifierWithRabbitMQ_test.go | 18 ++++++- integrationTests/testNotifierProxy.go | 4 ++ mocks/dispatcherMock.go | 4 ++ mocks/dispatcherStub.go | 22 ++++++--- mocks/publisherHandlerStub.go | 8 ++++ mocks/publisherStub.go | 8 ++++ process/eventsHandler.go | 23 +++++++++ process/interface.go | 2 + process/preprocess/eventsPreProcessorV1.go | 1 + process/publisher.go | 12 +++++ rabbitmq/interface.go | 1 + rabbitmq/publisher.go | 24 ++++++++++ rabbitmq/publisher_test.go | 48 +++++++++++++++++++ 24 files changed, 233 insertions(+), 11 deletions(-) diff --git a/cmd/notifier/config/config.toml b/cmd/notifier/config/config.toml index 24ad5c00..07cfbb39 100644 --- a/cmd/notifier/config/config.toml +++ b/cmd/notifier/config/config.toml @@ -114,3 +114,8 @@ [RabbitMQ.BlockEventsExchange] Name = "block_events" Type = "fanout" + + # The exchange which holds state accesses + [RabbitMQ.StateAccessesExchange] + Name = "state_accesses" + Type = "fanout" diff --git a/common/constants.go b/common/constants.go index 2908cf83..a35a1638 100644 --- a/common/constants.go +++ b/common/constants.go @@ -34,6 +34,9 @@ const ( // BlockScrs defines the subscription event type for block scrs BlockScrs string = "block_scrs" + + // BlockStateAccesses defines the subscription event type for block state accesses + BlockStateAccesses string = "block_state_accesses" ) const ( diff --git a/config/config.go b/config/config.go index b99f17ba..1f339088 100644 --- a/config/config.go +++ b/config/config.go @@ -80,6 +80,7 @@ type RabbitMQConfig struct { BlockTxsExchange RabbitMQExchangeConfig BlockScrsExchange RabbitMQExchangeConfig BlockEventsExchange RabbitMQExchangeConfig + StateAccessesExchange RabbitMQExchangeConfig } // RabbitMQExchangeConfig holds the configuration for a rabbitMQ exchange diff --git a/data/block.go b/data/block.go index 785395a7..2ae362d9 100644 --- a/data/block.go +++ b/data/block.go @@ -9,6 +9,7 @@ import ( "github.com/multiversx/mx-chain-core-go/data/receipt" "github.com/multiversx/mx-chain-core-go/data/rewardTx" "github.com/multiversx/mx-chain-core-go/data/smartContractResult" + "github.com/multiversx/mx-chain-core-go/data/stateChange" "github.com/multiversx/mx-chain-core-go/data/transaction" ) @@ -30,6 +31,7 @@ type InterceptorBlockData struct { Scrs map[string]*smartContractResult.SmartContractResult ScrsWithOrder map[string]*outport.SCRInfo LogEvents []Event + StateAccesses map[string]*stateChange.StateAccesses } // ArgsSaveBlockData holds the block data that will be received on push events @@ -44,6 +46,7 @@ type ArgsSaveBlockData struct { AlteredAccounts map[string]*alteredAccount.AlteredAccount NumberOfShards uint32 HeaderTimeStampMs uint64 + StateAccesses map[string]*stateChange.StateAccesses } // OutportBlockDataOld holds the block data that will be received on push events diff --git a/data/outport.go b/data/outport.go index 383b017f..c24a64ff 100644 --- a/data/outport.go +++ b/data/outport.go @@ -7,6 +7,7 @@ import ( "github.com/multiversx/mx-chain-core-go/data/receipt" "github.com/multiversx/mx-chain-core-go/data/rewardTx" "github.com/multiversx/mx-chain-core-go/data/smartContractResult" + "github.com/multiversx/mx-chain-core-go/data/stateChange" "github.com/multiversx/mx-chain-core-go/data/transaction" ) @@ -73,6 +74,12 @@ type BlockEventsWithOrder struct { Events []Event `json:"events"` } +// BlockStateAccesses holds the block state accesses +type BlockStateAccesses struct { + Hash string `json:"hash"` + StateAccesses map[string]*stateChange.StateAccesses `json:"stateAccesses"` +} + // NotifierTransaction defines a wrapper over transaction type NotifierTransaction struct { *transaction.Transaction diff --git a/disabled/disabledHub.go b/disabled/disabledHub.go index f819e711..c9fcb3d5 100644 --- a/disabled/disabledHub.go +++ b/disabled/disabledHub.go @@ -33,6 +33,10 @@ func (h *Hub) PublishScrs(blockScrs data.BlockScrs) { func (h *Hub) PublishBlockEventsWithOrder(blockTxs data.BlockEventsWithOrder) { } +// PublishStateAccesses does nothing +func (h *Hub) PublishStateAccesses(blockTxs data.BlockStateAccesses) { +} + // RegisterEvent does nothing func (h *Hub) RegisterEvent(_ dispatcher.EventDispatcher) { } diff --git a/dispatcher/hub/commonHub.go b/dispatcher/hub/commonHub.go index 027594c9..8da3b2f7 100644 --- a/dispatcher/hub/commonHub.go +++ b/dispatcher/hub/commonHub.go @@ -188,6 +188,25 @@ func (ch *commonHub) PublishScrs(blockScrs data.BlockScrs) { } } +// PublishStateAccesses will publish state accesses to dispatcher +func (ch *commonHub) PublishStateAccesses(stateAccesses data.BlockStateAccesses) { + subscriptions := ch.subscriptionMapper.Subscriptions() + + dispatchersMap := make(map[uuid.UUID]data.BlockStateAccesses) + + for _, subscription := range subscriptions[common.BlockStateAccesses] { + dispatchersMap[subscription.DispatcherID] = stateAccesses + } + + ch.mutDispatchers.RLock() + defer ch.mutDispatchers.RUnlock() + for id, event := range dispatchersMap { + if d, ok := ch.dispatchers[id]; ok { + d.StateAccessesEvent(event) + } + } +} + func (ch *commonHub) registerDispatcher(d dispatcher.EventDispatcher) { ch.mutDispatchers.Lock() defer ch.mutDispatchers.Unlock() diff --git a/dispatcher/interface.go b/dispatcher/interface.go index 9a2c33a1..3ac97dda 100644 --- a/dispatcher/interface.go +++ b/dispatcher/interface.go @@ -19,6 +19,7 @@ type EventDispatcher interface { TxsEvent(event data.BlockTxs) BlockEvents(event data.BlockEventsWithOrder) ScrsEvent(event data.BlockScrs) + StateAccessesEvent(event data.BlockStateAccesses) } // Hub defines the behaviour of a component which should be able to receive events diff --git a/dispatcher/ws/wsDispatcher.go b/dispatcher/ws/wsDispatcher.go index 21c3d6d1..8db20447 100644 --- a/dispatcher/ws/wsDispatcher.go +++ b/dispatcher/ws/wsDispatcher.go @@ -193,6 +193,26 @@ func (wd *websocketDispatcher) ScrsEvent(event data.BlockScrs) { wd.send <- wsEventBytes } +// StateAccessesEvent receives a block state accesses event and process it before pushing to socket +func (wd *websocketDispatcher) StateAccessesEvent(event data.BlockStateAccesses) { + eventBytes, err := wd.marshaller.Marshal(event) + if err != nil { + log.Error("failure marshalling events", "err", err.Error()) + return + } + wsEvent := &data.WebSocketEvent{ + Type: common.BlockStateAccesses, + Data: eventBytes, + } + wsEventBytes, err := wd.marshaller.Marshal(wsEvent) + if err != nil { + log.Error("failure marshalling events", "err", err.Error()) + return + } + + wd.send <- wsEventBytes +} + // writePump listens on the send-channel and pushes data on the socket stream func (wd *websocketDispatcher) writePump() { ticker := time.NewTicker(pingPeriod) diff --git a/go.mod b/go.mod index d00d6999..bafbdefc 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.3 github.com/multiversx/mx-chain-communication-go v1.2.0 - github.com/multiversx/mx-chain-core-go v1.4.0 + github.com/multiversx/mx-chain-core-go v1.4.1-0.20250715072713-eaece6359e1c github.com/multiversx/mx-chain-logger-go v1.1.0 github.com/pelletier/go-toml v1.9.3 github.com/prometheus/client_model v0.6.1 diff --git a/go.sum b/go.sum index 6cda5d90..4090f0a3 100644 --- a/go.sum +++ b/go.sum @@ -130,8 +130,8 @@ github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= github.com/multiversx/mx-chain-communication-go v1.2.0 h1:0wOoLldiRbvaOPxwICbnRCqCpLqPewg8M/FMbC/0OXY= github.com/multiversx/mx-chain-communication-go v1.2.0/go.mod h1:wS3aAwkmHbC9mlzQdvL6p7l8Rqw3vmzhj7WZW1dTveA= -github.com/multiversx/mx-chain-core-go v1.4.0 h1:p6FbfCzvMXF54kpS0B5mrjNWYpq4SEQqo0UvrMF7YVY= -github.com/multiversx/mx-chain-core-go v1.4.0/go.mod h1:IO+vspNan+gT0WOHnJ95uvWygiziHZvfXpff6KnxV7g= +github.com/multiversx/mx-chain-core-go v1.4.1-0.20250715072713-eaece6359e1c h1:MMRPxVcxpHfe9g1DIWiz7s+QUEA68Xb+oJALJukHbgA= +github.com/multiversx/mx-chain-core-go v1.4.1-0.20250715072713-eaece6359e1c/go.mod h1:IO+vspNan+gT0WOHnJ95uvWygiziHZvfXpff6KnxV7g= github.com/multiversx/mx-chain-crypto-go v1.2.13-0.20250218161752-9482d9a22234 h1:NNI7kYxzsq+4mTPSUJo0cK1+iPxjUX+gRJDaBRwEQ7M= github.com/multiversx/mx-chain-crypto-go v1.2.13-0.20250218161752-9482d9a22234/go.mod h1:QZAw2bZcOxGQRgYACTrmP8pfTa3NyxENIL+00G6nM5E= github.com/multiversx/mx-chain-logger-go v1.1.0 h1:97x84A6L4RfCa6YOx1HpAFxZp1cf/WI0Qh112whgZNM= diff --git a/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go b/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go index 0c9f97e5..9dd8af9c 100644 --- a/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go +++ b/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go @@ -10,6 +10,7 @@ import ( "github.com/multiversx/mx-chain-core-go/data/block" "github.com/multiversx/mx-chain-core-go/data/outport" "github.com/multiversx/mx-chain-core-go/data/smartContractResult" + "github.com/multiversx/mx-chain-core-go/data/stateChange" "github.com/multiversx/mx-chain-core-go/data/transaction" logger "github.com/multiversx/mx-chain-logger-go" "github.com/multiversx/mx-chain-notifier-go/common" @@ -56,7 +57,7 @@ func testNotifierWithRabbitMQ(t *testing.T, observerType string, payloadVersion integrationTests.WaitTimeout(t, wg, time.Second*2) assert.Equal(t, 3, len(notifier.RedisClient.GetEntries())) - assert.Equal(t, 6, len(notifier.RabbitMQClient.GetEntries())) + assert.Equal(t, 7, len(notifier.RabbitMQClient.GetEntries())) } func pushEventsRequest(wg *sync.WaitGroup, webServer integrationTests.ObserverConnector) { @@ -101,6 +102,21 @@ func pushEventsRequest(wg *sync.WaitGroup, webServer integrationTests.ObserverCo }, } + stateAccesses := make(map[string]*stateChange.StateAccesses) + stateAccesses["txHash1"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + MainTrieKey: []byte("mainTrieKey1"), + MainTrieVal: []byte("mainTrieKey1"), + }, + &stateChange.StateAccess{ + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieKey2"), + }, + }, + } + stateAccesses["txHash2"] = &stateChange.StateAccesses{} + saveBlockData := &outport.OutportBlock{ BlockData: &outport.BlockData{ HeaderBytes: headerBytes, diff --git a/integrationTests/testNotifierProxy.go b/integrationTests/testNotifierProxy.go index 0a0c839b..9c68c930 100644 --- a/integrationTests/testNotifierProxy.go +++ b/integrationTests/testNotifierProxy.go @@ -234,6 +234,10 @@ func GetDefaultConfigs() config.Configs { Name: "blockevents", Type: "fanout", }, + StateAccessesExchange: config.RabbitMQExchangeConfig{ + Name: "stateaccesses", + Type: "fanout", + }, }, }, Flags: config.FlagsConfig{ diff --git a/mocks/dispatcherMock.go b/mocks/dispatcherMock.go index 48b4581a..6962dd1d 100644 --- a/mocks/dispatcherMock.go +++ b/mocks/dispatcherMock.go @@ -52,6 +52,10 @@ func (d *DispatcherMock) TxsEvent(event data.BlockTxs) { func (d *DispatcherMock) ScrsEvent(event data.BlockScrs) { } +// StateAccessesEvent - +func (d *DispatcherMock) StateAccessesEvent(event data.BlockStateAccesses) { +} + // Subscribe - func (d *DispatcherMock) Subscribe(event data.SubscribeEvent) { d.hub.Subscribe(event) diff --git a/mocks/dispatcherStub.go b/mocks/dispatcherStub.go index e851a342..816e1c7a 100644 --- a/mocks/dispatcherStub.go +++ b/mocks/dispatcherStub.go @@ -7,13 +7,14 @@ import ( // DispatcherStub implements dispatcher EventDispatcher interface type DispatcherStub struct { - GetIDCalled func() uuid.UUID - PushEventsCalled func(events []data.Event) - BlockEventsCalled func(event data.BlockEventsWithOrder) - RevertEventCalled func(event data.RevertBlock) - FinalizedEventCalled func(event data.FinalizedBlock) - TxsEventCalled func(event data.BlockTxs) - ScrsEventCalled func(event data.BlockScrs) + GetIDCalled func() uuid.UUID + PushEventsCalled func(events []data.Event) + BlockEventsCalled func(event data.BlockEventsWithOrder) + RevertEventCalled func(event data.RevertBlock) + FinalizedEventCalled func(event data.FinalizedBlock) + TxsEventCalled func(event data.BlockTxs) + ScrsEventCalled func(event data.BlockScrs) + StateAccessesEventCalled func(event data.BlockStateAccesses) } // GetID - @@ -66,3 +67,10 @@ func (d *DispatcherStub) ScrsEvent(event data.BlockScrs) { d.ScrsEventCalled(event) } } + +// StateAccessesEvent - +func (d *DispatcherStub) StateAccessesEvent(event data.BlockStateAccesses) { + if d.StateAccessesEventCalled != nil { + d.StateAccessesEventCalled(event) + } +} diff --git a/mocks/publisherHandlerStub.go b/mocks/publisherHandlerStub.go index d74f5ae0..7f1edd90 100644 --- a/mocks/publisherHandlerStub.go +++ b/mocks/publisherHandlerStub.go @@ -10,6 +10,7 @@ type PublisherHandlerStub struct { PublishTxsCalled func(blockTxs data.BlockTxs) PublishScrsCalled func(blockScrs data.BlockScrs) PublishBlockEventsWithOrderCalled func(blockTxs data.BlockEventsWithOrder) + PublishBlockStateAccessesCalled func(stateAccesses data.BlockStateAccesses) CloseCalled func() error } @@ -55,6 +56,13 @@ func (p *PublisherHandlerStub) PublishBlockEventsWithOrder(blockTxs data.BlockEv } } +// PublishStateAccesses - +func (p *PublisherHandlerStub) PublishStateAccesses(stateAccesses data.BlockStateAccesses) { + if p.PublishBlockStateAccessesCalled != nil { + p.PublishBlockStateAccessesCalled(stateAccesses) + } +} + // Close - func (p *PublisherHandlerStub) Close() error { if p.CloseCalled != nil { diff --git a/mocks/publisherStub.go b/mocks/publisherStub.go index fbea84ce..48f10340 100644 --- a/mocks/publisherStub.go +++ b/mocks/publisherStub.go @@ -11,6 +11,7 @@ type PublisherStub struct { BroadcastTxsCalled func(event data.BlockTxs) BroadcastScrsCalled func(event data.BlockScrs) BroadcastBlockEventsWithOrderCalled func(event data.BlockEventsWithOrder) + BroadcastStateAccessesCalled func(event data.BlockStateAccesses) CloseCalled func() error } @@ -65,6 +66,13 @@ func (ps *PublisherStub) BroadcastBlockEventsWithOrder(event data.BlockEventsWit } } +// BroadcastStateAccesses - +func (ps *PublisherStub) BroadcastStateAccesses(event data.BlockStateAccesses) { + if ps.BroadcastStateAccessesCalled != nil { + ps.BroadcastStateAccessesCalled(event) + } +} + // Close - func (ps *PublisherStub) Close() error { if ps.CloseCalled != nil { diff --git a/process/eventsHandler.go b/process/eventsHandler.go index bb72dae2..f0a8d9c4 100644 --- a/process/eventsHandler.go +++ b/process/eventsHandler.go @@ -130,6 +130,12 @@ func (eh *eventsHandler) HandleSaveBlockEvents(allEvents data.ArgsSaveBlockData) } eh.handleBlockEventsWithOrder(txsWithOrder) + stateAccesses := data.BlockStateAccesses{ + Hash: eventsData.Hash, + StateAccesses: eventsData.StateAccesses, + } + eh.handleStateAccesses(stateAccesses) + return nil } @@ -306,6 +312,23 @@ func (eh *eventsHandler) handleBlockEventsWithOrder(blockTxs data.BlockEventsWit eh.metricsHandler.AddRequest(getRabbitOpID(common.BlockEvents), time.Since(t)) } +func (eh *eventsHandler) handleStateAccesses(stateAccesses data.BlockStateAccesses) { + if stateAccesses.Hash == "" { + log.Warn("received empty state accesses", + "will process", false, + ) + return + } + + log.Info("received state accesses", + "block hash", stateAccesses.Hash, + ) + + t := time.Now() + eh.publisher.BroadcastStateAccesses(stateAccesses) + eh.metricsHandler.AddRequest(getRabbitOpID(common.BlockStateAccesses), time.Since(t)) +} + func (eh *eventsHandler) tryCheckProcessedWithRetry(id, blockHash string) bool { var err error var setSuccessful bool diff --git a/process/interface.go b/process/interface.go index 34c93fef..5e9c8636 100644 --- a/process/interface.go +++ b/process/interface.go @@ -24,6 +24,7 @@ type Publisher interface { BroadcastTxs(event data.BlockTxs) BroadcastBlockEventsWithOrder(event data.BlockEventsWithOrder) BroadcastScrs(event data.BlockScrs) + BroadcastStateAccesses(events data.BlockStateAccesses) Close() error IsInterfaceNil() bool } @@ -71,6 +72,7 @@ type PublisherHandler interface { PublishTxs(blockTxs data.BlockTxs) PublishScrs(blockScrs data.BlockScrs) PublishBlockEventsWithOrder(blockTxs data.BlockEventsWithOrder) + PublishStateAccesses(stateAccesses data.BlockStateAccesses) Close() error IsInterfaceNil() bool } diff --git a/process/preprocess/eventsPreProcessorV1.go b/process/preprocess/eventsPreProcessorV1.go index 077acb0e..9a4add06 100644 --- a/process/preprocess/eventsPreProcessorV1.go +++ b/process/preprocess/eventsPreProcessorV1.go @@ -65,6 +65,7 @@ func (d *eventsPreProcessorV1) SaveBlock(marshalledData []byte) error { TransactionsPool: outportBlock.TransactionPool, Header: header, HeaderTimeStampMs: outportBlock.BlockData.GetTimestampMs(), + StateAccesses: outportBlock.GetStateAccesses(), } err = d.facade.HandlePushEvents(*saveBlockData) diff --git a/process/publisher.go b/process/publisher.go index 4d6713ec..e6d7aa47 100644 --- a/process/publisher.go +++ b/process/publisher.go @@ -18,6 +18,7 @@ type publisher struct { broadcastTxs chan data.BlockTxs broadcastBlockEventsWithOrder chan data.BlockEventsWithOrder broadcastScrs chan data.BlockScrs + broadcastStateAccesses chan data.BlockStateAccesses cancelFunc func() closeChan chan struct{} @@ -38,6 +39,7 @@ func NewPublisher(handler PublisherHandler) (*publisher, error) { broadcastTxs: make(chan data.BlockTxs), broadcastScrs: make(chan data.BlockScrs), broadcastBlockEventsWithOrder: make(chan data.BlockEventsWithOrder), + broadcastStateAccesses: make(chan data.BlockStateAccesses), closeChan: make(chan struct{}), } @@ -79,6 +81,8 @@ func (p *publisher) run(ctx context.Context) { p.handler.PublishScrs(blockScrs) case blockEvents := <-p.broadcastBlockEventsWithOrder: p.handler.PublishBlockEventsWithOrder(blockEvents) + case blockStateAccesses := <-p.broadcastStateAccesses: + p.handler.PublishStateAccesses(blockStateAccesses) } } } @@ -131,6 +135,14 @@ func (p *publisher) BroadcastBlockEventsWithOrder(events data.BlockEventsWithOrd } } +// BroadcastStateAccesses will handle state accesses pushed by producers +func (p *publisher) BroadcastStateAccesses(events data.BlockStateAccesses) { + select { + case p.broadcastStateAccesses <- events: + case <-p.closeChan: + } +} + // Close will close the channels func (p *publisher) Close() error { p.mutState.RLock() diff --git a/rabbitmq/interface.go b/rabbitmq/interface.go index 79caf2c5..00dc6b57 100644 --- a/rabbitmq/interface.go +++ b/rabbitmq/interface.go @@ -27,6 +27,7 @@ type PublisherService interface { BroadcastTxs(event data.BlockTxs) BroadcastScrs(event data.BlockScrs) BroadcastBlockEventsWithOrder(event data.BlockEventsWithOrder) + BroadcastStateAccesses(events data.BlockStateAccesses) Close() error IsInterfaceNil() bool } diff --git a/rabbitmq/publisher.go b/rabbitmq/publisher.go index 01b167c9..49169d4d 100644 --- a/rabbitmq/publisher.go +++ b/rabbitmq/publisher.go @@ -94,6 +94,12 @@ func checkArgs(args ArgsRabbitMqPublisher) error { if args.Config.BlockEventsExchange.Type == "" { return ErrInvalidRabbitMqExchangeType } + if args.Config.StateAccessesExchange.Name == "" { + return ErrInvalidRabbitMqExchangeName + } + if args.Config.StateAccessesExchange.Type == "" { + return ErrInvalidRabbitMqExchangeType + } return nil } @@ -124,6 +130,10 @@ func (rp *rabbitMqPublisher) createExchanges() error { if err != nil { return err } + err = rp.createExchange(rp.cfg.StateAccessesExchange) + if err != nil { + return err + } return nil } @@ -223,6 +233,20 @@ func (rp *rabbitMqPublisher) PublishBlockEventsWithOrder(blockTxs data.BlockEven } } +// PublishStateAccesses will publish block state accesses to rabbitmq +func (rp *rabbitMqPublisher) PublishStateAccesses(stateAccesses data.BlockStateAccesses) { + stateAccessesBytes, err := rp.marshaller.Marshal(stateAccesses) + if err != nil { + log.Error("could not marshal block state accesses", "err", err.Error()) + return + } + + err = rp.publishFanout(rp.cfg.StateAccessesExchange.Name, stateAccessesBytes) + if err != nil { + log.Error("failed to publish block state accesses to rabbitMQ", "err", err.Error()) + } +} + func (rp *rabbitMqPublisher) publishFanout(exchangeName string, payload []byte) error { return rp.client.Publish( exchangeName, diff --git a/rabbitmq/publisher_test.go b/rabbitmq/publisher_test.go index c0155d1b..223346ea 100644 --- a/rabbitmq/publisher_test.go +++ b/rabbitmq/publisher_test.go @@ -43,6 +43,10 @@ func createMockArgsRabbitMqPublisher() rabbitmq.ArgsRabbitMqPublisher { Name: "blockeventswithorder", Type: "fanout", }, + StateAccessesExchange: config.RabbitMQExchangeConfig{ + Name: "stateAccesses", + Type: "fanout", + }, }, Marshaller: &mock.MarshalizerMock{}, } @@ -128,6 +132,28 @@ func TestRabbitMqPublisher(t *testing.T) { require.True(t, errors.Is(err, rabbitmq.ErrInvalidRabbitMqExchangeName)) }) + t.Run("invalid state accesses exchange name", func(t *testing.T) { + t.Parallel() + + args := createMockArgsRabbitMqPublisher() + args.Config.StateAccessesExchange.Name = "" + + client, err := rabbitmq.NewRabbitMqPublisher(args) + require.True(t, check.IfNil(client)) + require.True(t, errors.Is(err, rabbitmq.ErrInvalidRabbitMqExchangeName)) + }) + + t.Run("invalid state accesses exchange type", func(t *testing.T) { + t.Parallel() + + args := createMockArgsRabbitMqPublisher() + args.Config.StateAccessesExchange.Type = "" + + client, err := rabbitmq.NewRabbitMqPublisher(args) + require.True(t, check.IfNil(client)) + require.True(t, errors.Is(err, rabbitmq.ErrInvalidRabbitMqExchangeType)) + }) + t.Run("invalid exchange type", func(t *testing.T) { t.Parallel() @@ -308,6 +334,28 @@ func TestBroadcastBlockEventsWithOrder(t *testing.T) { require.True(t, wasCalled) } +func TestBroadcastBlockStateAccesses(t *testing.T) { + t.Parallel() + + wasCalled := false + client := &mocks.RabbitClientStub{ + PublishCalled: func(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error { + wasCalled = true + return nil + }, + } + + args := createMockArgsRabbitMqPublisher() + args.Client = client + + rabbitmq, err := rabbitmq.NewRabbitMqPublisher(args) + require.Nil(t, err) + + rabbitmq.PublishStateAccesses(data.BlockStateAccesses{}) + + require.True(t, wasCalled) +} + func TestClose(t *testing.T) { t.Parallel() From 02b43dc66ad4427b4d22ea5d57920f7adc5fb92a Mon Sep 17 00:00:00 2001 From: ssd04 Date: Wed, 20 Aug 2025 15:42:50 +0300 Subject: [PATCH 02/28] fixes after review --- dispatcher/hub/commonHub.go | 24 +++++++++++++++ .../rabbitmq/testNotifierWithRabbitMQ_test.go | 4 +-- rabbitmq/publisher.go | 30 ++++++++++--------- 3 files changed, 42 insertions(+), 16 deletions(-) diff --git a/dispatcher/hub/commonHub.go b/dispatcher/hub/commonHub.go index 8da3b2f7..1278c074 100644 --- a/dispatcher/hub/commonHub.go +++ b/dispatcher/hub/commonHub.go @@ -96,6 +96,10 @@ func (ch *commonHub) handlePushBlockEvents(blockEvents data.BlockEvents, subscri // PublishRevert will publish revert event to dispatcher func (ch *commonHub) PublishRevert(revertBlock data.RevertBlock) { subscriptions := ch.subscriptionMapper.Subscriptions() + _, ok := subscriptions[common.RevertBlockEvents] + if !ok { + return + } dispatchersMap := make(map[uuid.UUID]data.RevertBlock) @@ -115,6 +119,10 @@ func (ch *commonHub) PublishRevert(revertBlock data.RevertBlock) { // PublishFinalized will publish finalized event to dispatcher func (ch *commonHub) PublishFinalized(finalizedBlock data.FinalizedBlock) { subscriptions := ch.subscriptionMapper.Subscriptions() + _, ok := subscriptions[common.FinalizedBlockEvents] + if !ok { + return + } dispatchersMap := make(map[uuid.UUID]data.FinalizedBlock) @@ -134,6 +142,10 @@ func (ch *commonHub) PublishFinalized(finalizedBlock data.FinalizedBlock) { // PublishTxs will publish txs event to dispatcher func (ch *commonHub) PublishTxs(blockTxs data.BlockTxs) { subscriptions := ch.subscriptionMapper.Subscriptions() + _, ok := subscriptions[common.BlockTxs] + if !ok { + return + } dispatchersMap := make(map[uuid.UUID]data.BlockTxs) @@ -153,6 +165,10 @@ func (ch *commonHub) PublishTxs(blockTxs data.BlockTxs) { // PublishBlockEventsWithOrder will publish block events with order to dispatcher func (ch *commonHub) PublishBlockEventsWithOrder(blockTxs data.BlockEventsWithOrder) { subscriptions := ch.subscriptionMapper.Subscriptions() + _, ok := subscriptions[common.BlockEvents] + if !ok { + return + } dispatchersMap := make(map[uuid.UUID]data.BlockEventsWithOrder) @@ -172,6 +188,10 @@ func (ch *commonHub) PublishBlockEventsWithOrder(blockTxs data.BlockEventsWithOr // PublishScrs will publish scrs events to dispatcher func (ch *commonHub) PublishScrs(blockScrs data.BlockScrs) { subscriptions := ch.subscriptionMapper.Subscriptions() + _, ok := subscriptions[common.BlockScrs] + if !ok { + return + } dispatchersMap := make(map[uuid.UUID]data.BlockScrs) @@ -191,6 +211,10 @@ func (ch *commonHub) PublishScrs(blockScrs data.BlockScrs) { // PublishStateAccesses will publish state accesses to dispatcher func (ch *commonHub) PublishStateAccesses(stateAccesses data.BlockStateAccesses) { subscriptions := ch.subscriptionMapper.Subscriptions() + _, ok := subscriptions[common.BlockStateAccesses] + if !ok { + return + } dispatchersMap := make(map[uuid.UUID]data.BlockStateAccesses) diff --git a/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go b/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go index 9dd8af9c..87a77748 100644 --- a/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go +++ b/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go @@ -107,11 +107,11 @@ func pushEventsRequest(wg *sync.WaitGroup, webServer integrationTests.ObserverCo StateAccess: []*stateChange.StateAccess{ &stateChange.StateAccess{ MainTrieKey: []byte("mainTrieKey1"), - MainTrieVal: []byte("mainTrieKey1"), + MainTrieVal: []byte("mainTrieVal1"), }, &stateChange.StateAccess{ MainTrieKey: []byte("mainTrieKey2"), - MainTrieVal: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal2"), }, }, } diff --git a/rabbitmq/publisher.go b/rabbitmq/publisher.go index 49169d4d..7bdddff6 100644 --- a/rabbitmq/publisher.go +++ b/rabbitmq/publisher.go @@ -1,6 +1,8 @@ package rabbitmq import ( + "fmt" + "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-core-go/marshal" logger "github.com/multiversx/mx-chain-logger-go" @@ -59,46 +61,46 @@ func checkArgs(args ArgsRabbitMqPublisher) error { } if args.Config.EventsExchange.Name == "" { - return ErrInvalidRabbitMqExchangeName + return fmt.Errorf("%w for %s", ErrInvalidRabbitMqExchangeName, args.Config.EventsExchange.Name) } if args.Config.EventsExchange.Type == "" { - return ErrInvalidRabbitMqExchangeType + return fmt.Errorf("%w for %s", ErrInvalidRabbitMqExchangeType, args.Config.EventsExchange.Type) } if args.Config.RevertEventsExchange.Name == "" { - return ErrInvalidRabbitMqExchangeName + return fmt.Errorf("%w for %s", ErrInvalidRabbitMqExchangeName, args.Config.RevertEventsExchange.Name) } if args.Config.RevertEventsExchange.Type == "" { - return ErrInvalidRabbitMqExchangeType + return fmt.Errorf("%w for %s", ErrInvalidRabbitMqExchangeType, args.Config.RevertEventsExchange.Type) } if args.Config.FinalizedEventsExchange.Name == "" { - return ErrInvalidRabbitMqExchangeName + return fmt.Errorf("%w for %s", ErrInvalidRabbitMqExchangeName, args.Config.FinalizedEventsExchange.Name) } if args.Config.FinalizedEventsExchange.Type == "" { - return ErrInvalidRabbitMqExchangeType + return fmt.Errorf("%w for %s", ErrInvalidRabbitMqExchangeType, args.Config.FinalizedEventsExchange.Type) } if args.Config.BlockTxsExchange.Name == "" { - return ErrInvalidRabbitMqExchangeName + return fmt.Errorf("%w for %s", ErrInvalidRabbitMqExchangeName, args.Config.BlockTxsExchange.Name) } if args.Config.BlockTxsExchange.Type == "" { - return ErrInvalidRabbitMqExchangeType + return fmt.Errorf("%w for %s", ErrInvalidRabbitMqExchangeType, args.Config.BlockTxsExchange.Type) } if args.Config.BlockScrsExchange.Name == "" { - return ErrInvalidRabbitMqExchangeName + return fmt.Errorf("%w for %s", ErrInvalidRabbitMqExchangeName, args.Config.BlockScrsExchange.Name) } if args.Config.BlockScrsExchange.Type == "" { - return ErrInvalidRabbitMqExchangeType + return fmt.Errorf("%w for %s", ErrInvalidRabbitMqExchangeType, args.Config.BlockScrsExchange.Type) } if args.Config.BlockEventsExchange.Name == "" { - return ErrInvalidRabbitMqExchangeName + return fmt.Errorf("%w for %s", ErrInvalidRabbitMqExchangeName, args.Config.BlockEventsExchange.Name) } if args.Config.BlockEventsExchange.Type == "" { - return ErrInvalidRabbitMqExchangeType + return fmt.Errorf("%w for %s", ErrInvalidRabbitMqExchangeType, args.Config.BlockEventsExchange.Type) } if args.Config.StateAccessesExchange.Name == "" { - return ErrInvalidRabbitMqExchangeName + return fmt.Errorf("%w for %s", ErrInvalidRabbitMqExchangeName, args.Config.StateAccessesExchange.Name) } if args.Config.StateAccessesExchange.Type == "" { - return ErrInvalidRabbitMqExchangeType + return fmt.Errorf("%w for %s", ErrInvalidRabbitMqExchangeType, args.Config.StateAccessesExchange.Type) } return nil From 794a8116a2a9ec62544218d493c224c93ef92df3 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Fri, 22 Aug 2025 11:02:45 +0300 Subject: [PATCH 03/28] fix errors --- rabbitmq/publisher.go | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/rabbitmq/publisher.go b/rabbitmq/publisher.go index 7bdddff6..9157f50f 100644 --- a/rabbitmq/publisher.go +++ b/rabbitmq/publisher.go @@ -61,46 +61,46 @@ func checkArgs(args ArgsRabbitMqPublisher) error { } if args.Config.EventsExchange.Name == "" { - return fmt.Errorf("%w for %s", ErrInvalidRabbitMqExchangeName, args.Config.EventsExchange.Name) + return fmt.Errorf("%w for EventsExchange", ErrInvalidRabbitMqExchangeName) } if args.Config.EventsExchange.Type == "" { - return fmt.Errorf("%w for %s", ErrInvalidRabbitMqExchangeType, args.Config.EventsExchange.Type) + return fmt.Errorf("%w for EventsExchange", ErrInvalidRabbitMqExchangeType) } if args.Config.RevertEventsExchange.Name == "" { - return fmt.Errorf("%w for %s", ErrInvalidRabbitMqExchangeName, args.Config.RevertEventsExchange.Name) + return fmt.Errorf("%w for RevertEventsExchange", ErrInvalidRabbitMqExchangeName) } if args.Config.RevertEventsExchange.Type == "" { - return fmt.Errorf("%w for %s", ErrInvalidRabbitMqExchangeType, args.Config.RevertEventsExchange.Type) + return fmt.Errorf("%w for RevertEventsExchange", ErrInvalidRabbitMqExchangeType) } if args.Config.FinalizedEventsExchange.Name == "" { - return fmt.Errorf("%w for %s", ErrInvalidRabbitMqExchangeName, args.Config.FinalizedEventsExchange.Name) + return fmt.Errorf("%w for FinalizedEventsExchange", ErrInvalidRabbitMqExchangeName) } if args.Config.FinalizedEventsExchange.Type == "" { - return fmt.Errorf("%w for %s", ErrInvalidRabbitMqExchangeType, args.Config.FinalizedEventsExchange.Type) + return fmt.Errorf("%w for FinalizedEventsExchange", ErrInvalidRabbitMqExchangeType) } if args.Config.BlockTxsExchange.Name == "" { - return fmt.Errorf("%w for %s", ErrInvalidRabbitMqExchangeName, args.Config.BlockTxsExchange.Name) + return fmt.Errorf("%w for BlockTxsExchange", ErrInvalidRabbitMqExchangeName) } if args.Config.BlockTxsExchange.Type == "" { - return fmt.Errorf("%w for %s", ErrInvalidRabbitMqExchangeType, args.Config.BlockTxsExchange.Type) + return fmt.Errorf("%w for BlockTxsExchange", ErrInvalidRabbitMqExchangeType) } if args.Config.BlockScrsExchange.Name == "" { - return fmt.Errorf("%w for %s", ErrInvalidRabbitMqExchangeName, args.Config.BlockScrsExchange.Name) + return fmt.Errorf("%w for BlockScrsExchange", ErrInvalidRabbitMqExchangeName) } if args.Config.BlockScrsExchange.Type == "" { - return fmt.Errorf("%w for %s", ErrInvalidRabbitMqExchangeType, args.Config.BlockScrsExchange.Type) + return fmt.Errorf("%w for BlockScrsExchange", ErrInvalidRabbitMqExchangeType) } if args.Config.BlockEventsExchange.Name == "" { - return fmt.Errorf("%w for %s", ErrInvalidRabbitMqExchangeName, args.Config.BlockEventsExchange.Name) + return fmt.Errorf("%w for BlockEventsExchange", ErrInvalidRabbitMqExchangeName) } if args.Config.BlockEventsExchange.Type == "" { - return fmt.Errorf("%w for %s", ErrInvalidRabbitMqExchangeType, args.Config.BlockEventsExchange.Type) + return fmt.Errorf("%w for BlockEventsExchange", ErrInvalidRabbitMqExchangeType) } if args.Config.StateAccessesExchange.Name == "" { - return fmt.Errorf("%w for %s", ErrInvalidRabbitMqExchangeName, args.Config.StateAccessesExchange.Name) + return fmt.Errorf("%w for StateAccessesExchange", ErrInvalidRabbitMqExchangeName) } if args.Config.StateAccessesExchange.Type == "" { - return fmt.Errorf("%w for %s", ErrInvalidRabbitMqExchangeType, args.Config.StateAccessesExchange.Type) + return fmt.Errorf("%w for StateAccessesExchange", ErrInvalidRabbitMqExchangeType) } return nil From 97aa452f71bf4c88df9ae7a336a63074c2e39535 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Fri, 22 Aug 2025 13:41:13 +0300 Subject: [PATCH 04/28] restructure state accesses per accounts --- data/block.go | 18 +-- data/outport.go | 4 +- .../rabbitmq/testNotifierWithRabbitMQ_test.go | 1 + .../testNotifierWithWebsockets_test.go | 38 +++++++ process/errors.go | 3 + process/eventsHandler.go | 4 +- process/eventsInterceptor.go | 105 ++++++++++++++++-- process/export_test.go | 6 + 8 files changed, 158 insertions(+), 21 deletions(-) diff --git a/data/block.go b/data/block.go index 2ae362d9..cf26ebe6 100644 --- a/data/block.go +++ b/data/block.go @@ -23,15 +23,15 @@ type SaveBlockData struct { // InterceptorBlockData holds the block data needed for processing type InterceptorBlockData struct { - Hash string - Body nodeData.BodyHandler - Header nodeData.HeaderHandler - Txs map[string]*transaction.Transaction - TxsWithOrder map[string]*outport.TxInfo - Scrs map[string]*smartContractResult.SmartContractResult - ScrsWithOrder map[string]*outport.SCRInfo - LogEvents []Event - StateAccesses map[string]*stateChange.StateAccesses + Hash string + Body nodeData.BodyHandler + Header nodeData.HeaderHandler + Txs map[string]*transaction.Transaction + TxsWithOrder map[string]*outport.TxInfo + Scrs map[string]*smartContractResult.SmartContractResult + ScrsWithOrder map[string]*outport.SCRInfo + LogEvents []Event + StateAccessesPerAccounts map[string]*stateChange.StateAccesses } // ArgsSaveBlockData holds the block data that will be received on push events diff --git a/data/outport.go b/data/outport.go index c24a64ff..e7b33504 100644 --- a/data/outport.go +++ b/data/outport.go @@ -76,8 +76,8 @@ type BlockEventsWithOrder struct { // BlockStateAccesses holds the block state accesses type BlockStateAccesses struct { - Hash string `json:"hash"` - StateAccesses map[string]*stateChange.StateAccesses `json:"stateAccesses"` + Hash string `json:"hash"` + StateAccessesPerAccounts map[string]*stateChange.StateAccesses `json:"stateAccessesPerAccounts"` } // NotifierTransaction defines a wrapper over transaction diff --git a/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go b/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go index 87a77748..45111b7d 100644 --- a/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go +++ b/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go @@ -130,6 +130,7 @@ func pushEventsRequest(wg *sync.WaitGroup, webServer integrationTests.ObserverCo }, TransactionPool: txPool, HeaderGasConsumption: &outport.HeaderGasConsumption{}, + StateAccesses: stateAccesses, } err := webServer.PushEventsRequest(saveBlockData) diff --git a/integrationTests/websocket/testNotifierWithWebsockets_test.go b/integrationTests/websocket/testNotifierWithWebsockets_test.go index efafd13f..3c558792 100644 --- a/integrationTests/websocket/testNotifierWithWebsockets_test.go +++ b/integrationTests/websocket/testNotifierWithWebsockets_test.go @@ -11,6 +11,7 @@ import ( "github.com/multiversx/mx-chain-core-go/data/block" "github.com/multiversx/mx-chain-core-go/data/outport" "github.com/multiversx/mx-chain-core-go/data/smartContractResult" + "github.com/multiversx/mx-chain-core-go/data/stateChange" "github.com/multiversx/mx-chain-core-go/data/transaction" "github.com/multiversx/mx-chain-notifier-go/common" "github.com/multiversx/mx-chain-notifier-go/data" @@ -61,6 +62,22 @@ func TestNotifierWithWebsockets_PushEvents(t *testing.T) { }, } headerBytes, _ := json.Marshal(header) + + stateAccesses := make(map[string]*stateChange.StateAccesses) + stateAccesses["txHash1"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + MainTrieKey: []byte("mainTrieKey1"), + MainTrieVal: []byte("mainTrieVal1"), + }, + &stateChange.StateAccess{ + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal2"), + }, + }, + } + stateAccesses["txHash2"] = &stateChange.StateAccesses{} + saveBlockData := &outport.OutportBlock{ TransactionPool: &outport.TransactionPool{ Logs: []*outport.LogData{ @@ -85,6 +102,7 @@ func TestNotifierWithWebsockets_PushEvents(t *testing.T) { }, }, HeaderGasConsumption: &outport.HeaderGasConsumption{}, + StateAccesses: stateAccesses, } wg := &sync.WaitGroup{} @@ -156,6 +174,10 @@ func TestNotifierWithWebsockets_BlockEvents(t *testing.T) { }, } headerBytes, _ := json.Marshal(header) + + stateAccesses := make(map[string]*stateChange.StateAccesses) + stateAccesses["txHash1"] = &stateChange.StateAccesses{} + saveBlockData := &outport.OutportBlock{ TransactionPool: &outport.TransactionPool{ Logs: []*outport.LogData{ @@ -181,6 +203,7 @@ func TestNotifierWithWebsockets_BlockEvents(t *testing.T) { TimestampMs: 1234000, }, HeaderGasConsumption: &outport.HeaderGasConsumption{}, + StateAccesses: stateAccesses, } wg := &sync.WaitGroup{} @@ -355,6 +378,10 @@ func TestNotifierWithWebsockets_TxsEvents(t *testing.T) { }, } headerBytes, _ := json.Marshal(header) + + stateAccesses := make(map[string]*stateChange.StateAccesses) + stateAccesses["txHash1"] = &stateChange.StateAccesses{} + saveBlockData := &outport.OutportBlock{ TransactionPool: &outport.TransactionPool{ Transactions: txs, @@ -368,6 +395,7 @@ func TestNotifierWithWebsockets_TxsEvents(t *testing.T) { }, }, HeaderGasConsumption: &outport.HeaderGasConsumption{}, + StateAccesses: stateAccesses, } expTxs := map[string]*transaction.Transaction{ @@ -438,6 +466,10 @@ func TestNotifierWithWebsockets_ScrsEvents(t *testing.T) { }, } headerBytes, _ := json.Marshal(header) + + stateAccesses := make(map[string]*stateChange.StateAccesses) + stateAccesses["txHash1"] = &stateChange.StateAccesses{} + blockEvents := &outport.OutportBlock{ TransactionPool: &outport.TransactionPool{ SmartContractResults: scrs, @@ -451,6 +483,7 @@ func TestNotifierWithWebsockets_ScrsEvents(t *testing.T) { }, }, HeaderGasConsumption: &outport.HeaderGasConsumption{}, + StateAccesses: stateAccesses, } expScrs := map[string]*smartContractResult.SmartContractResult{ @@ -637,6 +670,10 @@ func testNotifierWithWebsockets_AllEvents(t *testing.T, observerType string) { }, } headerBytes, _ = json.Marshal(header) + + stateAccesses := make(map[string]*stateChange.StateAccesses) + stateAccesses["txHash1"] = &stateChange.StateAccesses{} + blockEvents := &outport.OutportBlock{ TransactionPool: &outport.TransactionPool{ Transactions: txs, @@ -665,6 +702,7 @@ func testNotifierWithWebsockets_AllEvents(t *testing.T, observerType string) { TimestampMs: 1234000, }, HeaderGasConsumption: &outport.HeaderGasConsumption{}, + StateAccesses: stateAccesses, } numEvents := 6 diff --git a/process/errors.go b/process/errors.go index 51ed1751..27df101e 100644 --- a/process/errors.go +++ b/process/errors.go @@ -31,3 +31,6 @@ var ErrNilPublisherHandler = errors.New("nil publisher handler provided") // ErrNilEventsInterceptor signals that a nil events interceptor was provided var ErrNilEventsInterceptor = errors.New("nil events interceptor") + +// ErrNilStateAccesses signals that a nil state accesses has been provided +var ErrNilStateAccesses = errors.New("nil state accessess provided") diff --git a/process/eventsHandler.go b/process/eventsHandler.go index f0a8d9c4..10018524 100644 --- a/process/eventsHandler.go +++ b/process/eventsHandler.go @@ -131,8 +131,8 @@ func (eh *eventsHandler) HandleSaveBlockEvents(allEvents data.ArgsSaveBlockData) eh.handleBlockEventsWithOrder(txsWithOrder) stateAccesses := data.BlockStateAccesses{ - Hash: eventsData.Hash, - StateAccesses: eventsData.StateAccesses, + Hash: eventsData.Hash, + StateAccessesPerAccounts: eventsData.StateAccessesPerAccounts, } eh.handleStateAccesses(stateAccesses) diff --git a/process/eventsInterceptor.go b/process/eventsInterceptor.go index d85f55d0..5ce054c2 100644 --- a/process/eventsInterceptor.go +++ b/process/eventsInterceptor.go @@ -2,16 +2,23 @@ package process import ( "encoding/hex" + "sort" "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/core/check" nodeData "github.com/multiversx/mx-chain-core-go/data" "github.com/multiversx/mx-chain-core-go/data/outport" "github.com/multiversx/mx-chain-core-go/data/smartContractResult" + "github.com/multiversx/mx-chain-core-go/data/stateChange" "github.com/multiversx/mx-chain-core-go/data/transaction" "github.com/multiversx/mx-chain-notifier-go/data" ) +type txWithOrder struct { + hash string + index uint32 +} + // logEvent defines a log event associated with corresponding tx hash type logEvent struct { EventHandler nodeData.EventHandler @@ -52,6 +59,9 @@ func (ei *eventsInterceptor) ProcessBlockEvents(eventsData *data.ArgsSaveBlockDa if eventsData.Header == nil { return nil, ErrNilBlockHeader } + if eventsData.StateAccesses == nil { + return nil, ErrNilStateAccesses + } events := ei.getLogEventsFromTransactionsPool(eventsData.TransactionsPool.Logs) @@ -67,18 +77,97 @@ func (ei *eventsInterceptor) ProcessBlockEvents(eventsData *data.ArgsSaveBlockDa } scrsWithOrder := eventsData.TransactionsPool.SmartContractResults + stateAccessesPerAccounts := ei.getStateAccessesPerAccounts(eventsData) + return &data.InterceptorBlockData{ - Hash: hex.EncodeToString(eventsData.HeaderHash), - Body: eventsData.Body, - Header: eventsData.Header, - Txs: txs, - TxsWithOrder: txsWithOrder, - Scrs: scrs, - ScrsWithOrder: scrsWithOrder, - LogEvents: events, + Hash: hex.EncodeToString(eventsData.HeaderHash), + Body: eventsData.Body, + Header: eventsData.Header, + Txs: txs, + TxsWithOrder: txsWithOrder, + Scrs: scrs, + ScrsWithOrder: scrsWithOrder, + LogEvents: events, + StateAccessesPerAccounts: stateAccessesPerAccounts, }, nil } +func getTxsWithOrder(transactionsPool *outport.TransactionPool) []txWithOrder { + txsWithOrder := make([]txWithOrder, 0) + + for txHash, txInfo := range transactionsPool.Transactions { + txsWithOrder = append(txsWithOrder, txWithOrder{ + hash: txHash, + index: txInfo.ExecutionOrder, + }) + } + for txHash, txInfo := range transactionsPool.SmartContractResults { + txsWithOrder = append(txsWithOrder, txWithOrder{ + hash: txHash, + index: txInfo.ExecutionOrder, + }) + } + for txHash, txInfo := range transactionsPool.Rewards { + txsWithOrder = append(txsWithOrder, txWithOrder{ + hash: txHash, + index: txInfo.ExecutionOrder, + }) + } + for txHash, txInfo := range transactionsPool.InvalidTxs { + txsWithOrder = append(txsWithOrder, txWithOrder{ + hash: txHash, + index: txInfo.ExecutionOrder, + }) + } + + sort.Slice(txsWithOrder, func(i, j int) bool { + return txsWithOrder[i].index < txsWithOrder[j].index + }) + + return txsWithOrder +} + +func (ei *eventsInterceptor) getStateAccessesPerAccounts(eventsData *data.ArgsSaveBlockData) map[string]*stateChange.StateAccesses { + stateAccessesPerTxs := eventsData.StateAccesses + + // txs hashes with order + txsWithOrder := getTxsWithOrder(eventsData.TransactionsPool) + + stateAccessesPerAccounts := make(map[string]*stateChange.StateAccesses) + for _, txInfo := range txsWithOrder { + stateAccessesPerTx, ok := stateAccessesPerTxs[txInfo.hash] + if !ok { + log.Warn("did not find state accesses for tx", "txHash", txInfo.hash) + continue + } + + for _, stateAccess := range stateAccessesPerTx.StateAccess { + if stateAccess.Type == stateChange.Read { + // TODO: add a flag here to allow read state accessess + continue + } + + if stateAccess.Operation == stateChange.WriteCode { + // TODO: handle code update operations + continue + } + + accKey := hex.EncodeToString(stateAccess.MainTrieKey) + acc, ok := stateAccessesPerAccounts[accKey] + if !ok { + acc = &stateChange.StateAccesses{ + StateAccess: make([]*stateChange.StateAccess, 0), + } + stateAccessesPerAccounts[accKey] = acc + } + + stateAccessesPerAccounts[accKey].StateAccess = append(stateAccessesPerAccounts[accKey].StateAccess, stateAccess) + } + } + + return stateAccessesPerAccounts +} + func (ei *eventsInterceptor) getLogEventsFromTransactionsPool(logs []*outport.LogData) []data.Event { var logEvents []*logEvent for _, logData := range logs { diff --git a/process/export_test.go b/process/export_test.go index 8e107de4..fa032b33 100644 --- a/process/export_test.go +++ b/process/export_test.go @@ -2,6 +2,7 @@ package process import ( "github.com/multiversx/mx-chain-core-go/data/outport" + "github.com/multiversx/mx-chain-core-go/data/stateChange" "github.com/multiversx/mx-chain-notifier-go/data" ) @@ -39,3 +40,8 @@ func (eh *eventsHandler) ShouldProcessSaveBlockEvents(blockHash string) bool { func (ei *eventsInterceptor) GetLogEventsFromTransactionsPool(logs []*outport.LogData) []data.Event { return ei.getLogEventsFromTransactionsPool(logs) } + +// GetStateAccessesPerAccounts - +func (ei *eventsInterceptor) GetStateAccessesPerAccounts(eventsData *data.ArgsSaveBlockData) map[string]*stateChange.StateAccesses { + return ei.getStateAccessesPerAccounts(eventsData) +} From b33326df72013af2983589e9a6e76be94f46215d Mon Sep 17 00:00:00 2001 From: ssd04 Date: Fri, 22 Aug 2025 13:41:24 +0300 Subject: [PATCH 05/28] add unit tests --- process/eventsInterceptor_test.go | 259 ++++++++++++++++++++++++++++++ 1 file changed, 259 insertions(+) diff --git a/process/eventsInterceptor_test.go b/process/eventsInterceptor_test.go index 61bb504e..33aa1cc6 100644 --- a/process/eventsInterceptor_test.go +++ b/process/eventsInterceptor_test.go @@ -8,6 +8,7 @@ import ( "github.com/multiversx/mx-chain-core-go/data/block" "github.com/multiversx/mx-chain-core-go/data/outport" "github.com/multiversx/mx-chain-core-go/data/smartContractResult" + "github.com/multiversx/mx-chain-core-go/data/stateChange" "github.com/multiversx/mx-chain-core-go/data/transaction" "github.com/multiversx/mx-chain-notifier-go/data" "github.com/multiversx/mx-chain-notifier-go/mocks" @@ -101,6 +102,23 @@ func TestProcessBlockEvents(t *testing.T) { require.Equal(t, process.ErrNilBlockHeader, err) }) + t.Run("nil state accesses", func(t *testing.T) { + t.Parallel() + + eventsInterceptor, _ := process.NewEventsInterceptor(createMockEventsInterceptorArgs()) + + eventsData := &data.ArgsSaveBlockData{ + HeaderHash: []byte("headerHash"), + TransactionsPool: &outport.TransactionPool{}, + Body: &block.Body{}, + Header: &block.HeaderV2{}, + StateAccesses: nil, + } + events, err := eventsInterceptor.ProcessBlockEvents(eventsData) + require.Nil(t, events) + require.Equal(t, process.ErrNilStateAccesses, err) + }) + t.Run("should work", func(t *testing.T) { t.Parallel() @@ -157,6 +175,7 @@ func TestProcessBlockEvents(t *testing.T) { SmartContractResults: scrs, Logs: logs, }, + StateAccesses: make(map[string]*stateChange.StateAccesses), } expTxs := map[string]*transaction.Transaction{ @@ -202,6 +221,7 @@ func TestProcessBlockEvents(t *testing.T) { Topics: make([][]byte, 0), }, }, + StateAccessesPerAccounts: make(map[string]*stateChange.StateAccesses), } events, err := eventsInterceptor.ProcessBlockEvents(&blockEvents) @@ -250,6 +270,7 @@ func TestProcessBlockEvents(t *testing.T) { TransactionsPool: &outport.TransactionPool{ Logs: logs, }, + StateAccesses: make(map[string]*stateChange.StateAccesses), } expEvents := &data.InterceptorBlockData{ @@ -266,6 +287,7 @@ func TestProcessBlockEvents(t *testing.T) { Topics: make([][]byte, 0), }, }, + StateAccessesPerAccounts: make(map[string]*stateChange.StateAccesses), } events, err := eventsInterceptor.ProcessBlockEvents(&blockEvents) @@ -331,3 +353,240 @@ func TestGetLogEventsFromTransactionsPool(t *testing.T) { require.Equal(t, txHash1, receivedEvents[1].TxHash) require.Equal(t, txHash2, receivedEvents[2].TxHash) } + +func TestEventsInterceptor_GetStateAccessesPerAccounts(t *testing.T) { + t.Parallel() + + args := createMockEventsInterceptorArgs() + en, _ := process.NewEventsInterceptor(args) + + txs := map[string]*outport.TxInfo{ + "txHash1": { + Transaction: &transaction.Transaction{ + Nonce: 2, + }, + ExecutionOrder: 1, + }, + } + scrs := map[string]*outport.SCRInfo{ + "txHash2": { + SmartContractResult: &smartContractResult.SmartContractResult{ + Nonce: 3, + }, + ExecutionOrder: 2, + }, + } + invalidTxs := map[string]*outport.TxInfo{ + "txHash0": { + Transaction: &transaction.Transaction{ + Nonce: 1, + }, + ExecutionOrder: 0, + }, + } + + blockHash := []byte("blockHash") + + t.Run("with write operations", func(t *testing.T) { + t.Parallel() + + stateAccesses := make(map[string]*stateChange.StateAccesses) + stateAccesses["txHash1"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey1"), + MainTrieVal: []byte("mainTrieVal1"), + }, + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal2"), + }, + }, + } + stateAccesses["txHash2"] = &stateChange.StateAccesses{} + stateAccesses["txHash0"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey3"), + MainTrieVal: []byte("mainTrieVal3"), + }, + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal4"), + }, + }, + } + + blockEvents := &data.ArgsSaveBlockData{ + HeaderHash: blockHash, + TransactionsPool: &outport.TransactionPool{ + Transactions: txs, + SmartContractResults: scrs, + InvalidTxs: invalidTxs, + }, + StateAccesses: stateAccesses, + } + + expStateAccessesPerAccounts := make(map[string]*stateChange.StateAccesses) + expStateAccessesPerAccounts[hex.EncodeToString([]byte("mainTrieKey1"))] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey1"), + MainTrieVal: []byte("mainTrieVal1"), + }, + }, + } + expStateAccessesPerAccounts[hex.EncodeToString([]byte("mainTrieKey2"))] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal4"), + }, + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal2"), + }, + }, + } + expStateAccessesPerAccounts[hex.EncodeToString([]byte("mainTrieKey3"))] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey3"), + MainTrieVal: []byte("mainTrieVal3"), + }, + }, + } + + stateAccessesPerAccounts := en.GetStateAccessesPerAccounts(blockEvents) + + require.Equal(t, expStateAccessesPerAccounts, stateAccessesPerAccounts) + }) + + t.Run("with read operations", func(t *testing.T) { + t.Parallel() + + stateAccesses := make(map[string]*stateChange.StateAccesses) + stateAccesses["txHash1"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Read, + MainTrieKey: []byte("mainTrieKey1"), + MainTrieVal: []byte("mainTrieVal1"), + }, + &stateChange.StateAccess{ + Type: stateChange.Read, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal2"), + }, + }, + } + stateAccesses["txHash2"] = &stateChange.StateAccesses{} + stateAccesses["txHash0"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Read, + MainTrieKey: []byte("mainTrieKey3"), + MainTrieVal: []byte("mainTrieVal3"), + }, + &stateChange.StateAccess{ + Type: stateChange.Read, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal4"), + }, + }, + } + + blockEvents := &data.ArgsSaveBlockData{ + HeaderHash: blockHash, + TransactionsPool: &outport.TransactionPool{ + Transactions: txs, + SmartContractResults: scrs, + InvalidTxs: invalidTxs, + }, + StateAccesses: stateAccesses, + } + + expStateAccessesPerAccounts := make(map[string]*stateChange.StateAccesses) + + stateAccessesPerAccounts := en.GetStateAccessesPerAccounts(blockEvents) + + require.Equal(t, expStateAccessesPerAccounts, stateAccessesPerAccounts) + }) + + t.Run("with read and write operations", func(t *testing.T) { + t.Parallel() + + stateAccesses := make(map[string]*stateChange.StateAccesses) + stateAccesses["txHash1"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Read, + MainTrieKey: []byte("mainTrieKey1"), + MainTrieVal: []byte("mainTrieVal1"), + }, + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal2"), + }, + }, + } + stateAccesses["txHash2"] = &stateChange.StateAccesses{} + stateAccesses["txHash0"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey3"), + MainTrieVal: []byte("mainTrieVal3"), + }, + &stateChange.StateAccess{ + Type: stateChange.Read, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal4"), + }, + }, + } + + blockEvents := &data.ArgsSaveBlockData{ + HeaderHash: blockHash, + TransactionsPool: &outport.TransactionPool{ + Transactions: txs, + SmartContractResults: scrs, + InvalidTxs: invalidTxs, + }, + StateAccesses: stateAccesses, + } + + expStateAccessesPerAccounts := make(map[string]*stateChange.StateAccesses) + expStateAccessesPerAccounts[hex.EncodeToString([]byte("mainTrieKey2"))] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal2"), + }, + }, + } + expStateAccessesPerAccounts[hex.EncodeToString([]byte("mainTrieKey3"))] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey3"), + MainTrieVal: []byte("mainTrieVal3"), + }, + }, + } + + stateAccessesPerAccounts := en.GetStateAccessesPerAccounts(blockEvents) + + require.Equal(t, expStateAccessesPerAccounts, stateAccessesPerAccounts) + }) +} From e5701f544a2882de62239a836177251fa0355263 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Fri, 22 Aug 2025 13:46:38 +0300 Subject: [PATCH 06/28] fix typos --- process/errors.go | 2 +- process/eventsInterceptor.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/process/errors.go b/process/errors.go index 27df101e..1c1739dd 100644 --- a/process/errors.go +++ b/process/errors.go @@ -33,4 +33,4 @@ var ErrNilPublisherHandler = errors.New("nil publisher handler provided") var ErrNilEventsInterceptor = errors.New("nil events interceptor") // ErrNilStateAccesses signals that a nil state accesses has been provided -var ErrNilStateAccesses = errors.New("nil state accessess provided") +var ErrNilStateAccesses = errors.New("nil state accesses provided") diff --git a/process/eventsInterceptor.go b/process/eventsInterceptor.go index 5ce054c2..ee60f56a 100644 --- a/process/eventsInterceptor.go +++ b/process/eventsInterceptor.go @@ -143,7 +143,7 @@ func (ei *eventsInterceptor) getStateAccessesPerAccounts(eventsData *data.ArgsSa for _, stateAccess := range stateAccessesPerTx.StateAccess { if stateAccess.Type == stateChange.Read { - // TODO: add a flag here to allow read state accessess + // TODO: add a flag here to allow read state accesses continue } From faa012318419aa05cd93435e48a2025e1e6492ca Mon Sep 17 00:00:00 2001 From: ssd04 Date: Fri, 22 Aug 2025 15:24:28 +0300 Subject: [PATCH 07/28] update todo comment for code write operation --- process/eventsInterceptor.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/process/eventsInterceptor.go b/process/eventsInterceptor.go index ee60f56a..abfe574b 100644 --- a/process/eventsInterceptor.go +++ b/process/eventsInterceptor.go @@ -147,10 +147,11 @@ func (ei *eventsInterceptor) getStateAccessesPerAccounts(eventsData *data.ArgsSa continue } - if stateAccess.Operation == stateChange.WriteCode { - // TODO: handle code update operations - continue - } + // TODO: make sure code update operations are handled properly + // at the moment they are handled as a separate entry + // if stateAccess.Operation == stateChange.WriteCode { + // continue + // } accKey := hex.EncodeToString(stateAccess.MainTrieKey) acc, ok := stateAccessesPerAccounts[accKey] From 627e5a53a5d8c160aec73d8dbf1d67e9d105d5bd Mon Sep 17 00:00:00 2001 From: ssd04 Date: Fri, 22 Aug 2025 15:33:29 +0300 Subject: [PATCH 08/28] extend integration test timeout --- integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go b/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go index 45111b7d..eeb166f8 100644 --- a/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go +++ b/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go @@ -54,7 +54,7 @@ func testNotifierWithRabbitMQ(t *testing.T, observerType string, payloadVersion go pushEventsRequest(wg, client) go pushRevertRequest(wg, client) - integrationTests.WaitTimeout(t, wg, time.Second*2) + integrationTests.WaitTimeout(t, wg, time.Second*5) assert.Equal(t, 3, len(notifier.RedisClient.GetEntries())) assert.Equal(t, 7, len(notifier.RabbitMQClient.GetEntries())) From 65efec067a80505d3816481fd4edf59061c5f2f7 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Mon, 25 Aug 2025 15:53:51 +0300 Subject: [PATCH 09/28] proper handling for nil state accesses --- process/eventsInterceptor.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/process/eventsInterceptor.go b/process/eventsInterceptor.go index abfe574b..dccb4dfd 100644 --- a/process/eventsInterceptor.go +++ b/process/eventsInterceptor.go @@ -59,9 +59,6 @@ func (ei *eventsInterceptor) ProcessBlockEvents(eventsData *data.ArgsSaveBlockDa if eventsData.Header == nil { return nil, ErrNilBlockHeader } - if eventsData.StateAccesses == nil { - return nil, ErrNilStateAccesses - } events := ei.getLogEventsFromTransactionsPool(eventsData.TransactionsPool.Logs) @@ -128,6 +125,15 @@ func getTxsWithOrder(transactionsPool *outport.TransactionPool) []txWithOrder { } func (ei *eventsInterceptor) getStateAccessesPerAccounts(eventsData *data.ArgsSaveBlockData) map[string]*stateChange.StateAccesses { + + if eventsData.StateAccesses == nil { + log.Warn("getStateAccessesPerAccounts failed: will return empty state accesses per accounts", + "error", ErrNilStateAccesses, + ) + + return make(map[string]*stateChange.StateAccesses) + } + stateAccessesPerTxs := eventsData.StateAccesses // txs hashes with order From 4af31436f587072bc03c0bedafc57cfc7e38a378 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Tue, 26 Aug 2025 12:10:36 +0300 Subject: [PATCH 10/28] add more logging --- process/eventsInterceptor.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/process/eventsInterceptor.go b/process/eventsInterceptor.go index dccb4dfd..d086db88 100644 --- a/process/eventsInterceptor.go +++ b/process/eventsInterceptor.go @@ -136,6 +136,10 @@ func (ei *eventsInterceptor) getStateAccessesPerAccounts(eventsData *data.ArgsSa stateAccessesPerTxs := eventsData.StateAccesses + log.Debug("getStateAccessesPerAccounts", + "num stateAccessesPerTxs", len(stateAccessesPerTxs), + ) + // txs hashes with order txsWithOrder := getTxsWithOrder(eventsData.TransactionsPool) @@ -172,6 +176,10 @@ func (ei *eventsInterceptor) getStateAccessesPerAccounts(eventsData *data.ArgsSa } } + log.Debug("getStateAccessesPerAccounts", + "num stateAccessesPerAccounts", len(stateAccessesPerAccounts), + ) + return stateAccessesPerAccounts } From ab1a0bd5cdf2f7124698a1deb1f6fdded0952431 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Tue, 26 Aug 2025 12:45:00 +0300 Subject: [PATCH 11/28] decode tx hash from tx info --- process/eventsInterceptor.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/process/eventsInterceptor.go b/process/eventsInterceptor.go index d086db88..2479b0fd 100644 --- a/process/eventsInterceptor.go +++ b/process/eventsInterceptor.go @@ -145,7 +145,13 @@ func (ei *eventsInterceptor) getStateAccessesPerAccounts(eventsData *data.ArgsSa stateAccessesPerAccounts := make(map[string]*stateChange.StateAccesses) for _, txInfo := range txsWithOrder { - stateAccessesPerTx, ok := stateAccessesPerTxs[txInfo.hash] + txHash, err := hex.DecodeString(txInfo.hash) + if err != nil { + log.Error("failed to decode tx hash", "txHash", txInfo.hash) + continue + } + + stateAccessesPerTx, ok := stateAccessesPerTxs[string(txHash)] if !ok { log.Warn("did not find state accesses for tx", "txHash", txInfo.hash) continue From d1983161397300616bbdacef05ca5192d73f77a6 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Wed, 27 Aug 2025 17:21:48 +0300 Subject: [PATCH 12/28] fix unit tests --- process/eventsInterceptor.go | 26 +++++++++++++++++++++----- process/eventsInterceptor_test.go | 25 +++++++++++++++++++------ 2 files changed, 40 insertions(+), 11 deletions(-) diff --git a/process/eventsInterceptor.go b/process/eventsInterceptor.go index 2479b0fd..41b147c9 100644 --- a/process/eventsInterceptor.go +++ b/process/eventsInterceptor.go @@ -125,9 +125,9 @@ func getTxsWithOrder(transactionsPool *outport.TransactionPool) []txWithOrder { } func (ei *eventsInterceptor) getStateAccessesPerAccounts(eventsData *data.ArgsSaveBlockData) map[string]*stateChange.StateAccesses { - if eventsData.StateAccesses == nil { log.Warn("getStateAccessesPerAccounts failed: will return empty state accesses per accounts", + "block hash", eventsData.HeaderHash, "error", ErrNilStateAccesses, ) @@ -136,9 +136,7 @@ func (ei *eventsInterceptor) getStateAccessesPerAccounts(eventsData *data.ArgsSa stateAccessesPerTxs := eventsData.StateAccesses - log.Debug("getStateAccessesPerAccounts", - "num stateAccessesPerTxs", len(stateAccessesPerTxs), - ) + logStateAccessesPerTxs(stateAccessesPerTxs) // txs hashes with order txsWithOrder := getTxsWithOrder(eventsData.TransactionsPool) @@ -182,13 +180,31 @@ func (ei *eventsInterceptor) getStateAccessesPerAccounts(eventsData *data.ArgsSa } } - log.Debug("getStateAccessesPerAccounts", + log.Trace("getStateAccessesPerAccounts", "num stateAccessesPerAccounts", len(stateAccessesPerAccounts), ) return stateAccessesPerAccounts } +func logStateAccessesPerTxs(stateAccesses map[string]*stateChange.StateAccesses) { + log.Trace("getStateAccessesPerAccounts", + "num stateAccessesPerTxs", len(stateAccesses), + ) + + for txHash, sts := range stateAccesses { + log.Trace("stateAccessesPerTx", + "txHash", txHash, + ) + + for _, st := range sts.StateAccess { + log.Trace("st", + "txHash", st.GetTxHash(), + ) + } + } +} + func (ei *eventsInterceptor) getLogEventsFromTransactionsPool(logs []*outport.LogData) []data.Event { var logEvents []*logEvent for _, logData := range logs { diff --git a/process/eventsInterceptor_test.go b/process/eventsInterceptor_test.go index 33aa1cc6..325809aa 100644 --- a/process/eventsInterceptor_test.go +++ b/process/eventsInterceptor_test.go @@ -102,7 +102,7 @@ func TestProcessBlockEvents(t *testing.T) { require.Equal(t, process.ErrNilBlockHeader, err) }) - t.Run("nil state accesses", func(t *testing.T) { + t.Run("nil state accesses, should return empty map", func(t *testing.T) { t.Parallel() eventsInterceptor, _ := process.NewEventsInterceptor(createMockEventsInterceptorArgs()) @@ -115,8 +115,21 @@ func TestProcessBlockEvents(t *testing.T) { StateAccesses: nil, } events, err := eventsInterceptor.ProcessBlockEvents(eventsData) - require.Nil(t, events) - require.Equal(t, process.ErrNilStateAccesses, err) + require.Nil(t, err) + + expInterceptorData := &data.InterceptorBlockData{ + Hash: hex.EncodeToString([]byte("headerHash")), + Body: &block.Body{}, + Header: &block.HeaderV2{}, + Txs: map[string]*transaction.Transaction{}, + TxsWithOrder: map[string]*outport.TxInfo(nil), + Scrs: map[string]*smartContractResult.SmartContractResult{}, + ScrsWithOrder: map[string]*outport.SCRInfo(nil), + LogEvents: []data.Event{}, + StateAccessesPerAccounts: map[string]*stateChange.StateAccesses{}, + } + + require.Equal(t, expInterceptorData, events) }) t.Run("should work", func(t *testing.T) { @@ -361,7 +374,7 @@ func TestEventsInterceptor_GetStateAccessesPerAccounts(t *testing.T) { en, _ := process.NewEventsInterceptor(args) txs := map[string]*outport.TxInfo{ - "txHash1": { + hex.EncodeToString([]byte("txHash1")): { Transaction: &transaction.Transaction{ Nonce: 2, }, @@ -369,7 +382,7 @@ func TestEventsInterceptor_GetStateAccessesPerAccounts(t *testing.T) { }, } scrs := map[string]*outport.SCRInfo{ - "txHash2": { + hex.EncodeToString([]byte("txHash2")): { SmartContractResult: &smartContractResult.SmartContractResult{ Nonce: 3, }, @@ -377,7 +390,7 @@ func TestEventsInterceptor_GetStateAccessesPerAccounts(t *testing.T) { }, } invalidTxs := map[string]*outport.TxInfo{ - "txHash0": { + hex.EncodeToString([]byte("txHash0")): { Transaction: &transaction.Transaction{ Nonce: 1, }, From 8ba8d95ce5f418feeede2aee0e7baf8a6b52f8d0 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Wed, 27 Aug 2025 17:32:34 +0300 Subject: [PATCH 13/28] wait for components to start in tests - fix flopping test --- .../rabbitmq/testNotifierWithRabbitMQ_test.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go b/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go index eeb166f8..4afcb41b 100644 --- a/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go +++ b/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go @@ -1,6 +1,7 @@ package rabbitmq import ( + "encoding/hex" "encoding/json" "sync" "testing" @@ -40,6 +41,9 @@ func testNotifierWithRabbitMQ(t *testing.T, observerType string, payloadVersion client, err := integrationTests.CreateObserverConnector(notifier.Facade, observerType, common.MessageQueuePublisherType, payloadVersion) require.Nil(t, err) + // wait for components to start + time.Sleep(time.Second * 5) + _ = notifier.Publisher.Run() defer notifier.Publisher.Close() @@ -54,7 +58,7 @@ func testNotifierWithRabbitMQ(t *testing.T, observerType string, payloadVersion go pushEventsRequest(wg, client) go pushRevertRequest(wg, client) - integrationTests.WaitTimeout(t, wg, time.Second*5) + integrationTests.WaitTimeout(t, wg, time.Second*2) assert.Equal(t, 3, len(notifier.RedisClient.GetEntries())) assert.Equal(t, 7, len(notifier.RabbitMQClient.GetEntries())) @@ -70,7 +74,7 @@ func pushEventsRequest(wg *sync.WaitGroup, webServer integrationTests.ObserverCo txPool := &outport.TransactionPool{ Transactions: map[string]*outport.TxInfo{ - "hash1": { + hex.EncodeToString([]byte("hash1")): { Transaction: &transaction.Transaction{ Nonce: 1, }, @@ -81,7 +85,7 @@ func pushEventsRequest(wg *sync.WaitGroup, webServer integrationTests.ObserverCo }, }, SmartContractResults: map[string]*outport.SCRInfo{ - "hash2": { + hex.EncodeToString([]byte("hash2")): { SmartContractResult: &smartContractResult.SmartContractResult{ Nonce: 2, }, From dbade9cd9290714873d547d60a4b45f2c1ad3a3f Mon Sep 17 00:00:00 2001 From: ssd04 Date: Wed, 27 Aug 2025 17:32:53 +0300 Subject: [PATCH 14/28] add state accesses to test data --- testdata/testData.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/testdata/testData.go b/testdata/testData.go index a89c89a1..b7c1397d 100644 --- a/testdata/testData.go +++ b/testdata/testData.go @@ -5,6 +5,7 @@ import ( "github.com/multiversx/mx-chain-core-go/data/block" "github.com/multiversx/mx-chain-core-go/data/outport" "github.com/multiversx/mx-chain-core-go/data/smartContractResult" + "github.com/multiversx/mx-chain-core-go/data/stateChange" "github.com/multiversx/mx-chain-core-go/data/transaction" "github.com/multiversx/mx-chain-core-go/marshal" "github.com/multiversx/mx-chain-notifier-go/common" @@ -115,6 +116,21 @@ func (bd *blockData) OutportBlockV1() *outport.OutportBlock { } headerBytes, _ := bd.marshaller.Marshal(header) + stateAccesses := make(map[string]*stateChange.StateAccesses) + stateAccesses["txHash1"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + MainTrieKey: []byte("mainTrieKey1"), + MainTrieVal: []byte("mainTrieVal1"), + }, + &stateChange.StateAccess{ + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal2"), + }, + }, + } + stateAccesses["txHash2"] = &stateChange.StateAccesses{} + return &outport.OutportBlock{ BlockData: &outport.BlockData{ HeaderBytes: headerBytes, @@ -169,6 +185,7 @@ func (bd *blockData) OutportBlockV1() *outport.OutportBlock { }, }, }, + StateAccesses: stateAccesses, NumberOfShards: 2, } } From afd6941a6bd1047dd88e20dfa768584266de2acf Mon Sep 17 00:00:00 2001 From: ssd04 Date: Thu, 28 Aug 2025 16:10:13 +0300 Subject: [PATCH 15/28] fixes after review --- process/eventsInterceptor.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/process/eventsInterceptor.go b/process/eventsInterceptor.go index 41b147c9..7147b5b6 100644 --- a/process/eventsInterceptor.go +++ b/process/eventsInterceptor.go @@ -11,6 +11,7 @@ import ( "github.com/multiversx/mx-chain-core-go/data/smartContractResult" "github.com/multiversx/mx-chain-core-go/data/stateChange" "github.com/multiversx/mx-chain-core-go/data/transaction" + logger "github.com/multiversx/mx-chain-logger-go" "github.com/multiversx/mx-chain-notifier-go/data" ) @@ -163,17 +164,13 @@ func (ei *eventsInterceptor) getStateAccessesPerAccounts(eventsData *data.ArgsSa // TODO: make sure code update operations are handled properly // at the moment they are handled as a separate entry - // if stateAccess.Operation == stateChange.WriteCode { - // continue - // } accKey := hex.EncodeToString(stateAccess.MainTrieKey) - acc, ok := stateAccessesPerAccounts[accKey] + _, ok := stateAccessesPerAccounts[accKey] if !ok { - acc = &stateChange.StateAccesses{ + stateAccessesPerAccounts[accKey] = &stateChange.StateAccesses{ StateAccess: make([]*stateChange.StateAccess, 0), } - stateAccessesPerAccounts[accKey] = acc } stateAccessesPerAccounts[accKey].StateAccess = append(stateAccessesPerAccounts[accKey].StateAccess, stateAccess) @@ -188,6 +185,10 @@ func (ei *eventsInterceptor) getStateAccessesPerAccounts(eventsData *data.ArgsSa } func logStateAccessesPerTxs(stateAccesses map[string]*stateChange.StateAccesses) { + if log.GetLevel() > logger.LogTrace { + return + } + log.Trace("getStateAccessesPerAccounts", "num stateAccessesPerTxs", len(stateAccesses), ) From f18ab24220bfabcb572f3b0d3e22a767c87af5d5 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Tue, 2 Sep 2025 17:44:09 +0300 Subject: [PATCH 16/28] add more fields for state accesses event --- process/eventsHandler.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/process/eventsHandler.go b/process/eventsHandler.go index 10018524..0c8856bb 100644 --- a/process/eventsHandler.go +++ b/process/eventsHandler.go @@ -132,6 +132,9 @@ func (eh *eventsHandler) HandleSaveBlockEvents(allEvents data.ArgsSaveBlockData) stateAccesses := data.BlockStateAccesses{ Hash: eventsData.Hash, + ShardID: eventsData.Header.GetShardID(), + TimeStampMs: headerTimeStampMs, + Nonce: eventsData.Header.GetNonce(), StateAccessesPerAccounts: eventsData.StateAccessesPerAccounts, } eh.handleStateAccesses(stateAccesses) From 8576a2c63d993523253fa3d5373bd6609d1597a9 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Wed, 3 Sep 2025 11:59:58 +0300 Subject: [PATCH 17/28] add state accesses event extra fields --- data/outport.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/data/outport.go b/data/outport.go index e7b33504..18ec51ad 100644 --- a/data/outport.go +++ b/data/outport.go @@ -77,6 +77,9 @@ type BlockEventsWithOrder struct { // BlockStateAccesses holds the block state accesses type BlockStateAccesses struct { Hash string `json:"hash"` + ShardID uint32 `json:"shardID"` + TimeStampMs uint64 `json:"timestampMs"` + Nonce uint64 `json:"nonce"` StateAccessesPerAccounts map[string]*stateChange.StateAccesses `json:"stateAccessesPerAccounts"` } From 1d80d43173b2e9a82d54ffc9e53b8714e40b34aa Mon Sep 17 00:00:00 2001 From: ssd04 Date: Mon, 8 Sep 2025 11:56:51 +0300 Subject: [PATCH 18/28] update state accesses log --- process/eventsInterceptor.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/process/eventsInterceptor.go b/process/eventsInterceptor.go index 7147b5b6..9043fc92 100644 --- a/process/eventsInterceptor.go +++ b/process/eventsInterceptor.go @@ -200,7 +200,8 @@ func logStateAccessesPerTxs(stateAccesses map[string]*stateChange.StateAccesses) for _, st := range sts.StateAccess { log.Trace("st", - "txHash", st.GetTxHash(), + "actionType", st.GetType(), + "operation", st.GetOperation(), ) } } From 0582d4531060f63d79a6230fbf087134d87c6e45 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Mon, 15 Sep 2025 17:30:54 +0300 Subject: [PATCH 19/28] fix ws dispatcher subscription --- dispatcher/subscription.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dispatcher/subscription.go b/dispatcher/subscription.go index a5c048ee..698ab1c1 100644 --- a/dispatcher/subscription.go +++ b/dispatcher/subscription.go @@ -145,7 +145,8 @@ func getEventType(subEntry data.SubscriptionEntry) string { subEntry.EventType == common.RevertBlockEvents || subEntry.EventType == common.BlockTxs || subEntry.EventType == common.BlockScrs || - subEntry.EventType == common.BlockEvents { + subEntry.EventType == common.BlockEvents || + subEntry.EventType == common.BlockStateAccesses { return subEntry.EventType } From c698754585d3d6a0b28e65700e56f256d1ddceda Mon Sep 17 00:00:00 2001 From: ssd04 Date: Mon, 15 Sep 2025 17:33:01 +0300 Subject: [PATCH 20/28] ws tools for state changes --- testdata/testData.go | 10 +++++-- tools/wsConnector/main.go | 57 +++++++++++++++++++++++---------------- tools/wsPublisher/main.go | 12 +++++++-- 3 files changed, 52 insertions(+), 27 deletions(-) diff --git a/testdata/testData.go b/testdata/testData.go index b7c1397d..d11e9bdd 100644 --- a/testdata/testData.go +++ b/testdata/testData.go @@ -1,6 +1,8 @@ package testdata import ( + "encoding/hex" + "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-core-go/data/block" "github.com/multiversx/mx-chain-core-go/data/outport" @@ -120,12 +122,16 @@ func (bd *blockData) OutportBlockV1() *outport.OutportBlock { stateAccesses["txHash1"] = &stateChange.StateAccesses{ StateAccess: []*stateChange.StateAccess{ &stateChange.StateAccess{ + Type: stateChange.Write, MainTrieKey: []byte("mainTrieKey1"), MainTrieVal: []byte("mainTrieVal1"), + TxHash: []byte("txHash1"), }, &stateChange.StateAccess{ + Type: stateChange.Write, MainTrieKey: []byte("mainTrieKey2"), MainTrieVal: []byte("mainTrieVal2"), + TxHash: []byte("txHash1"), }, }, } @@ -149,7 +155,7 @@ func (bd *blockData) OutportBlockV1() *outport.OutportBlock { HeaderGasConsumption: &outport.HeaderGasConsumption{}, TransactionPool: &outport.TransactionPool{ Transactions: map[string]*outport.TxInfo{ - "txHash1": { + hex.EncodeToString([]byte("txHash1")): { Transaction: &transaction.Transaction{ Nonce: 1, GasPrice: 1, @@ -162,7 +168,7 @@ func (bd *blockData) OutportBlockV1() *outport.OutportBlock { }, }, SmartContractResults: map[string]*outport.SCRInfo{ - "scrHash1": { + hex.EncodeToString([]byte("scrHash1")): { SmartContractResult: &smartContractResult.SmartContractResult{ Nonce: 2, GasLimit: 2, diff --git a/tools/wsConnector/main.go b/tools/wsConnector/main.go index 49eed11a..552ee3ee 100644 --- a/tools/wsConnector/main.go +++ b/tools/wsConnector/main.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "time" wsData "github.com/multiversx/mx-chain-communication-go/websocket/data" wsFactory "github.com/multiversx/mx-chain-communication-go/websocket/factory" @@ -19,28 +20,38 @@ func main() { return } - blockData, err := testdata.NewBlockData(marshaller) - if err != nil { - fmt.Println(err.Error()) - return - } - - err = wsClient.PushEventsRequest(blockData.OutportBlockV1()) - if err != nil { - fmt.Println(err.Error()) - return - } - - err = wsClient.RevertEventsRequest(blockData.RevertBlockV1()) - if err != nil { - fmt.Println(err.Error()) - return - } - - err = wsClient.FinalizedEventsRequest(blockData.FinalizedBlockV1()) - if err != nil { - fmt.Println(err.Error()) - return + for { + blockData, err := testdata.NewBlockData(marshaller) + if err != nil { + fmt.Println(err.Error()) + time.Sleep(6 * time.Second) + continue + } + + err = wsClient.PushEventsRequest(blockData.OutportBlockV1()) + if err != nil { + fmt.Println(err.Error()) + time.Sleep(6 * time.Second) + continue + } + + err = wsClient.RevertEventsRequest(blockData.RevertBlockV1()) + if err != nil { + time.Sleep(6 * time.Second) + fmt.Println(err.Error()) + continue + } + + err = wsClient.FinalizedEventsRequest(blockData.FinalizedBlockV1()) + if err != nil { + fmt.Println(err.Error()) + time.Sleep(6 * time.Second) + continue + } + + fmt.Println("sent properly") + + time.Sleep(6 * time.Second) } } @@ -63,7 +74,7 @@ func newWSObsClient(marshaller marshal.Marshalizer) (*wsObsClient, error) { port := 22111 wsHost, err := wsFactory.CreateWebSocketHost(wsFactory.ArgsWebSocketHost{ WebSocketConfig: wsData.WebSocketConfig{ - URL: "localhost:" + fmt.Sprintf("%d", port), + URL: "ws://localhost:" + fmt.Sprintf("%d", port), WithAcknowledge: true, Mode: "client", RetryDurationInSec: 5, diff --git a/tools/wsPublisher/main.go b/tools/wsPublisher/main.go index 789a822d..686b07fb 100644 --- a/tools/wsPublisher/main.go +++ b/tools/wsPublisher/main.go @@ -5,6 +5,7 @@ import ( "fmt" "net/url" "sync" + "time" "github.com/gorilla/websocket" "github.com/multiversx/mx-chain-notifier-go/common" @@ -36,6 +37,9 @@ func main() { { EventType: common.BlockTxs, }, + { + EventType: common.BlockStateAccesses, + }, }, } @@ -59,7 +63,7 @@ func main() { case common.BlockEvents: var event data.BlockEventsWithOrder _ = json.Unmarshal(reply.Data, &event) - fmt.Println(event) + fmt.Printf("Hash: %s, TimeStamp: %d", event.Hash, event.TimeStamp) case common.RevertBlockEvents: var event *data.RevertBlock _ = json.Unmarshal(reply.Data, &event) @@ -67,7 +71,7 @@ func main() { case common.FinalizedBlockEvents: var event *data.FinalizedBlock _ = json.Unmarshal(reply.Data, &event) - fmt.Println(event) + fmt.Printf("Hash: %s, TimeStamp: %d", event.Hash, time.Now().Unix()) case common.BlockTxs: var event *data.BlockTxs _ = json.Unmarshal(reply.Data, &event) @@ -76,6 +80,10 @@ func main() { var event data.BlockScrs _ = json.Unmarshal(reply.Data, &event) fmt.Println(event.Hash) + case common.BlockStateAccesses: + var event data.BlockStateAccesses + _ = json.Unmarshal(reply.Data, &event) + fmt.Printf("Hash: %s, TimeStamp: %d, Len sa: %d", event.Hash, time.Now().Unix(), len(event.StateAccessesPerAccounts)) default: fmt.Println("invalid message type") } From 1d4b2ebba986cf36135919b28ac34fc8e88a1b9e Mon Sep 17 00:00:00 2001 From: ssd04 Date: Tue, 16 Sep 2025 13:59:13 +0300 Subject: [PATCH 21/28] update core-go version --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index bafbdefc..3c2bcf5a 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.3 github.com/multiversx/mx-chain-communication-go v1.2.0 - github.com/multiversx/mx-chain-core-go v1.4.1-0.20250715072713-eaece6359e1c + github.com/multiversx/mx-chain-core-go v1.4.1-0.20250909090314-60b4de5d3d1b github.com/multiversx/mx-chain-logger-go v1.1.0 github.com/pelletier/go-toml v1.9.3 github.com/prometheus/client_model v0.6.1 diff --git a/go.sum b/go.sum index 4090f0a3..6aee8d9f 100644 --- a/go.sum +++ b/go.sum @@ -130,8 +130,8 @@ github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= github.com/multiversx/mx-chain-communication-go v1.2.0 h1:0wOoLldiRbvaOPxwICbnRCqCpLqPewg8M/FMbC/0OXY= github.com/multiversx/mx-chain-communication-go v1.2.0/go.mod h1:wS3aAwkmHbC9mlzQdvL6p7l8Rqw3vmzhj7WZW1dTveA= -github.com/multiversx/mx-chain-core-go v1.4.1-0.20250715072713-eaece6359e1c h1:MMRPxVcxpHfe9g1DIWiz7s+QUEA68Xb+oJALJukHbgA= -github.com/multiversx/mx-chain-core-go v1.4.1-0.20250715072713-eaece6359e1c/go.mod h1:IO+vspNan+gT0WOHnJ95uvWygiziHZvfXpff6KnxV7g= +github.com/multiversx/mx-chain-core-go v1.4.1-0.20250909090314-60b4de5d3d1b h1:+tcmfcnOgrUcw6qr/hFiulEz3Efl4d4yoDmvfhm0jDE= +github.com/multiversx/mx-chain-core-go v1.4.1-0.20250909090314-60b4de5d3d1b/go.mod h1:IO+vspNan+gT0WOHnJ95uvWygiziHZvfXpff6KnxV7g= github.com/multiversx/mx-chain-crypto-go v1.2.13-0.20250218161752-9482d9a22234 h1:NNI7kYxzsq+4mTPSUJo0cK1+iPxjUX+gRJDaBRwEQ7M= github.com/multiversx/mx-chain-crypto-go v1.2.13-0.20250218161752-9482d9a22234/go.mod h1:QZAw2bZcOxGQRgYACTrmP8pfTa3NyxENIL+00G6nM5E= github.com/multiversx/mx-chain-logger-go v1.1.0 h1:97x84A6L4RfCa6YOx1HpAFxZp1cf/WI0Qh112whgZNM= From d5b951cd92003309e03244b947d93844091e2402 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Tue, 16 Sep 2025 14:05:51 +0300 Subject: [PATCH 22/28] update test data and tools --- testdata/testData.go | 18 ++++++++++-------- tools/wsPublisher/main.go | 12 +++++++++--- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/testdata/testData.go b/testdata/testData.go index d11e9bdd..9962d4d3 100644 --- a/testdata/testData.go +++ b/testdata/testData.go @@ -122,16 +122,18 @@ func (bd *blockData) OutportBlockV1() *outport.OutportBlock { stateAccesses["txHash1"] = &stateChange.StateAccesses{ StateAccess: []*stateChange.StateAccess{ &stateChange.StateAccess{ - Type: stateChange.Write, - MainTrieKey: []byte("mainTrieKey1"), - MainTrieVal: []byte("mainTrieVal1"), - TxHash: []byte("txHash1"), + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey1"), + MainTrieVal: []byte("mainTrieVal1"), + TxHash: []byte("txHash1"), + AccountChanges: 8, }, &stateChange.StateAccess{ - Type: stateChange.Write, - MainTrieKey: []byte("mainTrieKey2"), - MainTrieVal: []byte("mainTrieVal2"), - TxHash: []byte("txHash1"), + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal2"), + TxHash: []byte("txHash1"), + AccountChanges: 4, }, }, } diff --git a/tools/wsPublisher/main.go b/tools/wsPublisher/main.go index 686b07fb..93a50873 100644 --- a/tools/wsPublisher/main.go +++ b/tools/wsPublisher/main.go @@ -63,7 +63,7 @@ func main() { case common.BlockEvents: var event data.BlockEventsWithOrder _ = json.Unmarshal(reply.Data, &event) - fmt.Printf("Hash: %s, TimeStamp: %d", event.Hash, event.TimeStamp) + fmt.Printf("Hash: %s, TimeStamp: %d\n", event.Hash, event.TimeStamp) case common.RevertBlockEvents: var event *data.RevertBlock _ = json.Unmarshal(reply.Data, &event) @@ -71,7 +71,7 @@ func main() { case common.FinalizedBlockEvents: var event *data.FinalizedBlock _ = json.Unmarshal(reply.Data, &event) - fmt.Printf("Hash: %s, TimeStamp: %d", event.Hash, time.Now().Unix()) + fmt.Printf("Hash: %s, TimeStamp: %d\n", event.Hash, time.Now().Unix()) case common.BlockTxs: var event *data.BlockTxs _ = json.Unmarshal(reply.Data, &event) @@ -83,7 +83,13 @@ func main() { case common.BlockStateAccesses: var event data.BlockStateAccesses _ = json.Unmarshal(reply.Data, &event) - fmt.Printf("Hash: %s, TimeStamp: %d, Len sa: %d", event.Hash, time.Now().Unix(), len(event.StateAccessesPerAccounts)) + fmt.Printf("SA: Hash: %s, TimeStamp: %d, Len sa: %d\n", event.Hash, time.Now().Unix(), len(event.StateAccessesPerAccounts)) + for acc, sas := range event.StateAccessesPerAccounts { + fmt.Printf("Account: %s\n", acc) + for _, sa := range sas.StateAccess { + fmt.Printf("Txhash: %s, Type: %d, AccountChanges: %d\n", sa.TxHash, sa.Type, sa.AccountChanges) + } + } default: fmt.Println("invalid message type") } From 0c76eaf8be3c578a165d13f9892791cda3eee948 Mon Sep 17 00:00:00 2001 From: BeniaminDrasovean Date: Thu, 18 Sep 2025 17:32:37 +0300 Subject: [PATCH 23/28] update go mod --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 3c2bcf5a..e61cada3 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.3 github.com/multiversx/mx-chain-communication-go v1.2.0 - github.com/multiversx/mx-chain-core-go v1.4.1-0.20250909090314-60b4de5d3d1b + github.com/multiversx/mx-chain-core-go v1.4.1-0.20250918132809-9fc6a09d4292 github.com/multiversx/mx-chain-logger-go v1.1.0 github.com/pelletier/go-toml v1.9.3 github.com/prometheus/client_model v0.6.1 diff --git a/go.sum b/go.sum index 6aee8d9f..00683071 100644 --- a/go.sum +++ b/go.sum @@ -130,8 +130,8 @@ github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= github.com/multiversx/mx-chain-communication-go v1.2.0 h1:0wOoLldiRbvaOPxwICbnRCqCpLqPewg8M/FMbC/0OXY= github.com/multiversx/mx-chain-communication-go v1.2.0/go.mod h1:wS3aAwkmHbC9mlzQdvL6p7l8Rqw3vmzhj7WZW1dTveA= -github.com/multiversx/mx-chain-core-go v1.4.1-0.20250909090314-60b4de5d3d1b h1:+tcmfcnOgrUcw6qr/hFiulEz3Efl4d4yoDmvfhm0jDE= -github.com/multiversx/mx-chain-core-go v1.4.1-0.20250909090314-60b4de5d3d1b/go.mod h1:IO+vspNan+gT0WOHnJ95uvWygiziHZvfXpff6KnxV7g= +github.com/multiversx/mx-chain-core-go v1.4.1-0.20250918132809-9fc6a09d4292 h1:oHLQk03eJU5XaNGvOPCXyHTjgH7PGyp+Cznlnv/vod0= +github.com/multiversx/mx-chain-core-go v1.4.1-0.20250918132809-9fc6a09d4292/go.mod h1:IO+vspNan+gT0WOHnJ95uvWygiziHZvfXpff6KnxV7g= github.com/multiversx/mx-chain-crypto-go v1.2.13-0.20250218161752-9482d9a22234 h1:NNI7kYxzsq+4mTPSUJo0cK1+iPxjUX+gRJDaBRwEQ7M= github.com/multiversx/mx-chain-crypto-go v1.2.13-0.20250218161752-9482d9a22234/go.mod h1:QZAw2bZcOxGQRgYACTrmP8pfTa3NyxENIL+00G6nM5E= github.com/multiversx/mx-chain-logger-go v1.1.0 h1:97x84A6L4RfCa6YOx1HpAFxZp1cf/WI0Qh112whgZNM= From 0dccb5a7c241bbd4edbdf430a52faea9a1d96227 Mon Sep 17 00:00:00 2001 From: bogdan-rosianu Date: Fri, 19 Sep 2025 14:52:32 +0300 Subject: [PATCH 24/28] reference new core version --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 3c2bcf5a..e61cada3 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.3 github.com/multiversx/mx-chain-communication-go v1.2.0 - github.com/multiversx/mx-chain-core-go v1.4.1-0.20250909090314-60b4de5d3d1b + github.com/multiversx/mx-chain-core-go v1.4.1-0.20250918132809-9fc6a09d4292 github.com/multiversx/mx-chain-logger-go v1.1.0 github.com/pelletier/go-toml v1.9.3 github.com/prometheus/client_model v0.6.1 diff --git a/go.sum b/go.sum index 6aee8d9f..00683071 100644 --- a/go.sum +++ b/go.sum @@ -130,8 +130,8 @@ github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= github.com/multiversx/mx-chain-communication-go v1.2.0 h1:0wOoLldiRbvaOPxwICbnRCqCpLqPewg8M/FMbC/0OXY= github.com/multiversx/mx-chain-communication-go v1.2.0/go.mod h1:wS3aAwkmHbC9mlzQdvL6p7l8Rqw3vmzhj7WZW1dTveA= -github.com/multiversx/mx-chain-core-go v1.4.1-0.20250909090314-60b4de5d3d1b h1:+tcmfcnOgrUcw6qr/hFiulEz3Efl4d4yoDmvfhm0jDE= -github.com/multiversx/mx-chain-core-go v1.4.1-0.20250909090314-60b4de5d3d1b/go.mod h1:IO+vspNan+gT0WOHnJ95uvWygiziHZvfXpff6KnxV7g= +github.com/multiversx/mx-chain-core-go v1.4.1-0.20250918132809-9fc6a09d4292 h1:oHLQk03eJU5XaNGvOPCXyHTjgH7PGyp+Cznlnv/vod0= +github.com/multiversx/mx-chain-core-go v1.4.1-0.20250918132809-9fc6a09d4292/go.mod h1:IO+vspNan+gT0WOHnJ95uvWygiziHZvfXpff6KnxV7g= github.com/multiversx/mx-chain-crypto-go v1.2.13-0.20250218161752-9482d9a22234 h1:NNI7kYxzsq+4mTPSUJo0cK1+iPxjUX+gRJDaBRwEQ7M= github.com/multiversx/mx-chain-crypto-go v1.2.13-0.20250218161752-9482d9a22234/go.mod h1:QZAw2bZcOxGQRgYACTrmP8pfTa3NyxENIL+00G6nM5E= github.com/multiversx/mx-chain-logger-go v1.1.0 h1:97x84A6L4RfCa6YOx1HpAFxZp1cf/WI0Qh112whgZNM= From 71bb5372c35726798b1913a2200de5dd387569ed Mon Sep 17 00:00:00 2001 From: BeniaminDrasovean Date: Wed, 24 Sep 2025 11:58:07 +0300 Subject: [PATCH 25/28] update go mod --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index e61cada3..eedde3f8 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.3 github.com/multiversx/mx-chain-communication-go v1.2.0 - github.com/multiversx/mx-chain-core-go v1.4.1-0.20250918132809-9fc6a09d4292 + github.com/multiversx/mx-chain-core-go v1.4.1-0.20250924085417-30f7c94f7d5d github.com/multiversx/mx-chain-logger-go v1.1.0 github.com/pelletier/go-toml v1.9.3 github.com/prometheus/client_model v0.6.1 diff --git a/go.sum b/go.sum index 00683071..1dde752c 100644 --- a/go.sum +++ b/go.sum @@ -130,8 +130,8 @@ github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= github.com/multiversx/mx-chain-communication-go v1.2.0 h1:0wOoLldiRbvaOPxwICbnRCqCpLqPewg8M/FMbC/0OXY= github.com/multiversx/mx-chain-communication-go v1.2.0/go.mod h1:wS3aAwkmHbC9mlzQdvL6p7l8Rqw3vmzhj7WZW1dTveA= -github.com/multiversx/mx-chain-core-go v1.4.1-0.20250918132809-9fc6a09d4292 h1:oHLQk03eJU5XaNGvOPCXyHTjgH7PGyp+Cznlnv/vod0= -github.com/multiversx/mx-chain-core-go v1.4.1-0.20250918132809-9fc6a09d4292/go.mod h1:IO+vspNan+gT0WOHnJ95uvWygiziHZvfXpff6KnxV7g= +github.com/multiversx/mx-chain-core-go v1.4.1-0.20250924085417-30f7c94f7d5d h1:DLZo0iqbsu7iMPWu2948XvkWE9jCly5A6IFFgeGf7MQ= +github.com/multiversx/mx-chain-core-go v1.4.1-0.20250924085417-30f7c94f7d5d/go.mod h1:IO+vspNan+gT0WOHnJ95uvWygiziHZvfXpff6KnxV7g= github.com/multiversx/mx-chain-crypto-go v1.2.13-0.20250218161752-9482d9a22234 h1:NNI7kYxzsq+4mTPSUJo0cK1+iPxjUX+gRJDaBRwEQ7M= github.com/multiversx/mx-chain-crypto-go v1.2.13-0.20250218161752-9482d9a22234/go.mod h1:QZAw2bZcOxGQRgYACTrmP8pfTa3NyxENIL+00G6nM5E= github.com/multiversx/mx-chain-logger-go v1.1.0 h1:97x84A6L4RfCa6YOx1HpAFxZp1cf/WI0Qh112whgZNM= From 8341535240b7c6a44fc61d69b7e2f3fb383eb9fb Mon Sep 17 00:00:00 2001 From: BeniaminDrasovean Date: Wed, 8 Oct 2025 16:03:26 +0300 Subject: [PATCH 26/28] use map for collecting txs with order --- process/eventsInterceptor.go | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/process/eventsInterceptor.go b/process/eventsInterceptor.go index 9043fc92..bd046595 100644 --- a/process/eventsInterceptor.go +++ b/process/eventsInterceptor.go @@ -91,30 +91,26 @@ func (ei *eventsInterceptor) ProcessBlockEvents(eventsData *data.ArgsSaveBlockDa } func getTxsWithOrder(transactionsPool *outport.TransactionPool) []txWithOrder { - txsWithOrder := make([]txWithOrder, 0) + txsWithOrderMap := make(map[string]uint32) for txHash, txInfo := range transactionsPool.Transactions { - txsWithOrder = append(txsWithOrder, txWithOrder{ - hash: txHash, - index: txInfo.ExecutionOrder, - }) + txsWithOrderMap[txHash] = txInfo.ExecutionOrder } for txHash, txInfo := range transactionsPool.SmartContractResults { - txsWithOrder = append(txsWithOrder, txWithOrder{ - hash: txHash, - index: txInfo.ExecutionOrder, - }) + txsWithOrderMap[txHash] = txInfo.ExecutionOrder } for txHash, txInfo := range transactionsPool.Rewards { - txsWithOrder = append(txsWithOrder, txWithOrder{ - hash: txHash, - index: txInfo.ExecutionOrder, - }) + txsWithOrderMap[txHash] = txInfo.ExecutionOrder } for txHash, txInfo := range transactionsPool.InvalidTxs { + txsWithOrderMap[txHash] = txInfo.ExecutionOrder + } + + txsWithOrder := make([]txWithOrder, 0, len(txsWithOrderMap)) + for txHash, index := range txsWithOrderMap { txsWithOrder = append(txsWithOrder, txWithOrder{ hash: txHash, - index: txInfo.ExecutionOrder, + index: index, }) } From 1ae38fc98f1c284842b9cc6d6a7c6dc09832972a Mon Sep 17 00:00:00 2001 From: ssd04 Date: Fri, 10 Oct 2025 13:48:33 +0300 Subject: [PATCH 27/28] add config field for read state accesses enabled --- cmd/notifier/config/config.toml | 4 + config/config.go | 7 +- config/tomlConfig_test.go | 8 +- factory/processFactory.go | 3 +- .../rabbitmq/testNotifierWithRabbitMQ_test.go | 1 + process/eventsInterceptor.go | 15 +- process/eventsInterceptor_test.go | 233 +++++++++++------- 7 files changed, 173 insertions(+), 98 deletions(-) diff --git a/cmd/notifier/config/config.toml b/cmd/notifier/config/config.toml index 07cfbb39..fba4695c 100644 --- a/cmd/notifier/config/config.toml +++ b/cmd/notifier/config/config.toml @@ -3,6 +3,10 @@ # Requires a redis instance/cluster and should be used when multiple observers push from the same shard CheckDuplicates = true + # WithReadStateChanges signals if read state changes operation will be handled + # It depends also if read state changes are enabled from observer nodes + WithReadStateChanges = false + # ExternalMarshaller is used for handling incoming/outcoming api requests [General.ExternalMarshaller] Type = "json" diff --git a/config/config.go b/config/config.go index 1f339088..1cec21a2 100644 --- a/config/config.go +++ b/config/config.go @@ -20,9 +20,10 @@ type MainConfig struct { // GeneralConfig maps the general config section type GeneralConfig struct { - ExternalMarshaller MarshallerConfig - AddressConverter AddressConverterConfig - CheckDuplicates bool + ExternalMarshaller MarshallerConfig + AddressConverter AddressConverterConfig + CheckDuplicates bool + WithReadStateChanges bool } // MarshallerConfig maps the marshaller configuration diff --git a/config/tomlConfig_test.go b/config/tomlConfig_test.go index ddf0dc7c..a0851853 100644 --- a/config/tomlConfig_test.go +++ b/config/tomlConfig_test.go @@ -17,6 +17,7 @@ func TestMainConfig(t *testing.T) { adrConverterPrefix := "erd" adrConverterLength := 32 checkDuplicates := true + withReadStateChanges := true connectorAPIHost := "5000" connectorAPIUsername := "guest" @@ -50,7 +51,8 @@ func TestMainConfig(t *testing.T) { Prefix: adrConverterPrefix, Length: adrConverterLength, }, - CheckDuplicates: checkDuplicates, + CheckDuplicates: checkDuplicates, + WithReadStateChanges: withReadStateChanges, }, WebSocketConnector: config.WebSocketConfig{ Enabled: true, @@ -95,6 +97,10 @@ func TestMainConfig(t *testing.T) { # Requires a redis instance/cluster and should be used when multiple observers push from the same shard CheckDuplicates = true + # WithReadStateChanges signals if read state changes operation will be handled + # It depends also if read state changes are enabled from observer nodes + WithReadStateChanges = true + # ExternalMarshaller is used for handling incoming/outcoming api requests [General.ExternalMarshaller] Type = "` + generalMarshallerType + `" diff --git a/factory/processFactory.go b/factory/processFactory.go index 9fd41f67..73fc8b2e 100644 --- a/factory/processFactory.go +++ b/factory/processFactory.go @@ -24,7 +24,8 @@ func CreateEventsInterceptor(cfg config.GeneralConfig) (process.EventsIntercepto } argsEventsInterceptor := process.ArgsEventsInterceptor{ - PubKeyConverter: pubKeyConverter, + PubKeyConverter: pubKeyConverter, + WithReadStateChanges: cfg.WithReadStateChanges, } return process.NewEventsInterceptor(argsEventsInterceptor) diff --git a/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go b/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go index 4afcb41b..6be251f8 100644 --- a/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go +++ b/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go @@ -35,6 +35,7 @@ func TestNotifierWithRabbitMQ(t *testing.T) { func testNotifierWithRabbitMQ(t *testing.T, observerType string, payloadVersion uint32) { cfg := integrationTests.GetDefaultConfigs() cfg.MainConfig.General.CheckDuplicates = true + cfg.MainConfig.General.WithReadStateChanges = true notifier, err := integrationTests.NewTestNotifierWithRabbitMq(cfg.MainConfig) require.Nil(t, err) diff --git a/process/eventsInterceptor.go b/process/eventsInterceptor.go index bd046595..778b2bc5 100644 --- a/process/eventsInterceptor.go +++ b/process/eventsInterceptor.go @@ -28,11 +28,13 @@ type logEvent struct { // ArgsEventsInterceptor defines the arguments needed for creating an events interceptor instance type ArgsEventsInterceptor struct { - PubKeyConverter core.PubkeyConverter + PubKeyConverter core.PubkeyConverter + WithReadStateChanges bool } type eventsInterceptor struct { - pubKeyConverter core.PubkeyConverter + pubKeyConverter core.PubkeyConverter + withReadStateChanges bool } // NewEventsInterceptor creates a new eventsInterceptor instance @@ -42,7 +44,8 @@ func NewEventsInterceptor(args ArgsEventsInterceptor) (*eventsInterceptor, error } return &eventsInterceptor{ - pubKeyConverter: args.PubKeyConverter, + pubKeyConverter: args.PubKeyConverter, + withReadStateChanges: args.WithReadStateChanges, }, nil } @@ -153,14 +156,10 @@ func (ei *eventsInterceptor) getStateAccessesPerAccounts(eventsData *data.ArgsSa } for _, stateAccess := range stateAccessesPerTx.StateAccess { - if stateAccess.Type == stateChange.Read { - // TODO: add a flag here to allow read state accesses + if stateAccess.Type == stateChange.Read && !ei.withReadStateChanges { continue } - // TODO: make sure code update operations are handled properly - // at the moment they are handled as a separate entry - accKey := hex.EncodeToString(stateAccess.MainTrieKey) _, ok := stateAccessesPerAccounts[accKey] if !ok { diff --git a/process/eventsInterceptor_test.go b/process/eventsInterceptor_test.go index 325809aa..986fb089 100644 --- a/process/eventsInterceptor_test.go +++ b/process/eventsInterceptor_test.go @@ -18,7 +18,8 @@ import ( func createMockEventsInterceptorArgs() process.ArgsEventsInterceptor { return process.ArgsEventsInterceptor{ - PubKeyConverter: &mocks.PubkeyConverterMock{}, + PubKeyConverter: &mocks.PubkeyConverterMock{}, + WithReadStateChanges: false, } } @@ -370,9 +371,6 @@ func TestGetLogEventsFromTransactionsPool(t *testing.T) { func TestEventsInterceptor_GetStateAccessesPerAccounts(t *testing.T) { t.Parallel() - args := createMockEventsInterceptorArgs() - en, _ := process.NewEventsInterceptor(args) - txs := map[string]*outport.TxInfo{ hex.EncodeToString([]byte("txHash1")): { Transaction: &transaction.Transaction{ @@ -398,41 +396,103 @@ func TestEventsInterceptor_GetStateAccessesPerAccounts(t *testing.T) { }, } - blockHash := []byte("blockHash") + stateAccessesRead := make(map[string]*stateChange.StateAccesses) + stateAccessesRead["txHash1"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Read, + MainTrieKey: []byte("mainTrieKey1"), + MainTrieVal: []byte("mainTrieVal1"), + }, + &stateChange.StateAccess{ + Type: stateChange.Read, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal2"), + }, + }, + } + stateAccessesRead["txHash2"] = &stateChange.StateAccesses{} + stateAccessesRead["txHash0"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Read, + MainTrieKey: []byte("mainTrieKey3"), + MainTrieVal: []byte("mainTrieVal3"), + }, + &stateChange.StateAccess{ + Type: stateChange.Read, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal4"), + }, + }, + } - t.Run("with write operations", func(t *testing.T) { - t.Parallel() + stateAccessesWrite := make(map[string]*stateChange.StateAccesses) + stateAccessesWrite["txHash1"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey1"), + MainTrieVal: []byte("mainTrieVal1"), + }, + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal2"), + }, + }, + } + stateAccessesWrite["txHash2"] = &stateChange.StateAccesses{} + stateAccessesWrite["txHash0"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey3"), + MainTrieVal: []byte("mainTrieVal3"), + }, + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal4"), + }, + }, + } - stateAccesses := make(map[string]*stateChange.StateAccesses) - stateAccesses["txHash1"] = &stateChange.StateAccesses{ - StateAccess: []*stateChange.StateAccess{ - &stateChange.StateAccess{ - Type: stateChange.Write, - MainTrieKey: []byte("mainTrieKey1"), - MainTrieVal: []byte("mainTrieVal1"), - }, - &stateChange.StateAccess{ - Type: stateChange.Write, - MainTrieKey: []byte("mainTrieKey2"), - MainTrieVal: []byte("mainTrieVal2"), - }, + stateAccessesReadWrite := make(map[string]*stateChange.StateAccesses) + stateAccessesReadWrite["txHash1"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Read, + MainTrieKey: []byte("mainTrieKey1"), + MainTrieVal: []byte("mainTrieVal1"), }, - } - stateAccesses["txHash2"] = &stateChange.StateAccesses{} - stateAccesses["txHash0"] = &stateChange.StateAccesses{ - StateAccess: []*stateChange.StateAccess{ - &stateChange.StateAccess{ - Type: stateChange.Write, - MainTrieKey: []byte("mainTrieKey3"), - MainTrieVal: []byte("mainTrieVal3"), - }, - &stateChange.StateAccess{ - Type: stateChange.Write, - MainTrieKey: []byte("mainTrieKey2"), - MainTrieVal: []byte("mainTrieVal4"), - }, + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal2"), }, - } + }, + } + stateAccessesReadWrite["txHash2"] = &stateChange.StateAccesses{} + stateAccessesReadWrite["txHash0"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey3"), + MainTrieVal: []byte("mainTrieVal3"), + }, + &stateChange.StateAccess{ + Type: stateChange.Read, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal4"), + }, + }, + } + + blockHash := []byte("blockHash") + + t.Run("with write operations", func(t *testing.T) { + t.Parallel() blockEvents := &data.ArgsSaveBlockData{ HeaderHash: blockHash, @@ -441,7 +501,7 @@ func TestEventsInterceptor_GetStateAccessesPerAccounts(t *testing.T) { SmartContractResults: scrs, InvalidTxs: invalidTxs, }, - StateAccesses: stateAccesses, + StateAccesses: stateAccessesWrite, } expStateAccessesPerAccounts := make(map[string]*stateChange.StateAccesses) @@ -478,45 +538,17 @@ func TestEventsInterceptor_GetStateAccessesPerAccounts(t *testing.T) { }, } + args := createMockEventsInterceptorArgs() + en, _ := process.NewEventsInterceptor(args) + stateAccessesPerAccounts := en.GetStateAccessesPerAccounts(blockEvents) require.Equal(t, expStateAccessesPerAccounts, stateAccessesPerAccounts) }) - t.Run("with read operations", func(t *testing.T) { + t.Run("with read operations, but not enabled from config", func(t *testing.T) { t.Parallel() - stateAccesses := make(map[string]*stateChange.StateAccesses) - stateAccesses["txHash1"] = &stateChange.StateAccesses{ - StateAccess: []*stateChange.StateAccess{ - &stateChange.StateAccess{ - Type: stateChange.Read, - MainTrieKey: []byte("mainTrieKey1"), - MainTrieVal: []byte("mainTrieVal1"), - }, - &stateChange.StateAccess{ - Type: stateChange.Read, - MainTrieKey: []byte("mainTrieKey2"), - MainTrieVal: []byte("mainTrieVal2"), - }, - }, - } - stateAccesses["txHash2"] = &stateChange.StateAccesses{} - stateAccesses["txHash0"] = &stateChange.StateAccesses{ - StateAccess: []*stateChange.StateAccess{ - &stateChange.StateAccess{ - Type: stateChange.Read, - MainTrieKey: []byte("mainTrieKey3"), - MainTrieVal: []byte("mainTrieVal3"), - }, - &stateChange.StateAccess{ - Type: stateChange.Read, - MainTrieKey: []byte("mainTrieKey2"), - MainTrieVal: []byte("mainTrieVal4"), - }, - }, - } - blockEvents := &data.ArgsSaveBlockData{ HeaderHash: blockHash, TransactionsPool: &outport.TransactionPool{ @@ -524,9 +556,12 @@ func TestEventsInterceptor_GetStateAccessesPerAccounts(t *testing.T) { SmartContractResults: scrs, InvalidTxs: invalidTxs, }, - StateAccesses: stateAccesses, + StateAccesses: stateAccessesRead, } + args := createMockEventsInterceptorArgs() + en, _ := process.NewEventsInterceptor(args) + expStateAccessesPerAccounts := make(map[string]*stateChange.StateAccesses) stateAccessesPerAccounts := en.GetStateAccessesPerAccounts(blockEvents) @@ -534,17 +569,22 @@ func TestEventsInterceptor_GetStateAccessesPerAccounts(t *testing.T) { require.Equal(t, expStateAccessesPerAccounts, stateAccessesPerAccounts) }) - t.Run("with read and write operations", func(t *testing.T) { + t.Run("with read (not enabled from config) and write operations", func(t *testing.T) { t.Parallel() - stateAccesses := make(map[string]*stateChange.StateAccesses) - stateAccesses["txHash1"] = &stateChange.StateAccesses{ + blockEvents := &data.ArgsSaveBlockData{ + HeaderHash: blockHash, + TransactionsPool: &outport.TransactionPool{ + Transactions: txs, + SmartContractResults: scrs, + InvalidTxs: invalidTxs, + }, + StateAccesses: stateAccessesReadWrite, + } + + expStateAccessesPerAccounts := make(map[string]*stateChange.StateAccesses) + expStateAccessesPerAccounts[hex.EncodeToString([]byte("mainTrieKey2"))] = &stateChange.StateAccesses{ StateAccess: []*stateChange.StateAccess{ - &stateChange.StateAccess{ - Type: stateChange.Read, - MainTrieKey: []byte("mainTrieKey1"), - MainTrieVal: []byte("mainTrieVal1"), - }, &stateChange.StateAccess{ Type: stateChange.Write, MainTrieKey: []byte("mainTrieKey2"), @@ -552,22 +592,27 @@ func TestEventsInterceptor_GetStateAccessesPerAccounts(t *testing.T) { }, }, } - stateAccesses["txHash2"] = &stateChange.StateAccesses{} - stateAccesses["txHash0"] = &stateChange.StateAccesses{ + expStateAccessesPerAccounts[hex.EncodeToString([]byte("mainTrieKey3"))] = &stateChange.StateAccesses{ StateAccess: []*stateChange.StateAccess{ &stateChange.StateAccess{ Type: stateChange.Write, MainTrieKey: []byte("mainTrieKey3"), MainTrieVal: []byte("mainTrieVal3"), }, - &stateChange.StateAccess{ - Type: stateChange.Read, - MainTrieKey: []byte("mainTrieKey2"), - MainTrieVal: []byte("mainTrieVal4"), - }, }, } + args := createMockEventsInterceptorArgs() + en, _ := process.NewEventsInterceptor(args) + + stateAccessesPerAccounts := en.GetStateAccessesPerAccounts(blockEvents) + + require.Equal(t, expStateAccessesPerAccounts, stateAccessesPerAccounts) + }) + + t.Run("with read and write operations", func(t *testing.T) { + t.Parallel() + blockEvents := &data.ArgsSaveBlockData{ HeaderHash: blockHash, TransactionsPool: &outport.TransactionPool{ @@ -575,12 +620,26 @@ func TestEventsInterceptor_GetStateAccessesPerAccounts(t *testing.T) { SmartContractResults: scrs, InvalidTxs: invalidTxs, }, - StateAccesses: stateAccesses, + StateAccesses: stateAccessesReadWrite, } expStateAccessesPerAccounts := make(map[string]*stateChange.StateAccesses) + expStateAccessesPerAccounts[hex.EncodeToString([]byte("mainTrieKey1"))] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Read, + MainTrieKey: []byte("mainTrieKey1"), + MainTrieVal: []byte("mainTrieVal1"), + }, + }, + } expStateAccessesPerAccounts[hex.EncodeToString([]byte("mainTrieKey2"))] = &stateChange.StateAccesses{ StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Read, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal4"), + }, &stateChange.StateAccess{ Type: stateChange.Write, MainTrieKey: []byte("mainTrieKey2"), @@ -598,6 +657,10 @@ func TestEventsInterceptor_GetStateAccessesPerAccounts(t *testing.T) { }, } + args := createMockEventsInterceptorArgs() + args.WithReadStateChanges = true + en, _ := process.NewEventsInterceptor(args) + stateAccessesPerAccounts := en.GetStateAccessesPerAccounts(blockEvents) require.Equal(t, expStateAccessesPerAccounts, stateAccessesPerAccounts) From 888d6281ec3d9df51b581aa9a5fbccfd5fa5b13e Mon Sep 17 00:00:00 2001 From: ssd04 Date: Fri, 10 Oct 2025 14:25:33 +0300 Subject: [PATCH 28/28] use latest core-go tag reference --- go.mod | 2 +- go.sum | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index e61cada3..e3e11a4a 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.3 github.com/multiversx/mx-chain-communication-go v1.2.0 - github.com/multiversx/mx-chain-core-go v1.4.1-0.20250918132809-9fc6a09d4292 + github.com/multiversx/mx-chain-core-go v1.4.1 github.com/multiversx/mx-chain-logger-go v1.1.0 github.com/pelletier/go-toml v1.9.3 github.com/prometheus/client_model v0.6.1 diff --git a/go.sum b/go.sum index 00683071..54c4b786 100644 --- a/go.sum +++ b/go.sum @@ -132,6 +132,8 @@ github.com/multiversx/mx-chain-communication-go v1.2.0 h1:0wOoLldiRbvaOPxwICbnRC github.com/multiversx/mx-chain-communication-go v1.2.0/go.mod h1:wS3aAwkmHbC9mlzQdvL6p7l8Rqw3vmzhj7WZW1dTveA= github.com/multiversx/mx-chain-core-go v1.4.1-0.20250918132809-9fc6a09d4292 h1:oHLQk03eJU5XaNGvOPCXyHTjgH7PGyp+Cznlnv/vod0= github.com/multiversx/mx-chain-core-go v1.4.1-0.20250918132809-9fc6a09d4292/go.mod h1:IO+vspNan+gT0WOHnJ95uvWygiziHZvfXpff6KnxV7g= +github.com/multiversx/mx-chain-core-go v1.4.1 h1:ljs53jpdjtCohpaqm2n/dvTGrFlSgIpoZYH8RVt5cWo= +github.com/multiversx/mx-chain-core-go v1.4.1/go.mod h1:IO+vspNan+gT0WOHnJ95uvWygiziHZvfXpff6KnxV7g= github.com/multiversx/mx-chain-crypto-go v1.2.13-0.20250218161752-9482d9a22234 h1:NNI7kYxzsq+4mTPSUJo0cK1+iPxjUX+gRJDaBRwEQ7M= github.com/multiversx/mx-chain-crypto-go v1.2.13-0.20250218161752-9482d9a22234/go.mod h1:QZAw2bZcOxGQRgYACTrmP8pfTa3NyxENIL+00G6nM5E= github.com/multiversx/mx-chain-logger-go v1.1.0 h1:97x84A6L4RfCa6YOx1HpAFxZp1cf/WI0Qh112whgZNM=