Skip to content

Commit 0388b96

Browse files
chainimport: process overlap headers region
1 parent ee14ea8 commit 0388b96

File tree

6 files changed

+932
-62
lines changed

6 files changed

+932
-62
lines changed

chainimport/headers_import.go

Lines changed: 179 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"net/http"
1111
"os"
1212
"path/filepath"
13+
"runtime"
1314
"strings"
1415
"time"
1516

@@ -18,6 +19,7 @@ import (
1819
"github.com/btcsuite/btcd/wire"
1920
"github.com/lightninglabs/neutrino/headerfs"
2021
"golang.org/x/exp/mmap"
22+
"golang.org/x/sync/errgroup"
2123
)
2224

2325
const (
@@ -45,6 +47,40 @@ const (
4547
DefaultWriteBatchSizePerRegion = 16384
4648
)
4749

50+
// OverlapMode defines how to handle headers that overlap between the import
51+
// source and existing headers in the stores.
52+
type OverlapMode uint8
53+
54+
const (
55+
// AppendOnly adds headers only beyond what already exists in each
56+
// store. If import source contains headers that overlap with existing
57+
// data, it will skip those without validation and only append the new
58+
// ones. This mode is optimized for performance with large header
59+
// datasets from trusted sources, avoiding the overhead of additional
60+
// validation between import and target sources while preserving
61+
// existing data. It is the default mode as it efficiently handles
62+
// imports while minimizing the risk of unintended chain
63+
// reorganizations.
64+
AppendOnly OverlapMode = iota
65+
66+
// ValidateAndAppend validates that any overlapping headers match before
67+
// appending new ones. If mismatches are found during validation, the
68+
// import operation is aborted.
69+
ValidateAndAppend
70+
)
71+
72+
// String returns a human-readable representation of the overlap mode.
73+
func (m OverlapMode) String() string {
74+
switch m {
75+
case AppendOnly:
76+
return "AppendOnlyMode"
77+
case ValidateAndAppend:
78+
return "ValidateAndAppendMode"
79+
default:
80+
return fmt.Sprintf("OverlapMode(%d)", m)
81+
}
82+
}
83+
4884
// HeaderMetadata contains the metadata about the header source.
4985
type HeaderMetadata struct {
5086
BitcoinChainType wire.BitcoinNet
@@ -326,8 +362,8 @@ func (v *FilterHeadersImportSourceValidator) Validate(
326362
iterator HeaderIterator,
327363
targetChainParams chaincfg.Params) error {
328364

329-
log.Debug("Skipping filter headers validation - missing access to " +
330-
"original compact filters")
365+
log.Debug("Skipping filter headers import validation - missing " +
366+
"access to original compact filters")
331367
return nil
332368
}
333369

@@ -772,27 +808,29 @@ func (s *HeadersImport) Import(ctx context.Context) (*ImportResult, error) {
772808
}
773809

774810
// Validate all block headers from import source.
775-
log.Debugf("Validating %d block headers", metadata.HeadersCount)
811+
log.Debugf("Validating %d block headers from import source",
812+
metadata.HeadersCount)
776813
if err := s.BlockHeadersValidator.Validate(
777814
s.BlockHeadersImportSource.Iterator(
778815
0, metadata.HeadersCount-1,
779816
),
780817
s.options.TargetChainParams,
781818
); err != nil {
782819
return nil, fmt.Errorf("failed to validate block "+
783-
"headers: %w", err)
820+
"headers from import source: %w", err)
784821
}
785822

786823
// Validate all filter headers from import source.
787-
log.Debugf("Validating %d filter headers", metadata.HeadersCount)
824+
log.Debugf("Validating %d filter headers from import source",
825+
metadata.HeadersCount)
788826
if err := s.FilterHeadersValidator.Validate(
789827
s.FilterHeadersImportSource.Iterator(
790828
0, metadata.HeadersCount-1,
791829
),
792830
s.options.TargetChainParams,
793831
); err != nil {
794832
return nil, fmt.Errorf("failed to validate filter "+
795-
"headers: %w", err)
833+
"headers from import source: %w", err)
796834
}
797835

