Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,11 @@ packages:
dir: ./test/mocks
pkgname: mocks
filename: da.go
github.com/evstack/ev-node/pkg/da/types:
interfaces:
Verifier:
config:
dir: ./test/mocks
pkgname: mocks
filename: da_verifier.go
filename: da.go
github.com/evstack/ev-node/pkg/da/jsonrpc:
interfaces:
BlobModule:
Expand Down
8 changes: 1 addition & 7 deletions apps/evm/cmd/rollback.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,9 @@ import (
"errors"
"fmt"

ds "github.com/ipfs/go-datastore"
kt "github.com/ipfs/go-datastore/keytransform"
"github.com/spf13/cobra"

goheaderstore "github.com/celestiaorg/go-header/store"
"github.com/evstack/ev-node/node"
rollcmd "github.com/evstack/ev-node/pkg/cmd"
"github.com/evstack/ev-node/pkg/store"
"github.com/evstack/ev-node/types"
Expand Down Expand Up @@ -50,10 +47,7 @@ func NewRollbackCmd() *cobra.Command {
}()

// prefixed evolve db
evolveDB := kt.Wrap(rawEvolveDB, &kt.PrefixTransform{
Prefix: ds.NewKey(node.EvPrefix),
})

evolveDB := store.NewEvNodeKVStore(rawEvolveDB)
evolveStore := store.New(evolveDB)
if height == 0 {
currentHeight, err := evolveStore.Height(goCtx)
Expand Down
11 changes: 3 additions & 8 deletions apps/evm/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ var RunCmd = &cobra.Command{
}

// Create sequencer based on configuration
sequencer, err := createSequencer(context.Background(), logger, datastore, nodeConfig, genesis, daClient)
sequencer, err := createSequencer(logger, datastore, nodeConfig, genesis, daClient)
if err != nil {
return err
}
Expand Down Expand Up @@ -154,22 +154,20 @@ func init() {
// If BasedSequencer is enabled, it creates a based sequencer that fetches transactions from DA.
// Otherwise, it creates a single (traditional) sequencer.
func createSequencer(
ctx context.Context,
logger zerolog.Logger,
datastore datastore.Batching,
nodeConfig config.Config,
genesis genesis.Genesis,
daClient block.FullDAClient,
) (coresequencer.Sequencer, error) {
fiRetriever := block.NewForcedInclusionRetriever(daClient, genesis, logger)

if nodeConfig.Node.BasedSequencer {
// Based sequencer mode - fetch transactions only from DA
if !nodeConfig.Node.Aggregator {
return nil, fmt.Errorf("based sequencer mode requires aggregator mode to be enabled")
}

basedSeq, err := based.NewBasedSequencer(ctx, fiRetriever, datastore, genesis, logger)
fiRetriever := block.NewForcedInclusionRetriever(daClient, logger, genesis.DAStartHeight, genesis.DAEpochForcedInclusion)
basedSeq, err := based.NewBasedSequencer(fiRetriever, datastore, genesis, logger)
if err != nil {
return nil, fmt.Errorf("failed to create based sequencer: %w", err)
}
Expand All @@ -183,15 +181,12 @@ func createSequencer(
}

sequencer, err := single.NewSequencer(
ctx,
logger,
datastore,
daClient,
[]byte(genesis.ChainID),
nodeConfig.Node.BlockTime.Duration,
nodeConfig.Node.Aggregator,
1000,
fiRetriever,
genesis,
)
if err != nil {
Expand Down
7 changes: 2 additions & 5 deletions apps/grpc/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,15 @@ func createSequencer(
}

daClient := block.NewDAClient(blobClient, nodeConfig, logger)
fiRetriever := block.NewForcedInclusionRetriever(daClient, genesis, logger)

if nodeConfig.Node.BasedSequencer {
// Based sequencer mode - fetch transactions only from DA
if !nodeConfig.Node.Aggregator {
return nil, fmt.Errorf("based sequencer mode requires aggregator mode to be enabled")
}

basedSeq, err := based.NewBasedSequencer(ctx, fiRetriever, datastore, genesis, logger)
fiRetriever := block.NewForcedInclusionRetriever(daClient, logger, genesis.DAStartHeight, genesis.DAEpochForcedInclusion)
basedSeq, err := based.NewBasedSequencer(fiRetriever, datastore, genesis, logger)
if err != nil {
return nil, fmt.Errorf("failed to create based sequencer: %w", err)
}
Expand All @@ -142,15 +142,12 @@ func createSequencer(
}

sequencer, err := single.NewSequencer(
ctx,
logger,
datastore,
daClient,
[]byte(genesis.ChainID),
nodeConfig.Node.BlockTime.Duration,
nodeConfig.Node.Aggregator,
1000,
fiRetriever,
genesis,
)
if err != nil {
Expand Down
8 changes: 1 addition & 7 deletions apps/testapp/cmd/rollback.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,11 @@ import (
"fmt"

kvexecutor "github.com/evstack/ev-node/apps/testapp/kv"
"github.com/evstack/ev-node/node"
rollcmd "github.com/evstack/ev-node/pkg/cmd"
"github.com/evstack/ev-node/pkg/store"
"github.com/evstack/ev-node/types"

goheaderstore "github.com/celestiaorg/go-header/store"
ds "github.com/ipfs/go-datastore"
kt "github.com/ipfs/go-datastore/keytransform"
"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -51,10 +48,7 @@ func NewRollbackCmd() *cobra.Command {
}()

// prefixed evolve db
evolveDB := kt.Wrap(rawEvolveDB, &kt.PrefixTransform{
Prefix: ds.NewKey(node.EvPrefix),
})

evolveDB := store.NewEvNodeKVStore(rawEvolveDB)
evolveStore := store.New(evolveDB)
if height == 0 {
currentHeight, err := evolveStore.Height(goCtx)
Expand Down
7 changes: 2 additions & 5 deletions apps/testapp/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,15 @@ func createSequencer(
}

daClient := block.NewDAClient(blobClient, nodeConfig, logger)
fiRetriever := block.NewForcedInclusionRetriever(daClient, genesis, logger)

if nodeConfig.Node.BasedSequencer {
// Based sequencer mode - fetch transactions only from DA
if !nodeConfig.Node.Aggregator {
return nil, fmt.Errorf("based sequencer mode requires aggregator mode to be enabled")
}

basedSeq, err := based.NewBasedSequencer(ctx, fiRetriever, datastore, genesis, logger)
fiRetriever := block.NewForcedInclusionRetriever(daClient, logger, genesis.DAStartHeight, genesis.DAEpochForcedInclusion)
basedSeq, err := based.NewBasedSequencer(fiRetriever, datastore, genesis, logger)
if err != nil {
return nil, fmt.Errorf("failed to create based sequencer: %w", err)
}
Expand All @@ -143,15 +143,12 @@ func createSequencer(
}

sequencer, err := single.NewSequencer(
ctx,
logger,
datastore,
daClient,
[]byte(genesis.ChainID),
nodeConfig.Node.BlockTime.Duration,
nodeConfig.Node.Aggregator,
1000,
fiRetriever,
genesis,
)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions block/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func NewSyncComponents(
config,
genesis,
daSubmitter,
nil, // No sequencer for sync nodes
nil, // No signer for sync nodes
logger,
errorCh,
Expand Down Expand Up @@ -250,6 +251,7 @@ func NewAggregatorComponents(
config,
genesis,
daSubmitter,
sequencer,
signer, // Signer for aggregator nodes to submit to DA
logger,
errorCh,
Expand Down
26 changes: 15 additions & 11 deletions block/internal/da/forced_inclusion_retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/rs/zerolog"

datypes "github.com/evstack/ev-node/pkg/da/types"
"github.com/evstack/ev-node/pkg/genesis"
"github.com/evstack/ev-node/types"
)

Expand All @@ -18,10 +17,10 @@ var ErrForceInclusionNotConfigured = errors.New("forced inclusion namespace not

// ForcedInclusionRetriever handles retrieval of forced inclusion transactions from DA.
type ForcedInclusionRetriever struct {
client Client
genesis genesis.Genesis
logger zerolog.Logger
daEpochSize uint64
client Client
logger zerolog.Logger
daEpochSize uint64
daStartHeight uint64
}

// ForcedInclusionEvent contains forced inclusion transactions retrieved from DA.
Expand All @@ -35,25 +34,30 @@ type ForcedInclusionEvent struct {
// NewForcedInclusionRetriever creates a new forced inclusion retriever.
func NewForcedInclusionRetriever(
client Client,
genesis genesis.Genesis,
logger zerolog.Logger,
daStartHeight, daEpochSize uint64,
) *ForcedInclusionRetriever {
return &ForcedInclusionRetriever{
client: client,
genesis: genesis,
logger: logger.With().Str("component", "forced_inclusion_retriever").Logger(),
daEpochSize: genesis.DAEpochForcedInclusion,
client: client,
logger: logger.With().Str("component", "forced_inclusion_retriever").Logger(),
daStartHeight: daStartHeight,
daEpochSize: daEpochSize,
}
}

// RetrieveForcedIncludedTxs retrieves forced inclusion transactions at the given DA height.
// It respects epoch boundaries and only fetches at epoch start.
func (r *ForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context, daHeight uint64) (*ForcedInclusionEvent, error) {
// when daStartHeight is not set or no namespace is configured, we retrieve nothing.
if !r.client.HasForcedInclusionNamespace() {
return nil, ErrForceInclusionNotConfigured
}

epochStart, epochEnd, currentEpochNumber := types.CalculateEpochBoundaries(daHeight, r.genesis.DAStartHeight, r.daEpochSize)
if daHeight < r.daStartHeight {
return nil, fmt.Errorf("DA height %d is before the configured start height %d", daHeight, r.daStartHeight)
}

epochStart, epochEnd, currentEpochNumber := types.CalculateEpochBoundaries(daHeight, r.daStartHeight, r.daEpochSize)

if daHeight != epochEnd {
r.logger.Debug().
Expand Down
16 changes: 8 additions & 8 deletions block/internal/da/forced_inclusion_retriever_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestNewForcedInclusionRetriever(t *testing.T) {
DAEpochForcedInclusion: 10,
}

retriever := NewForcedInclusionRetriever(client, gen, zerolog.Nop())
retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion)
assert.Assert(t, retriever != nil)
}

Expand All @@ -37,7 +37,7 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_NoNamespace(t *testi
DAEpochForcedInclusion: 10,
}

retriever := NewForcedInclusionRetriever(client, gen, zerolog.Nop())
retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion)
ctx := context.Background()

_, err := retriever.RetrieveForcedIncludedTxs(ctx, 100)
Expand All @@ -56,7 +56,7 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_NotAtEpochStart(t *t
DAEpochForcedInclusion: 10,
}

retriever := NewForcedInclusionRetriever(client, gen, zerolog.Nop())
retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion)
ctx := context.Background()

// Height 105 is not an epoch start (100, 110, 120, etc. are epoch starts)
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_EpochStartSuccess(t
DAEpochForcedInclusion: 1, // Single height epoch
}

retriever := NewForcedInclusionRetriever(client, gen, zerolog.Nop())
retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion)
ctx := context.Background()

// Height 100 is an epoch start
Expand All @@ -116,7 +116,7 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_EpochStartNotAvailab
DAEpochForcedInclusion: 10,
}

retriever := NewForcedInclusionRetriever(client, gen, zerolog.Nop())
retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion)
ctx := context.Background()

// Epoch boundaries: [100, 109] - retrieval happens at epoch end (109)
Expand All @@ -139,7 +139,7 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_NoBlobsAtHeight(t *t
DAEpochForcedInclusion: 1, // Single height epoch
}

retriever := NewForcedInclusionRetriever(client, gen, zerolog.Nop())
retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion)
ctx := context.Background()

event, err := retriever.RetrieveForcedIncludedTxs(ctx, 100)
Expand Down Expand Up @@ -177,7 +177,7 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_MultiHeightEpoch(t *
DAEpochForcedInclusion: 3, // Epoch: 100-102
}

retriever := NewForcedInclusionRetriever(client, gen, zerolog.Nop())
retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion)
ctx := context.Background()

// Epoch boundaries: [100, 102] - retrieval happens at epoch end (102)
Expand All @@ -201,7 +201,7 @@ func TestForcedInclusionRetriever_processForcedInclusionBlobs(t *testing.T) {
DAEpochForcedInclusion: 10,
}

retriever := NewForcedInclusionRetriever(client, gen, zerolog.Nop())
retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion)

tests := []struct {
name string
Expand Down
3 changes: 2 additions & 1 deletion block/internal/executing/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ func (e *Executor) initializeState() error {
LastBlockHeight: e.genesis.InitialHeight - 1,
LastBlockTime: e.genesis.StartTime,
AppHash: stateRoot,
DAHeight: e.genesis.DAStartHeight,
// DA start height is usually 0 at InitChain unless it is a re-genesis or a based sequencer.
DAHeight: e.genesis.DAStartHeight,
}
}

Expand Down
18 changes: 14 additions & 4 deletions block/internal/submitting/submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/evstack/ev-node/block/internal/cache"
"github.com/evstack/ev-node/block/internal/common"
coreexecutor "github.com/evstack/ev-node/core/execution"
coresequencer "github.com/evstack/ev-node/core/sequencer"
"github.com/evstack/ev-node/pkg/config"
"github.com/evstack/ev-node/pkg/genesis"
"github.com/evstack/ev-node/pkg/signer"
Expand All @@ -31,10 +32,11 @@ type daSubmitterAPI interface {
// Submitter handles DA submission and inclusion processing for both sync and aggregator nodes
type Submitter struct {
// Core components
store store.Store
exec coreexecutor.Executor
config config.Config
genesis genesis.Genesis
store store.Store
exec coreexecutor.Executor
sequencer coresequencer.Sequencer
config config.Config
genesis genesis.Genesis

// Shared components
cache cache.Manager
Expand Down Expand Up @@ -74,6 +76,7 @@ func NewSubmitter(
config config.Config,
genesis genesis.Genesis,
daSubmitter daSubmitterAPI,
sequencer coresequencer.Sequencer, // Can be nil for sync nodes
signer signer.Signer, // Can be nil for sync nodes
logger zerolog.Logger,
errorCh chan<- error,
Expand All @@ -86,6 +89,7 @@ func NewSubmitter(
config: config,
genesis: genesis,
daSubmitter: daSubmitter,
sequencer: sequencer,
signer: signer,
daIncludedHeight: &atomic.Uint64{},
errorCh: errorCh,
Expand Down Expand Up @@ -364,6 +368,12 @@ func (s *Submitter) setSequencerHeightToDAHeight(ctx context.Context, height uin
if err := s.store.SetMetadata(ctx, store.GenesisDAHeightKey, genesisDAIncludedHeightBytes); err != nil {
return err
}

// the sequencer will process DA epochs from this height.
if s.sequencer != nil {
s.sequencer.SetDAHeight(genesisDAIncludedHeight)
s.logger.Debug().Uint64("genesis_da_height", genesisDAIncludedHeight).Msg("initialized sequencer DA height from persisted genesis DA height")
}
}

return nil
Expand Down
Loading