From 240966de90d45b3b16874de6df1c79999a50cc4f Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 25 Jul 2025 11:06:52 +0200 Subject: [PATCH 1/6] graph/db+log: use v2 logger for graph subsystem --- graph/db/log.go | 2 +- log.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/graph/db/log.go b/graph/db/log.go index 242e78c99a7..dbe7914ef45 100644 --- a/graph/db/log.go +++ b/graph/db/log.go @@ -1,7 +1,7 @@ package graphdb import ( - "github.com/btcsuite/btclog" + "github.com/btcsuite/btclog/v2" "github.com/lightningnetwork/lnd/build" ) diff --git a/log.go b/log.go index e193ab520a7..49ee5afdf67 100644 --- a/log.go +++ b/log.go @@ -203,7 +203,7 @@ func SetupLoggers(root *build.SubLoggerManager, interceptor signal.Interceptor) AddSubLogger( root, blindedpath.Subsystem, interceptor, blindedpath.UseLogger, ) - AddV1SubLogger(root, graphdb.Subsystem, interceptor, graphdb.UseLogger) + AddSubLogger(root, graphdb.Subsystem, interceptor, graphdb.UseLogger) AddSubLogger(root, chainio.Subsystem, interceptor, chainio.UseLogger) AddSubLogger(root, msgmux.Subsystem, interceptor, msgmux.UseLogger) AddSubLogger(root, sqldb.Subsystem, interceptor, sqldb.UseLogger) From afbe6b12edf451cbe3177b0a731abac0e4f13f95 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 25 Jul 2025 11:23:54 +0200 Subject: [PATCH 2/6] graph/db: define various db connection helpers for incoming tests This is a prep commit that just adds all the boilerplate connection logic for connecting to various graph DBs. These will be used in upcoming commits which will add tests and benchmarks. --- graph/db/benchmark_test.go | 507 +++++++++++++++++++++++++++++++++++++ 1 file changed, 507 insertions(+) create mode 100644 graph/db/benchmark_test.go diff --git a/graph/db/benchmark_test.go b/graph/db/benchmark_test.go new file mode 100644 index 00000000000..fc724b993b1 --- /dev/null +++ b/graph/db/benchmark_test.go @@ -0,0 +1,507 @@ +package graphdb + +import ( + "context" + "database/sql" + "errors" + "path" + "sync" + "testing" + "time" + + "github.com/btcsuite/btcd/chaincfg" + "github.com/lightningnetwork/lnd/batch" + "github.com/lightningnetwork/lnd/graph/db/models" + "github.com/lightningnetwork/lnd/kvdb" + "github.com/lightningnetwork/lnd/kvdb/postgres" + "github.com/lightningnetwork/lnd/kvdb/sqlbase" + "github.com/lightningnetwork/lnd/kvdb/sqlite" + "github.com/lightningnetwork/lnd/sqldb" + "github.com/stretchr/testify/require" + "golang.org/x/time/rate" +) + +// Here we define various database paths, connection strings and file names that +// we will use to open the database connections. These should be changed to +// point to your actual local test databases. +const ( + bboltDBPath = "testdata/kvdb" + kvdbSqlitePath = "testdata/kvdb" + nativeSQLSqlitePath = "testdata" + kvdbPostgresDNS = "postgres://test@localhost/graphbenchmark_kvdb" + nativeSQLPostgresDNS = "postgres://test@localhost/graphbenchmark" + + kvdbSqliteFile = "channel.sqlite" + kvdbBBoltFile = "channel.db" + nativeSQLSqliteFile = "lnd.sqlite" + + testMaxSQLiteConnections = 2 + testMaxPostgresConnections = 50 + testSQLBusyTimeout = 5 * time.Second +) + +// Here we define some variables that will be used to configure the graph stores +// we open for testing. These can be modified to suit your testing needs. +var ( + // dbTestChain is the chain hash used for initialising the test + // databases. This should be changed to match the chain hash of the + // database you are testing against. + dbTestChain = *chaincfg.MainNetParams.GenesisHash + + // testStoreOptions is used to configure the graph stores we open for + // testing. + testStoreOptions = []StoreOptionModifier{ + WithBatchCommitInterval(500 * time.Millisecond), + } + + // testSQLPaginationCfg is used to configure the pagination settings for + // the SQL stores we open for testing. + testSQLPaginationCfg = sqldb.DefaultPagedQueryConfig() + + // testSqlitePragmaOpts is a set of SQLite pragma options that we apply + // to the SQLite databases we open for testing. + testSqlitePragmaOpts = []string{ + "synchronous=full", + "auto_vacuum=incremental", + "fullfsync=true", + } +) + +// dbConnection is a struct that holds the name of the database connection +// and a function to open the connection. +type dbConnection struct { + name string + open func(testing.TB) V1Store +} + +// This var block defines the various database connections that we will use +// for testing. Each connection is defined as a dbConnection struct that +// contains a name and an open function. The open function is used to create +// a new V1Store instance for the given database type. +var ( + // kvdbBBoltConn is a connection to a kvdb-bbolt database called + // channel.db. + kvdbBBoltConn = dbConnection{ + name: "kvdb-bbolt", + open: func(b testing.TB) V1Store { + return connectBBoltDB(b, bboltDBPath, kvdbBBoltFile) + }, + } + + // kvdbSqliteConn is a connection to a kvdb-sqlite database called + // channel.sqlite. + kvdbSqliteConn = dbConnection{ + name: "kvdb-sqlite", + open: func(b testing.TB) V1Store { + return connectKVDBSqlite( + b, kvdbSqlitePath, kvdbSqliteFile, + ) + }, + } + + // nativeSQLSqliteConn is a connection to a native SQL sqlite database + // called lnd.sqlite. + nativeSQLSqliteConn = dbConnection{ + name: "native-sqlite", + open: func(b testing.TB) V1Store { + return connectNativeSQLite( + b, nativeSQLSqlitePath, nativeSQLSqliteFile, + ) + }, + } + + // kvdbPostgresConn is a connection to a kvdb-postgres database + // using a postgres connection string. + kvdbPostgresConn = dbConnection{ + name: "kvdb-postgres", + open: func(b testing.TB) V1Store { + return connectKVDBPostgres(b, kvdbPostgresDNS) + }, + } + + // nativeSQLPostgresConn is a connection to a native SQL postgres + // database using a postgres connection string. + nativeSQLPostgresConn = dbConnection{ + name: "native-postgres", + open: func(b testing.TB) V1Store { + return connectNativePostgres(b, nativeSQLPostgresDNS) + }, + } +) + +// connectNativePostgres creates a V1Store instance backed by a native Postgres +// database for testing purposes. +func connectNativePostgres(t testing.TB, dsn string) V1Store { + store, err := sqldb.NewPostgresStore(&sqldb.PostgresConfig{ + Dsn: dsn, + MaxConnections: testMaxPostgresConnections, + }) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, store.Close()) + }) + + return newSQLStore(t, store) +} + +// connectNativeSQLite creates a V1Store instance backed by a native SQLite +// database for testing purposes. +func connectNativeSQLite(t testing.TB, dbPath, file string) V1Store { + store, err := sqldb.NewSqliteStore( + &sqldb.SqliteConfig{ + MaxConnections: testMaxSQLiteConnections, + BusyTimeout: testSQLBusyTimeout, + PragmaOptions: testSqlitePragmaOpts, + }, + path.Join(dbPath, file), + ) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, store.Close()) + }) + + return newSQLStore(t, store) +} + +// connectKVDBPostgres creates a V1Store instance backed by a kvdb-postgres +// database for testing purposes. +func connectKVDBPostgres(t testing.TB, dsn string) V1Store { + kvStore, err := kvdb.Open( + kvdb.PostgresBackendName, context.Background(), + &postgres.Config{ + Dsn: dsn, + MaxConnections: testMaxPostgresConnections, + }, + // NOTE: we use the raw string here else we get an + // import cycle if we try to import lncfg.NSChannelDB. + "channeldb", + ) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, kvStore.Close()) + }) + + return newKVStore(t, kvStore) +} + +// connectKVDBSqlite creates a V1Store instance backed by a kvdb-sqlite +// database for testing purposes. +func connectKVDBSqlite(t testing.TB, dbPath, fileName string) V1Store { + sqlbase.Init(testMaxSQLiteConnections) + kvStore, err := kvdb.Open( + kvdb.SqliteBackendName, context.Background(), + &sqlite.Config{ + BusyTimeout: testSQLBusyTimeout, + MaxConnections: testMaxSQLiteConnections, + PragmaOptions: testSqlitePragmaOpts, + }, dbPath, fileName, + // NOTE: we use the raw string here else we get an + // import cycle if we try to import lncfg.NSChannelDB. + "channeldb", + ) + require.NoError(t, err) + + return newKVStore(t, kvStore) +} + +// connectBBoltDB creates a new BBolt database connection for testing. +func connectBBoltDB(t testing.TB, dbPath, fileName string) V1Store { + cfg := &kvdb.BoltBackendConfig{ + DBPath: dbPath, + DBFileName: fileName, + NoFreelistSync: true, + AutoCompact: false, + AutoCompactMinAge: kvdb.DefaultBoltAutoCompactMinAge, + DBTimeout: kvdb.DefaultDBTimeout, + } + + kvStore, err := kvdb.GetBoltBackend(cfg) + require.NoError(t, err) + + return newKVStore(t, kvStore) +} + +// newKVStore creates a new KVStore instance for testing using a provided +// kvdb.Backend instance. +func newKVStore(t testing.TB, backend kvdb.Backend) V1Store { + store, err := NewKVStore(backend, testStoreOptions...) + require.NoError(t, err) + + return store +} + +// newSQLStore creates a new SQLStore instance for testing using a provided +// sqldb.DB instance. +func newSQLStore(t testing.TB, db sqldb.DB) V1Store { + err := db.ApplyAllMigrations( + context.Background(), sqldb.GetMigrations(), + ) + require.NoError(t, err) + + graphExecutor := sqldb.NewTransactionExecutor( + db.GetBaseDB(), func(tx *sql.Tx) SQLQueries { + return db.GetBaseDB().WithTx(tx) + }, + ) + + store, err := NewSQLStore( + &SQLStoreConfig{ + ChainHash: dbTestChain, + PaginationCfg: testSQLPaginationCfg, + }, + graphExecutor, testStoreOptions..., + ) + require.NoError(t, err) + + return store +} + +// TestPopulateDBs is a helper test that can be used to populate various local +// graph DBs from some source graph DB. This can then be used to run the +// various benchmark tests against the same graph data. +// +// TODO(elle): this test reveals that the batching logic we use might be +// problematic for postgres backends. This needs some investigation & it might +// make sense to only use LazyAdd for sqlite backends & it may make sense to +// also add a maximum batch size to avoid grouping too many updates at once. +// Observations: +// - the LazyAdd options need to be turned off for channel/policy update calls +// for both the native SQL postgres & kvdb postgres backend for this test to +// succeed. +// - The LazyAdd option must be added for the sqlite backends, else it takes +// very long to sync the graph. +func TestPopulateDBs(t *testing.T) { + t.Parallel() + ctx := context.Background() + + // NOTE: uncomment the line below to run this test locally, then provide + // the desired source database (and make sure the destination Postgres + // databases exist and are running). + t.Skipf("Skipping local helper test") + + // Set your desired source database here. For kvdbSqliteConn, a file + // called testdata/kvdb/channel.sqlite must exist and be populated with + // a KVDB based channel graph. + sourceDB := kvdbSqliteConn + + // Populate this list with the desired destination databases. + destinations := []dbConnection{ + kvdbBBoltConn, + nativeSQLSqliteConn, + kvdbPostgresConn, + nativeSQLPostgresConn, + } + + // Open and start the source graph. + src, err := NewChannelGraph(sourceDB.open(t)) + require.NoError(t, err) + require.NoError(t, src.Start()) + t.Cleanup(func() { + require.NoError(t, src.Stop()) + }) + + // countNodes is a helper function to count the number of nodes in the + // graph. + countNodes := func(graph *ChannelGraph) int { + numNodes := 0 + err := graph.ForEachNode(ctx, func(tx NodeRTx) error { + numNodes++ + return nil + }, func() { + numNodes = 0 + }) + require.NoError(t, err) + + return numNodes + } + + // countChannels is a helper function to count the number of channels + // in the graph. + countChannels := func(graph *ChannelGraph) (int, int) { + var ( + numChans = 0 + numPolicies = 0 + ) + err := graph.ForEachChannel( + ctx, func(info *models.ChannelEdgeInfo, + policy, + policy2 *models.ChannelEdgePolicy) error { + + numChans++ + if policy != nil { + numPolicies++ + } + if policy2 != nil { + numPolicies++ + } + + return nil + }, func() { + numChans = 0 + numPolicies = 0 + }) + require.NoError(t, err) + + return numChans, numPolicies + } + + t.Logf("Number of nodes in source graph (%s): %d", sourceDB.name, + countNodes(src)) + numChan, numPol := countChannels(src) + t.Logf("Number of channels & policies in source graph (%s): %d "+ + "channels, %d policies", sourceDB.name, numChan, numPol) + + for _, destDB := range destinations { + t.Run(destDB.name, func(t *testing.T) { + t.Parallel() + + // Open and start the destination graph. + dest, err := NewChannelGraph(destDB.open(t)) + require.NoError(t, err) + require.NoError(t, dest.Start()) + t.Cleanup(func() { + require.NoError(t, dest.Stop()) + }) + + t.Logf("Number of nodes in %s graph: %d", destDB.name, + countNodes(dest)) + numChan, numPol := countChannels(dest) + t.Logf("Number of channels in %s graph: %d, %d", + destDB.name, numChan, numPol) + + // Sync the source graph to the destination graph. + syncGraph(t, src, dest) + + t.Logf("Number of nodes in %s graph after sync: %d", + destDB.name, countNodes(dest)) + numChan, numPol = countChannels(dest) + t.Logf("Number of channels in %s graph after sync: "+ + "%d, %d", destDB.name, numChan, numPol) + }) + } +} + +// syncGraph synchronizes the source graph with the destination graph by +// copying all nodes and channels from the source to the destination. +func syncGraph(t *testing.T, src, dest *ChannelGraph) { + ctx := context.Background() + + var ( + s = rate.Sometimes{ + Interval: 10 * time.Second, + } + t0 = time.Now() + + chunk = 0 + total = 0 + mu sync.Mutex + ) + + reportNodeStats := func() { + elapsed := time.Since(t0).Seconds() + ratePerSec := float64(chunk) / elapsed + t.Logf("Synced %d nodes (last chunk: %d) "+ + "(%.2f nodes/second)", + total, chunk, ratePerSec) + + t0 = time.Now() + } + + var wgNodes sync.WaitGroup + err := src.ForEachNode(ctx, func(tx NodeRTx) error { + wgNodes.Add(1) + go func() { + defer wgNodes.Done() + + // NOTE: even though the transaction (tx) may have + // already been aborted, it is still ok to use the + // Node() result since that is a static object that + // is not affected by the transaction state. + err := dest.AddLightningNode( + ctx, tx.Node(), batch.LazyAdd(), + ) + require.NoError(t, err) + + mu.Lock() + total++ + chunk++ + s.Do(func() { + reportNodeStats() + chunk = 0 + }) + mu.Unlock() + }() + + return nil + }, func() {}) + require.NoError(t, err) + + wgNodes.Wait() + reportNodeStats() + t.Logf("Done syncing %d nodes", total) + + total = 0 + chunk = 0 + t0 = time.Now() + + reportChanStats := func() { + elapsed := time.Since(t0).Seconds() + ratePerSec := float64(chunk) / elapsed + t.Logf("Synced %d channels (and its "+ + "policies) (last chunk: %d) "+ + "(%.2f channels/second)", + total, chunk, ratePerSec) + + t0 = time.Now() + } + + var wgChans sync.WaitGroup + err = src.ForEachChannel(ctx, func(info *models.ChannelEdgeInfo, + policy1, policy2 *models.ChannelEdgePolicy) error { + + // Add each channel & policy. We do this in a goroutine to + // take advantage of batch processing. + wgChans.Add(1) + go func() { + defer wgChans.Done() + + err := dest.AddChannelEdge( + ctx, info, batch.LazyAdd(), + ) + if !errors.Is(err, ErrEdgeAlreadyExist) { + require.NoError(t, err) + } + + if policy1 != nil { + err = dest.UpdateEdgePolicy( + ctx, policy1, batch.LazyAdd(), + ) + require.NoError(t, err) + } + + if policy2 != nil { + err = dest.UpdateEdgePolicy( + ctx, policy2, batch.LazyAdd(), + ) + require.NoError(t, err) + } + + mu.Lock() + total++ + chunk++ + s.Do(func() { + reportChanStats() + chunk = 0 + }) + mu.Unlock() + }() + + return nil + }, func() {}) + require.NoError(t, err) + + wgChans.Wait() + reportChanStats() + + t.Logf("Done syncing %d channels", total) +} From 77fe1816f62ef9068f52cc7f67e7ac1c3949f8d7 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 25 Jul 2025 14:01:29 +0200 Subject: [PATCH 3/6] graph/db: add test helper for populating via migration This commit adds a helper test that is similar to TestPopulateDBs except for the fact that it uses the MigrateGraphToSQL function directly to migrate a local kvdb-sql graph to a native SQL one. --- graph/db/benchmark_test.go | 101 ++++++++++++++++++++++++++++++++----- 1 file changed, 88 insertions(+), 13 deletions(-) diff --git a/graph/db/benchmark_test.go b/graph/db/benchmark_test.go index fc724b993b1..10505ac124d 100644 --- a/graph/db/benchmark_test.go +++ b/graph/db/benchmark_test.go @@ -4,12 +4,14 @@ import ( "context" "database/sql" "errors" + "os" "path" "sync" "testing" "time" "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btclog/v2" "github.com/lightningnetwork/lnd/batch" "github.com/lightningnetwork/lnd/graph/db/models" "github.com/lightningnetwork/lnd/kvdb" @@ -132,6 +134,12 @@ var ( // connectNativePostgres creates a V1Store instance backed by a native Postgres // database for testing purposes. func connectNativePostgres(t testing.TB, dsn string) V1Store { + return newSQLStore(t, sqlPostgres(t, dsn)) +} + +// sqlPostgres creates a sqldb.DB instance backed by a native Postgres database +// for testing purposes. +func sqlPostgres(t testing.TB, dsn string) BatchedSQLQueries { store, err := sqldb.NewPostgresStore(&sqldb.PostgresConfig{ Dsn: dsn, MaxConnections: testMaxPostgresConnections, @@ -141,12 +149,18 @@ func connectNativePostgres(t testing.TB, dsn string) V1Store { require.NoError(t, store.Close()) }) - return newSQLStore(t, store) + return newSQLExecutor(t, store) } // connectNativeSQLite creates a V1Store instance backed by a native SQLite // database for testing purposes. func connectNativeSQLite(t testing.TB, dbPath, file string) V1Store { + return newSQLStore(t, sqlSQLite(t, dbPath, file)) +} + +// sqlSQLite creates a sqldb.DB instance backed by a native SQLite database for +// testing purposes. +func sqlSQLite(t testing.TB, dbPath, file string) BatchedSQLQueries { store, err := sqldb.NewSqliteStore( &sqldb.SqliteConfig{ MaxConnections: testMaxSQLiteConnections, @@ -160,12 +174,12 @@ func connectNativeSQLite(t testing.TB, dbPath, file string) V1Store { require.NoError(t, store.Close()) }) - return newSQLStore(t, store) + return newSQLExecutor(t, store) } -// connectKVDBPostgres creates a V1Store instance backed by a kvdb-postgres +// kvdbPostgres creates a kvdb.Backend instance backed by a kvdb-postgres // database for testing purposes. -func connectKVDBPostgres(t testing.TB, dsn string) V1Store { +func kvdbPostgres(t testing.TB, dsn string) kvdb.Backend { kvStore, err := kvdb.Open( kvdb.PostgresBackendName, context.Background(), &postgres.Config{ @@ -181,12 +195,18 @@ func connectKVDBPostgres(t testing.TB, dsn string) V1Store { require.NoError(t, kvStore.Close()) }) - return newKVStore(t, kvStore) + return kvStore } -// connectKVDBSqlite creates a V1Store instance backed by a kvdb-sqlite +// connectKVDBPostgres creates a V1Store instance backed by a kvdb-postgres // database for testing purposes. -func connectKVDBSqlite(t testing.TB, dbPath, fileName string) V1Store { +func connectKVDBPostgres(t testing.TB, dsn string) V1Store { + return newKVStore(t, kvdbPostgres(t, dsn)) +} + +// kvdbSqlite creates a kvdb.Backend instance backed by a kvdb-sqlite +// database for testing purposes. +func kvdbSqlite(t testing.TB, dbPath, fileName string) kvdb.Backend { sqlbase.Init(testMaxSQLiteConnections) kvStore, err := kvdb.Open( kvdb.SqliteBackendName, context.Background(), @@ -201,7 +221,13 @@ func connectKVDBSqlite(t testing.TB, dbPath, fileName string) V1Store { ) require.NoError(t, err) - return newKVStore(t, kvStore) + return kvStore +} + +// connectKVDBSqlite creates a V1Store instance backed by a kvdb-sqlite +// database for testing purposes. +func connectKVDBSqlite(t testing.TB, dbPath, fileName string) V1Store { + return newKVStore(t, kvdbSqlite(t, dbPath, fileName)) } // connectBBoltDB creates a new BBolt database connection for testing. @@ -230,26 +256,30 @@ func newKVStore(t testing.TB, backend kvdb.Backend) V1Store { return store } -// newSQLStore creates a new SQLStore instance for testing using a provided -// sqldb.DB instance. -func newSQLStore(t testing.TB, db sqldb.DB) V1Store { +// newSQLExecutor creates a new BatchedSQLQueries instance for testing using a +// provided sqldb.DB instance. +func newSQLExecutor(t testing.TB, db sqldb.DB) BatchedSQLQueries { err := db.ApplyAllMigrations( context.Background(), sqldb.GetMigrations(), ) require.NoError(t, err) - graphExecutor := sqldb.NewTransactionExecutor( + return sqldb.NewTransactionExecutor( db.GetBaseDB(), func(tx *sql.Tx) SQLQueries { return db.GetBaseDB().WithTx(tx) }, ) +} +// newSQLStore creates a new SQLStore instance for testing using a provided +// sqldb.DB instance. +func newSQLStore(t testing.TB, db BatchedSQLQueries) V1Store { store, err := NewSQLStore( &SQLStoreConfig{ ChainHash: dbTestChain, PaginationCfg: testSQLPaginationCfg, }, - graphExecutor, testStoreOptions..., + db, testStoreOptions..., ) require.NoError(t, err) @@ -381,6 +411,51 @@ func TestPopulateDBs(t *testing.T) { } } +// TestPopulateViaMigration is a helper test that can be used to populate a +// local native SQL graph from a kvdb-sql graph using the migration logic. +// +// NOTE: the testPostgres variable can be set to true to test with a +// postgres backend instead of the kvdb-sqlite backend. +// +// NOTE: this is a helper test and is not run by default. +// +// TODO(elle): this test reveals tht there may be an issue with the postgres +// migration as it is super slow. +func TestPopulateViaMigration(t *testing.T) { + t.Skipf("Skipping local helper test") + + // Set this to true if you want to test with a postgres backend. + // By default, we use a kvdb-sqlite backend. + testPostgres := false + + ctx := context.Background() + + // Set up a logger so we can see the migration progress. + logger := btclog.NewDefaultHandler(os.Stdout) + UseLogger(btclog.NewSLogger(logger)) + log.SetLevel(btclog.LevelDebug) + + var ( + srcKVDB = kvdbSqlite(t, kvdbSqlitePath, kvdbSqliteFile) + dstSQL = sqlSQLite(t, nativeSQLSqlitePath, nativeSQLSqliteFile) + ) + if testPostgres { + srcKVDB = kvdbPostgres(t, kvdbPostgresDNS) + dstSQL = sqlPostgres(t, nativeSQLPostgresDNS) + } + + // Use the graph migration to populate the SQL graph from the + // kvdb graph. + err := dstSQL.ExecTx( + ctx, sqldb.WriteTxOpt(), func(queries SQLQueries) error { + return MigrateGraphToSQL( + ctx, srcKVDB, queries, dbTestChain, + ) + }, func() {}, + ) + require.NoError(t, err) +} + // syncGraph synchronizes the source graph with the destination graph by // copying all nodes and channels from the source to the destination. func syncGraph(t *testing.T, src, dest *ChannelGraph) { From 45033b8c54deb92fd916e2ca26f99dae42a56ded Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 25 Jul 2025 14:58:31 +0200 Subject: [PATCH 4/6] graph/db: add SQL migration progress logs --- graph/db/sql_migration.go | 101 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 98 insertions(+), 3 deletions(-) diff --git a/graph/db/sql_migration.go b/graph/db/sql_migration.go index dd7cf62b253..2f1792b1f59 100644 --- a/graph/db/sql_migration.go +++ b/graph/db/sql_migration.go @@ -17,6 +17,7 @@ import ( "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/sqldb" "github.com/lightningnetwork/lnd/sqldb/sqlc" + "golang.org/x/time/rate" ) // MigrateGraphToSQL migrates the graph store from a KV backend to a SQL @@ -115,6 +116,12 @@ func migrateNodes(ctx context.Context, kvBackend kvdb.Backend, var ( count uint64 skipped uint64 + + t0 = time.Now() + chunk uint64 + s = rate.Sometimes{ + Interval: 10 * time.Second, + } ) // Loop through each node in the KV store and insert it into the SQL @@ -146,6 +153,7 @@ func migrateNodes(ctx context.Context, kvBackend kvdb.Backend, } count++ + chunk++ // TODO(elle): At this point, we should check the loaded node // to see if we should extract any DNS addresses from its @@ -203,9 +211,25 @@ func migrateNodes(ctx context.Context, kvBackend kvdb.Backend, }, ) - return sqldb.CompareRecords( + err = sqldb.CompareRecords( node, migratedNode, fmt.Sprintf("node %x", pub), ) + if err != nil { + return fmt.Errorf("node mismatch after migration "+ + "for node %x: %w", pub, err) + } + + s.Do(func() { + elapsed := time.Since(t0).Seconds() + ratePerSec := float64(chunk) / elapsed + log.Debugf("Migrated %d nodes (%.2f nodes/sec)", + count, ratePerSec) + + t0 = time.Now() + chunk = 0 + }) + + return nil }, func() { // No reset is needed since if a retry occurs, the entire // migration will be retried from the start. @@ -225,6 +249,8 @@ func migrateNodes(ctx context.Context, kvBackend kvdb.Backend, func migrateSourceNode(ctx context.Context, kvdb kvdb.Backend, sqlDB SQLQueries) error { + log.Debugf("Migrating source node from KV to SQL") + sourceNode, err := sourceNode(kvdb) if errors.Is(err, ErrSourceNodeNotSet) { // If the source node has not been set yet, we can skip this @@ -302,6 +328,12 @@ func migrateChannelsAndPolicies(ctx context.Context, kvBackend kvdb.Backend, skippedChanCount uint64 policyCount uint64 skippedPolicyCount uint64 + + t0 = time.Now() + chunk uint64 + s = rate.Sometimes{ + Interval: 10 * time.Second, + } ) migChanPolicy := func(policy *models.ChannelEdgePolicy) error { // If the policy is nil, we can skip it. @@ -386,6 +418,16 @@ func migrateChannelsAndPolicies(ctx context.Context, kvBackend kvdb.Backend, scid, err) } + s.Do(func() { + elapsed := time.Since(t0).Seconds() + ratePerSec := float64(chunk) / elapsed + log.Debugf("Migrated %d channels (%.2f channels/sec)", + channelCount, ratePerSec) + + t0 = time.Now() + chunk = 0 + }) + return nil }, func() { // No reset is needed since if a retry occurs, the entire @@ -544,6 +586,12 @@ func migratePruneLog(ctx context.Context, kvBackend kvdb.Backend, count uint64 pruneTipHeight uint32 pruneTipHash chainhash.Hash + + t0 = time.Now() + chunk uint64 + s = rate.Sometimes{ + Interval: 10 * time.Second, + } ) // migrateSinglePruneEntry is a helper function that inserts a single @@ -596,6 +644,17 @@ func migratePruneLog(ctx context.Context, kvBackend kvdb.Backend, height, err) } + s.Do(func() { + elapsed := time.Since(t0).Seconds() + ratePerSec := float64(chunk) / elapsed + log.Debugf("Migrated %d prune log "+ + "entries (%.2f entries/sec)", + count, ratePerSec) + + t0 = time.Now() + chunk = 0 + }) + return nil }, ) @@ -715,7 +774,15 @@ func forEachPruneLogEntry(db kvdb.Backend, cb func(height uint32, func migrateClosedSCIDIndex(ctx context.Context, kvBackend kvdb.Backend, sqlDB SQLQueries) error { - var count uint64 + var ( + count uint64 + + t0 = time.Now() + chunk uint64 + s = rate.Sometimes{ + Interval: 10 * time.Second, + } + ) migrateSingleClosedSCID := func(scid lnwire.ShortChannelID) error { count++ @@ -739,6 +806,16 @@ func migrateClosedSCIDIndex(ctx context.Context, kvBackend kvdb.Backend, "but is not", scid) } + s.Do(func() { + elapsed := time.Since(t0).Seconds() + ratePerSec := float64(chunk) / elapsed + log.Debugf("Migrated %d closed scids "+ + "(%.2f entries/sec)", count, ratePerSec) + + t0 = time.Now() + chunk = 0 + }) + return nil } @@ -766,7 +843,15 @@ func migrateClosedSCIDIndex(ctx context.Context, kvBackend kvdb.Backend, func migrateZombieIndex(ctx context.Context, kvBackend kvdb.Backend, sqlDB SQLQueries) error { - var count uint64 + var ( + count uint64 + + t0 = time.Now() + chunk uint64 + s = rate.Sometimes{ + Interval: 10 * time.Second, + } + ) err := forEachZombieEntry(kvBackend, func(chanID uint64, pubKey1, pubKey2 [33]byte) error { @@ -820,6 +905,16 @@ func migrateZombieIndex(ctx context.Context, kvBackend kvdb.Backend, "a zombie, but is not", chanID) } + s.Do(func() { + elapsed := time.Since(t0).Seconds() + ratePerSec := float64(chunk) / elapsed + log.Debugf("Migrated %d zombie index entries "+ + "(%.2f entries/sec)", count, ratePerSec) + + t0 = time.Now() + chunk = 0 + }) + return nil }) if err != nil { From f5ce4a56563f393627e863cb0f2211609710aaff Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 25 Jul 2025 15:02:16 +0200 Subject: [PATCH 5/6] graph/db: add BenchmarkCacheLoading Add a benchmark test to test graph cache loading against various local graph DBs. --- graph/db/benchmark_test.go | 40 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/graph/db/benchmark_test.go b/graph/db/benchmark_test.go index 10505ac124d..36a8cf27351 100644 --- a/graph/db/benchmark_test.go +++ b/graph/db/benchmark_test.go @@ -580,3 +580,43 @@ func syncGraph(t *testing.T, src, dest *ChannelGraph) { t.Logf("Done syncing %d channels", total) } + +// BenchmarkCacheLoading benchmarks how long it takes to load the in-memory +// graph cache from a populated database. +// +// NOTE: this is to be run against a local graph database. It can be run +// either against a kvdb-bbolt channel.db file, or a kvdb-sqlite channel.sqlite +// file or a postgres connection containing the channel graph in kvdb format and +// finally, it can be run against a native SQL sqlite or postgres database. +// +// NOTE: the TestPopulateDBs test helper can be used to populate a set of test +// DBs from a single source db. +func BenchmarkCacheLoading(b *testing.B) { + ctx := context.Background() + + tests := []dbConnection{ + kvdbBBoltConn, + kvdbSqliteConn, + nativeSQLSqliteConn, + kvdbPostgresConn, + nativeSQLPostgresConn, + } + + for _, test := range tests { + b.Run(test.name, func(b *testing.B) { + store := test.open(b) + + // Reset timer to exclude setup time. + b.ResetTimer() + + for i := 0; i < b.N; i++ { + b.StopTimer() + graph, err := NewChannelGraph(store) + require.NoError(b, err) + b.StartTimer() + + require.NoError(b, graph.populateCache(ctx)) + } + }) + } +} From 5800a3e0548d6cdc1f491c776bf1677e0a83817d Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 25 Jul 2025 15:04:38 +0200 Subject: [PATCH 6/6] graph/db: benchmark various graph read calls This commit adds benchmark tests for the ForEachNode and ForEachChannel DB calls which are called by DescribeGraph. --- graph/db/benchmark_test.go | 71 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/graph/db/benchmark_test.go b/graph/db/benchmark_test.go index 36a8cf27351..81083a97f93 100644 --- a/graph/db/benchmark_test.go +++ b/graph/db/benchmark_test.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "errors" + "fmt" "os" "path" "sync" @@ -620,3 +621,73 @@ func BenchmarkCacheLoading(b *testing.B) { }) } } + +// BenchmarkGraphReadMethods benchmarks various read calls of various V1Store +// implementations. +// +// NOTE: this is to be run against a local graph database. It can be run +// either against a kvdb-bbolt channel.db file, or a kvdb-sqlite channel.sqlite +// file or a postgres connection containing the channel graph in kvdb format and +// finally, it can be run against a native SQL sqlite or postgres database. +// +// NOTE: the TestPopulateDBs test helper can be used to populate a set of test +// DBs from a single source db. +func BenchmarkGraphReadMethods(b *testing.B) { + ctx := context.Background() + + backends := []dbConnection{ + kvdbBBoltConn, + kvdbSqliteConn, + nativeSQLSqliteConn, + kvdbPostgresConn, + nativeSQLPostgresConn, + } + + tests := []struct { + name string + fn func(b testing.TB, store V1Store) + }{ + { + name: "ForEachNode", + fn: func(b testing.TB, store V1Store) { + err := store.ForEachNode( + ctx, func(_ NodeRTx) error { + return nil + }, func() {}, + ) + require.NoError(b, err) + }, + }, + { + name: "ForEachChannel", + fn: func(b testing.TB, store V1Store) { + //nolint:ll + err := store.ForEachChannel( + ctx, func(_ *models.ChannelEdgeInfo, + _ *models.ChannelEdgePolicy, + _ *models.ChannelEdgePolicy) error { + + return nil + }, func() {}, + ) + require.NoError(b, err) + }, + }, + } + + for _, test := range tests { + for _, db := range backends { + name := fmt.Sprintf("%s-%s", test.name, db.name) + b.Run(name, func(b *testing.B) { + store := db.open(b) + + // Reset timer to exclude setup time. + b.ResetTimer() + + for i := 0; i < b.N; i++ { + test.fn(b, store) + } + }) + } + } +}