798836
// Determine processing regions that partition the import task into
@@ -805,9 +843,15 @@ func (s *HeadersImport) Import(ctx context.Context) (*ImportResult, error) {
805843
result.StartHeight = regions.ImportStartHeight
806844
result.EndHeight = regions.ImportEndHeight
807845

808-
// TODO(mohamedawnallah): process the overlap region. This mainly
809-
// includes a validation strategy for the overlap region between headers
810-
// from import and target sources.
846+
// Process overlap headers region.
847+
// Process headers in the overlap region by either skipping validation
848+
// (AppendOnly mode) or validating that headers match exactly between
849+
// import and target sources (ValidateAndAppend mode).
850+
err = s.processOverlapHeadersRegion(ctx, regions.Overlap, result)
851+
if err != nil {
852+
return nil, fmt.Errorf("headers import failed: overlap region "+
853+
"headers validation failed: %w", err)
854+
}
811855

812856
// TODO(mohamedawnallah): Process the divergence region. This includes
813857
// strategy/strategies for handling divergence region that may exist in
@@ -837,32 +881,6 @@ func (s *HeadersImport) Import(ctx context.Context) (*ImportResult, error) {
837881
return result, nil
838882
}
839883

840-
// isTargetFresh checks if the target header stores are in their initial state,
841-
// meaning they contain only the genesis header (height 0).
842-
func (s *HeadersImport) isTargetFresh(
843-
targetBlockHeaderStore headerfs.BlockHeaderStore,
844-
targetFilterHeaderStore headerfs.FilterHeaderStore) (bool, error) {
845-
846-
// Get the chain tip from both target stores.
847-
_, blockTipHeight, err := targetBlockHeaderStore.ChainTip()
848-
if err != nil {
849-
return false, fmt.Errorf("failed to get target block header "+
850-
"chain tip: %w", err)
851-
}
852-
853-
_, filterTipHeight, err := targetFilterHeaderStore.ChainTip()
854-
if err != nil {
855-
return false, fmt.Errorf("failed to get target filter header "+
856-
"chain tip: %w", err)
857-
}
858-
859-
if blockTipHeight == 0 && filterTipHeight == 0 {
860-
return true, nil
861-
}
862-
863-
return false, nil
864-
}
865-
866884
// openSources initializes and opens all required header import sources. It
867885
// verifies that all necessary import sources and validators are properly
868886
// configured, then opens each source to prepare for data reading. Returns an
@@ -1301,16 +1319,131 @@ func (s *HeadersImport) verifyHeadersAtTargetHeight(height uint32) error {
13011319
sourceFilterHeaderHash, targetFilterHeaderHash)
13021320
}
13031321

