From 4b54e7a2b4898042927dddf739c62aa7cb54a036 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Fri, 9 Jan 2026 15:25:56 +0100 Subject: [PATCH 1/2] feat(syncer): verify force inclusion for p2p blocks --- block/internal/syncing/syncer.go | 38 +++++++++++-------- .../syncing/syncer_forced_inclusion_test.go | 26 ++++++------- 2 files changed, 36 insertions(+), 28 deletions(-) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index bd391b8f4..711613465 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -110,6 +110,7 @@ type Syncer struct { gracePeriodMultiplier *atomic.Pointer[float64] blockFullnessEMA *atomic.Pointer[float64] gracePeriodConfig forcedInclusionGracePeriodConfig + p2pHeightHints map[uint64]uint64 // map[height]daHeight // Lifecycle ctx context.Context @@ -177,6 +178,7 @@ func NewSyncer( gracePeriodMultiplier: gracePeriodMultiplier, blockFullnessEMA: blockFullnessEMA, gracePeriodConfig: newForcedInclusionGracePeriodConfig(), + p2pHeightHints: make(map[uint64]uint64), } } @@ -541,6 +543,8 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) { Uint64("da_height_hint", daHeightHint). Msg("P2P event with DA height hint, triggering targeted DA retrieval") + s.p2pHeightHints[height] = daHeightHint + // Trigger targeted DA retrieval in background via worker pool s.asyncDARetriever.RequestRetrieval(daHeightHint) } @@ -581,7 +585,7 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) { } // only save to p2p stores if the event came from DA - if event.Source == common.SourceDA { // TODO(@julienrbrt): To be reverted once DA Hints are merged (https://github.com/evstack/ev-node/pull/2891) + if event.Source == common.SourceDA { g, ctx := errgroup.WithContext(s.ctx) g.Go(func() error { // broadcast header locally only — prevents spamming the p2p network with old height notifications, @@ -636,13 +640,17 @@ func (s *Syncer) trySyncNextBlock(event *common.DAHeightEvent) error { } // Verify forced inclusion transactions if configured - if event.Source == common.SourceDA { - if err := s.verifyForcedInclusionTxs(currentState, data); err != nil { - s.logger.Error().Err(err).Uint64("height", nextHeight).Msg("forced inclusion verification failed") - if errors.Is(err, errMaliciousProposer) { - s.cache.RemoveHeaderDAIncluded(headerHash) - return err - } + currentDaHeight, ok := s.p2pHeightHints[nextHeight] + if !ok { + currentDaHeight = currentState.DAHeight + } else { + delete(s.p2pHeightHints, nextHeight) + } + if err := s.verifyForcedInclusionTxs(currentDaHeight, data); err != nil { + s.logger.Error().Err(err).Uint64("height", nextHeight).Msg("forced inclusion verification failed") + if errors.Is(err, errMaliciousProposer) { + s.cache.RemoveHeaderDAIncluded(headerHash) + return err } } @@ -855,7 +863,7 @@ func (s *Syncer) getEffectiveGracePeriod() uint64 { // Note: Due to block size constraints (MaxBytes), sequencers may defer forced inclusion transactions // to future blocks (smoothing). This is legitimate behavior within an epoch. // However, ALL forced inclusion txs from an epoch MUST be included before the next epoch begins or grace boundary (whichever comes later). -func (s *Syncer) verifyForcedInclusionTxs(currentState types.State, data *types.Data) error { +func (s *Syncer) verifyForcedInclusionTxs(daHeight uint64, data *types.Data) error { if s.fiRetriever == nil { return nil } @@ -865,7 +873,7 @@ func (s *Syncer) verifyForcedInclusionTxs(currentState types.State, data *types. s.updateDynamicGracePeriod(blockFullness) // Retrieve forced inclusion transactions from DA for current epoch - forcedIncludedTxsEvent, err := s.fiRetriever.RetrieveForcedIncludedTxs(s.ctx, currentState.DAHeight) + forcedIncludedTxsEvent, err := s.fiRetriever.RetrieveForcedIncludedTxs(s.ctx, daHeight) if err != nil { if errors.Is(err, da.ErrForceInclusionNotConfigured) { s.logger.Debug().Msg("forced inclusion namespace not configured, skipping verification") @@ -928,10 +936,10 @@ func (s *Syncer) verifyForcedInclusionTxs(currentState types.State, data *types. effectiveGracePeriod := s.getEffectiveGracePeriod() graceBoundary := pending.EpochEnd + (effectiveGracePeriod * s.genesis.DAEpochForcedInclusion) - if currentState.DAHeight > graceBoundary { + if daHeight > graceBoundary { maliciousTxs = append(maliciousTxs, pending) s.logger.Warn(). - Uint64("current_da_height", currentState.DAHeight). + Uint64("current_da_height", daHeight). Uint64("epoch_end", pending.EpochEnd). Uint64("grace_boundary", graceBoundary). Uint64("base_grace_periods", s.gracePeriodConfig.basePeriod). @@ -941,7 +949,7 @@ func (s *Syncer) verifyForcedInclusionTxs(currentState types.State, data *types. Msg("forced inclusion transaction past grace boundary - marking as malicious") } else { remainingPending = append(remainingPending, pending) - if currentState.DAHeight > pending.EpochEnd { + if daHeight > pending.EpochEnd { txsInGracePeriod++ } } @@ -965,7 +973,7 @@ func (s *Syncer) verifyForcedInclusionTxs(currentState types.State, data *types. effectiveGracePeriod := s.getEffectiveGracePeriod() s.logger.Error(). Uint64("height", data.Height()). - Uint64("current_da_height", currentState.DAHeight). + Uint64("current_da_height", daHeight). Int("malicious_count", len(maliciousTxs)). Uint64("base_grace_periods", s.gracePeriodConfig.basePeriod). Uint64("effective_grace_periods", effectiveGracePeriod). @@ -985,7 +993,7 @@ func (s *Syncer) verifyForcedInclusionTxs(currentState types.State, data *types. s.logger.Info(). Uint64("height", data.Height()). - Uint64("da_height", currentState.DAHeight). + Uint64("da_height", daHeight). Uint64("epoch_start", forcedIncludedTxsEvent.StartDaHeight). Uint64("epoch_end", forcedIncludedTxsEvent.EndDaHeight). Int("included_count", includedCount). diff --git a/block/internal/syncing/syncer_forced_inclusion_test.go b/block/internal/syncing/syncer_forced_inclusion_test.go index 0fe087efc..0978788bb 100644 --- a/block/internal/syncing/syncer_forced_inclusion_test.go +++ b/block/internal/syncing/syncer_forced_inclusion_test.go @@ -409,7 +409,7 @@ func TestVerifyForcedInclusionTxs_AllTransactionsIncluded(t *testing.T) { currentState.DAHeight = 0 // Verify - should pass since all forced txs are included - err = s.verifyForcedInclusionTxs(currentState, data) + err = s.verifyForcedInclusionTxs(currentState.DAHeight, data) require.NoError(t, err) } @@ -484,7 +484,7 @@ func TestVerifyForcedInclusionTxs_MissingTransactions(t *testing.T) { currentState.DAHeight = 0 // Verify - should pass since forced tx blob may be legitimately deferred within the epoch - err = s.verifyForcedInclusionTxs(currentState, data) + err = s.verifyForcedInclusionTxs(currentState.DAHeight, data) require.NoError(t, err) // Mock DA for next epoch to return no forced inclusion transactions @@ -497,7 +497,7 @@ func TestVerifyForcedInclusionTxs_MissingTransactions(t *testing.T) { data2 := makeData(gen.ChainID, 2, 1) data2.Txs[0] = []byte("regular_tx_3") - err = s.verifyForcedInclusionTxs(currentState, data2) + err = s.verifyForcedInclusionTxs(currentState.DAHeight, data2) require.NoError(t, err) // Should pass since DAHeight=1 equals grace boundary, not past it // Mock DA for height 2 to return no forced inclusion transactions @@ -510,7 +510,7 @@ func TestVerifyForcedInclusionTxs_MissingTransactions(t *testing.T) { data3 := makeData(gen.ChainID, 3, 1) data3.Txs[0] = types.Tx([]byte("regular_tx_4")) - err = s.verifyForcedInclusionTxs(currentState, data3) + err = s.verifyForcedInclusionTxs(currentState.DAHeight, data3) require.Error(t, err) require.Contains(t, err.Error(), "sequencer is malicious") require.Contains(t, err.Error(), "past grace boundary") @@ -588,7 +588,7 @@ func TestVerifyForcedInclusionTxs_PartiallyIncluded(t *testing.T) { currentState.DAHeight = 0 // Verify - should pass since dataBin2 may be legitimately deferred within the epoch - err = s.verifyForcedInclusionTxs(currentState, data) + err = s.verifyForcedInclusionTxs(currentState.DAHeight, data) require.NoError(t, err) // Mock DA for next epoch to return no forced inclusion transactions @@ -602,7 +602,7 @@ func TestVerifyForcedInclusionTxs_PartiallyIncluded(t *testing.T) { data2.Txs[0] = types.Tx([]byte("regular_tx_3")) // Verify - should pass since we're at the grace boundary, not past it - err = s.verifyForcedInclusionTxs(currentState, data2) + err = s.verifyForcedInclusionTxs(currentState.DAHeight, data2) require.NoError(t, err) // Mock DA for height 2 (when we move to DAHeight 2) @@ -617,7 +617,7 @@ func TestVerifyForcedInclusionTxs_PartiallyIncluded(t *testing.T) { data3 := makeData(gen.ChainID, 3, 1) data3.Txs[0] = types.Tx([]byte("regular_tx_4")) - err = s.verifyForcedInclusionTxs(currentState, data3) + err = s.verifyForcedInclusionTxs(currentState.DAHeight, data3) require.Error(t, err) require.Contains(t, err.Error(), "sequencer is malicious") require.Contains(t, err.Error(), "past grace boundary") @@ -687,7 +687,7 @@ func TestVerifyForcedInclusionTxs_NoForcedTransactions(t *testing.T) { currentState.DAHeight = 0 // Verify - should pass since no forced txs to verify - err = s.verifyForcedInclusionTxs(currentState, data) + err = s.verifyForcedInclusionTxs(currentState.DAHeight, data) require.NoError(t, err) } @@ -748,7 +748,7 @@ func TestVerifyForcedInclusionTxs_NamespaceNotConfigured(t *testing.T) { currentState.DAHeight = 0 // Verify - should pass since namespace not configured - err = s.verifyForcedInclusionTxs(currentState, data) + err = s.verifyForcedInclusionTxs(currentState.DAHeight, data) require.NoError(t, err) } @@ -834,7 +834,7 @@ func TestVerifyForcedInclusionTxs_DeferralWithinEpoch(t *testing.T) { currentState.DAHeight = 104 // Verify - should pass since dataBin2 can be deferred within epoch - err = s.verifyForcedInclusionTxs(currentState, data1) + err = s.verifyForcedInclusionTxs(currentState.DAHeight, data1) require.NoError(t, err) // Verify that dataBin2 is now tracked as pending @@ -863,7 +863,7 @@ func TestVerifyForcedInclusionTxs_DeferralWithinEpoch(t *testing.T) { data2.Txs[1] = types.Tx(dataBin2) // The deferred one we're waiting for // Verify - should pass since dataBin2 is now included and clears pending - err = s.verifyForcedInclusionTxs(currentState, data2) + err = s.verifyForcedInclusionTxs(currentState.DAHeight, data2) require.NoError(t, err) // Verify that pending queue is now empty (dataBin2 was included) @@ -957,7 +957,7 @@ func TestVerifyForcedInclusionTxs_MaliciousAfterEpochEnd(t *testing.T) { currentState.DAHeight = 102 // Verify - should pass, tx can be deferred within epoch - err = s.verifyForcedInclusionTxs(currentState, data1) + err = s.verifyForcedInclusionTxs(currentState.DAHeight, data1) require.NoError(t, err) } @@ -1050,6 +1050,6 @@ func TestVerifyForcedInclusionTxs_SmoothingExceedsEpoch(t *testing.T) { currentState := s.getLastState() currentState.DAHeight = 102 // At epoch end - err = s.verifyForcedInclusionTxs(currentState, data1) + err = s.verifyForcedInclusionTxs(currentState.DAHeight, data1) require.NoError(t, err, "smoothing within epoch should be allowed") } From 1f5a65431cb0cec75797fd8d283f86c8a6602544 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Fri, 9 Jan 2026 15:41:07 +0100 Subject: [PATCH 2/2] add test --- .../syncing/syncer_forced_inclusion_test.go | 232 ++++++++++++++++++ 1 file changed, 232 insertions(+) diff --git a/block/internal/syncing/syncer_forced_inclusion_test.go b/block/internal/syncing/syncer_forced_inclusion_test.go index 0978788bb..0ed97964d 100644 --- a/block/internal/syncing/syncer_forced_inclusion_test.go +++ b/block/internal/syncing/syncer_forced_inclusion_test.go @@ -1053,3 +1053,235 @@ func TestVerifyForcedInclusionTxs_SmoothingExceedsEpoch(t *testing.T) { err = s.verifyForcedInclusionTxs(currentState.DAHeight, data1) require.NoError(t, err, "smoothing within epoch should be allowed") } + +func TestVerifyForcedInclusionTxs_P2PBlocks(t *testing.T) { + t.Run("P2P block with all forced txs included passes verification", func(t *testing.T) { + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + + cm, err := cache.NewCacheManager(config.DefaultConfig(), zerolog.Nop()) + require.NoError(t, err) + + addr, pub, signer := buildSyncTestSigner(t) + gen := genesis.Genesis{ + ChainID: "tchain", + InitialHeight: 1, + StartTime: time.Now().Add(-time.Second), + ProposerAddress: addr, + DAStartHeight: 0, + DAEpochForcedInclusion: 1, + } + + cfg := config.DefaultConfig() + cfg.DA.ForcedInclusionNamespace = "nsForcedInclusion" + + mockExec := testmocks.NewMockExecutor(t) + mockExec.EXPECT().InitChain(mock.Anything, mock.Anything, uint64(1), "tchain"). + Return([]byte("app0"), uint64(1024), nil).Once() + + client := testmocks.NewMockClient(t) + client.On("GetHeaderNamespace").Return([]byte(cfg.DA.Namespace)).Maybe() + client.On("GetDataNamespace").Return([]byte(cfg.DA.DataNamespace)).Maybe() + client.On("GetForcedInclusionNamespace").Return([]byte(cfg.DA.ForcedInclusionNamespace)).Maybe() + client.On("HasForcedInclusionNamespace").Return(true).Maybe() + + errChan := make(chan error, 1) + s := NewSyncer( + st, + mockExec, + client, + cm, + common.NopMetrics(), + cfg, + gen, + common.NewMockBroadcaster[*types.P2PSignedHeader](t), + common.NewMockBroadcaster[*types.P2PData](t), + zerolog.Nop(), + common.DefaultBlockOptions(), + errChan, + ) + + require.NoError(t, s.initializeState()) + s.ctx = context.Background() + + // Initialize DA retriever and forced inclusion retriever + s.daRetriever = NewDARetriever(client, cm, gen, zerolog.Nop()) + s.fiRetriever = da.NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion) + + // Mock async DA retriever to avoid dealing with actual DA fetching in test + mockDARetriever := NewMockDARetriever(t) + s.asyncDARetriever = NewAsyncDARetriever(mockDARetriever, s.heightInCh, zerolog.Nop()) + + // Mock DA to return forced inclusion transactions at epoch 0 + forcedTxData, _ := makeSignedDataBytes(t, gen.ChainID, 10, addr, pub, signer, 2) + client.On("Retrieve", mock.Anything, uint64(0), []byte("nsForcedInclusion")).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, IDs: [][]byte{[]byte("fi1")}, Timestamp: time.Now()}, + Data: [][]byte{forcedTxData}, + }).Once() + + // Create block data that INCLUDES the forced transaction + lastState := s.getLastState() + data := makeData(gen.ChainID, 1, 1) + data.Txs[0] = types.Tx(forcedTxData) + _, hdr := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, lastState.AppHash, data, nil) + + // Mock ExecuteTxs for successful block execution + mockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(1), mock.Anything, lastState.AppHash). + Return([]byte("app1"), uint64(1024), nil).Once() + + // Create P2P event with DA height hint matching the epoch (DA height 0) + evt := common.DAHeightEvent{ + Header: hdr, + Data: data, + Source: common.SourceP2P, + DaHeightHints: [2]uint64{0, 0}, + } + + // Process the P2P block - should succeed with forced inclusion verification + s.processHeightEvent(&evt) + + // Verify no errors occurred + select { + case err := <-errChan: + t.Fatalf("unexpected error: %v", err) + default: + } + + // Verify block was processed + h, err := st.Height(context.Background()) + require.NoError(t, err) + require.Equal(t, uint64(1), h, "block should have been synced") + }) + + t.Run("P2P block missing forced txs triggers malicious detection after grace period", func(t *testing.T) { + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + + cm, err := cache.NewCacheManager(config.DefaultConfig(), zerolog.Nop()) + require.NoError(t, err) + + addr, pub, signer := buildSyncTestSigner(t) + gen := genesis.Genesis{ + ChainID: "tchain", + InitialHeight: 1, + StartTime: time.Now().Add(-time.Second), + ProposerAddress: addr, + DAStartHeight: 0, + DAEpochForcedInclusion: 1, + } + + cfg := config.DefaultConfig() + cfg.DA.ForcedInclusionNamespace = "nsForcedInclusion" + + mockExec := testmocks.NewMockExecutor(t) + mockExec.EXPECT().InitChain(mock.Anything, mock.Anything, uint64(1), "tchain"). + Return([]byte("app0"), uint64(1024), nil).Once() + + client := testmocks.NewMockClient(t) + client.On("GetHeaderNamespace").Return([]byte(cfg.DA.Namespace)).Maybe() + client.On("GetDataNamespace").Return([]byte(cfg.DA.DataNamespace)).Maybe() + client.On("GetForcedInclusionNamespace").Return([]byte(cfg.DA.ForcedInclusionNamespace)).Maybe() + client.On("HasForcedInclusionNamespace").Return(true).Maybe() + + errChan := make(chan error, 1) + s := NewSyncer( + st, + mockExec, + client, + cm, + common.NopMetrics(), + cfg, + gen, + common.NewMockBroadcaster[*types.P2PSignedHeader](t), + common.NewMockBroadcaster[*types.P2PData](t), + zerolog.Nop(), + common.DefaultBlockOptions(), + errChan, + ) + + require.NoError(t, s.initializeState()) + s.ctx = context.Background() + + // Initialize DA retriever and forced inclusion retriever + s.daRetriever = NewDARetriever(client, cm, gen, zerolog.Nop()) + s.fiRetriever = da.NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion) + + // Mock async DA retriever to avoid dealing with actual DA fetching in test + mockDARetriever := NewMockDARetriever(t) + s.asyncDARetriever = NewAsyncDARetriever(mockDARetriever, s.heightInCh, zerolog.Nop()) + // Don't start it to avoid async complications + + // Process first block successfully (within grace period) + // Mock DA to return forced inclusion transactions at epoch 0 + forcedTxData, _ := makeSignedDataBytes(t, gen.ChainID, 10, addr, pub, signer, 2) + client.On("Retrieve", mock.Anything, uint64(0), []byte("nsForcedInclusion")).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, IDs: [][]byte{[]byte("fi1")}, Timestamp: time.Now()}, + Data: [][]byte{forcedTxData}, + }).Once() + + lastState := s.getLastState() + data1 := makeData(gen.ChainID, 1, 1) + data1.Txs[0] = types.Tx([]byte("regular_tx")) + _, hdr1 := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, lastState.AppHash, data1, nil) + + mockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(1), mock.Anything, lastState.AppHash). + Return([]byte("app1"), uint64(1024), nil).Once() + + evt1 := common.DAHeightEvent{ + Header: hdr1, + Data: data1, + Source: common.SourceP2P, + DaHeightHints: [2]uint64{0, 0}, + } + + // First block processes fine (forced tx can be deferred within grace period) + s.processHeightEvent(&evt1) + + select { + case err := <-errChan: + t.Fatalf("unexpected error on first block: %v", err) + default: + } + + // Verify block was processed + h, err := st.Height(context.Background()) + require.NoError(t, err) + require.Equal(t, uint64(1), h) + + // Now process second block past grace boundary + // Mock DA for epoch 2 (past grace boundary) + client.On("Retrieve", mock.Anything, uint64(2), []byte("nsForcedInclusion")).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusNotFound, Timestamp: time.Now()}, + }).Once() + + // Set the DA height hint to be past grace boundary (epoch 0 end + grace period of 1 epoch = boundary at 1, so 2 is past) + lastState = s.getLastState() + data2 := makeData(gen.ChainID, 2, 1) + data2.Txs[0] = types.Tx([]byte("regular_tx_2")) + _, hdr2 := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, lastState.AppHash, data2, hdr1.Hash()) + + evt2 := common.DAHeightEvent{ + Header: hdr2, + Data: data2, + Source: common.SourceP2P, + DaHeightHints: [2]uint64{2, 2}, // DA height 2 is past grace boundary + } + + // Second block should fail with malicious sequencer error + s.processHeightEvent(&evt2) + + // Verify critical error was sent + select { + case err := <-errChan: + require.Error(t, err) + require.Contains(t, err.Error(), "sequencer malicious", "should detect malicious sequencer") + case <-time.After(100 * time.Millisecond): + t.Fatal("expected malicious sequencer error to be sent to error channel") + } + + // Verify block 2 was NOT synced + h, err = st.Height(context.Background()) + require.NoError(t, err) + require.Equal(t, uint64(1), h, "block 2 should not have been synced due to malicious sequencer") + }) +}