From b7b801a24de952274e8ae608b55b9e0b07ba968a Mon Sep 17 00:00:00 2001 From: Draco Date: Tue, 25 Nov 2025 14:14:51 -0500 Subject: [PATCH 1/2] feat(evm): add database with separate block storage --- vms/evm/database/blockdb/database.go | 528 +++++++++++++++++++++ vms/evm/database/blockdb/database_test.go | 526 ++++++++++++++++++++ vms/evm/database/blockdb/helpers_test.go | 172 +++++++ vms/evm/database/blockdb/migration_test.go | 298 ++++++++++++ vms/evm/database/blockdb/migrator.go | 512 ++++++++++++++++++++ 5 files changed, 2036 insertions(+) create mode 100644 vms/evm/database/blockdb/database.go create mode 100644 vms/evm/database/blockdb/database_test.go create mode 100644 vms/evm/database/blockdb/helpers_test.go create mode 100644 vms/evm/database/blockdb/migration_test.go create mode 100644 vms/evm/database/blockdb/migrator.go diff --git a/vms/evm/database/blockdb/database.go b/vms/evm/database/blockdb/database.go new file mode 100644 index 000000000000..d8cb30ff4d53 --- /dev/null +++ b/vms/evm/database/blockdb/database.go @@ -0,0 +1,528 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package blockdb + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + "path/filepath" + + "github.com/ava-labs/libevm/common" + "github.com/ava-labs/libevm/ethdb" + "github.com/ava-labs/libevm/rlp" + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" + + "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/database/heightindexdb/meterdb" + "github.com/ava-labs/avalanchego/database/prefixdb" + "github.com/ava-labs/avalanchego/utils/logging" + + heightindexdb "github.com/ava-labs/avalanchego/x/blockdb" +) + +var ( + _ ethdb.Database = (*Database)(nil) + _ ethdb.Batch = (*batch)(nil) + + migratorDBPrefix = []byte("migrator") + + // blockDBMinHeightKey stores the minimum block height of the + // height-indexed block databases. + // It is set at initialization and cannot be changed without + // recreating the databases. + blockDBMinHeightKey = []byte("blockdb_min_height") + + errUnexpectedKey = errors.New("unexpected database key") + errNotInitialized = errors.New("database not initialized") + errAlreadyInitialized = errors.New("database already initialized") + errInvalidEncodedLength = errors.New("invalid encoded length") +) + +// Key prefixes for block data in [ethdb.Database]. +// This is copied from libevm because they are not exported. +// Since the prefixes should never be changed, we can avoid libevm changes by +// duplicating them here. +var ( + evmHeaderPrefix = []byte("h") + evmBlockBodyPrefix = []byte("b") + evmReceiptsPrefix = []byte("r") +) + +const ( + hashDataElements = 2 + blockNumberSize = 8 + blockHashSize = 32 + + headerDBName = "headerdb" + bodyDBName = "bodydb" + receiptsDBName = "receiptsdb" +) + +// Database wraps an [ethdb.Database] and routes block headers, bodies, and receipts +// to separate [database.HeightIndex] databases for blocks at or above the minimum height. +// All other data uses the underlying [ethdb.Database] directly. +type Database struct { + ethdb.Database + + // Databases + stateDB database.Database + headerDB database.HeightIndex + bodyDB database.HeightIndex + receiptsDB database.HeightIndex + + // Configuration + config heightindexdb.DatabaseConfig + dbPath string + minHeight uint64 + + migrator *migrator + heightDBsReady bool + + reg prometheus.Registerer + logger logging.Logger +} + +// New creates a new [Database] over the provided [ethdb.Database]. +// +// If allowDeferredInit is true and no minimum block height is known, +// New defers initializing the height-indexed block databases until +// [Database.InitBlockDBs] is called. +// +// The bool result is true if the block databases were initialized immediately, +// and false if initialization was deferred. +func New( + stateDB database.Database, + evmDB ethdb.Database, + dbPath string, + allowDeferredInit bool, + config heightindexdb.DatabaseConfig, + logger logging.Logger, + reg prometheus.Registerer, +) (*Database, bool, error) { + db := &Database{ + stateDB: stateDB, + Database: evmDB, + dbPath: dbPath, + config: config, + reg: reg, + logger: logger, + } + + minHeight, ok, err := databaseMinHeight(db.stateDB) + if err != nil { + return nil, false, err + } + + // Databases already exist, load with existing min height. + if ok { + if err := db.InitBlockDBs(minHeight); err != nil { + return nil, false, err + } + return db, true, nil + } + + // Initialize using the minimum block height of existing blocks to migrate. + minHeight, ok, err = minBlockHeightToMigrate(evmDB) + if err != nil { + return nil, false, err + } + if ok { + if err := db.InitBlockDBs(minHeight); err != nil { + return nil, false, err + } + return db, true, nil + } + + // Initialize with min height 1 if deferred initialization is not allowed. + if !allowDeferredInit { + if err := db.InitBlockDBs(1); err != nil { + return nil, false, err + } + return db, true, nil + } + + db.logger.Info( + "Deferring block database initialization until minimum height is known", + ) + return db, false, nil +} + +// InitBlockDBs initializes [database.HeightIndex] databases with the specified +// minimum height. +// Once initialized, the minimum height cannot be changed without recreating +// the databases. +// +// Returns an error if already initialized. +func (db *Database) InitBlockDBs(minHeight uint64) error { + if db.heightDBsReady { + return errAlreadyInitialized + } + + if err := db.stateDB.Put(blockDBMinHeightKey, encodeBlockNumber(minHeight)); err != nil { + return err + } + headerDB, err := db.newMeteredHeightDB(headerDBName, minHeight) + if err != nil { + return err + } + bodyDB, err := db.newMeteredHeightDB(bodyDBName, minHeight) + if err != nil { + return errors.Join(err, headerDB.Close()) + } + receiptsDB, err := db.newMeteredHeightDB(receiptsDBName, minHeight) + if err != nil { + return errors.Join(err, headerDB.Close(), bodyDB.Close()) + } + db.headerDB = headerDB + db.bodyDB = bodyDB + db.receiptsDB = receiptsDB + + if err := db.initMigrator(); err != nil { + return errors.Join( + fmt.Errorf("failed to initialize migrator: %w", err), + headerDB.Close(), + bodyDB.Close(), + receiptsDB.Close(), + ) + } + + db.heightDBsReady = true + db.minHeight = minHeight + + db.logger.Info( + "Initialized height-indexed block databases", + zap.Uint64("minHeight", db.minHeight), + ) + + return nil +} + +// StartMigration begins the background migration of block data from the +// [ethdb.Database] to the height-indexed block databases. +// +// Returns an error if the databases are not initialized. +// No error if already running. +func (db *Database) StartMigration() error { + if !db.heightDBsReady { + return errNotInitialized + } + db.migrator.start() + return nil +} + +func (db *Database) Put(key []byte, value []byte) error { + if !db.useHeightIndexedDB(key) { + return db.Database.Put(key, value) + } + + heightDB, err := db.heightDBForKey(key) + if err != nil { + return err + } + num, hash, err := parseBlockKey(key) + if err != nil { + return err + } + return writeHashAndData(heightDB, num, hash, value) +} + +func (db *Database) Get(key []byte) ([]byte, error) { + if !db.useHeightIndexedDB(key) { + return db.Database.Get(key) + } + + heightDB, err := db.heightDBForKey(key) + if err != nil { + return nil, err + } + return readHashAndData(heightDB, db.Database, key, db.migrator) +} + +func (db *Database) Has(key []byte) (bool, error) { + if !db.useHeightIndexedDB(key) { + return db.Database.Has(key) + } + + _, err := db.Get(key) + if err != nil { + if errors.Is(err, database.ErrNotFound) { + return false, nil + } + return false, err + } + return true, nil +} + +// Delete removes the key from the underlying database for non-block data. +// Block data deletion is a no-op because [database.HeightIndex] does not support deletion. +func (db *Database) Delete(key []byte) error { + if !db.useHeightIndexedDB(key) { + return db.Database.Delete(key) + } + + num, hash, err := parseBlockKey(key) + if err != nil { + return err + } + db.logger.Warn( + "Deleting block data is a no-op", + zap.Uint64("height", num), + zap.Stringer("hash", hash), + ) + return nil +} + +func (db *Database) Close() error { + if db.migrator != nil { + db.migrator.stop() + } + if !db.heightDBsReady { + return db.Database.Close() + } + + // Don't close stateDB since the caller should be managing it. + return errors.Join( + db.headerDB.Close(), + db.bodyDB.Close(), + db.receiptsDB.Close(), + db.Database.Close(), + ) +} + +func (db *Database) initMigrator() error { + if db.migrator != nil { + return nil + } + mdb := prefixdb.New(migratorDBPrefix, db.stateDB) + migrator, err := newMigrator( + mdb, + db.headerDB, + db.bodyDB, + db.receiptsDB, + db.Database, + db.logger, + ) + if err != nil { + return err + } + db.migrator = migrator + return nil +} + +func (db *Database) newMeteredHeightDB( + namespace string, + minHeight uint64, +) (database.HeightIndex, error) { + path := filepath.Join(db.dbPath, namespace) + config := db.config.WithDir(path).WithMinimumHeight(minHeight) + ndb, err := heightindexdb.New(config, db.logger) + if err != nil { + return nil, fmt.Errorf("failed to create %s database at %s: %w", namespace, path, err) + } + + mdb, err := meterdb.New(db.reg, namespace, ndb) + if err != nil { + return nil, errors.Join( + fmt.Errorf("failed to create metered %s database: %w", namespace, err), + ndb.Close(), + ) + } + + return mdb, nil +} + +func (db *Database) heightDBForKey(key []byte) (database.HeightIndex, error) { + switch { + case isHeaderKey(key): + return db.headerDB, nil + case isBodyKey(key): + return db.bodyDB, nil + case isReceiptsKey(key): + return db.receiptsDB, nil + default: + return nil, errUnexpectedKey + } +} + +func (db *Database) useHeightIndexedDB(key []byte) bool { + if !db.heightDBsReady { + return false + } + + var n int + switch { + case isBodyKey(key): + n = len(evmBlockBodyPrefix) + case isHeaderKey(key): + n = len(evmHeaderPrefix) + case isReceiptsKey(key): + n = len(evmReceiptsPrefix) + default: + return false + } + num := binary.BigEndian.Uint64(key[n : n+blockNumberSize]) + return num >= db.minHeight +} + +type batch struct { + ethdb.Batch + db *Database +} + +func (db *Database) NewBatch() ethdb.Batch { + return &batch{ + db: db, + Batch: db.Database.NewBatch(), + } +} + +func (db *Database) NewBatchWithSize(size int) ethdb.Batch { + return &batch{ + db: db, + Batch: db.Database.NewBatchWithSize(size), + } +} + +func (b *batch) Put(key []byte, value []byte) error { + if b.db.useHeightIndexedDB(key) { + return b.db.Put(key, value) + } + return b.Batch.Put(key, value) +} + +func (b *batch) Delete(key []byte) error { + if b.db.useHeightIndexedDB(key) { + return b.db.Delete(key) + } + return b.Batch.Delete(key) +} + +func parseBlockKey(key []byte) (num uint64, hash common.Hash, err error) { + var n int + switch { + case isBodyKey(key): + n = len(evmBlockBodyPrefix) + case isHeaderKey(key): + n = len(evmHeaderPrefix) + case isReceiptsKey(key): + n = len(evmReceiptsPrefix) + default: + return 0, common.Hash{}, errUnexpectedKey + } + num = binary.BigEndian.Uint64(key[n : n+blockNumberSize]) + bytes := key[n+blockNumberSize:] + if len(bytes) != blockHashSize { + return 0, common.Hash{}, fmt.Errorf("invalid hash length: %d", len(bytes)) + } + hash = common.BytesToHash(bytes) + return num, hash, nil +} + +func encodeBlockNumber(number uint64) []byte { + enc := make([]byte, blockNumberSize) + binary.BigEndian.PutUint64(enc, number) + return enc +} + +func isBodyKey(key []byte) bool { + if len(key) != len(evmBlockBodyPrefix)+blockNumberSize+blockHashSize { + return false + } + return bytes.HasPrefix(key, evmBlockBodyPrefix) +} + +func isHeaderKey(key []byte) bool { + if len(key) != len(evmHeaderPrefix)+blockNumberSize+blockHashSize { + return false + } + return bytes.HasPrefix(key, evmHeaderPrefix) +} + +func isReceiptsKey(key []byte) bool { + if len(key) != len(evmReceiptsPrefix)+blockNumberSize+blockHashSize { + return false + } + return bytes.HasPrefix(key, evmReceiptsPrefix) +} + +func databaseMinHeight(db database.KeyValueReader) (uint64, bool, error) { + minBytes, err := db.Get(blockDBMinHeightKey) + if err != nil { + if errors.Is(err, database.ErrNotFound) { + return 0, false, nil + } + return 0, false, err + } + if len(minBytes) != blockNumberSize { + return 0, false, fmt.Errorf("%w: min height expected %d bytes, got %d", errInvalidEncodedLength, blockNumberSize, len(minBytes)) + } + return binary.BigEndian.Uint64(minBytes), true, nil +} + +func writeHashAndData( + db database.HeightIndex, + height uint64, + hash common.Hash, + data []byte, +) error { + encoded, err := rlp.EncodeToBytes([][]byte{hash.Bytes(), data}) + if err != nil { + return err + } + return db.Put(height, encoded) +} + +// readHashAndData reads data from [database.HeightIndex] and falls back +// to the [ethdb.Database] if the data is not found and migration is not complete. +func readHashAndData( + heightDB database.HeightIndex, + evmDB ethdb.KeyValueReader, + key []byte, + migrator *migrator, +) ([]byte, error) { + num, hash, err := parseBlockKey(key) + if err != nil { + return nil, err + } + data, err := heightDB.Get(num) + if err != nil { + if errors.Is(err, database.ErrNotFound) && !migrator.isCompleted() { + return evmDB.Get(key) + } + return nil, err + } + + var elems [][]byte + if err := rlp.DecodeBytes(data, &elems); err != nil { + return nil, err + } + if len(elems) != hashDataElements { + err := fmt.Errorf( + "invalid hash+data format: expected %d elements, got %d", + hashDataElements, + len(elems), + ) + return nil, err + } + if common.BytesToHash(elems[0]) != hash { + // Hash mismatch means we are trying to read a different block at this height. + return nil, database.ErrNotFound + } + + return elems[1], nil +} + +// IsEnabled checks if blockdb has ever been initialized. +// It returns true if the minimum block height key exists, indicating the +// block database has been created and initialized with a minimum height. +func IsEnabled(db database.KeyValueReader) (bool, error) { + has, err := db.Has(blockDBMinHeightKey) + if err != nil { + return false, err + } + return has, nil +} diff --git a/vms/evm/database/blockdb/database_test.go b/vms/evm/database/blockdb/database_test.go new file mode 100644 index 000000000000..550b56f0d9b6 --- /dev/null +++ b/vms/evm/database/blockdb/database_test.go @@ -0,0 +1,526 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package blockdb + +import ( + "os" + "slices" + "testing" + + "github.com/ava-labs/coreth/params" + "github.com/ava-labs/coreth/plugin/evm/customtypes" + "github.com/ava-labs/libevm/core/rawdb" + "github.com/ava-labs/libevm/core/types" + "github.com/ava-labs/libevm/ethdb" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + + "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/database/leveldb" + "github.com/ava-labs/avalanchego/utils/logging" + + evmdb "github.com/ava-labs/avalanchego/vms/evm/database" + heightindexdb "github.com/ava-labs/avalanchego/x/blockdb" +) + +func TestMain(m *testing.M) { + customtypes.Register() + params.RegisterExtras() + os.Exit(m.Run()) +} + +func TestDatabaseWriteAndReadBlock(t *testing.T) { + dataDir := t.TempDir() + db, _ := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 10) + writeBlocks(db, blocks, receipts) + + for _, block := range blocks { + actualBlock := rawdb.ReadBlock(db, block.Hash(), block.NumberU64()) + requireRLPEqual(t, block, actualBlock) + } +} + +func TestDatabaseWriteAndReadReceipts(t *testing.T) { + dataDir := t.TempDir() + db, _ := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 10) + writeBlocks(db, blocks, receipts) + + for i, block := range blocks { + require.True(t, rawdb.HasReceipts(db, block.Hash(), block.NumberU64())) + actualReceipts := rawdb.ReadReceipts( + db, block.Hash(), block.NumberU64(), block.Time(), params.TestChainConfig, + ) + requireRLPEqual(t, receipts[i], actualReceipts) + } +} + +func TestDatabaseReadLogs(t *testing.T) { + dataDir := t.TempDir() + db, _ := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 10) + writeBlocks(db, blocks, receipts) + + for i, block := range blocks { + actualLogs := rawdb.ReadLogs(db, block.Hash(), block.NumberU64()) + recs := receipts[i] + requireRLPEqual(t, logsFromReceipts(recs), actualLogs) + } +} + +func TestDatabaseDeleteBlocksNoOp(t *testing.T) { + // Verifies that block header, body and receipts cannot be deleted (no-op), + // but hash to height mapping should be deleted. + tests := []struct { + name string + useBatch bool + batchSize int + }{ + {name: "delete block data is a no-op"}, + {name: "batch delete", useBatch: true}, + {name: "batch delete with size", useBatch: true, batchSize: 1024}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + dataDir := t.TempDir() + db, _ := newDatabasesFromDir(t, dataDir) + allBlocks, allReceipts := createBlocks(t, 4) + blocks := allBlocks[1:] // skip genesis block + receipts := allReceipts[1:] + writeBlocks(db, blocks, receipts) + + // perform delete operations on all blocks + if tc.useBatch { + var batch ethdb.Batch + if tc.batchSize > 0 { + batch = db.NewBatchWithSize(tc.batchSize) + } else { + batch = db.NewBatch() + } + + for _, block := range blocks { + rawdb.DeleteBlock(batch, block.Hash(), block.NumberU64()) + } + require.NoError(t, batch.Write()) + } else { + for _, block := range blocks { + rawdb.DeleteBlock(db, block.Hash(), block.NumberU64()) + } + } + + for i, block := range blocks { + actualBlock := rawdb.ReadBlock(db, block.Hash(), block.NumberU64()) + requireRLPEqual(t, block, actualBlock) + require.True(t, rawdb.HasReceipts(db, block.Hash(), block.NumberU64())) + expReceipts := receipts[i] + logs := rawdb.ReadLogs(db, block.Hash(), block.NumberU64()) + requireRLPEqual(t, logsFromReceipts(expReceipts), logs) + + // hash -> number mapping should be deleted + num := rawdb.ReadHeaderNumber(db, block.Hash()) + require.Nil(t, num) + } + }) + } +} + +func TestDatabaseWriteToHeightIndexedDB(t *testing.T) { + dataDir := t.TempDir() + db, evmDB := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 2) + writeBlocks(db, blocks, receipts) + block := blocks[1] + + // verify no block data in evmDB + require.False(t, rawdb.HasHeader(evmDB, block.Hash(), block.NumberU64())) + require.False(t, rawdb.HasBody(evmDB, block.Hash(), block.NumberU64())) + require.False(t, rawdb.HasReceipts(evmDB, block.Hash(), block.NumberU64())) + + // verify block data in height-indexed databases + ok, err := db.headerDB.Has(block.NumberU64()) + require.NoError(t, err) + require.True(t, ok) + ok, err = db.bodyDB.Has(block.NumberU64()) + require.NoError(t, err) + require.True(t, ok) + ok, err = db.receiptsDB.Has(block.NumberU64()) + require.NoError(t, err) + require.True(t, ok) +} + +func TestDatabaseNewBatch(t *testing.T) { + dataDir := t.TempDir() + db, _ := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 2) + block := blocks[1] + batch := db.NewBatch() + writeBlocks(batch, blocks, receipts) + + // after adding blocks to batch, blocks and receipts should be available immediately + require.True(t, rawdb.HasBody(db, block.Hash(), block.NumberU64())) + require.True(t, rawdb.HasHeader(db, block.Hash(), block.NumberU64())) + require.True(t, rawdb.HasReceipts(db, block.Hash(), block.NumberU64())) + + // header number should not be available until batch is written + require.Nil(t, rawdb.ReadHeaderNumber(db, block.Hash())) + require.NoError(t, batch.Write()) + num := rawdb.ReadHeaderNumber(db, block.Hash()) + require.Equal(t, block.NumberU64(), *num) +} + +func TestDatabaseNewBatchWithSize(t *testing.T) { + dataDir := t.TempDir() + db, _ := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 2) + batch := db.NewBatchWithSize(2048) + writeBlocks(batch, blocks, receipts) + require.NoError(t, batch.Write()) + + for _, block := range blocks { + require.True(t, rawdb.HasHeader(db, block.Hash(), block.NumberU64())) + require.True(t, rawdb.HasBody(db, block.Hash(), block.NumberU64())) + require.True(t, rawdb.HasReceipts(db, block.Hash(), block.NumberU64())) + } +} + +func TestDatabaseWriteSameBlockTwice(t *testing.T) { + dataDir := t.TempDir() + db, _ := newDatabasesFromDir(t, dataDir) + blocks, _ := createBlocks(t, 2) + block := blocks[1] + + // write same block twice + rawdb.WriteBlock(db, block) + rawdb.WriteBlock(db, block) + + // we should be able to read the block after duplicate writes + actualBlock := rawdb.ReadBlock(db, block.Hash(), block.NumberU64()) + requireRLPEqual(t, block, actualBlock) +} + +func TestDatabaseWriteDifferentBlocksAtSameHeight(t *testing.T) { + dataDir := t.TempDir() + db, _ := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 2) + b1 := blocks[1] + r1 := receipts[1] + + // create a second block with the same height but different tx + to := addrFromTest(t, "different-to") + blocks2, receipts2 := createBlocksToAddr(t, 2, to) + b2 := blocks2[1] + r2 := receipts2[1] + + // ensure both blocks have the same height but different hashes + require.Equal(t, b1.NumberU64(), b2.NumberU64()) + require.NotEqual(t, b1.Hash(), b2.Hash()) + + writeBlocks(db, []*types.Block{b1, b2}, []types.Receipts{r1, r2}) + + // reading by the first block's hash should not return anything + require.Nil(t, rawdb.ReadHeader(db, b1.Hash(), b1.NumberU64())) + require.Nil(t, rawdb.ReadBody(db, b1.Hash(), b1.NumberU64())) + require.Nil(t, rawdb.ReadReceipts(db, b1.Hash(), b1.NumberU64(), b1.Time(), params.TestChainConfig)) + + // reading by the second block's hash returns second block data + requireRLPEqual(t, b2, rawdb.ReadBlock(db, b2.Hash(), b2.NumberU64())) + actualReceipts := rawdb.ReadReceipts(db, b2.Hash(), b2.NumberU64(), b2.Time(), params.TestChainConfig) + requireRLPEqual(t, r2, actualReceipts) +} + +func TestDatabaseReopen(t *testing.T) { + dataDir := t.TempDir() + db, _ := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 2) + writeBlocks(db, blocks, receipts) + b1 := blocks[1] + r1 := receipts[1] + + // close db and verify we can no longer read block data + require.NoError(t, db.Close()) + block := rawdb.ReadBlock(db, b1.Hash(), b1.NumberU64()) + require.Nil(t, block) + recs := rawdb.ReadReceipts(db, b1.Hash(), b1.NumberU64(), b1.Time(), params.TestChainConfig) + require.Nil(t, recs) + _, err := db.headerDB.Get(b1.NumberU64()) + require.ErrorIs(t, err, database.ErrClosed) + + // reopen the database and data can be read again + db, _ = newDatabasesFromDir(t, dataDir) + block = rawdb.ReadBlock(db, b1.Hash(), b1.NumberU64()) + requireRLPEqual(t, b1, block) + actualReceipts := rawdb.ReadReceipts(db, b1.Hash(), b1.NumberU64(), b1.Time(), params.TestChainConfig) + requireRLPEqual(t, r1, actualReceipts) +} + +func TestDatabaseInitialization(t *testing.T) { + blocks, _ := createBlocks(t, 10) + + tests := []struct { + name string + deferInit bool + evmDBBlocks []*types.Block + dbMinHeight uint64 + wantDBReady bool + wantMinHeight uint64 + }{ + { + name: "empty evmDB and no deferred init", + wantDBReady: true, + wantMinHeight: 1, + }, + { + name: "empty evmDB and deferred init", + deferInit: true, + wantDBReady: false, // db should not be ready due to deferred init + }, + { + name: "non genesis blocks to migrate", + evmDBBlocks: blocks[5:10], + wantDBReady: true, + wantMinHeight: 5, + }, + { + name: "blocks to migrate - including genesis", + evmDBBlocks: slices.Concat([]*types.Block{blocks[0]}, blocks[5:10]), + wantDBReady: true, + wantMinHeight: 5, + }, + { + name: "blocks to migrate and deferred init", + deferInit: true, + evmDBBlocks: blocks[5:10], + wantDBReady: true, + wantMinHeight: 5, + }, + { + name: "existing db created with min height", + evmDBBlocks: blocks[5:8], + dbMinHeight: 2, + wantDBReady: true, + wantMinHeight: 2, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + dataDir := t.TempDir() + base, err := leveldb.New(dataDir, nil, logging.NoLog{}, prometheus.NewRegistry()) + require.NoError(t, err) + evmDB := rawdb.NewDatabase(evmdb.New(base)) + + // create block databases with existing min height if needed + if tc.dbMinHeight > 0 { + db := Database{ + stateDB: base, + Database: evmDB, + dbPath: dataDir, + config: heightindexdb.DefaultConfig(), + reg: prometheus.NewRegistry(), + logger: logging.NoLog{}, + } + require.NoError(t, db.InitBlockDBs(tc.dbMinHeight)) + require.NoError(t, db.headerDB.Close()) + require.NoError(t, db.bodyDB.Close()) + require.NoError(t, db.receiptsDB.Close()) + minHeight, ok, err := databaseMinHeight(base) + require.NoError(t, err) + require.True(t, ok) + require.Equal(t, tc.dbMinHeight, minHeight) + } + + writeBlocks(evmDB, tc.evmDBBlocks, []types.Receipts{}) + db, _, err := New( + base, + evmDB, + dataDir, + tc.deferInit, + heightindexdb.DefaultConfig(), + logging.NoLog{}, + prometheus.NewRegistry(), + ) + require.NoError(t, err) + require.Equal(t, tc.wantDBReady, db.heightDBsReady, "database ready mismatch") + require.Equal(t, tc.wantMinHeight, db.minHeight, "database min height mismatch") + }) + } +} + +func TestDatabaseGenesisBlockHandling(t *testing.T) { + // Verifies that genesis blocks (block 0) only exist in evmDB and not + // in the height-indexed databases. + dataDir := t.TempDir() + db, evmDB := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 1) // first block is genesis + writeBlocks(db, blocks, receipts) + + // Validate genesis block can be retrieved and is stored in evmDB. + hash := rawdb.ReadCanonicalHash(evmDB, 0) + block := rawdb.ReadBlock(db, hash, 0) + requireRLPEqual(t, blocks[0], block) + _, err := db.headerDB.Get(0) + require.ErrorIs(t, err, database.ErrNotFound) + _, err = db.receiptsDB.Get(0) + require.ErrorIs(t, err, database.ErrNotFound) + require.Equal(t, uint64(1), db.minHeight) +} + +func TestDatabaseInitBlockDBs(t *testing.T) { + dataDir := t.TempDir() + base, err := leveldb.New(dataDir, nil, logging.NoLog{}, prometheus.NewRegistry()) + require.NoError(t, err) + evmDB := rawdb.NewDatabase(evmdb.New(base)) + + db, initialized, err := New( + base, + evmDB, + dataDir, + true, + heightindexdb.DefaultConfig(), + logging.NoLog{}, + prometheus.NewRegistry(), + ) + require.NoError(t, err) + require.False(t, initialized) + + require.NoError(t, db.InitBlockDBs(10)) + require.Equal(t, uint64(10), db.minHeight) +} + +func TestDatabaseMinHeightWrites(t *testing.T) { + // Verifies writes are gated by minHeight: below threshold go to evmDB, + // at/above threshold go to height-index DBs. + dataDir := t.TempDir() + base, err := leveldb.New(dataDir, nil, logging.NoLog{}, prometheus.NewRegistry()) + require.NoError(t, err) + evmDB := rawdb.NewDatabase(evmdb.New(base)) + + db, _, err := New( + base, + evmDB, + dataDir, + true, + heightindexdb.DefaultConfig(), + logging.NoLog{}, + prometheus.NewRegistry(), + ) + require.NoError(t, err) + require.NoError(t, db.InitBlockDBs(10)) + blocks, receipts := createBlocks(t, 11) + + // write block 9 (below minHeight) and block 10 (at minHeight) + writeBlocks(db, blocks[9:11], receipts[9:11]) + + // below threshold should not be in height DBs but in kvDB + has, err := db.headerDB.Has(9) + require.NoError(t, err) + require.False(t, has) + has, err = db.bodyDB.Has(9) + require.NoError(t, err) + require.False(t, has) + has, err = db.receiptsDB.Has(9) + require.NoError(t, err) + require.False(t, has) + require.True(t, rawdb.HasHeader(evmDB, blocks[9].Hash(), 9)) + require.True(t, rawdb.HasBody(evmDB, blocks[9].Hash(), 9)) + require.True(t, rawdb.HasReceipts(evmDB, blocks[9].Hash(), 9)) + + // at/above threshold should be in height DBs + _, err = db.bodyDB.Get(10) + require.NoError(t, err) + _, err = db.headerDB.Get(10) + require.NoError(t, err) + _, err = db.receiptsDB.Get(10) + require.NoError(t, err) + require.Nil(t, rawdb.ReadBlock(evmDB, blocks[10].Hash(), 10)) + require.False(t, rawdb.HasReceipts(evmDB, blocks[10].Hash(), 10)) +} + +func TestDatabaseMinHeightReturnsErrorOnInvalidEncoding(t *testing.T) { + dataDir := t.TempDir() + db, err := leveldb.New(dataDir, nil, logging.NoLog{}, prometheus.NewRegistry()) + require.NoError(t, err) + + // write an incorrectly sized value for blockDBMinHeightKey + require.NoError(t, db.Put(blockDBMinHeightKey, []byte{0x01})) + + _, ok, err := databaseMinHeight(db) + require.ErrorIs(t, err, errInvalidEncodedLength) + require.False(t, ok) +} + +func TestDatabaseHasReturnsFalseOnHashMismatch(t *testing.T) { + dataDir := t.TempDir() + db, _ := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 3) + writeBlocks(db, blocks[1:3], receipts[1:3]) + + // fetch block 2 with block 1's hash + require.False(t, rawdb.HasHeader(db, blocks[1].Hash(), blocks[2].NumberU64())) + require.False(t, rawdb.HasBody(db, blocks[1].Hash(), blocks[2].NumberU64())) + require.False(t, rawdb.HasReceipts(db, blocks[1].Hash(), blocks[2].NumberU64())) +} + +func TestDatabaseAlreadyInitializedError(t *testing.T) { + dataDir := t.TempDir() + db, _ := newDatabasesFromDir(t, dataDir) + + err := db.InitBlockDBs(5) + require.ErrorIs(t, err, errAlreadyInitialized) + require.Equal(t, uint64(1), db.minHeight) +} + +func TestDatabaseGetNotFoundOnHashMismatch(t *testing.T) { + dataDir := t.TempDir() + db, _ := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 3) + writeBlocks(db, blocks, receipts) + + // get block 1 with block 0's hash + _, err := db.Get(blockHeaderKey(1, blocks[0].Hash())) + require.ErrorIs(t, err, database.ErrNotFound) + _, err = db.Get(blockBodyKey(1, blocks[0].Hash())) + require.ErrorIs(t, err, database.ErrNotFound) + _, err = db.Get(receiptsKey(1, blocks[0].Hash())) + require.ErrorIs(t, err, database.ErrNotFound) +} + +func TestIsEnabled(t *testing.T) { + // Verifies database min height is set on first init. + dataDir := t.TempDir() + base, err := leveldb.New(dataDir, nil, logging.NoLog{}, prometheus.NewRegistry()) + require.NoError(t, err) + evmDB := rawdb.NewDatabase(evmdb.New(base)) + + // initially not enabled + enabled, err := IsEnabled(base) + require.NoError(t, err) + require.False(t, enabled) + + // create db but don't initialize + db, initialized, err := New( + base, + evmDB, + dataDir, + true, + heightindexdb.DefaultConfig(), + logging.NoLog{}, + prometheus.NewRegistry(), + ) + require.NoError(t, err) + require.False(t, initialized) + + // not enabled since InitBlockDBs was not called + enabled, err = IsEnabled(base) + require.NoError(t, err) + require.False(t, enabled) + + // now enabled + require.NoError(t, db.InitBlockDBs(10)) + enabled, err = IsEnabled(base) + require.NoError(t, err) + require.True(t, enabled) +} diff --git a/vms/evm/database/blockdb/helpers_test.go b/vms/evm/database/blockdb/helpers_test.go new file mode 100644 index 000000000000..971dbf79f367 --- /dev/null +++ b/vms/evm/database/blockdb/helpers_test.go @@ -0,0 +1,172 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package blockdb + +import ( + "fmt" + "math/big" + "slices" + "testing" + "time" + + "github.com/ava-labs/coreth/consensus/dummy" + "github.com/ava-labs/coreth/core" + "github.com/ava-labs/coreth/params" + "github.com/ava-labs/libevm/common" + "github.com/ava-labs/libevm/core/rawdb" + "github.com/ava-labs/libevm/core/types" + "github.com/ava-labs/libevm/crypto" + "github.com/ava-labs/libevm/ethdb" + "github.com/ava-labs/libevm/rlp" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + + "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/database/leveldb" + "github.com/ava-labs/avalanchego/utils/logging" + + evmdb "github.com/ava-labs/avalanchego/vms/evm/database" + heightindexdb "github.com/ava-labs/avalanchego/x/blockdb" +) + +// blockingDatabase wraps a HeightIndex and blocks indefinitely on Put +// if shouldBlock returns true. +type blockingDatabase struct { + database.HeightIndex + shouldBlock func() bool +} + +func (b *blockingDatabase) Put(num uint64, encodedBlock []byte) error { + if b.shouldBlock == nil || b.shouldBlock() { + <-make(chan struct{}) + } + return b.HeightIndex.Put(num, encodedBlock) +} + +func newDatabasesFromDir(t *testing.T, dataDir string) (*Database, ethdb.Database) { + t.Helper() + + base, err := leveldb.New(dataDir, nil, logging.NoLog{}, prometheus.NewRegistry()) + require.NoError(t, err) + evmDB := rawdb.NewDatabase(evmdb.New(base)) + db, _, err := New( + base, + evmDB, + dataDir, + false, + heightindexdb.DefaultConfig(), + logging.NoLog{}, + prometheus.NewRegistry(), + ) + require.NoError(t, err) + return db, evmDB +} + +// addrFromTest returns a deterministic address derived from the test name and supplied salt. +func addrFromTest(t *testing.T, salt string) common.Address { + t.Helper() + h := crypto.Keccak256Hash([]byte(t.Name() + ":" + salt)) + return common.BytesToAddress(h.Bytes()[12:]) +} + +// createBlocksToAddr generates blocks with a single funded sender and a tx to the provided recipient. +func createBlocksToAddr(t *testing.T, numBlocks int, to common.Address) ([]*types.Block, []types.Receipts) { + t.Helper() + + key1, _ := crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + addr1 := crypto.PubkeyToAddress(key1.PublicKey) + gspec := &core.Genesis{ + Config: params.TestChainConfig, + Alloc: types.GenesisAlloc{addr1: {Balance: big.NewInt(params.Ether)}}, + } + engine := dummy.NewFaker() + signer := types.LatestSigner(params.TestChainConfig) + gap := uint64(10) + db, blocks, receipts, err := core.GenerateChainWithGenesis( + gspec, engine, numBlocks-1, gap, func(_ int, gen *core.BlockGen) { + tx, _ := types.SignTx(types.NewTx(&types.DynamicFeeTx{ + ChainID: params.TestChainConfig.ChainID, + Nonce: gen.TxNonce(addr1), + To: &to, + Gas: 500_000, + GasTipCap: big.NewInt(1), + GasFeeCap: big.NewInt(1), + }), signer, key1) + gen.AddTx(tx) + }) + require.NoError(t, err) + + // add genesis block since generated blocks and receipts don't include it + genHash := rawdb.ReadCanonicalHash(db, 0) + genBlock := rawdb.ReadBlock(db, genHash, 0) + genReceipts := rawdb.ReadReceipts(db, genHash, 0, 0, params.TestChainConfig) + blocks = slices.Concat([]*types.Block{genBlock}, blocks) + receipts = slices.Concat([]types.Receipts{genReceipts}, receipts) + + return blocks, receipts +} + +func createBlocks(t *testing.T, numBlocks int) ([]*types.Block, []types.Receipts) { + t.Helper() + to := addrFromTest(t, "default-to") + return createBlocksToAddr(t, numBlocks, to) +} + +func writeBlocks(db ethdb.KeyValueWriter, blocks []*types.Block, receipts []types.Receipts) { + for i, block := range blocks { + rawdb.WriteBlock(db, block) + if i < len(receipts) { + rawdb.WriteReceipts(db, block.Hash(), block.NumberU64(), receipts[i]) + } + rawdb.WriteCanonicalHash(db, block.Hash(), block.NumberU64()) + } +} + +func requireRLPEqual(t *testing.T, expected, actual interface{}) { + t.Helper() + expectedBytes, err := rlp.EncodeToBytes(expected) + require.NoError(t, err) + actualBytes, err := rlp.EncodeToBytes(actual) + require.NoError(t, err) + require.Equal(t, expectedBytes, actualBytes) +} + +func logsFromReceipts(receipts types.Receipts) [][]*types.Log { + logs := make([][]*types.Log, len(receipts)) + for i := range receipts { + logs[i] = receipts[i].Logs + } + return logs +} + +func startPartialMigration(t *testing.T, db *Database, blocksToMigrate uint64) { + t.Helper() + + n := uint64(0) + db.migrator.headerDB = &blockingDatabase{ + HeightIndex: db.headerDB, + shouldBlock: func() bool { + n++ + return n > blocksToMigrate + }, + } + startMigration(t, db, false) + require.Eventually(t, func() bool { + return db.migrator.processed.Load() >= blocksToMigrate + }, 5*time.Second, 100*time.Millisecond) +} + +func startMigration(t *testing.T, db *Database, waitForCompletion bool) { + t.Helper() + + db.migrator.completed.Store(false) + require.NoError(t, db.StartMigration()) + + if waitForCompletion { + timeout := 5 * time.Second + msg := fmt.Sprintf("Migration did not complete within timeout: %v", timeout) + require.True(t, db.migrator.waitMigratorDone(timeout), msg) + require.True(t, db.migrator.isCompleted()) + } +} diff --git a/vms/evm/database/blockdb/migration_test.go b/vms/evm/database/blockdb/migration_test.go new file mode 100644 index 000000000000..810a3bf9eb5d --- /dev/null +++ b/vms/evm/database/blockdb/migration_test.go @@ -0,0 +1,298 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package blockdb + +import ( + "slices" + "testing" + + "github.com/ava-labs/libevm/core/rawdb" + "github.com/ava-labs/libevm/core/types" + "github.com/ava-labs/libevm/params" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + + "github.com/ava-labs/avalanchego/database/leveldb" + "github.com/ava-labs/avalanchego/utils/logging" + + evmdb "github.com/ava-labs/avalanchego/vms/evm/database" + heightindexdb "github.com/ava-labs/avalanchego/x/blockdb" +) + +func TestMigrationCompletion(t *testing.T) { + tests := []struct { + name string + want bool + dataToMigrate bool + migrate bool + }{ + { + name: "completed when no data to migrate", + want: true, + }, + { + name: "not completed if data to migrate", + dataToMigrate: true, + }, + { + name: "completed after migration", + dataToMigrate: true, + migrate: true, + want: true, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + dataDir := t.TempDir() + base, err := leveldb.New(dataDir, nil, logging.NoLog{}, prometheus.NewRegistry()) + require.NoError(t, err) + evmDB := rawdb.NewDatabase(evmdb.New(base)) + + if tc.dataToMigrate { + blocks, receipts := createBlocks(t, 5) + writeBlocks(evmDB, blocks, receipts) + } + + db, _, err := New( + base, + evmDB, + dataDir, + false, + heightindexdb.DefaultConfig(), + logging.NoLog{}, + prometheus.NewRegistry(), + ) + require.NoError(t, err) + if tc.migrate { + startMigration(t, db, true) + } + require.Equal(t, tc.want, db.migrator.isCompleted()) + }) + } +} + +func TestMigrationInProcess(t *testing.T) { + // Verifies blocks are readable during migration for both migrated + // and un-migrated blocks. + // The test generates 21 blocks, migrates 20 but pauses after 5, + // writes block 21, and verifies migrated and un-migrated blocks are readable. + dataDir := t.TempDir() + db, evmDB := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 21) + + // add blocks 0-19 to KVDB to migrate + writeBlocks(evmDB, blocks[0:20], receipts[0:20]) + + // migrate blocks 1-6 + startPartialMigration(t, db, 6) + + // write block 20 to simulate new block being added during migration + writeBlocks(db, blocks[20:21], receipts[20:21]) + + // verify all 21 blocks are readable via the db + for i, block := range blocks { + num := block.NumberU64() + expReceipts := receipts[i] + + // We should be able to fetch block, receipts and logs. + actualBlock := rawdb.ReadBlock(db, block.Hash(), num) + requireRLPEqual(t, block, actualBlock) + actualReceipts := rawdb.ReadReceipts(db, block.Hash(), num, block.Time(), params.TestChainConfig) + requireRLPEqual(t, expReceipts, actualReceipts) + actualLogs := rawdb.ReadLogs(db, block.Hash(), num) + requireRLPEqual(t, logsFromReceipts(expReceipts), actualLogs) + + // header number should also be readable + actualNum := rawdb.ReadHeaderNumber(db, block.Hash()) + require.NotNil(t, actualNum) + require.Equal(t, num, *actualNum) + + // Block 1-6 and 20 should be migrated, others should not. + has, err := db.headerDB.Has(num) + require.NoError(t, err) + migrated := (num >= 1 && num <= 6) || num == 20 + require.Equal(t, migrated, has) + } +} + +func TestMigrationStart(t *testing.T) { + tests := []struct { + name string + toMigrateHeights []uint64 + migratedHeights []uint64 + }{ + { + name: "migrate blocks 0-4", + toMigrateHeights: []uint64{0, 1, 2, 3, 4}, + }, + { + name: "migrate blocks 20-24", + toMigrateHeights: []uint64{20, 21, 22, 23, 24}, + }, + { + name: "migrate non consecutive blocks", + toMigrateHeights: []uint64{20, 21, 22, 29, 30, 40}, + }, + { + name: "migrated 0-5 and to migrate 6-10", + toMigrateHeights: []uint64{6, 7, 8, 9, 10}, + migratedHeights: []uint64{0, 1, 2, 3, 4, 5}, + }, + { + name: "all blocks migrated", + migratedHeights: []uint64{0, 1, 2, 3, 4, 5}, + }, + { + name: "no blocks to migrate or migrated", + }, + { + name: "non consecutive blocks migrated and blocks to migrate", + toMigrateHeights: []uint64{2, 3, 7, 8, 10}, + migratedHeights: []uint64{0, 1, 4, 5, 9}, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + dataDir := t.TempDir() + db, evmDB := newDatabasesFromDir(t, dataDir) + allHeights := slices.Concat(tc.toMigrateHeights, tc.migratedHeights) + var maxHeight uint64 + if len(allHeights) > 0 { + maxHeight = slices.Max(allHeights) + } + blocks, receipts := createBlocks(t, int(maxHeight)+1) + + // set initial db state + for _, height := range tc.toMigrateHeights { + writeBlocks(evmDB, []*types.Block{blocks[height]}, []types.Receipts{receipts[height]}) + } + for _, height := range tc.migratedHeights { + writeBlocks(db, []*types.Block{blocks[height]}, []types.Receipts{receipts[height]}) + } + + // Verify all blocks and receipts are accessible after migration. + startMigration(t, db, true) + for _, height := range allHeights { + expBlock := blocks[height] + expReceipts := receipts[height] + block := rawdb.ReadBlock(db, expBlock.Hash(), height) + requireRLPEqual(t, expBlock, block) + receipts := rawdb.ReadReceipts(db, expBlock.Hash(), height, expBlock.Time(), params.TestChainConfig) + requireRLPEqual(t, expReceipts, receipts) + logs := rawdb.ReadLogs(db, expBlock.Hash(), height) + requireRLPEqual(t, logsFromReceipts(expReceipts), logs) + + // Verify evmDB no longer has any blocks or receipts (except for genesis). + hasData := height == 0 + require.Equal(t, hasData, rawdb.HasHeader(evmDB, expBlock.Hash(), height)) + require.Equal(t, hasData, rawdb.HasBody(evmDB, expBlock.Hash(), height)) + require.Equal(t, hasData, rawdb.HasReceipts(evmDB, expBlock.Hash(), height)) + } + }) + } +} + +func TestMigrationResume(t *testing.T) { + // Verifies migration can be stopped mid-run and resumed. + dataDir := t.TempDir() + db, evmDB := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 10) + writeBlocks(evmDB, blocks, receipts) + + // block migration after 3 blocks + startPartialMigration(t, db, 3) + require.False(t, db.migrator.isCompleted()) + + for i := 0; i < 10; i++ { + migrated := i >= 1 && i <= 3 // blocks 1-3 are migrated + has, err := db.bodyDB.Has(uint64(i)) + require.NoError(t, err) + require.Equal(t, migrated, has) + } + + // stop migration and start again + require.NoError(t, db.Database.Close()) + db, _ = newDatabasesFromDir(t, dataDir) + require.False(t, db.migrator.isCompleted()) + startMigration(t, db, true) + + // verify all blocks are accessible after migration + for i, block := range blocks { + num := block.NumberU64() + hash := block.Hash() + actualBlock := rawdb.ReadBlock(db, hash, num) + requireRLPEqual(t, block, actualBlock) + actualReceipts := rawdb.ReadReceipts(db, hash, num, block.Time(), params.TestChainConfig) + requireRLPEqual(t, receipts[i], actualReceipts) + } +} + +func TestMigrationSkipsGenesis(t *testing.T) { + dataDir := t.TempDir() + base, err := leveldb.New(dataDir, nil, logging.NoLog{}, prometheus.NewRegistry()) + require.NoError(t, err) + evmDB := rawdb.NewDatabase(evmdb.New(base)) + blocks, receipts := createBlocks(t, 10) + writeBlocks(evmDB, blocks[0:1], receipts[0:1]) + writeBlocks(evmDB, blocks[5:10], receipts[5:10]) + + db, _, err := New( + base, + evmDB, + dataDir, + false, + heightindexdb.DefaultConfig(), + logging.NoLog{}, + prometheus.NewRegistry(), + ) + require.NoError(t, err) + require.True(t, db.heightDBsReady) + require.Equal(t, uint64(5), db.minHeight) + + // migrate and verify genesis block is not migrated + startMigration(t, db, true) + require.True(t, db.migrator.isCompleted()) + genHash := rawdb.ReadCanonicalHash(evmDB, 0) + require.True(t, rawdb.HasHeader(evmDB, genHash, 0)) + has, err := db.bodyDB.Has(0) + require.NoError(t, err) + require.False(t, has) +} + +func TestMigrationWithoutReceipts(t *testing.T) { + dataDir := t.TempDir() + base, err := leveldb.New(dataDir, nil, logging.NoLog{}, prometheus.NewRegistry()) + require.NoError(t, err) + evmDB := rawdb.NewDatabase(evmdb.New(base)) + blocks, _ := createBlocks(t, 5) + + // write blocks without receipts to evmDB + for _, block := range blocks { + rawdb.WriteBlock(evmDB, block) + rawdb.WriteCanonicalHash(evmDB, block.Hash(), block.NumberU64()) + } + + db, initialized, err := New( + base, + evmDB, + dataDir, + false, + heightindexdb.DefaultConfig(), + logging.NoLog{}, + prometheus.NewRegistry(), + ) + require.NoError(t, err) + require.True(t, initialized) + startMigration(t, db, true) + require.True(t, db.migrator.isCompleted()) + + // verify all blocks are accessible and receipts are nil + for _, block := range blocks { + actualBlock := rawdb.ReadBlock(db, block.Hash(), block.NumberU64()) + requireRLPEqual(t, block, actualBlock) + recs := rawdb.ReadReceipts(db, block.Hash(), block.NumberU64(), block.Time(), params.TestChainConfig) + require.Nil(t, recs) + } +} diff --git a/vms/evm/database/blockdb/migrator.go b/vms/evm/database/blockdb/migrator.go new file mode 100644 index 000000000000..a44deb27bc14 --- /dev/null +++ b/vms/evm/database/blockdb/migrator.go @@ -0,0 +1,512 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package blockdb + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + "slices" + "sync" + "sync/atomic" + "time" + + "github.com/ava-labs/libevm/common" + "github.com/ava-labs/libevm/core/rawdb" + "github.com/ava-labs/libevm/ethdb" + "github.com/ava-labs/libevm/rlp" + "go.uber.org/zap" + + "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/utils/timer" +) + +const ( + // logProgressInterval controls how often migration progress is logged. + logProgressInterval = 30 * time.Second + // compactionInterval is the number of blocks to process before compacting the database. + compactionInterval = 250_000 + // stopTimeout is the maximum time to wait for migration to stop gracefully. + // 5 seconds allows cleanup operations to complete without blocking shutdown indefinitely. + stopTimeout = 5 * time.Second +) + +// targetBlockHeightKey stores the head height captured at first run for ETA. +var targetBlockHeightKey = []byte("migration_target_block_height") + +type migrator struct { + // Databases + evmDB ethdb.Database + headerDB database.HeightIndex + bodyDB database.HeightIndex + receiptsDB database.HeightIndex + + // Concurrency control + mu sync.Mutex // protects cancel and done + cancel context.CancelFunc + done chan struct{} + + // Migration state + completed atomic.Bool + processed atomic.Uint64 + endHeight uint64 + + logger logging.Logger +} + +func newMigrator( + db database.Database, + headerDB database.HeightIndex, + bodyDB database.HeightIndex, + receiptsDB database.HeightIndex, + evmDB ethdb.Database, + logger logging.Logger, +) (*migrator, error) { + m := &migrator{ + headerDB: headerDB, + bodyDB: bodyDB, + receiptsDB: receiptsDB, + evmDB: evmDB, + logger: logger, + } + + _, ok, err := minBlockHeightToMigrate(evmDB) + if err != nil { + return nil, err + } + if !ok { + m.completed.Store(true) + m.logger.Info("No block data to migrate; migration already complete") + return m, nil + } + + // load saved end block height + endHeight, ok, err := targetBlockHeight(db) + if err != nil { + return nil, err + } + if !ok { + // load and save head block number as end block height + if num, ok := headBlockNumber(evmDB); ok { + endHeight = num + if err := writeTargetBlockHeight(db, endHeight); err != nil { + return nil, err + } + m.logger.Info( + "Migration target height set", + zap.Uint64("targetHeight", endHeight), + ) + } + } + m.endHeight = endHeight + + return m, nil +} + +func (m *migrator) isCompleted() bool { + return m.completed.Load() +} + +func (m *migrator) stop() { + // Snapshot cancel/done under lock to avoid data race with endRun. + // We must release the lock before waiting on done to prevent deadlock. + m.mu.Lock() + cancel := m.cancel + done := m.done + m.mu.Unlock() + + if cancel == nil { + return // no active migration + } + + cancel() + select { + case <-done: + // worker finished cleanup + case <-time.After(stopTimeout): + m.logger.Warn("Migration shutdown timeout exceeded") + } +} + +// start begins the migration process in a background goroutine. +// Returns immediately if migration is already completed or running. +func (m *migrator) start() { + if m.isCompleted() { + return + } + ctx, ok := m.beginRun() + if !ok { + m.logger.Warn("Migration already running") + return + } + + go func() { + defer func() { + close(m.done) + m.endRun() + }() + if err := m.run(ctx); err != nil { + if !errors.Is(err, context.Canceled) { + m.logger.Error("Migration failed", zap.Error(err)) + } + } + }() +} + +func (m *migrator) beginRun() (context.Context, bool) { + m.mu.Lock() + defer m.mu.Unlock() + + if m.cancel != nil { + return nil, false // migration already running + } + ctx, cancel := context.WithCancel(context.Background()) + m.cancel = cancel + m.done = make(chan struct{}) + m.processed.Store(0) + return ctx, true +} + +func (m *migrator) endRun() { + m.mu.Lock() + defer m.mu.Unlock() + + m.cancel = nil + m.done = nil +} + +func (m *migrator) run(ctx context.Context) error { + var ( + // Progress tracking + etaTarget uint64 // target # of blocks to process + etaTracker = timer.NewEtaTracker(10, 1) + start = time.Now() + nextLog = start.Add(logProgressInterval) + + // Batch to accumulate delete operations before writing + batch = m.evmDB.NewBatch() + lastCompact uint64 // blocks processed at last compaction + + // Compaction tracking + canCompact bool + startBlockNum uint64 + endBlockNum uint64 + + // Iterate over block bodies instead of headers since there are keys + // under the header prefix that we are not migrating (e.g., hash mappings). + iter = m.evmDB.NewIterator(evmBlockBodyPrefix, nil) + ) + + defer func() { + iter.Release() + + if batch.ValueSize() > 0 { + if err := batch.Write(); err != nil { + m.logger.Error("Failed to write final delete batch", zap.Error(err)) + } + } + + // Compact final range if we processed any blocks after last interval compaction. + if canCompact { + m.compactBlockRange(startBlockNum, endBlockNum) + } + + duration := time.Since(start) + m.logger.Info( + "Block data migration ended", + zap.Uint64("targetHeight", m.endHeight), + zap.Uint64("blocksProcessed", m.processed.Load()), + zap.Uint64("lastProcessedHeight", endBlockNum), + zap.Duration("duration", duration), + zap.Bool("completed", m.isCompleted()), + ) + }() + + m.logger.Info( + "Block data migration started", + zap.Uint64("targetHeight", m.endHeight), + ) + + // Iterate over all block bodies in ascending order by block number. + for iter.Next() { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + key := iter.Key() + if !isMigratableKey(m.evmDB, key) { + continue + } + + blockNum, hash, err := parseBlockKey(key) + if err != nil { + return err + } + + if etaTarget == 0 && m.endHeight > 0 && blockNum < m.endHeight { + etaTarget = m.endHeight - blockNum + etaTracker.AddSample(0, etaTarget, start) + } + + // track the range of blocks for compaction + if !canCompact { + startBlockNum = blockNum + canCompact = true + } + endBlockNum = blockNum + + if err := m.migrateHeader(blockNum, hash); err != nil { + return fmt.Errorf("failed to migrate header data: %w", err) + } + if err := m.migrateBody(blockNum, hash, iter.Value()); err != nil { + return fmt.Errorf("failed to migrate body data: %w", err) + } + if err := m.migrateReceipts(blockNum, hash); err != nil { + return fmt.Errorf("failed to migrate receipts data: %w", err) + } + if err := deleteBlock(batch, blockNum, hash); err != nil { + return fmt.Errorf("failed to add block deletes to batch: %w", err) + } + processed := m.processed.Add(1) + + if batch.ValueSize() > ethdb.IdealBatchSize { + if err := batch.Write(); err != nil { + return fmt.Errorf("failed to write delete batch: %w", err) + } + batch.Reset() + } + + // compact every compactionInterval blocks + if canCompact && processed-lastCompact >= compactionInterval { + // write any remaining deletes in batch before compaction + if batch.ValueSize() > 0 { + if err := batch.Write(); err != nil { + return fmt.Errorf("failed to write delete batch before compaction: %w", err) + } + batch.Reset() + } + + iter.Release() + m.compactBlockRange(startBlockNum, endBlockNum) + startKey := encodeBlockNumber(blockNum + 1) + newIter := m.evmDB.NewIterator(evmBlockBodyPrefix, startKey) + iter = newIter + lastCompact = processed + canCompact = false + } + + // log progress every logProgressInterval + if now := time.Now(); now.After(nextLog) { + fields := []zap.Field{ + zap.Uint64("blocksProcessed", processed), + zap.Uint64("lastProcessedHeight", blockNum), + zap.Duration("timeElapsed", time.Since(start)), + } + if etaTarget > 0 { + eta, pct := etaTracker.AddSample(processed, etaTarget, now) + if eta != nil { + fields = append(fields, + zap.Duration("eta", *eta), + zap.String("progress", fmt.Sprintf("%.2f%%", pct)), + ) + } + } + + m.logger.Info("Block data migration progress", fields...) + nextLog = now.Add(logProgressInterval) + } + } + + if iter.Error() != nil { + return fmt.Errorf("failed to iterate over evmDB: %w", iter.Error()) + } + + m.completed.Store(true) + return nil +} + +func (m *migrator) compactBlockRange(startNum, endNum uint64) { + start := time.Now() + + compactRange(m.evmDB, blockHeaderKey, startNum, endNum, m.logger) + compactRange(m.evmDB, blockBodyKey, startNum, endNum, m.logger) + compactRange(m.evmDB, receiptsKey, startNum, endNum, m.logger) + + m.logger.Info("Compaction of block range completed", + zap.Uint64("startHeight", startNum), + zap.Uint64("endHeight", endNum), + zap.Duration("duration", time.Since(start))) +} + +func (m *migrator) migrateHeader(num uint64, hash common.Hash) error { + header := rawdb.ReadHeader(m.evmDB, hash, num) + if header == nil { + return fmt.Errorf("header not found for block %d hash %s", num, hash) + } + hBytes, err := rlp.EncodeToBytes(header) + if err != nil { + return fmt.Errorf("failed to encode block header: %w", err) + } + if err := writeHashAndData(m.headerDB, num, hash, hBytes); err != nil { + return fmt.Errorf("failed to write header to headerDB: %w", err) + } + return nil +} + +func (m *migrator) migrateBody(num uint64, hash common.Hash, body []byte) error { + if err := writeHashAndData(m.bodyDB, num, hash, body); err != nil { + return fmt.Errorf("failed to write body to bodyDB: %w", err) + } + return nil +} + +func (m *migrator) migrateReceipts(num uint64, hash common.Hash) error { + receipts := rawdb.ReadReceiptsRLP(m.evmDB, hash, num) + if receipts == nil { + return nil + } + + if err := writeHashAndData(m.receiptsDB, num, hash, receipts); err != nil { + return fmt.Errorf("failed to write receipts to receiptsDB: %w", err) + } + return nil +} + +// waitMigratorDone waits until the current migration run completes. +// If timeout <= 0, it waits indefinitely. +// Returns true if completed, false on timeout. +func (m *migrator) waitMigratorDone(timeout time.Duration) bool { + // Snapshot done to avoid race with goroutine cleanup + m.mu.Lock() + done := m.done + m.mu.Unlock() + + if done == nil { + return true + } + if timeout <= 0 { + <-done + return true + } + t := time.NewTimer(timeout) + defer t.Stop() + select { + case <-done: + return true + case <-t.C: + return false + } +} + +func deleteBlock(db ethdb.KeyValueWriter, num uint64, hash common.Hash) error { + // rawdb.DeleteHeader is not used to avoid deleting number/hash mappings. + headerKey := blockHeaderKey(num, hash) + if err := db.Delete(headerKey); err != nil { + return fmt.Errorf("failed to delete header from evmDB: %w", err) + } + rawdb.DeleteBody(db, hash, num) + rawdb.DeleteReceipts(db, hash, num) + return nil +} + +func targetBlockHeight(db database.KeyValueReader) (uint64, bool, error) { + has, err := db.Has(targetBlockHeightKey) + if err != nil { + return 0, false, err + } + if !has { + return 0, false, nil + } + numBytes, err := db.Get(targetBlockHeightKey) + if err != nil { + return 0, false, err + } + if len(numBytes) != blockNumberSize { + return 0, false, fmt.Errorf("invalid block number encoding length: %d", len(numBytes)) + } + height := binary.BigEndian.Uint64(numBytes) + return height, true, nil +} + +func headBlockNumber(db ethdb.KeyValueReader) (uint64, bool) { + hash := rawdb.ReadHeadHeaderHash(db) + num := rawdb.ReadHeaderNumber(db, hash) + if num == nil || *num == 0 { + return 0, false + } + return *num, true +} + +func writeTargetBlockHeight(db database.KeyValueWriter, endHeight uint64) error { + return db.Put(targetBlockHeightKey, encodeBlockNumber(endHeight)) +} + +func isMigratableKey(db ethdb.Reader, key []byte) bool { + if !isBodyKey(key) { + return false + } + num, hash, err := parseBlockKey(key) + if err != nil { + return false + } + + // Skip genesis since all vms have it and to benefit from being able to have a + // minimum height greater than 0 when state sync is enabled. + if num == 0 { + return false + } + + canonHash := rawdb.ReadCanonicalHash(db, num) + return canonHash == hash +} + +func blockHeaderKey(num uint64, hash common.Hash) []byte { + return slices.Concat(evmHeaderPrefix, encodeBlockNumber(num), hash.Bytes()) +} + +func blockBodyKey(num uint64, hash common.Hash) []byte { + return slices.Concat(evmBlockBodyPrefix, encodeBlockNumber(num), hash.Bytes()) +} + +func receiptsKey(num uint64, hash common.Hash) []byte { + return slices.Concat(evmReceiptsPrefix, encodeBlockNumber(num), hash.Bytes()) +} + +func minBlockHeightToMigrate(db ethdb.Database) (uint64, bool, error) { + iter := db.NewIterator(evmBlockBodyPrefix, nil) + defer iter.Release() + + for iter.Next() { + key := iter.Key() + if !isMigratableKey(db, key) { + continue + } + num, _, err := parseBlockKey(key) + if err != nil { + return 0, false, err + } + return num, true, nil + } + return 0, false, iter.Error() +} + +func compactRange( + db ethdb.Compacter, + keyFunc func(uint64, common.Hash) []byte, + startNum, endNum uint64, + logger logging.Logger, +) { + startKey := keyFunc(startNum, common.Hash{}) + endKey := keyFunc(endNum+1, common.Hash{}) + if err := db.Compact(startKey, endKey); err != nil { + logger.Error("Failed to compact data in range", + zap.Uint64("startHeight", startNum), + zap.Uint64("endHeight", endNum), + zap.Error(err)) + } +} From e4d947308e77ef08229944b57ea526698d750ec1 Mon Sep 17 00:00:00 2001 From: Draco Date: Sun, 30 Nov 2025 15:26:09 -0500 Subject: [PATCH 2/2] fix: updates from feedback --- vms/evm/database/blockdb/database.go | 531 +++++++++------------- vms/evm/database/blockdb/database_test.go | 6 +- vms/evm/database/blockdb/helpers_test.go | 8 +- vms/evm/database/blockdb/migrator.go | 495 ++++++++++---------- 4 files changed, 491 insertions(+), 549 deletions(-) diff --git a/vms/evm/database/blockdb/database.go b/vms/evm/database/blockdb/database.go index d8cb30ff4d53..443f52246f85 100644 --- a/vms/evm/database/blockdb/database.go +++ b/vms/evm/database/blockdb/database.go @@ -4,11 +4,11 @@ package blockdb import ( - "bytes" "encoding/binary" "errors" "fmt" "path/filepath" + "slices" "github.com/ava-labs/libevm/common" "github.com/ava-labs/libevm/ethdb" @@ -18,49 +18,19 @@ import ( "github.com/ava-labs/avalanchego/database" "github.com/ava-labs/avalanchego/database/heightindexdb/meterdb" - "github.com/ava-labs/avalanchego/database/prefixdb" "github.com/ava-labs/avalanchego/utils/logging" heightindexdb "github.com/ava-labs/avalanchego/x/blockdb" ) var ( - _ ethdb.Database = (*Database)(nil) - _ ethdb.Batch = (*batch)(nil) - - migratorDBPrefix = []byte("migrator") - - // blockDBMinHeightKey stores the minimum block height of the - // height-indexed block databases. - // It is set at initialization and cannot be changed without - // recreating the databases. - blockDBMinHeightKey = []byte("blockdb_min_height") - errUnexpectedKey = errors.New("unexpected database key") errNotInitialized = errors.New("database not initialized") errAlreadyInitialized = errors.New("database already initialized") errInvalidEncodedLength = errors.New("invalid encoded length") ) -// Key prefixes for block data in [ethdb.Database]. -// This is copied from libevm because they are not exported. -// Since the prefixes should never be changed, we can avoid libevm changes by -// duplicating them here. -var ( - evmHeaderPrefix = []byte("h") - evmBlockBodyPrefix = []byte("b") - evmReceiptsPrefix = []byte("r") -) - -const ( - hashDataElements = 2 - blockNumberSize = 8 - blockHashSize = 32 - - headerDBName = "headerdb" - bodyDBName = "bodydb" - receiptsDBName = "receiptsdb" -) +var _ ethdb.Database = (*Database)(nil) // Database wraps an [ethdb.Database] and routes block headers, bodies, and receipts // to separate [database.HeightIndex] databases for blocks at or above the minimum height. @@ -69,7 +39,7 @@ type Database struct { ethdb.Database // Databases - stateDB database.Database + metaDB database.Database headerDB database.HeightIndex bodyDB database.HeightIndex receiptsDB database.HeightIndex @@ -86,6 +56,67 @@ type Database struct { logger logging.Logger } +const blockNumberSize = 8 + +func encodeBlockNumber(number uint64) []byte { + enc := make([]byte, blockNumberSize) + binary.BigEndian.PutUint64(enc, number) + return enc +} + +// blockDBMinHeightKey stores the minimum block height of the +// height-indexed block databases. +// It is set at initialization and cannot be changed without +// recreating the databases. +var blockDBMinHeightKey = []byte("blockdb_min_height") + +func databaseMinHeight(db database.KeyValueReader) (uint64, bool, error) { + minBytes, err := db.Get(blockDBMinHeightKey) + if err != nil { + if errors.Is(err, database.ErrNotFound) { + return 0, false, nil + } + return 0, false, err + } + if len(minBytes) != blockNumberSize { + return 0, false, fmt.Errorf("%w: min height expected %d bytes, got %d", errInvalidEncodedLength, blockNumberSize, len(minBytes)) + } + return binary.BigEndian.Uint64(minBytes), true, nil +} + +// IsEnabled checks if blockdb has ever been initialized. +// It returns true if the minimum block height key exists, indicating the +// block databases have been created and initialized with a minimum height. +func IsEnabled(db database.KeyValueReader) (bool, error) { + has, err := db.Has(blockDBMinHeightKey) + if err != nil { + return false, err + } + return has, nil +} + +func (db *Database) newMeteredHeightDB( + namespace string, + minHeight uint64, +) (database.HeightIndex, error) { + path := filepath.Join(db.dbPath, namespace) + config := db.config.WithDir(path).WithMinimumHeight(minHeight) + ndb, err := heightindexdb.New(config, db.logger) + if err != nil { + return nil, fmt.Errorf("failed to create %s database at %s: %w", namespace, path, err) + } + + mdb, err := meterdb.New(db.reg, namespace, ndb) + if err != nil { + return nil, errors.Join( + fmt.Errorf("failed to create metered %s database: %w", namespace, err), + ndb.Close(), + ) + } + + return mdb, nil +} + // New creates a new [Database] over the provided [ethdb.Database]. // // If allowDeferredInit is true and no minimum block height is known, @@ -95,7 +126,7 @@ type Database struct { // The bool result is true if the block databases were initialized immediately, // and false if initialization was deferred. func New( - stateDB database.Database, + metaDB database.Database, evmDB ethdb.Database, dbPath string, allowDeferredInit bool, @@ -104,7 +135,7 @@ func New( reg prometheus.Registerer, ) (*Database, bool, error) { db := &Database{ - stateDB: stateDB, + metaDB: metaDB, Database: evmDB, dbPath: dbPath, config: config, @@ -112,34 +143,29 @@ func New( logger: logger, } - minHeight, ok, err := databaseMinHeight(db.stateDB) - if err != nil { - return nil, false, err - } - - // Databases already exist, load with existing min height. - if ok { - if err := db.InitBlockDBs(minHeight); err != nil { + minHeightFn := [](func() (uint64, bool, error)){ + func() (uint64, bool, error) { + // Load existing database min height. + return databaseMinHeight(db.metaDB) + }, + func() (uint64, bool, error) { + // Use the minimum block height of existing blocks to migrate. + return minBlockHeightToMigrate(evmDB) + }, + func() (uint64, bool, error) { + // Use min height 1 unless deferring initialization. + return 1, !allowDeferredInit, nil + }, + } + for _, fn := range minHeightFn { + h, ok, err := fn() + if err != nil { return nil, false, err } - return db, true, nil - } - - // Initialize using the minimum block height of existing blocks to migrate. - minHeight, ok, err = minBlockHeightToMigrate(evmDB) - if err != nil { - return nil, false, err - } - if ok { - if err := db.InitBlockDBs(minHeight); err != nil { - return nil, false, err + if !ok { + continue } - return db, true, nil - } - - // Initialize with min height 1 if deferred initialization is not allowed. - if !allowDeferredInit { - if err := db.InitBlockDBs(1); err != nil { + if err := db.InitBlockDBs(h); err != nil { return nil, false, err } return db, true, nil @@ -162,18 +188,18 @@ func (db *Database) InitBlockDBs(minHeight uint64) error { return errAlreadyInitialized } - if err := db.stateDB.Put(blockDBMinHeightKey, encodeBlockNumber(minHeight)); err != nil { + if err := db.metaDB.Put(blockDBMinHeightKey, encodeBlockNumber(minHeight)); err != nil { return err } - headerDB, err := db.newMeteredHeightDB(headerDBName, minHeight) + headerDB, err := db.newMeteredHeightDB("headerdb", minHeight) if err != nil { return err } - bodyDB, err := db.newMeteredHeightDB(bodyDBName, minHeight) + bodyDB, err := db.newMeteredHeightDB("bodydb", minHeight) if err != nil { return errors.Join(err, headerDB.Close()) } - receiptsDB, err := db.newMeteredHeightDB(receiptsDBName, minHeight) + receiptsDB, err := db.newMeteredHeightDB("receiptsdb", minHeight) if err != nil { return errors.Join(err, headerDB.Close(), bodyDB.Close()) } @@ -201,54 +227,143 @@ func (db *Database) InitBlockDBs(minHeight uint64) error { return nil } -// StartMigration begins the background migration of block data from the -// [ethdb.Database] to the height-indexed block databases. -// -// Returns an error if the databases are not initialized. -// No error if already running. -func (db *Database) StartMigration() error { - if !db.heightDBsReady { - return errNotInitialized +// Key prefixes for block data in [ethdb.Database]. +// This is copied from libevm because they are not exported. +// Since the prefixes should never be changed, we can avoid libevm changes by +// duplicating them here. +const ( + evmHeaderPrefix = 'h' + evmBlockBodyPrefix = 'b' + evmReceiptsPrefix = 'r' +) + +var blockPrefixes = []byte{evmBlockBodyPrefix, evmHeaderPrefix, evmReceiptsPrefix} + +func parseBlockKey(key []byte) (num uint64, hash common.Hash, ok bool) { + // Block keys should have 1 byte prefix + blockNumberSize + 32 bytes for the hash + if len(key) != 1+blockNumberSize+32 { + return 0, common.Hash{}, false } - db.migrator.start() - return nil + if !slices.Contains(blockPrefixes, key[0]) { + return 0, common.Hash{}, false + } + num = binary.BigEndian.Uint64(key[1 : 1+blockNumberSize]) + bytes := key[1+blockNumberSize:] + hash = common.BytesToHash(bytes) + return num, hash, true } -func (db *Database) Put(key []byte, value []byte) error { - if !db.useHeightIndexedDB(key) { - return db.Database.Put(key, value) - } +type parsedBlockKey struct { + key []byte + db database.HeightIndex + num uint64 + hash common.Hash +} - heightDB, err := db.heightDBForKey(key) +func (p *parsedBlockKey) writeHashAndData(data []byte) error { + return writeHashAndData(p.db, p.num, p.hash, data) +} + +func writeHashAndData( + db database.HeightIndex, + height uint64, + hash common.Hash, + data []byte, +) error { + encoded, err := rlp.EncodeToBytes([][]byte{hash.Bytes(), data}) if err != nil { return err } - num, hash, err := parseBlockKey(key) + return db.Put(height, encoded) +} + +// parseKey parses a block key into a parsedBlockKey. +// It returns false if no block databases for the key prefix exist. +func (db *Database) parseKey(key []byte) (*parsedBlockKey, bool) { + if !db.heightDBsReady { + return nil, false + } + + var hdb database.HeightIndex + switch key[0] { + case evmBlockBodyPrefix: + hdb = db.bodyDB + case evmHeaderPrefix: + hdb = db.headerDB + case evmReceiptsPrefix: + hdb = db.receiptsDB + default: + return nil, false + } + + num, hash, ok := parseBlockKey(key) + if !ok { + return nil, false + } + + if num < db.minHeight { + return nil, false + } + + return &parsedBlockKey{ + key: key, + db: hdb, + num: num, + hash: hash, + }, true +} + +// readBlock reads data from [database.HeightIndex] and falls back +// to the [ethdb.Database] if the data is not found and migration is not complete. +func (db *Database) readBlock(p *parsedBlockKey) ([]byte, error) { + data, err := p.db.Get(p.num) if err != nil { - return err + if errors.Is(err, database.ErrNotFound) && !db.migrator.isCompleted() { + return db.Database.Get(p.key) + } + return nil, err } - return writeHashAndData(heightDB, num, hash, value) + + var elems [][]byte + if err := rlp.DecodeBytes(data, &elems); err != nil { + return nil, err + } + if len(elems) != 2 { + err := fmt.Errorf( + "invalid hash+data format: expected 2 elements, got %d", len(elems), + ) + return nil, err + } + + // Hash mismatch means we are trying to read a different block at this height. + if common.BytesToHash(elems[0]) != p.hash { + return nil, database.ErrNotFound + } + + return elems[1], nil } func (db *Database) Get(key []byte) ([]byte, error) { - if !db.useHeightIndexedDB(key) { - return db.Database.Get(key) + if p, ok := db.parseKey(key); ok { + return db.readBlock(p) } + return db.Database.Get(key) +} - heightDB, err := db.heightDBForKey(key) - if err != nil { - return nil, err +func (db *Database) Put(key []byte, value []byte) error { + if p, ok := db.parseKey(key); ok { + return p.writeHashAndData(value) } - return readHashAndData(heightDB, db.Database, key, db.migrator) + return db.Database.Put(key, value) } func (db *Database) Has(key []byte) (bool, error) { - if !db.useHeightIndexedDB(key) { + p, ok := db.parseKey(key) + if !ok { return db.Database.Has(key) } - _, err := db.Get(key) - if err != nil { + if _, err := db.readBlock(p); err != nil { if errors.Is(err, database.ErrNotFound) { return false, nil } @@ -260,31 +375,24 @@ func (db *Database) Has(key []byte) (bool, error) { // Delete removes the key from the underlying database for non-block data. // Block data deletion is a no-op because [database.HeightIndex] does not support deletion. func (db *Database) Delete(key []byte) error { - if !db.useHeightIndexedDB(key) { - return db.Database.Delete(key) - } - - num, hash, err := parseBlockKey(key) - if err != nil { - return err + if p, ok := db.parseKey(key); ok { + db.logger.Warn( + "Deleting block data is a no-op", + zap.Uint64("height", p.num), + zap.Stringer("hash", p.hash), + ) + return nil } - db.logger.Warn( - "Deleting block data is a no-op", - zap.Uint64("height", num), - zap.Stringer("hash", hash), - ) - return nil + return db.Database.Delete(key) } func (db *Database) Close() error { - if db.migrator != nil { - db.migrator.stop() - } + db.migrator.stop() if !db.heightDBsReady { return db.Database.Close() } - // Don't close stateDB since the caller should be managing it. + // Don't close metaDB since the caller should be managing it. return errors.Join( db.headerDB.Close(), db.bodyDB.Close(), @@ -293,80 +401,7 @@ func (db *Database) Close() error { ) } -func (db *Database) initMigrator() error { - if db.migrator != nil { - return nil - } - mdb := prefixdb.New(migratorDBPrefix, db.stateDB) - migrator, err := newMigrator( - mdb, - db.headerDB, - db.bodyDB, - db.receiptsDB, - db.Database, - db.logger, - ) - if err != nil { - return err - } - db.migrator = migrator - return nil -} - -func (db *Database) newMeteredHeightDB( - namespace string, - minHeight uint64, -) (database.HeightIndex, error) { - path := filepath.Join(db.dbPath, namespace) - config := db.config.WithDir(path).WithMinimumHeight(minHeight) - ndb, err := heightindexdb.New(config, db.logger) - if err != nil { - return nil, fmt.Errorf("failed to create %s database at %s: %w", namespace, path, err) - } - - mdb, err := meterdb.New(db.reg, namespace, ndb) - if err != nil { - return nil, errors.Join( - fmt.Errorf("failed to create metered %s database: %w", namespace, err), - ndb.Close(), - ) - } - - return mdb, nil -} - -func (db *Database) heightDBForKey(key []byte) (database.HeightIndex, error) { - switch { - case isHeaderKey(key): - return db.headerDB, nil - case isBodyKey(key): - return db.bodyDB, nil - case isReceiptsKey(key): - return db.receiptsDB, nil - default: - return nil, errUnexpectedKey - } -} - -func (db *Database) useHeightIndexedDB(key []byte) bool { - if !db.heightDBsReady { - return false - } - - var n int - switch { - case isBodyKey(key): - n = len(evmBlockBodyPrefix) - case isHeaderKey(key): - n = len(evmHeaderPrefix) - case isReceiptsKey(key): - n = len(evmReceiptsPrefix) - default: - return false - } - num := binary.BigEndian.Uint64(key[n : n+blockNumberSize]) - return num >= db.minHeight -} +var _ ethdb.Batch = (*batch)(nil) type batch struct { ethdb.Batch @@ -388,141 +423,15 @@ func (db *Database) NewBatchWithSize(size int) ethdb.Batch { } func (b *batch) Put(key []byte, value []byte) error { - if b.db.useHeightIndexedDB(key) { - return b.db.Put(key, value) + if p, ok := b.db.parseKey(key); ok { + return p.writeHashAndData(value) } return b.Batch.Put(key, value) } func (b *batch) Delete(key []byte) error { - if b.db.useHeightIndexedDB(key) { + if _, ok := b.db.parseKey(key); ok { return b.db.Delete(key) } return b.Batch.Delete(key) } - -func parseBlockKey(key []byte) (num uint64, hash common.Hash, err error) { - var n int - switch { - case isBodyKey(key): - n = len(evmBlockBodyPrefix) - case isHeaderKey(key): - n = len(evmHeaderPrefix) - case isReceiptsKey(key): - n = len(evmReceiptsPrefix) - default: - return 0, common.Hash{}, errUnexpectedKey - } - num = binary.BigEndian.Uint64(key[n : n+blockNumberSize]) - bytes := key[n+blockNumberSize:] - if len(bytes) != blockHashSize { - return 0, common.Hash{}, fmt.Errorf("invalid hash length: %d", len(bytes)) - } - hash = common.BytesToHash(bytes) - return num, hash, nil -} - -func encodeBlockNumber(number uint64) []byte { - enc := make([]byte, blockNumberSize) - binary.BigEndian.PutUint64(enc, number) - return enc -} - -func isBodyKey(key []byte) bool { - if len(key) != len(evmBlockBodyPrefix)+blockNumberSize+blockHashSize { - return false - } - return bytes.HasPrefix(key, evmBlockBodyPrefix) -} - -func isHeaderKey(key []byte) bool { - if len(key) != len(evmHeaderPrefix)+blockNumberSize+blockHashSize { - return false - } - return bytes.HasPrefix(key, evmHeaderPrefix) -} - -func isReceiptsKey(key []byte) bool { - if len(key) != len(evmReceiptsPrefix)+blockNumberSize+blockHashSize { - return false - } - return bytes.HasPrefix(key, evmReceiptsPrefix) -} - -func databaseMinHeight(db database.KeyValueReader) (uint64, bool, error) { - minBytes, err := db.Get(blockDBMinHeightKey) - if err != nil { - if errors.Is(err, database.ErrNotFound) { - return 0, false, nil - } - return 0, false, err - } - if len(minBytes) != blockNumberSize { - return 0, false, fmt.Errorf("%w: min height expected %d bytes, got %d", errInvalidEncodedLength, blockNumberSize, len(minBytes)) - } - return binary.BigEndian.Uint64(minBytes), true, nil -} - -func writeHashAndData( - db database.HeightIndex, - height uint64, - hash common.Hash, - data []byte, -) error { - encoded, err := rlp.EncodeToBytes([][]byte{hash.Bytes(), data}) - if err != nil { - return err - } - return db.Put(height, encoded) -} - -// readHashAndData reads data from [database.HeightIndex] and falls back -// to the [ethdb.Database] if the data is not found and migration is not complete. -func readHashAndData( - heightDB database.HeightIndex, - evmDB ethdb.KeyValueReader, - key []byte, - migrator *migrator, -) ([]byte, error) { - num, hash, err := parseBlockKey(key) - if err != nil { - return nil, err - } - data, err := heightDB.Get(num) - if err != nil { - if errors.Is(err, database.ErrNotFound) && !migrator.isCompleted() { - return evmDB.Get(key) - } - return nil, err - } - - var elems [][]byte - if err := rlp.DecodeBytes(data, &elems); err != nil { - return nil, err - } - if len(elems) != hashDataElements { - err := fmt.Errorf( - "invalid hash+data format: expected %d elements, got %d", - hashDataElements, - len(elems), - ) - return nil, err - } - if common.BytesToHash(elems[0]) != hash { - // Hash mismatch means we are trying to read a different block at this height. - return nil, database.ErrNotFound - } - - return elems[1], nil -} - -// IsEnabled checks if blockdb has ever been initialized. -// It returns true if the minimum block height key exists, indicating the -// block database has been created and initialized with a minimum height. -func IsEnabled(db database.KeyValueReader) (bool, error) { - has, err := db.Has(blockDBMinHeightKey) - if err != nil { - return false, err - } - return has, nil -} diff --git a/vms/evm/database/blockdb/database_test.go b/vms/evm/database/blockdb/database_test.go index 550b56f0d9b6..d6b93ad0b40a 100644 --- a/vms/evm/database/blockdb/database_test.go +++ b/vms/evm/database/blockdb/database_test.go @@ -8,8 +8,6 @@ import ( "slices" "testing" - "github.com/ava-labs/coreth/params" - "github.com/ava-labs/coreth/plugin/evm/customtypes" "github.com/ava-labs/libevm/core/rawdb" "github.com/ava-labs/libevm/core/types" "github.com/ava-labs/libevm/ethdb" @@ -18,6 +16,8 @@ import ( "github.com/ava-labs/avalanchego/database" "github.com/ava-labs/avalanchego/database/leveldb" + "github.com/ava-labs/avalanchego/graft/coreth/params" + "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/customtypes" "github.com/ava-labs/avalanchego/utils/logging" evmdb "github.com/ava-labs/avalanchego/vms/evm/database" @@ -315,7 +315,7 @@ func TestDatabaseInitialization(t *testing.T) { // create block databases with existing min height if needed if tc.dbMinHeight > 0 { db := Database{ - stateDB: base, + metaDB: base, Database: evmDB, dbPath: dataDir, config: heightindexdb.DefaultConfig(), diff --git a/vms/evm/database/blockdb/helpers_test.go b/vms/evm/database/blockdb/helpers_test.go index 971dbf79f367..5763e68f12bd 100644 --- a/vms/evm/database/blockdb/helpers_test.go +++ b/vms/evm/database/blockdb/helpers_test.go @@ -10,9 +10,6 @@ import ( "testing" "time" - "github.com/ava-labs/coreth/consensus/dummy" - "github.com/ava-labs/coreth/core" - "github.com/ava-labs/coreth/params" "github.com/ava-labs/libevm/common" "github.com/ava-labs/libevm/core/rawdb" "github.com/ava-labs/libevm/core/types" @@ -24,6 +21,9 @@ import ( "github.com/ava-labs/avalanchego/database" "github.com/ava-labs/avalanchego/database/leveldb" + "github.com/ava-labs/avalanchego/graft/coreth/consensus/dummy" + "github.com/ava-labs/avalanchego/graft/coreth/core" + "github.com/ava-labs/avalanchego/graft/coreth/params" "github.com/ava-labs/avalanchego/utils/logging" evmdb "github.com/ava-labs/avalanchego/vms/evm/database" @@ -161,7 +161,7 @@ func startMigration(t *testing.T, db *Database, waitForCompletion bool) { t.Helper() db.migrator.completed.Store(false) - require.NoError(t, db.StartMigration()) + require.NoError(t, db.StartMigration(t.Context())) if waitForCompletion { timeout := 5 * time.Second diff --git a/vms/evm/database/blockdb/migrator.go b/vms/evm/database/blockdb/migrator.go index a44deb27bc14..494cfc160eb6 100644 --- a/vms/evm/database/blockdb/migrator.go +++ b/vms/evm/database/blockdb/migrator.go @@ -20,23 +20,11 @@ import ( "go.uber.org/zap" "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/database/prefixdb" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/timer" ) -const ( - // logProgressInterval controls how often migration progress is logged. - logProgressInterval = 30 * time.Second - // compactionInterval is the number of blocks to process before compacting the database. - compactionInterval = 250_000 - // stopTimeout is the maximum time to wait for migration to stop gracefully. - // 5 seconds allows cleanup operations to complete without blocking shutdown indefinitely. - stopTimeout = 5 * time.Second -) - -// targetBlockHeightKey stores the head height captured at first run for ETA. -var targetBlockHeightKey = []byte("migration_target_block_height") - type migrator struct { // Databases evmDB ethdb.Database @@ -57,6 +45,110 @@ type migrator struct { logger logging.Logger } +var migratorDBPrefix = []byte("migrator") + +func (db *Database) initMigrator() error { + mdb := prefixdb.New(migratorDBPrefix, db.metaDB) + migrator, err := newMigrator( + mdb, + db.headerDB, + db.bodyDB, + db.receiptsDB, + db.Database, + db.logger, + ) + if err != nil { + return err + } + db.migrator = migrator + return nil +} + +// StartMigration begins the background migration of block data from the +// [ethdb.Database] to the height-indexed block databases. +// +// Returns an error if the databases are not initialized. +// No error if already running. +func (db *Database) StartMigration(ctx context.Context) error { + if !db.heightDBsReady { + return errNotInitialized + } + db.migrator.start(ctx) + return nil +} + +// targetBlockHeightKey stores the head height captured at first run for ETA. +var targetBlockHeightKey = []byte("migration_target_block_height") + +func targetBlockHeight(db database.KeyValueReader) (uint64, bool, error) { + has, err := db.Has(targetBlockHeightKey) + if err != nil { + return 0, false, err + } + if !has { + return 0, false, nil + } + numBytes, err := db.Get(targetBlockHeightKey) + if err != nil { + return 0, false, err + } + if len(numBytes) != blockNumberSize { + return 0, false, fmt.Errorf("invalid block number encoding length: %d", len(numBytes)) + } + height := binary.BigEndian.Uint64(numBytes) + return height, true, nil +} + +func writeTargetBlockHeight(db database.KeyValueWriter, endHeight uint64) error { + return db.Put(targetBlockHeightKey, encodeBlockNumber(endHeight)) +} + +func headBlockNumber(db ethdb.KeyValueReader) (uint64, bool) { + hash := rawdb.ReadHeadHeaderHash(db) + num := rawdb.ReadHeaderNumber(db, hash) + if num == nil || *num == 0 { + return 0, false + } + return *num, true +} + +func isMigratableKey(db ethdb.Reader, key []byte) bool { + if key[0] != evmBlockBodyPrefix { + return false + } + num, hash, ok := parseBlockKey(key) + if !ok { + return false + } + + // Skip genesis since all vms have it and to benefit from being able to have a + // minimum height greater than 0 when state sync is enabled. + if num == 0 { + return false + } + + canonHash := rawdb.ReadCanonicalHash(db, num) + return canonHash == hash +} + +func minBlockHeightToMigrate(db ethdb.Database) (uint64, bool, error) { + iter := db.NewIterator([]byte{evmBlockBodyPrefix}, nil) + defer iter.Release() + + for iter.Next() { + key := iter.Key() + if !isMigratableKey(db, key) { + continue + } + num, _, ok := parseBlockKey(key) + if !ok { + return 0, false, errUnexpectedKey + } + return num, true, nil + } + return 0, false, iter.Error() +} + func newMigrator( db database.Database, headerDB database.HeightIndex, @@ -110,6 +202,10 @@ func (m *migrator) isCompleted() bool { return m.completed.Load() } +// stopTimeout is the maximum time to wait for migration to stop gracefully. +// 5 seconds allows cleanup operations to complete without blocking shutdown indefinitely. +const stopTimeout = 5 * time.Second + func (m *migrator) stop() { // Snapshot cancel/done under lock to avoid data race with endRun. // We must release the lock before waiting on done to prevent deadlock. @@ -131,13 +227,35 @@ func (m *migrator) stop() { } } +func (m *migrator) beginRun(ctx context.Context) (context.Context, bool) { + m.mu.Lock() + defer m.mu.Unlock() + + if m.cancel != nil { + return nil, false // migration already running + } + ctx, cancel := context.WithCancel(ctx) + m.cancel = cancel + m.done = make(chan struct{}) + m.processed.Store(0) + return ctx, true +} + +func (m *migrator) endRun() { + m.mu.Lock() + defer m.mu.Unlock() + + m.cancel = nil + m.done = nil +} + // start begins the migration process in a background goroutine. // Returns immediately if migration is already completed or running. -func (m *migrator) start() { +func (m *migrator) start(ctx context.Context) { if m.isCompleted() { return } - ctx, ok := m.beginRun() + ctx, ok := m.beginRun(ctx) if !ok { m.logger.Warn("Migration already running") return @@ -156,29 +274,131 @@ func (m *migrator) start() { }() } -func (m *migrator) beginRun() (context.Context, bool) { +// waitMigratorDone waits until the current migration run completes. +// If timeout <= 0, it waits indefinitely. +// Returns true if completed, false on timeout. +func (m *migrator) waitMigratorDone(timeout time.Duration) bool { + // Snapshot done to avoid race with goroutine cleanup m.mu.Lock() - defer m.mu.Unlock() + done := m.done + m.mu.Unlock() - if m.cancel != nil { - return nil, false // migration already running + if done == nil { + return true + } + if timeout <= 0 { + <-done + return true + } + t := time.NewTimer(timeout) + defer t.Stop() + select { + case <-done: + return true + case <-t.C: + return false } - ctx, cancel := context.WithCancel(context.Background()) - m.cancel = cancel - m.done = make(chan struct{}) - m.processed.Store(0) - return ctx, true } -func (m *migrator) endRun() { - m.mu.Lock() - defer m.mu.Unlock() +func (m *migrator) migrateHeader(num uint64, hash common.Hash) error { + header := rawdb.ReadHeader(m.evmDB, hash, num) + if header == nil { + return fmt.Errorf("header not found for block %d hash %s", num, hash) + } + hBytes, err := rlp.EncodeToBytes(header) + if err != nil { + return fmt.Errorf("failed to encode block header: %w", err) + } + if err := writeHashAndData(m.headerDB, num, hash, hBytes); err != nil { + return fmt.Errorf("failed to write header to headerDB: %w", err) + } + return nil +} - m.cancel = nil - m.done = nil +func (m *migrator) migrateBody(num uint64, hash common.Hash, body []byte) error { + if err := writeHashAndData(m.bodyDB, num, hash, body); err != nil { + return fmt.Errorf("failed to write body to bodyDB: %w", err) + } + return nil +} + +func (m *migrator) migrateReceipts(num uint64, hash common.Hash) error { + receipts := rawdb.ReadReceiptsRLP(m.evmDB, hash, num) + if receipts == nil { + return nil + } + + if err := writeHashAndData(m.receiptsDB, num, hash, receipts); err != nil { + return fmt.Errorf("failed to write receipts to receiptsDB: %w", err) + } + return nil +} + +func deleteBlock(db ethdb.KeyValueWriter, num uint64, hash common.Hash) error { + // rawdb.DeleteHeader is not used to avoid deleting number/hash mappings. + headerKey := blockHeaderKey(num, hash) + if err := db.Delete(headerKey); err != nil { + return fmt.Errorf("failed to delete header from evmDB: %w", err) + } + rawdb.DeleteBody(db, hash, num) + rawdb.DeleteReceipts(db, hash, num) + return nil +} + +func blockHeaderKey(num uint64, hash common.Hash) []byte { + return slices.Concat([]byte{evmHeaderPrefix}, encodeBlockNumber(num), hash.Bytes()) +} + +func blockBodyKey(num uint64, hash common.Hash) []byte { + return slices.Concat([]byte{evmBlockBodyPrefix}, encodeBlockNumber(num), hash.Bytes()) +} + +func receiptsKey(num uint64, hash common.Hash) []byte { + return slices.Concat([]byte{evmReceiptsPrefix}, encodeBlockNumber(num), hash.Bytes()) +} + +func compactRange( + db ethdb.Compacter, + keyFunc func(uint64, common.Hash) []byte, + startNum, endNum uint64, + logger logging.Logger, +) { + startKey := keyFunc(startNum, common.Hash{}) + endKey := keyFunc(endNum+1, common.Hash{}) + if err := db.Compact(startKey, endKey); err != nil { + logger.Error("Failed to compact data in range", + zap.Uint64("startHeight", startNum), + zap.Uint64("endHeight", endNum), + zap.Error(err)) + } +} + +func (m *migrator) compactBlockRange(startNum, endNum uint64) { + start := time.Now() + + compactRange(m.evmDB, blockHeaderKey, startNum, endNum, m.logger) + compactRange(m.evmDB, blockBodyKey, startNum, endNum, m.logger) + compactRange(m.evmDB, receiptsKey, startNum, endNum, m.logger) + + m.logger.Info("Compaction of block range completed", + zap.Uint64("startHeight", startNum), + zap.Uint64("endHeight", endNum), + zap.Duration("duration", time.Since(start))) } +const ( + // logProgressInterval controls how often migration progress is logged. + logProgressInterval = 30 * time.Second + // compactionInterval is the number of blocks to process before compacting the database. + compactionInterval = 250_000 +) + func (m *migrator) run(ctx context.Context) error { + m.logger.Info( + "Block data migration started", + zap.Uint64("targetHeight", m.endHeight), + ) + var ( // Progress tracking etaTarget uint64 // target # of blocks to process @@ -197,7 +417,7 @@ func (m *migrator) run(ctx context.Context) error { // Iterate over block bodies instead of headers since there are keys // under the header prefix that we are not migrating (e.g., hash mappings). - iter = m.evmDB.NewIterator(evmBlockBodyPrefix, nil) + iter = m.evmDB.NewIterator([]byte{evmBlockBodyPrefix}, nil) ) defer func() { @@ -225,11 +445,6 @@ func (m *migrator) run(ctx context.Context) error { ) }() - m.logger.Info( - "Block data migration started", - zap.Uint64("targetHeight", m.endHeight), - ) - // Iterate over all block bodies in ascending order by block number. for iter.Next() { select { @@ -242,34 +457,33 @@ func (m *migrator) run(ctx context.Context) error { if !isMigratableKey(m.evmDB, key) { continue } - - blockNum, hash, err := parseBlockKey(key) - if err != nil { - return err + num, hash, ok := parseBlockKey(key) + if !ok { + return errUnexpectedKey } - if etaTarget == 0 && m.endHeight > 0 && blockNum < m.endHeight { - etaTarget = m.endHeight - blockNum + if etaTarget == 0 && m.endHeight > 0 && num < m.endHeight { + etaTarget = m.endHeight - num etaTracker.AddSample(0, etaTarget, start) } // track the range of blocks for compaction if !canCompact { - startBlockNum = blockNum + startBlockNum = num canCompact = true } - endBlockNum = blockNum + endBlockNum = num - if err := m.migrateHeader(blockNum, hash); err != nil { + if err := m.migrateHeader(num, hash); err != nil { return fmt.Errorf("failed to migrate header data: %w", err) } - if err := m.migrateBody(blockNum, hash, iter.Value()); err != nil { + if err := m.migrateBody(num, hash, iter.Value()); err != nil { return fmt.Errorf("failed to migrate body data: %w", err) } - if err := m.migrateReceipts(blockNum, hash); err != nil { + if err := m.migrateReceipts(num, hash); err != nil { return fmt.Errorf("failed to migrate receipts data: %w", err) } - if err := deleteBlock(batch, blockNum, hash); err != nil { + if err := deleteBlock(batch, num, hash); err != nil { return fmt.Errorf("failed to add block deletes to batch: %w", err) } processed := m.processed.Add(1) @@ -293,8 +507,8 @@ func (m *migrator) run(ctx context.Context) error { iter.Release() m.compactBlockRange(startBlockNum, endBlockNum) - startKey := encodeBlockNumber(blockNum + 1) - newIter := m.evmDB.NewIterator(evmBlockBodyPrefix, startKey) + startKey := encodeBlockNumber(num + 1) + newIter := m.evmDB.NewIterator([]byte{evmBlockBodyPrefix}, startKey) iter = newIter lastCompact = processed canCompact = false @@ -304,7 +518,7 @@ func (m *migrator) run(ctx context.Context) error { if now := time.Now(); now.After(nextLog) { fields := []zap.Field{ zap.Uint64("blocksProcessed", processed), - zap.Uint64("lastProcessedHeight", blockNum), + zap.Uint64("lastProcessedHeight", num), zap.Duration("timeElapsed", time.Since(start)), } if etaTarget > 0 { @@ -329,184 +543,3 @@ func (m *migrator) run(ctx context.Context) error { m.completed.Store(true) return nil } - -func (m *migrator) compactBlockRange(startNum, endNum uint64) { - start := time.Now() - - compactRange(m.evmDB, blockHeaderKey, startNum, endNum, m.logger) - compactRange(m.evmDB, blockBodyKey, startNum, endNum, m.logger) - compactRange(m.evmDB, receiptsKey, startNum, endNum, m.logger) - - m.logger.Info("Compaction of block range completed", - zap.Uint64("startHeight", startNum), - zap.Uint64("endHeight", endNum), - zap.Duration("duration", time.Since(start))) -} - -func (m *migrator) migrateHeader(num uint64, hash common.Hash) error { - header := rawdb.ReadHeader(m.evmDB, hash, num) - if header == nil { - return fmt.Errorf("header not found for block %d hash %s", num, hash) - } - hBytes, err := rlp.EncodeToBytes(header) - if err != nil { - return fmt.Errorf("failed to encode block header: %w", err) - } - if err := writeHashAndData(m.headerDB, num, hash, hBytes); err != nil { - return fmt.Errorf("failed to write header to headerDB: %w", err) - } - return nil -} - -func (m *migrator) migrateBody(num uint64, hash common.Hash, body []byte) error { - if err := writeHashAndData(m.bodyDB, num, hash, body); err != nil { - return fmt.Errorf("failed to write body to bodyDB: %w", err) - } - return nil -} - -func (m *migrator) migrateReceipts(num uint64, hash common.Hash) error { - receipts := rawdb.ReadReceiptsRLP(m.evmDB, hash, num) - if receipts == nil { - return nil - } - - if err := writeHashAndData(m.receiptsDB, num, hash, receipts); err != nil { - return fmt.Errorf("failed to write receipts to receiptsDB: %w", err) - } - return nil -} - -// waitMigratorDone waits until the current migration run completes. -// If timeout <= 0, it waits indefinitely. -// Returns true if completed, false on timeout. -func (m *migrator) waitMigratorDone(timeout time.Duration) bool { - // Snapshot done to avoid race with goroutine cleanup - m.mu.Lock() - done := m.done - m.mu.Unlock() - - if done == nil { - return true - } - if timeout <= 0 { - <-done - return true - } - t := time.NewTimer(timeout) - defer t.Stop() - select { - case <-done: - return true - case <-t.C: - return false - } -} - -func deleteBlock(db ethdb.KeyValueWriter, num uint64, hash common.Hash) error { - // rawdb.DeleteHeader is not used to avoid deleting number/hash mappings. - headerKey := blockHeaderKey(num, hash) - if err := db.Delete(headerKey); err != nil { - return fmt.Errorf("failed to delete header from evmDB: %w", err) - } - rawdb.DeleteBody(db, hash, num) - rawdb.DeleteReceipts(db, hash, num) - return nil -} - -func targetBlockHeight(db database.KeyValueReader) (uint64, bool, error) { - has, err := db.Has(targetBlockHeightKey) - if err != nil { - return 0, false, err - } - if !has { - return 0, false, nil - } - numBytes, err := db.Get(targetBlockHeightKey) - if err != nil { - return 0, false, err - } - if len(numBytes) != blockNumberSize { - return 0, false, fmt.Errorf("invalid block number encoding length: %d", len(numBytes)) - } - height := binary.BigEndian.Uint64(numBytes) - return height, true, nil -} - -func headBlockNumber(db ethdb.KeyValueReader) (uint64, bool) { - hash := rawdb.ReadHeadHeaderHash(db) - num := rawdb.ReadHeaderNumber(db, hash) - if num == nil || *num == 0 { - return 0, false - } - return *num, true -} - -func writeTargetBlockHeight(db database.KeyValueWriter, endHeight uint64) error { - return db.Put(targetBlockHeightKey, encodeBlockNumber(endHeight)) -} - -func isMigratableKey(db ethdb.Reader, key []byte) bool { - if !isBodyKey(key) { - return false - } - num, hash, err := parseBlockKey(key) - if err != nil { - return false - } - - // Skip genesis since all vms have it and to benefit from being able to have a - // minimum height greater than 0 when state sync is enabled. - if num == 0 { - return false - } - - canonHash := rawdb.ReadCanonicalHash(db, num) - return canonHash == hash -} - -func blockHeaderKey(num uint64, hash common.Hash) []byte { - return slices.Concat(evmHeaderPrefix, encodeBlockNumber(num), hash.Bytes()) -} - -func blockBodyKey(num uint64, hash common.Hash) []byte { - return slices.Concat(evmBlockBodyPrefix, encodeBlockNumber(num), hash.Bytes()) -} - -func receiptsKey(num uint64, hash common.Hash) []byte { - return slices.Concat(evmReceiptsPrefix, encodeBlockNumber(num), hash.Bytes()) -} - -func minBlockHeightToMigrate(db ethdb.Database) (uint64, bool, error) { - iter := db.NewIterator(evmBlockBodyPrefix, nil) - defer iter.Release() - - for iter.Next() { - key := iter.Key() - if !isMigratableKey(db, key) { - continue - } - num, _, err := parseBlockKey(key) - if err != nil { - return 0, false, err - } - return num, true, nil - } - return 0, false, iter.Error() -} - -func compactRange( - db ethdb.Compacter, - keyFunc func(uint64, common.Hash) []byte, - startNum, endNum uint64, - logger logging.Logger, -) { - startKey := keyFunc(startNum, common.Hash{}) - endKey := keyFunc(endNum+1, common.Hash{}) - if err := db.Compact(startKey, endKey); err != nil { - logger.Error("Failed to compact data in range", - zap.Uint64("startHeight", startNum), - zap.Uint64("endHeight", endNum), - zap.Error(err)) - } -}