Skip to content
Draft
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
08783fa
added rabbitmq exchange for state changes
ssd04 Aug 19, 2025
02b43dc
fixes after review
ssd04 Aug 20, 2025
794a811
fix errors
ssd04 Aug 22, 2025
97aa452
restructure state accesses per accounts
ssd04 Aug 22, 2025
b33326d
add unit tests
ssd04 Aug 22, 2025
e5701f5
fix typos
ssd04 Aug 22, 2025
99e9118
Merge pull request #112 from multiversx/state-accesses-exchange
ssd04 Aug 22, 2025
faa0123
update todo comment for code write operation
ssd04 Aug 22, 2025
627e5a5
extend integration test timeout
ssd04 Aug 22, 2025
65efec0
proper handling for nil state accesses
ssd04 Aug 25, 2025
4af3143
add more logging
ssd04 Aug 26, 2025
ab1a0bd
decode tx hash from tx info
ssd04 Aug 26, 2025
d198316
fix unit tests
ssd04 Aug 27, 2025
8ba8d95
wait for components to start in tests - fix flopping test
ssd04 Aug 27, 2025
dbade9c
add state accesses to test data
ssd04 Aug 27, 2025
afd6941
fixes after review
ssd04 Aug 28, 2025
f18ab24
add more fields for state accesses event
ssd04 Sep 2, 2025
8576a2c
add state accesses event extra fields
ssd04 Sep 3, 2025
1d80d43
update state accesses log
ssd04 Sep 8, 2025
9f5e680
Merge pull request #113 from multiversx/state-accesses-per-account
ssd04 Sep 11, 2025
0582d45
fix ws dispatcher subscription
ssd04 Sep 15, 2025
c698754
ws tools for state changes
ssd04 Sep 15, 2025
dd4e17c
Merge pull request #115 from multiversx/fix-ws-dispatcher-subscription
ssd04 Sep 16, 2025
3997fbc
Merge pull request #116 from multiversx/adapt-ws-tools-for-state-changes
ssd04 Sep 16, 2025
1d4b2eb
update core-go version
ssd04 Sep 16, 2025
d5b951c
update test data and tools
ssd04 Sep 16, 2025
09be306
Merge pull request #117 from multiversx/update-state-accesses-acount-…
ssd04 Sep 16, 2025
0c76eaf
update go mod
BeniaminDrasovean Sep 18, 2025
0dccb5a
reference new core version
bogdan-rosianu Sep 19, 2025
71bb537
update go mod
BeniaminDrasovean Sep 24, 2025
d378bbb
Merge pull request #118 from multiversx/dataTrieChange-refactor
BeniaminDrasovean Sep 29, 2025
8341535
use map for collecting txs with order
BeniaminDrasovean Oct 8, 2025
87e3578
Merge pull request #121 from multiversx/txsWithOrder-fix
BeniaminDrasovean Oct 9, 2025
1ae38fc
add config field for read state accesses enabled
ssd04 Oct 10, 2025
3569b3f
Merge pull request #122 from multiversx/read-state-accesses-flag
ssd04 Oct 10, 2025
888d628
use latest core-go tag reference
ssd04 Oct 10, 2025
dda58e7
Merge branch 'rc/barnard-patch-3' into state-change-deletion-support
ssd04 Oct 10, 2025
a47807c
Merge pull request #119 from multiversx/state-change-deletion-support
sstanculeanu Oct 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cmd/notifier/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
# Requires a redis instance/cluster and should be used when multiple observers push from the same shard
CheckDuplicates = true

# WithReadStateChanges signals if read state changes operation will be handled
# It depends also if read state changes are enabled from observer nodes
WithReadStateChanges = false

# ExternalMarshaller is used for handling incoming/outcoming api requests
[General.ExternalMarshaller]
Type = "json"
Expand Down Expand Up @@ -114,3 +118,8 @@
[RabbitMQ.BlockEventsExchange]
Name = "block_events"
Type = "fanout"

# The exchange which holds state accesses
[RabbitMQ.StateAccessesExchange]
Name = "state_accesses"
Type = "fanout"
3 changes: 3 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ const (

// BlockScrs defines the subscription event type for block scrs
BlockScrs string = "block_scrs"

// BlockStateAccesses defines the subscription event type for block state accesses
BlockStateAccesses string = "block_state_accesses"
)