1304-
log.Debugf("Headers from %s (block) and %s (filter) verified at "+
1305-
"height %d", s.BlockHeadersImportSource.GetURI(),
1306-
s.FilterHeadersImportSource.GetURI(), height)
1322+
log.Debugf("Headers from import sources verified at height %d", height)
1323+
return nil
1324+
}
1325+
1326+
// processOverlapHeadersRegion processes the headers in the overlap region by
1327+
// either skipping validation (AppendOnly mode) or validating that headers match
1328+
// exactly between import and target sources (ValidateAndAppend mode). When
1329+
// using ValidateAndAppend mode, if any mismatches are found during validation,
1330+
// the import operation is aborted.
1331+
func (s *HeadersImport) processOverlapHeadersRegion(ctx context.Context,
1332+
region HeaderRegion, result *ImportResult) error {
1333+
1334+
if !region.Exists {
1335+
return nil
1336+
}
1337+
1338+
log.Infof("Validating headers in the overlap region between import "+
1339+
"and target sources from heights %d to %d", region.Start,
1340+
region.End)
1341+
1342+
switch s.options.OverlapMode {
1343+
case AppendOnly:
1344+
// Skip all headers in overlap region.
1345+
log.Infof("Skipping validating %d headers (block and filter) "+
1346+
"in overlap region due to %s mode",
1347+
region.End-region.Start+1, s.options.OverlapMode)
1348+
case ValidateAndAppend:
1349+
// Validate all headers in overlap region match. If mismatches
1350+
// are found during validation, the import operation is aborted.
1351+
if err := s.validateHeadersExactMatch(
1352+
ctx, region.Start, region.End,
1353+
); err != nil {
1354+
return fmt.Errorf("overlap region validation failed: "+
1355+
"%w", err)
1356+
}
1357+
1358+
log.Infof("Successfully validated %d headers "+
1359+
"(block and filter) in overlap region",
1360+
region.End-region.Start+1)
1361+
}
1362+
1363+
result.SkippedCount += int(region.End - region.Start + 1)
1364+
1365+
return nil
1366+
}
1367+
1368+
// validateHeadersExactMatch validates all headers in the specified range match
1369+
// exactly between import and target sources.
1370+
//
1371+
// The function heuristically processes sequentially for smaller ranges and uses
1372+
// parallel validation for larger ranges to optimize performance.
1373+
func (s *HeadersImport) validateHeadersExactMatch(ctx context.Context,
1374+
startHeight, endHeight uint32) error {
1375+
1376+
// Calculate the range size.
1377+
rangeSize := endHeight - startHeight + 1
1378+
1379+
// If range is small, heuristically process sequentially.
1380+
if rangeSize <= 100 {
1381+
return s.validateHeadersSequential(startHeight, endHeight)
1382+
}
1383+
1384+
// Create an errgroup with a derived context.
1385+
g, ctx := errgroup.WithContext(ctx)
1386+
1387+
// Set concurrency limit based on CPU cores, but cap reasonably.
1388+
g.SetLimit(min(runtime.NumCPU(), 8))
1389+
1390+
// Queue up all the heights to validate.
1391+
for height := startHeight; height <= endHeight; height++ {
1392+
// Add work to the errgroup.
1393+
g.Go(func() error {
1394+
// Check if context has been canceled before starting
1395+
// work.
1396+
select {
1397+
case <-ctx.Done():
1398+
return ctx.Err()
1399+
default:
1400+
}
1401+
1402+
// Verify headers at this target height.
1403+
err := s.verifyHeadersAtTargetHeight(height)
1404+
if err != nil {
1405+
return fmt.Errorf("header verification failed "+
1406+
"at height %d: %w", height, err)
1407+
}
1408+
1409+
return nil
1410+
})
1411+
}
1412+
1413+
// Wait for all verification goroutines to complete or for any error.
1414+
err := g.Wait()
1415+
if err != nil {
1416+
return err
1417+
}
1418+
1419+
log.Infof("Validated %d headers in the overlap region between import "+
1420+
"and target sources from heights %d to %d",
1421+
endHeight-startHeight+1, startHeight, endHeight)
13071422

13081423
return nil
13091424
}
13101425

1311-
// processNewHeadersRegion imports headers from the specified region into the
1312-
// target stores. This method handles the case where headers exist in the import
1313-
// source but not in the target stores.
1426+
// validateHeadersSequential performs sequential validation for smaller ranges.
1427+
//
1428+
// The function validates headers sequentially for smaller ranges to optimize
1429+
// performance.
1430+
func (s *HeadersImport) validateHeadersSequential(startHeight,
1431+
endHeight uint32) error {
1432+
1433+
for height := startHeight; height <= endHeight; height++ {
1434+
err := s.verifyHeadersAtTargetHeight(height)
1435+
if err != nil {
1436+
return fmt.Errorf("header verification failed at "+
1437+
"height %d: %w", height, err)
1438+
}
1439+
}
1440+
1441+
return nil
1442+
}
1443+
1444+
// processNewHeadersRegion processes the headers in the new headers region by
1445+
// importing them into the target stores. This method handles the case where
1446+
// headers exist in the import source but not in the target stores.
13141447
func (s *HeadersImport) processNewHeadersRegion(region HeaderRegion,
13151448
result *ImportResult) error {
13161449

@@ -1548,6 +1681,11 @@ type ImportOptions struct {
15481681
// each batch per region. This controls performance characteristics of
15491682
// the import.
15501683
WriteBatchSizePerRegion int
1684+
1685+
// OverlapMode defines how to handle headers that overlap between the
1686+
// import source and existing data in the target stores. Defaults to
1687+
// AppendOnly.
1688+
OverlapMode OverlapMode
15511689
}
15521690

15531691
// Import executes the header import process with the configuration specified in

0 commit comments

Comments
 (0)