Skip to content

Commit 8e996cf

Browse files
authored
feat(mempool): Block EVM mempool Select on legacypool reorg (#867)
* block mempool.Select on reorg completion at height * launch next run on subscribe and move updating latest height inside lock * wait for latest reorg height tests * typo * comemnt * remove launchNextRun = true during arbitrary subscriptions and fix test * fix test
1 parent 67034a0 commit 8e996cf

File tree

3 files changed

+213
-22
lines changed

3 files changed

+213
-22
lines changed

mempool/mempool.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,10 @@ func (m *ExperimentalEVMMempool) Select(goCtx context.Context, i [][]byte) sdkme
290290
defer m.mtx.Unlock()
291291
ctx := sdk.UnwrapSDKContext(goCtx)
292292

293+
// Wait for the legacypool to Reset at >= blockHeight (this may have
294+
// already happened), to ensure all txs in pending pool are valid.
295+
m.legacyTxPool.WaitForReorgHeight(ctx, ctx.BlockHeight())
296+
293297
evmIterator, cosmosIterator := m.getIterators(goCtx, i)
294298

295299
combinedIterator := NewEVMMempoolIterator(evmIterator, cosmosIterator, m.logger, m.txConfig, m.vmKeeper.GetEvmCoinInfo(ctx).Denom, m.blockchain.Config().ChainID, m.blockchain)

mempool/txpool/legacypool/legacypool.go

Lines changed: 68 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package legacypool
1919

2020
import (
21+
"context"
2122
"errors"
2223
"maps"
2324
"math/big"
@@ -255,13 +256,15 @@ type LegacyPool struct {
255256
all *lookup // All transactions to allow lookups
256257
priced *pricedList // All transactions sorted by price
257258

258-
reqResetCh chan *txpoolResetRequest
259-
reqPromoteCh chan *accountSet
260-
queueTxEventCh chan *types.Transaction
261-
reorgDoneCh chan chan struct{}
262-
reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop
263-
wg sync.WaitGroup // tracks loop, scheduleReorgLoop
264-
initDoneCh chan struct{} // is closed once the pool is initialized (for tests)
259+
reqResetCh chan *txpoolResetRequest
260+
reqPromoteCh chan *accountSet
261+
queueTxEventCh chan *types.Transaction
262+
reorgDoneCh chan chan struct{}
263+
reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop
264+
reorgSubscriptionCh chan struct{} // notifies the reorg loop that a subscriber wants to wait on nextDone
265+
wg sync.WaitGroup // tracks loop, scheduleReorgLoop
266+
initDoneCh chan struct{} // is closed once the pool is initialized (for tests)
267+
latestReorgHeight atomic.Int64 // Latest height that the reorg loop has completed
265268

266269
changesSinceReorg int // A counter for how many drops we've performed in-between reorg.
267270

@@ -282,22 +285,24 @@ func New(config Config, chain BlockChain) *LegacyPool {
282285

283286
// Create the transaction pool with its initial settings
284287
pool := &LegacyPool{
285-
config: config,
286-
chain: chain,
287-
chainconfig: chain.Config(),
288-
signer: types.LatestSigner(chain.Config()),
289-
pending: make(map[common.Address]*list),
290-
queue: make(map[common.Address]*list),
291-
beats: make(map[common.Address]time.Time),
292-
all: newLookup(),
293-
reqResetCh: make(chan *txpoolResetRequest),
294-
reqPromoteCh: make(chan *accountSet),
295-
queueTxEventCh: make(chan *types.Transaction),
296-
reorgDoneCh: make(chan chan struct{}),
297-
reorgShutdownCh: make(chan struct{}),
298-
initDoneCh: make(chan struct{}),
288+
config: config,
289+
chain: chain,
290+
chainconfig: chain.Config(),
291+
signer: types.LatestSigner(chain.Config()),
292+
pending: make(map[common.Address]*list),
293+
queue: make(map[common.Address]*list),
294+
beats: make(map[common.Address]time.Time),
295+
all: newLookup(),
296+
reqResetCh: make(chan *txpoolResetRequest),
297+
reqPromoteCh: make(chan *accountSet),
298+
queueTxEventCh: make(chan *types.Transaction),
299+
reorgDoneCh: make(chan chan struct{}),
300+
reorgShutdownCh: make(chan struct{}),
301+
reorgSubscriptionCh: make(chan struct{}),
302+
initDoneCh: make(chan struct{}),
299303
}
300304
pool.priced = newPricedList(pool.all)
305+
pool.latestReorgHeight.Store(0)
301306

302307
return pool
303308
}
@@ -1262,7 +1267,8 @@ func (pool *LegacyPool) scheduleReorgLoop() {
12621267
queuedEvents[addr] = NewSortedMap()
12631268
}
12641269
queuedEvents[addr].Put(tx)
1265-
1270+
case <-pool.reorgSubscriptionCh:
1271+
pool.reorgDoneCh <- nextDone
12661272
case <-curDone:
12671273
curDone = nil
12681274

@@ -1342,6 +1348,9 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
13421348

13431349
dropBetweenReorgHistogram.Update(int64(pool.changesSinceReorg))
13441350
pool.changesSinceReorg = 0 // Reset change counter
1351+
if reset != nil && reset.newHead != nil {
1352+
pool.latestReorgHeight.Store(reset.newHead.Number.Int64())
1353+
}
13451354
pool.mu.Unlock()
13461355

13471356
// Notify subsystems for newly added transactions
@@ -1963,6 +1972,43 @@ func (pool *LegacyPool) Clear() {
19631972
pool.pendingNonces = newNoncer(pool.currentState)
19641973
}
19651974

1975+
// WaitForReorgHeight blocks until the reorg loop has reset at a head with
1976+
// height >= height. If the context is cancelled or the pool is shutting down,
1977+
// this will also return.
1978+
func (pool *LegacyPool) WaitForReorgHeight(ctx context.Context, height int64) {
1979+
for pool.latestReorgHeight.Load() < height {
1980+
// reorg loop has not run at the target height, subscribe to the
1981+
// outcome of the next reorg loop iteration to know when to check again
1982+
sub, err := pool.SubscribeToNextReorg()
1983+
if err != nil {
1984+
return
1985+
}
1986+
1987+
// need to check again in case reorg has finished in between initial
1988+
// check and subscribing to next reorg
1989+
if pool.latestReorgHeight.Load() >= height {
1990+
return
1991+
}
1992+
1993+
select {
1994+
case <-sub:
1995+
case <-ctx.Done():
1996+
return
1997+
}
1998+
}
1999+
}
2000+
2001+
// SubscribeToNextReorg returns a channel that will close when the next reorg
2002+
// loop completes. An error is returned if the loop is shutting down.
2003+
func (pool *LegacyPool) SubscribeToNextReorg() (chan struct{}, error) {
2004+
select {
2005+
case pool.reorgSubscriptionCh <- struct{}{}:
2006+
return <-pool.reorgDoneCh, nil
2007+
case <-pool.reorgShutdownCh:
2008+
return nil, errors.New("shutdown")
2009+
}
2010+
}
2011+
19662012
// HasPendingAuth returns a flag indicating whether there are pending
19672013
// authorizations from the specific address cached in the pool.
19682014
func (pool *LegacyPool) HasPendingAuth(addr common.Address) bool {

mempool/txpool/legacypool/legacypool_test.go

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package legacypool
1818

1919
import (
20+
"context"
2021
"crypto/ecdsa"
2122
crand "crypto/rand"
2223
"errors"
@@ -2668,6 +2669,146 @@ func TestRemoveTxTruncatePoolRace(t *testing.T) {
26682669
wg.Wait()
26692670
}
26702671

2672+
// TestWaitForReorgHeight tests that WaitForReorgHeight properly blocks until
2673+
// the reorg loop has completed for the specified height.
2674+
func TestWaitForReorgHeight(t *testing.T) {
2675+
t.Run("waits for reorg to complete", func(t *testing.T) {
2676+
pool, _ := setupPool()
2677+
defer pool.Close()
2678+
2679+
if pool.latestReorgHeight.Load() != 0 {
2680+
t.Fatalf("expected initial height 0, got %d", pool.latestReorgHeight.Load())
2681+
}
2682+
2683+
// Create headers for the reset
2684+
oldHead := &types.Header{Number: big.NewInt(0), BaseFee: big.NewInt(10)}
2685+
newHead := &types.Header{Number: big.NewInt(5), BaseFee: big.NewInt(10)}
2686+
2687+
var reorgCompleted atomic.Bool
2688+
var waitCompleted atomic.Bool
2689+
var wg sync.WaitGroup
2690+
2691+
wg.Add(1)
2692+
go func() {
2693+
defer wg.Done()
2694+
ctx := context.Background()
2695+
pool.WaitForReorgHeight(ctx, 5)
2696+
waitCompleted.Store(true)
2697+
}()
2698+
2699+
// Give the waiter a chance to subscribe
2700+
time.Sleep(50 * time.Millisecond)
2701+
2702+
wg.Add(1)
2703+
go func() {
2704+
pool.Reset(oldHead, newHead)
2705+
reorgCompleted.Store(true)
2706+
wg.Done()
2707+
}()
2708+
2709+
// Wait for waiters
2710+
waitChan := make(chan struct{})
2711+
go func() {
2712+
wg.Wait()
2713+
close(waitChan)
2714+
}()
2715+
select {
2716+
case <-waitChan:
2717+
case <-time.After(time.Second):
2718+
t.Fatal("timeout waiting for waiters")
2719+
}
2720+
2721+
if pool.latestReorgHeight.Load() != newHead.Number.Int64() {
2722+
t.Errorf("expected height 5 after reorg, got %d", pool.latestReorgHeight.Load())
2723+
}
2724+
if !reorgCompleted.Load() {
2725+
t.Errorf("WaitForReorgHeight returned before reorg completed")
2726+
}
2727+
})
2728+
2729+
t.Run("multiple height wait", func(t *testing.T) {
2730+
pool, _ := setupPool()
2731+
defer pool.Close()
2732+
2733+
if pool.latestReorgHeight.Load() != 0 {
2734+
t.Fatalf("expected initial height 0, got %d", pool.latestReorgHeight.Load())
2735+
}
2736+
2737+
var waitCompleted atomic.Bool
2738+
var wg sync.WaitGroup
2739+
2740+
wg.Add(1)
2741+
go func() {
2742+
defer wg.Done()
2743+
ctx := context.Background()
2744+
pool.WaitForReorgHeight(ctx, 10)
2745+
waitCompleted.Store(true)
2746+
}()
2747+
2748+
// Give the waiter a chance to subscribe
2749+
time.Sleep(50 * time.Millisecond)
2750+
2751+
go func() {
2752+
for i := 0; i < 20; i++ {
2753+
oldHead := &types.Header{Number: big.NewInt(int64(i)), BaseFee: big.NewInt(10)}
2754+
newHead := &types.Header{Number: big.NewInt(int64(i + 1)), BaseFee: big.NewInt(10)}
2755+
pool.Reset(oldHead, newHead)
2756+
}
2757+
}()
2758+
2759+
// Wait for waiters
2760+
waitChan := make(chan struct{})
2761+
go func() {
2762+
wg.Wait()
2763+
close(waitChan)
2764+
}()
2765+
2766+
select {
2767+
case <-waitChan:
2768+
case <-time.After(2 * time.Second):
2769+
t.Fatal("timeout waiting for waiters")
2770+
}
2771+
2772+
if pool.latestReorgHeight.Load() < 10 {
2773+
t.Errorf("expected height >= 10 after reorg, got %d", pool.latestReorgHeight.Load())
2774+
}
2775+
})
2776+
2777+
t.Run("concurrent waiters", func(t *testing.T) {
2778+
pool, _ := setupPool()
2779+
defer pool.Close()
2780+
2781+
var wg sync.WaitGroup
2782+
for i := 0; i < 5; i++ {
2783+
wg.Add(1)
2784+
go func(id int) {
2785+
defer wg.Done()
2786+
pool.WaitForReorgHeight(context.Background(), 7)
2787+
}(i)
2788+
}
2789+
2790+
// Give all waiters time to subscribe
2791+
time.Sleep(100 * time.Millisecond)
2792+
2793+
// Trigger a single reorg
2794+
oldHead := &types.Header{Number: big.NewInt(0), BaseFee: big.NewInt(10)}
2795+
newHead := &types.Header{Number: big.NewInt(7), BaseFee: big.NewInt(10)}
2796+
pool.Reset(oldHead, newHead)
2797+
2798+
// Wait for all waiters to complete
2799+
waitChan := make(chan struct{})
2800+
go func() {
2801+
wg.Wait()
2802+
close(waitChan)
2803+
}()
2804+
select {
2805+
case <-waitChan:
2806+
case <-time.After(2 * time.Second):
2807+
t.Errorf("not all waiters completed in 2 seconds")
2808+
}
2809+
})
2810+
}
2811+
26712812
// TestPromoteExecutablesRecheckTx tests that promoteExecutables properly removes
26722813
// a transaction from all pools if it fails the RecheckTxFn.
26732814
func TestPromoteExecutablesRecheckTx(t *testing.T) {

0 commit comments

Comments
 (0)