const (
Expand Down
8 changes: 5 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ type MainConfig struct {

// GeneralConfig maps the general config section
type GeneralConfig struct {
ExternalMarshaller MarshallerConfig
AddressConverter AddressConverterConfig
CheckDuplicates bool
ExternalMarshaller MarshallerConfig
AddressConverter AddressConverterConfig
CheckDuplicates bool
WithReadStateChanges bool
}

// MarshallerConfig maps the marshaller configuration
Expand Down Expand Up @@ -80,6 +81,7 @@ type RabbitMQConfig struct {
BlockTxsExchange RabbitMQExchangeConfig
BlockScrsExchange RabbitMQExchangeConfig
BlockEventsExchange RabbitMQExchangeConfig
StateAccessesExchange RabbitMQExchangeConfig
}

// RabbitMQExchangeConfig holds the configuration for a rabbitMQ exchange
Expand Down
8 changes: 7 additions & 1 deletion config/tomlConfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func TestMainConfig(t *testing.T) {
adrConverterPrefix := "erd"
adrConverterLength := 32
checkDuplicates := true
withReadStateChanges := true

connectorAPIHost := "5000"
connectorAPIUsername := "guest"
Expand Down Expand Up @@ -50,7 +51,8 @@ func TestMainConfig(t *testing.T) {
Prefix: adrConverterPrefix,
Length: adrConverterLength,
},
CheckDuplicates: checkDuplicates,
CheckDuplicates: checkDuplicates,
WithReadStateChanges: withReadStateChanges,
},
WebSocketConnector: config.WebSocketConfig{
Enabled: true,
Expand Down Expand Up @@ -95,6 +97,10 @@ func TestMainConfig(t *testing.T) {
# Requires a redis instance/cluster and should be used when multiple observers push from the same shard
CheckDuplicates = true

# WithReadStateChanges signals if read state changes operation will be handled
# It depends also if read state changes are enabled from observer nodes
WithReadStateChanges = true

# ExternalMarshaller is used for handling incoming/outcoming api requests
[General.ExternalMarshaller]
Type = "` + generalMarshallerType + `"
Expand Down
19 changes: 11 additions & 8 deletions data/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/multiversx/mx-chain-core-go/data/receipt"
"github.com/multiversx/mx-chain-core-go/data/rewardTx"
"github.com/multiversx/mx-chain-core-go/data/smartContractResult"
"github.com/multiversx/mx-chain-core-go/data/stateChange"
"github.com/multiversx/mx-chain-core-go/data/transaction"
)

Expand All @@ -22,14 +23,15 @@ type SaveBlockData struct {

// InterceptorBlockData holds the block data needed for processing
type InterceptorBlockData struct {
Hash string
Body nodeData.BodyHandler
Header nodeData.HeaderHandler
Txs map[string]*transaction.Transaction
TxsWithOrder map[string]*outport.TxInfo
Scrs map[string]*smartContractResult.SmartContractResult
ScrsWithOrder map[string]*outport.SCRInfo
LogEvents []Event
Hash string
Body nodeData.BodyHandler
Header nodeData.HeaderHandler
Txs map[string]*transaction.Transaction
TxsWithOrder map[string]*outport.TxInfo
Scrs map[string]*smartContractResult.SmartContractResult
ScrsWithOrder map[string]*outport.SCRInfo
LogEvents []Event
StateAccessesPerAccounts map[string]*stateChange.StateAccesses
}

// ArgsSaveBlockData holds the block data that will be received on push events
Expand All @@ -44,6 +46,7 @@ type ArgsSaveBlockData struct {
AlteredAccounts map[string]*alteredAccount.AlteredAccount
NumberOfShards uint32
HeaderTimeStampMs uint64
StateAccesses map[string]*stateChange.StateAccesses
}

// OutportBlockDataOld holds the block data that will be received on push events
Expand Down
10 changes: 10 additions & 0 deletions data/outport.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/multiversx/mx-chain-core-go/data/receipt"
"github.com/multiversx/mx-chain-core-go/data/rewardTx"
"github.com/multiversx/mx-chain-core-go/data/smartContractResult"
"github.com/multiversx/mx-chain-core-go/data/stateChange"
"github.com/multiversx/mx-chain-core-go/data/transaction"
)

Expand Down Expand Up @@ -73,6 +74,15 @@ type BlockEventsWithOrder struct {
Events []Event `json:"events"`
}

// BlockStateAccesses holds the block state accesses
type BlockStateAccesses struct {
Hash string `json:"hash"`
ShardID uint32 `json:"shardID"`
TimeStampMs uint64 `json:"timestampMs"`
Nonce uint64 `json:"nonce"`
StateAccessesPerAccounts map[string]*stateChange.StateAccesses `json:"stateAccessesPerAccounts"`
}

// NotifierTransaction defines a wrapper over transaction
type NotifierTransaction struct {
*transaction.Transaction
Expand Down
4 changes: 4 additions & 0 deletions disabled/disabledHub.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ func (h *Hub) PublishScrs(blockScrs data.BlockScrs) {
func (h *Hub) PublishBlockEventsWithOrder(blockTxs data.BlockEventsWithOrder) {
}

// PublishStateAccesses does nothing
func (h *Hub) PublishStateAccesses(blockTxs data.BlockStateAccesses) {
}

// RegisterEvent does nothing
func (h *Hub) RegisterEvent(_ dispatcher.EventDispatcher) {
}
Expand Down
43 changes: 43 additions & 0 deletions dispatcher/hub/commonHub.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ func (ch *commonHub) handlePushBlockEvents(blockEvents data.BlockEvents, subscri
// PublishRevert will publish revert event to dispatcher
func (ch *commonHub) PublishRevert(revertBlock data.RevertBlock) {
subscriptions := ch.subscriptionMapper.Subscriptions()
_, ok := subscriptions[common.RevertBlockEvents]
if !ok {
return
}

dispatchersMap := make(map[uuid.UUID]data.RevertBlock)

Expand All @@ -115,6 +119,10 @@ func (ch *commonHub) PublishRevert(revertBlock data.RevertBlock) {
// PublishFinalized will publish finalized event to dispatcher
func (ch *commonHub) PublishFinalized(finalizedBlock data.FinalizedBlock) {
subscriptions := ch.subscriptionMapper.Subscriptions()
_, ok := subscriptions[common.FinalizedBlockEvents]
if !ok {
return
}

dispatchersMap := make(map[uuid.UUID]data.FinalizedBlock)

Expand All @@ -134,6 +142,10 @@ func (ch *commonHub) PublishFinalized(finalizedBlock data.FinalizedBlock) {
// PublishTxs will publish txs event to dispatcher
func (ch *commonHub) PublishTxs(blockTxs data.BlockTxs) {
subscriptions := ch.subscriptionMapper.Subscriptions()
_, ok := subscriptions[common.BlockTxs]
if !ok {
return
}

dispatchersMap := make(map[uuid.UUID]data.BlockTxs)

Expand All @@ -153,6 +165,10 @@ func (ch *commonHub) PublishTxs(blockTxs data.BlockTxs) {
// PublishBlockEventsWithOrder will publish block events with order to dispatcher
func (ch *commonHub) PublishBlockEventsWithOrder(blockTxs data.BlockEventsWithOrder) {
subscriptions := ch.subscriptionMapper.Subscriptions()
_, ok := subscriptions[common.BlockEvents]
if !ok {
return
}

dispatchersMap := make(map[uuid.UUID]data.BlockEventsWithOrder)

Expand All @@ -172,6 +188,10 @@ func (ch *commonHub) PublishBlockEventsWithOrder(blockTxs data.BlockEventsWithOr
// PublishScrs will publish scrs events to dispatcher
func (ch *commonHub) PublishScrs(blockScrs data.BlockScrs) {
subscriptions := ch.subscriptionMapper.Subscriptions()
_, ok := subscriptions[common.BlockScrs]
if !ok {
return
}

dispatchersMap := make(map[uuid.UUID]data.BlockScrs)

Expand All @@ -188,6 +208,29 @@ func (ch *commonHub) PublishScrs(blockScrs data.BlockScrs) {
}
}

// PublishStateAccesses will publish state accesses to dispatcher
func (ch *commonHub) PublishStateAccesses(stateAccesses data.BlockStateAccesses) {
subscriptions := ch.subscriptionMapper.Subscriptions()
_, ok := subscriptions[common.BlockStateAccesses]
if !ok {
return
}

dispatchersMap := make(map[uuid.UUID]data.BlockStateAccesses)

for _, subscription := range subscriptions[common.BlockStateAccesses] {
dispatchersMap[subscription.DispatcherID] = stateAccesses
}

ch.mutDispatchers.RLock()
defer ch.mutDispatchers.RUnlock()
for id, event := range dispatchersMap {
if d, ok := ch.dispatchers[id]; ok {
d.StateAccessesEvent(event)
}
}
}

func (ch *commonHub) registerDispatcher(d dispatcher.EventDispatcher) {
ch.mutDispatchers.Lock()
defer ch.mutDispatchers.Unlock()
Expand Down
1 change: 1 addition & 0 deletions dispatcher/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type EventDispatcher interface {
TxsEvent(event data.BlockTxs)
BlockEvents(event data.BlockEventsWithOrder)
ScrsEvent(event data.BlockScrs)
StateAccessesEvent(event data.BlockStateAccesses)
}

// Hub defines the behaviour of a component which should be able to receive events
Expand Down
3 changes: 2 additions & 1 deletion dispatcher/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ func getEventType(subEntry data.SubscriptionEntry) string {
subEntry.EventType == common.RevertBlockEvents ||
subEntry.EventType == common.BlockTxs ||
subEntry.EventType == common.BlockScrs ||
subEntry.EventType == common.BlockEvents {
subEntry.EventType == common.BlockEvents ||
subEntry.EventType == common.BlockStateAccesses {
return subEntry.EventType
}

Expand Down
20 changes: 20 additions & 0 deletions dispatcher/ws/wsDispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,26 @@ func (wd *websocketDispatcher) ScrsEvent(event data.BlockScrs) {
wd.send <- wsEventBytes
}

// StateAccessesEvent receives a block state accesses event and process it before pushing to socket
func (wd *websocketDispatcher) StateAccessesEvent(event data.BlockStateAccesses) {
eventBytes, err := wd.marshaller.Marshal(event)
if err != nil {
log.Error("failure marshalling events", "err", err.Error())
return
}
wsEvent := &data.WebSocketEvent{
Type: common.BlockStateAccesses,
Data: eventBytes,
}
wsEventBytes, err := wd.marshaller.Marshal(wsEvent)
if err != nil {
log.Error("failure marshalling events", "err", err.Error())
return
}

wd.send <- wsEventBytes
}

// writePump listens on the send-channel and pushes data on the socket stream
func (wd *websocketDispatcher) writePump() {
ticker := time.NewTicker(pingPeriod)
Expand Down
3 changes: 2 additions & 1 deletion factory/processFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ func CreateEventsInterceptor(cfg config.GeneralConfig) (process.EventsIntercepto
}

argsEventsInterceptor := process.ArgsEventsInterceptor{
PubKeyConverter: pubKeyConverter,
PubKeyConverter: pubKeyConverter,
WithReadStateChanges: cfg.WithReadStateChanges,
}

return process.NewEventsInterceptor(argsEventsInterceptor)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.5.3
github.com/multiversx/mx-chain-communication-go v1.2.0
github.com/multiversx/mx-chain-core-go v1.4.0
github.com/multiversx/mx-chain-core-go v1.4.1-0.20250924085417-30f7c94f7d5d
github.com/multiversx/mx-chain-logger-go v1.1.0
github.com/pelletier/go-toml v1.9.3
github.com/prometheus/client_model v0.6.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o=
github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
github.com/multiversx/mx-chain-communication-go v1.2.0 h1:0wOoLldiRbvaOPxwICbnRCqCpLqPewg8M/FMbC/0OXY=
github.com/multiversx/mx-chain-communication-go v1.2.0/go.mod h1:wS3aAwkmHbC9mlzQdvL6p7l8Rqw3vmzhj7WZW1dTveA=
github.com/multiversx/mx-chain-core-go v1.4.0 h1:p6FbfCzvMXF54kpS0B5mrjNWYpq4SEQqo0UvrMF7YVY=
github.com/multiversx/mx-chain-core-go v1.4.0/go.mod h1:IO+vspNan+gT0WOHnJ95uvWygiziHZvfXpff6KnxV7g=
github.com/multiversx/mx-chain-core-go v1.4.1-0.20250924085417-30f7c94f7d5d h1:DLZo0iqbsu7iMPWu2948XvkWE9jCly5A6IFFgeGf7MQ=
github.com/multiversx/mx-chain-core-go v1.4.1-0.20250924085417-30f7c94f7d5d/go.mod h1:IO+vspNan+gT0WOHnJ95uvWygiziHZvfXpff6KnxV7g=
github.com/multiversx/mx-chain-crypto-go v1.2.13-0.20250218161752-9482d9a22234 h1:NNI7kYxzsq+4mTPSUJo0cK1+iPxjUX+gRJDaBRwEQ7M=
github.com/multiversx/mx-chain-crypto-go v1.2.13-0.20250218161752-9482d9a22234/go.mod h1:QZAw2bZcOxGQRgYACTrmP8pfTa3NyxENIL+00G6nM5E=
github.com/multiversx/mx-chain-logger-go v1.1.0 h1:97x84A6L4RfCa6YOx1HpAFxZp1cf/WI0Qh112whgZNM=
Expand Down
Loading
Loading