diff --git a/go.mod b/go.mod index c54a787e7..97a54488c 100644 --- a/go.mod +++ b/go.mod @@ -35,7 +35,6 @@ require ( github.com/stretchr/testify v1.10.0 github.com/urfave/cli v1.22.14 go.etcd.io/bbolt v1.3.11 - golang.org/x/net v0.38.0 golang.org/x/sync v0.12.0 google.golang.org/grpc v1.64.1 google.golang.org/protobuf v1.34.2 @@ -184,6 +183,7 @@ require ( golang.org/x/crypto v0.36.0 // indirect golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect golang.org/x/mod v0.21.0 // indirect + golang.org/x/net v0.38.0 // indirect golang.org/x/sys v0.31.0 // indirect golang.org/x/term v0.30.0 // indirect golang.org/x/text v0.23.0 // indirect diff --git a/loopout.go b/loopout.go index 6fc740916..182f9a5a7 100644 --- a/loopout.go +++ b/loopout.go @@ -1246,7 +1246,11 @@ func (s *loopOutSwap) waitForHtlcSpendConfirmedV2(globalCtx context.Context, s.height = notification.(int32) timerChan = s.timerFactory(repushDelay) + s.log.Infof("Received block %d", s.height) + case <-timerChan: + s.log.Infof("Checking the sweep") + // canSweep will return false if the preimage is // not revealed yet but the conf target is closer than // 20 blocks. In this case to be sure we won't attempt @@ -1268,6 +1272,7 @@ func (s *loopOutSwap) waitForHtlcSpendConfirmedV2(globalCtx context.Context, } // Send the sweep to the sweeper. + s.log.Infof("(Re)adding the sweep to sweepbatcher") err := s.batcher.AddSweep(ctx, &sweepReq) if err != nil { return nil, err diff --git a/sweepbatcher/sweep_batch.go b/sweepbatcher/sweep_batch.go index 933077fcf..3eb65faf3 100644 --- a/sweepbatcher/sweep_batch.go +++ b/sweepbatcher/sweep_batch.go @@ -169,9 +169,10 @@ type batchConfig struct { // initial delay completion and publishing the batch transaction. batchPublishDelay time.Duration - // noBumping instructs sweepbatcher not to fee bump itself and rely on - // external source of fee rates (FeeRateProvider). - noBumping bool + // customFeeRate provides custom min fee rate per swap. The batch uses + // max of the fee rates of its swaps. In this mode confTarget is + // ignored and fee bumping by sweepbatcher is disabled. + customFeeRate FeeRateProvider // txLabeler is a function generating a transaction label. It is called // before publishing a batch transaction. Batch ID is passed to it. @@ -232,6 +233,10 @@ type batch struct { // spendChan is the channel over which spend notifications are received. spendChan chan *chainntnfs.SpendDetail + // spendErrChan is the channel over which spend notifier errors are + // received. + spendErrChan chan error + // confChan is the channel over which confirmation notifications are // received. confChan chan *chainntnfs.TxConfirmation @@ -378,9 +383,7 @@ func NewBatch(cfg batchConfig, bk batchKit) *batch { id: -1, state: Open, sweeps: make(map[wire.OutPoint]sweep), - spendChan: make(chan *chainntnfs.SpendDetail), confChan: make(chan *chainntnfs.TxConfirmation, 1), - reorgChan: make(chan struct{}, 1), testReqs: make(chan *testRequest), errChan: make(chan error, 1), callEnter: make(chan struct{}), @@ -423,9 +426,7 @@ func NewBatchFromDB(cfg batchConfig, bk batchKit) (*batch, error) { state: bk.state, primarySweepID: bk.primaryID, sweeps: bk.sweeps, - spendChan: make(chan *chainntnfs.SpendDetail), confChan: make(chan *chainntnfs.TxConfirmation, 1), - reorgChan: make(chan struct{}, 1), testReqs: make(chan *testRequest), errChan: make(chan error, 1), callEnter: make(chan struct{}), @@ -723,6 +724,9 @@ func (b *batch) addSweeps(ctx context.Context, sweeps []*sweep) (bool, error) { // lower that minFeeRate of other sweeps (so it is // applied). if b.rbfCache.FeeRate < s.minFeeRate { + b.Infof("Increasing feerate of the batch "+ + "from %v to %v", b.rbfCache.FeeRate, + s.minFeeRate) b.rbfCache.FeeRate = s.minFeeRate } } @@ -769,6 +773,9 @@ func (b *batch) addSweeps(ctx context.Context, sweeps []*sweep) (bool, error) { // Update FeeRate. Max(s.minFeeRate) for all the sweeps of // the batch is the basis for fee bumps. if b.rbfCache.FeeRate < s.minFeeRate { + b.Infof("Increasing feerate of the batch "+ + "from %v to %v", b.rbfCache.FeeRate, + s.minFeeRate) b.rbfCache.FeeRate = s.minFeeRate b.rbfCache.SkipNextBump = true } @@ -968,6 +975,12 @@ func (b *batch) Run(ctx context.Context) error { continue } + // Update feerate of sweeps. This is normally done by + // AddSweep, but it may not be called after the sweep + // is confirmed, but fresh feerate is still needed to + // keep publishing in case of reorg. + b.updateFeeRate(ctx) + err := b.publish(ctx) if err != nil { return fmt.Errorf("publish error: %w", err) @@ -979,6 +992,11 @@ func (b *batch) Run(ctx context.Context) error { return fmt.Errorf("handleSpend error: %w", err) } + case err := <-b.spendErrChan: + b.writeToSpendErrChan(ctx, err) + + return fmt.Errorf("spend notifier failed: %w", err) + case conf := <-b.confChan: if err := b.handleConf(runCtx, conf); err != nil { return fmt.Errorf("handleConf error: %w", err) @@ -986,16 +1004,14 @@ func (b *batch) Run(ctx context.Context) error { return nil + // A re-org has been detected. We set the batch state back to + // open since our batch transaction is no longer present in any + // block. We can accept more sweeps and try to publish. case <-b.reorgChan: b.state = Open b.Warnf("reorg detected, batch is able to " + "accept new sweeps") - err := b.monitorSpend(ctx, b.sweeps[b.primarySweepID]) - if err != nil { - return fmt.Errorf("monitorSpend error: %w", err) - } - case testReq := <-b.testReqs: testReq.handler() close(testReq.quit) @@ -1013,6 +1029,41 @@ func (b *batch) Run(ctx context.Context) error { } } +// updateFeeRate gets fresh values of minFeeRate for sweeps and updates the +// feerate of the batch if needed. This method must be called from event loop. +func (b *batch) updateFeeRate(ctx context.Context) { + for outpoint, s := range b.sweeps { + minFeeRate, err := minimumSweepFeeRate( + ctx, b.cfg.customFeeRate, b.wallet, + s.swapHash, s.outpoint, s.confTarget, + ) + if err != nil { + b.Warnf("failed to determine feerate for sweep %v of "+ + "swap %x, confTarget %d: %w", s.outpoint, + s.swapHash[:6], s.confTarget, err) + continue + } + + if minFeeRate <= s.minFeeRate { + continue + } + + b.Infof("Increasing feerate of sweep %v of swap %x from %v "+ + "to %v", s.outpoint, s.swapHash[:6], s.minFeeRate, + minFeeRate) + s.minFeeRate = minFeeRate + b.sweeps[outpoint] = s + + if s.minFeeRate <= b.rbfCache.FeeRate { + continue + } + + b.Infof("Increasing feerate of the batch from %v to %v", + b.rbfCache.FeeRate, s.minFeeRate) + b.rbfCache.FeeRate = s.minFeeRate + } +} + // testRunInEventLoop runs a function in the event loop blocking until // the function returns. For unit tests only! func (b *batch) testRunInEventLoop(ctx context.Context, handler func()) { @@ -1790,7 +1841,7 @@ func (b *batch) updateRbfRate(ctx context.Context) error { // Set the initial value for our fee rate. b.rbfCache.FeeRate = rate - } else if !b.cfg.noBumping { + } else if noBumping := b.cfg.customFeeRate != nil; !noBumping { if b.rbfCache.SkipNextBump { // Skip fee bumping, unset the flag, to bump next time. b.rbfCache.SkipNextBump = false @@ -1812,44 +1863,33 @@ func (b *batch) updateRbfRate(ctx context.Context) error { // of the batch transaction gets confirmed, due to the uncertainty of RBF // replacements and network propagation, we can always detect the transaction. func (b *batch) monitorSpend(ctx context.Context, primarySweep sweep) error { - spendCtx, cancel := context.WithCancel(ctx) + if b.spendChan != nil || b.spendErrChan != nil || b.reorgChan != nil { + return fmt.Errorf("an attempt to run monitorSpend multiple " + + "times per batch") + } + + reorgChan := make(chan struct{}, 1) - spendChan, spendErr, err := b.chainNotifier.RegisterSpendNtfn( - spendCtx, &primarySweep.outpoint, primarySweep.htlc.PkScript, + spendChan, spendErrChan, err := b.chainNotifier.RegisterSpendNtfn( + ctx, &primarySweep.outpoint, primarySweep.htlc.PkScript, primarySweep.initiationHeight, + lndclient.WithReOrgChan(reorgChan), ) if err != nil { - cancel() - - return err + return fmt.Errorf("failed to register spend notifier for "+ + "primary sweep %v, pkscript %x, height %d: %w", + primarySweep.outpoint, primarySweep.htlc.PkScript, + primarySweep.initiationHeight, err) } - b.wg.Add(1) - go func() { - defer cancel() - defer b.wg.Done() - - b.Infof("monitoring spend for outpoint %s", - primarySweep.outpoint.String()) + b.Infof("monitoring spend for outpoint %s", + primarySweep.outpoint.String()) - select { - case spend := <-spendChan: - select { - case b.spendChan <- spend: - - case <-ctx.Done(): - } - - case err := <-spendErr: - b.writeToSpendErrChan(ctx, err) - - b.writeToErrChan( - fmt.Errorf("spend error: %w", err), - ) - - case <-ctx.Done(): - } - }() + // This is safe to do as we always call monitorSpend from the event + // loop's goroutine. + b.spendChan = spendChan + b.spendErrChan = spendErrChan + b.reorgChan = reorgChan return nil } @@ -1862,14 +1902,11 @@ func (b *batch) monitorConfirmations(ctx context.Context) error { return fmt.Errorf("can't find primarySweep") } - reorgChan := make(chan struct{}) - confCtx, cancel := context.WithCancel(ctx) confChan, errChan, err := b.chainNotifier.RegisterConfirmationsNtfn( confCtx, b.batchTxid, b.batchPkScript, batchConfHeight, primarySweep.initiationHeight, - lndclient.WithReOrgChan(reorgChan), ) if err != nil { cancel() @@ -1895,18 +1932,6 @@ func (b *batch) monitorConfirmations(ctx context.Context) error { b.writeToErrChan(fmt.Errorf("confirmations "+ "monitoring error: %w", err)) - case <-reorgChan: - // A re-org has been detected. We set the batch - // state back to open since our batch - // transaction is no longer present in any - // block. We can accept more sweeps and try to - // publish new transactions, at this point we - // need to monitor again for a new spend. - select { - case b.reorgChan <- struct{}{}: - case <-ctx.Done(): - } - case <-ctx.Done(): } }() @@ -2395,12 +2420,6 @@ func (b *batch) writeToErrChan(err error) { // writeToSpendErrChan sends an error to spend error channels of all the sweeps. func (b *batch) writeToSpendErrChan(ctx context.Context, spendErr error) { - done, err := b.scheduleNextCall() - if err != nil { - done() - - return - } notifiers := make([]*SpendNotifier, 0, len(b.sweeps)) for _, s := range b.sweeps { // If the sweep's notifier is empty then this means that a swap @@ -2412,7 +2431,6 @@ func (b *batch) writeToSpendErrChan(ctx context.Context, spendErr error) { notifiers = append(notifiers, s.notifier) } - done() for _, notifier := range notifiers { select { diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index 6bab035e7..dcd987efe 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -1522,28 +1522,12 @@ func (b *Batcher) loadSweep(ctx context.Context, swapHash lntypes.Hash, // Find minimum fee rate for the sweep. Use customFeeRate if it is // provided, otherwise use wallet's EstimateFeeRate. - var minFeeRate chainfee.SatPerKWeight - if b.customFeeRate != nil { - minFeeRate, err = b.customFeeRate(ctx, swapHash, outpoint) - if err != nil { - return nil, fmt.Errorf("failed to fetch min fee rate "+ - "for %x: %w", swapHash[:6], err) - } - if minFeeRate < chainfee.AbsoluteFeePerKwFloor { - return nil, fmt.Errorf("min fee rate too low (%v) for "+ - "%x", minFeeRate, swapHash[:6]) - } - } else { - if s.ConfTarget == 0 { - warnf("Fee estimation was requested for zero "+ - "confTarget for sweep %x.", swapHash[:6]) - } - minFeeRate, err = b.wallet.EstimateFeeRate(ctx, s.ConfTarget) - if err != nil { - return nil, fmt.Errorf("failed to estimate fee rate "+ - "for %x, confTarget=%d: %w", swapHash[:6], - s.ConfTarget, err) - } + minFeeRate, err := minimumSweepFeeRate( + ctx, b.customFeeRate, b.wallet, + swapHash, outpoint, s.ConfTarget, + ) + if err != nil { + return nil, err } return &sweep{ @@ -1567,11 +1551,61 @@ func (b *Batcher) loadSweep(ctx context.Context, swapHash lntypes.Hash, }, nil } +// feeRateEstimator determines feerate by confTarget. +type feeRateEstimator interface { + // EstimateFeeRate returns feerate corresponding to the confTarget. + EstimateFeeRate(ctx context.Context, + confTarget int32) (chainfee.SatPerKWeight, error) +} + +// minimumSweepFeeRate determines minimum feerate for a sweep. +func minimumSweepFeeRate(ctx context.Context, customFeeRate FeeRateProvider, + wallet feeRateEstimator, swapHash lntypes.Hash, outpoint wire.OutPoint, + sweepConfTarget int32) (chainfee.SatPerKWeight, error) { + + // Find minimum fee rate for the sweep. Use customFeeRate if it is + // provided, otherwise use wallet's EstimateFeeRate. + if customFeeRate != nil { + minFeeRate, err := customFeeRate(ctx, swapHash, outpoint) + if err != nil { + return 0, fmt.Errorf("failed to fetch min fee rate "+ + "for %x: %w", swapHash[:6], err) + } + if minFeeRate < chainfee.AbsoluteFeePerKwFloor { + return 0, fmt.Errorf("min fee rate too low (%v) for "+ + "%x", minFeeRate, swapHash[:6]) + } + + return minFeeRate, nil + } + + // Make sure sweepConfTarget is at least 2. LND's walletkit fails with + // conftarget of 0 or 1. + // TODO: when https://github.com/lightningnetwork/lnd/pull/10087 is + // merged and that LND version becomes a requirement, we can decrease + // this from 2 to 1. + if sweepConfTarget < 2 { + warnf("Fee estimation was requested for confTarget=%d for "+ + "sweep %x; changing confTarget to 2", sweepConfTarget, + swapHash[:6]) + sweepConfTarget = 2 + } + + minFeeRate, err := wallet.EstimateFeeRate(ctx, sweepConfTarget) + if err != nil { + return 0, fmt.Errorf("failed to estimate fee rate "+ + "for %x, confTarget=%d: %w", swapHash[:6], + sweepConfTarget, err) + } + + return minFeeRate, nil +} + // newBatchConfig creates new batch config. func (b *Batcher) newBatchConfig(maxTimeoutDistance int32) batchConfig { return batchConfig{ maxTimeoutDistance: maxTimeoutDistance, - noBumping: b.customFeeRate != nil, + customFeeRate: b.customFeeRate, txLabeler: b.txLabeler, customMuSig2Signer: b.customMuSig2Signer, presignedHelper: b.presignedHelper, diff --git a/sweepbatcher/sweep_batcher_test.go b/sweepbatcher/sweep_batcher_test.go index 590d83ae4..126724435 100644 --- a/sweepbatcher/sweep_batcher_test.go +++ b/sweepbatcher/sweep_batcher_test.go @@ -2,6 +2,7 @@ package sweepbatcher import ( "context" + "database/sql" "errors" "fmt" "os" @@ -3997,6 +3998,9 @@ func testSweepBatcherCloseDuringAdding(t *testing.T, store testStore, if errors.Is(err, context.Canceled) { break } + if errors.Is(err, sql.ErrTxDone) { + break + } require.NoError(t, err) } }() @@ -4834,8 +4838,6 @@ func testFeeRateGrows(t *testing.T, store testStore, // Now update fee rate of second sweep (which is not primary) to // feeRateHigh. Fee rate of sweep 1 is still feeRateLow. setFeeRate(swapHash2, feeRateHigh) - require.NoError(t, batcher.AddSweep(ctx, &sweepReq1)) - require.NoError(t, batcher.AddSweep(ctx, &sweepReq2)) // Tick tock next block. err = lnd.NotifyHeight(603) diff --git a/test/chainnotifier_mock.go b/test/chainnotifier_mock.go index 7dbf0df4c..d9a82202b 100644 --- a/test/chainnotifier_mock.go +++ b/test/chainnotifier_mock.go @@ -2,6 +2,7 @@ package test import ( "bytes" + "context" "sync" "time" @@ -10,7 +11,6 @@ import ( "github.com/lightninglabs/lndclient" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/lnrpc/chainrpc" - "golang.org/x/net/context" ) type mockChainNotifier struct { diff --git a/test/lightning_client_mock.go b/test/lightning_client_mock.go index 947c6ec6a..bcd38e7ba 100644 --- a/test/lightning_client_mock.go +++ b/test/lightning_client_mock.go @@ -1,6 +1,7 @@ package test import ( + "context" "crypto/rand" "encoding/hex" "fmt" @@ -17,7 +18,6 @@ import ( "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/zpay32" - "golang.org/x/net/context" ) type mockLightningClient struct { diff --git a/test/router_mock.go b/test/router_mock.go index e3e3e6603..9ef84947e 100644 --- a/test/router_mock.go +++ b/test/router_mock.go @@ -1,9 +1,10 @@ package test import ( + "context" + "github.com/lightninglabs/lndclient" "github.com/lightningnetwork/lnd/lntypes" - "golang.org/x/net/context" ) type mockRouter struct {