@@ -17,6 +17,7 @@ import (
17
17
"github.com/lightningnetwork/lnd/lnwire"
18
18
"github.com/lightningnetwork/lnd/sqldb"
19
19
"github.com/lightningnetwork/lnd/sqldb/sqlc"
20
+ "golang.org/x/time/rate"
20
21
)
21
22
22
23
// MigrateGraphToSQL migrates the graph store from a KV backend to a SQL
@@ -115,6 +116,12 @@ func migrateNodes(ctx context.Context, kvBackend kvdb.Backend,
115
116
var (
116
117
count uint64
117
118
skipped uint64
119
+
120
+ t0 = time .Now ()
121
+ chunk uint64
122
+ s = rate.Sometimes {
123
+ Interval : 10 * time .Second ,
124
+ }
118
125
)
119
126
120
127
// Loop through each node in the KV store and insert it into the SQL
@@ -146,6 +153,7 @@ func migrateNodes(ctx context.Context, kvBackend kvdb.Backend,
146
153
}
147
154
148
155
count ++
156
+ chunk ++
149
157
150
158
// TODO(elle): At this point, we should check the loaded node
151
159
// to see if we should extract any DNS addresses from its
@@ -203,9 +211,25 @@ func migrateNodes(ctx context.Context, kvBackend kvdb.Backend,
203
211
},
204
212
)
205
213
206
- return sqldb .CompareRecords (
214
+ err = sqldb .CompareRecords (
207
215
node , migratedNode , fmt .Sprintf ("node %x" , pub ),
208
216
)
217
+ if err != nil {
218
+ return fmt .Errorf ("node mismatch after migration " +
219
+ "for node %x: %w" , pub , err )
220
+ }
221
+
222
+ s .Do (func () {
223
+ elapsed := time .Since (t0 ).Seconds ()
224
+ ratePerSec := float64 (chunk ) / elapsed
225
+ log .Debugf ("Migrated %d nodes (%.2f nodes/sec)" ,
226
+ count , ratePerSec )
227
+
228
+ t0 = time .Now ()
229
+ chunk = 0
230
+ })
231
+
232
+ return nil
209
233
}, func () {
210
234
// No reset is needed since if a retry occurs, the entire
211
235
// migration will be retried from the start.
@@ -225,6 +249,8 @@ func migrateNodes(ctx context.Context, kvBackend kvdb.Backend,
225
249
func migrateSourceNode (ctx context.Context , kvdb kvdb.Backend ,
226
250
sqlDB SQLQueries ) error {
227
251
252
+ log .Debugf ("Migrating source node from KV to SQL" )
253
+
228
254
sourceNode , err := sourceNode (kvdb )
229
255
if errors .Is (err , ErrSourceNodeNotSet ) {
230
256
// If the source node has not been set yet, we can skip this
@@ -302,6 +328,12 @@ func migrateChannelsAndPolicies(ctx context.Context, kvBackend kvdb.Backend,
302
328
skippedChanCount uint64
303
329
policyCount uint64
304
330
skippedPolicyCount uint64
331
+
332
+ t0 = time .Now ()
333
+ chunk uint64
334
+ s = rate.Sometimes {
335
+ Interval : 10 * time .Second ,
336
+ }
305
337
)
306
338
migChanPolicy := func (policy * models.ChannelEdgePolicy ) error {
307
339
// If the policy is nil, we can skip it.
@@ -386,6 +418,16 @@ func migrateChannelsAndPolicies(ctx context.Context, kvBackend kvdb.Backend,
386
418
scid , err )
387
419
}
388
420
421
+ s .Do (func () {
422
+ elapsed := time .Since (t0 ).Seconds ()
423
+ ratePerSec := float64 (chunk ) / elapsed
424
+ log .Debugf ("Migrated %d channels (%.2f channels/sec)" ,
425
+ channelCount , ratePerSec )
426
+
427
+ t0 = time .Now ()
428
+ chunk = 0
429
+ })
430
+
389
431
return nil
390
432
}, func () {
391
433
// No reset is needed since if a retry occurs, the entire
@@ -544,6 +586,12 @@ func migratePruneLog(ctx context.Context, kvBackend kvdb.Backend,
544
586
count uint64
545
587
pruneTipHeight uint32
546
588
pruneTipHash chainhash.Hash
589
+
590
+ t0 = time .Now ()
591
+ chunk uint64
592
+ s = rate.Sometimes {
593
+ Interval : 10 * time .Second ,
594
+ }
547
595
)
548
596
549
597
// migrateSinglePruneEntry is a helper function that inserts a single
@@ -596,6 +644,17 @@ func migratePruneLog(ctx context.Context, kvBackend kvdb.Backend,
596
644
height , err )
597
645
}
598
646
647
+ s .Do (func () {
648
+ elapsed := time .Since (t0 ).Seconds ()
649
+ ratePerSec := float64 (chunk ) / elapsed
650
+ log .Debugf ("Migrated %d prune log " +
651
+ "entries (%.2f entries/sec)" ,
652
+ count , ratePerSec )
653
+
654
+ t0 = time .Now ()
655
+ chunk = 0
656
+ })
657
+
599
658
return nil
600
659
},
601
660
)
@@ -715,7 +774,15 @@ func forEachPruneLogEntry(db kvdb.Backend, cb func(height uint32,
715
774
func migrateClosedSCIDIndex (ctx context.Context , kvBackend kvdb.Backend ,
716
775
sqlDB SQLQueries ) error {
717
776
718
- var count uint64
777
+ var (
778
+ count uint64
779
+
780
+ t0 = time .Now ()
781
+ chunk uint64
782
+ s = rate.Sometimes {
783
+ Interval : 10 * time .Second ,
784
+ }
785
+ )
719
786
migrateSingleClosedSCID := func (scid lnwire.ShortChannelID ) error {
720
787
count ++
721
788
@@ -739,6 +806,16 @@ func migrateClosedSCIDIndex(ctx context.Context, kvBackend kvdb.Backend,
739
806
"but is not" , scid )
740
807
}
741
808
809
+ s .Do (func () {
810
+ elapsed := time .Since (t0 ).Seconds ()
811
+ ratePerSec := float64 (chunk ) / elapsed
812
+ log .Debugf ("Migrated %d closed scids " +
813
+ "(%.2f entries/sec)" , count , ratePerSec )
814
+
815
+ t0 = time .Now ()
816
+ chunk = 0
817
+ })
818
+
742
819
return nil
743
820
}
744
821
@@ -766,7 +843,15 @@ func migrateClosedSCIDIndex(ctx context.Context, kvBackend kvdb.Backend,
766
843
func migrateZombieIndex (ctx context.Context , kvBackend kvdb.Backend ,
767
844
sqlDB SQLQueries ) error {
768
845
769
- var count uint64
846
+ var (
847
+ count uint64
848
+
849
+ t0 = time .Now ()
850
+ chunk uint64
851
+ s = rate.Sometimes {
852
+ Interval : 10 * time .Second ,
853
+ }
854
+ )
770
855
err := forEachZombieEntry (kvBackend , func (chanID uint64 , pubKey1 ,
771
856
pubKey2 [33 ]byte ) error {
772
857
@@ -820,6 +905,16 @@ func migrateZombieIndex(ctx context.Context, kvBackend kvdb.Backend,
820
905
"a zombie, but is not" , chanID )
821
906
}
822
907
908
+ s .Do (func () {
909
+ elapsed := time .Since (t0 ).Seconds ()
910
+ ratePerSec := float64 (chunk ) / elapsed
911
+ log .Debugf ("Migrated %d zombie index entries " +
912
+ "(%.2f entries/sec)" , count , ratePerSec )
913
+
914
+ t0 = time .Now ()
915
+ chunk = 0
916
+ })
917
+
823
918
return nil
824
919
})
825
920
if err != nil {
0 commit comments