Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion core/txpool/locals/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,25 @@ func (journal *journal) load(add func([]*types.Transaction) []error) error {
return failure
}

func (journal *journal) setupWriter() error {
if journal.writer != nil {
if err := journal.writer.Close(); err != nil {
return err
}
journal.writer = nil
}

// Re-open the journal file for appending
// Use O_APPEND to ensure we always write to the end of the file
sink, err := os.OpenFile(journal.path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
return err
}
journal.writer = sink

return nil
}

// insert adds the specified transaction to the local disk journal.
func (journal *journal) insert(tx *types.Transaction) error {
if journal.writer == nil {
Expand Down Expand Up @@ -177,7 +196,6 @@ func (journal *journal) rotate(all map[common.Address]types.Transactions) error
// close flushes the transaction journal contents to disk and closes the file.
func (journal *journal) close() error {
var err error

if journal.writer != nil {
err = journal.writer.Close()
journal.writer = nil
Expand Down
38 changes: 24 additions & 14 deletions core/txpool/locals/tx_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,14 @@ func (tracker *TxTracker) TrackAll(txs []*types.Transaction) {
}

// recheck checks and returns any transactions that needs to be resubmitted.
func (tracker *TxTracker) recheck(journalCheck bool) (resubmits []*types.Transaction, rejournal map[common.Address]types.Transactions) {
func (tracker *TxTracker) recheck(journalCheck bool) []*types.Transaction {
tracker.mu.Lock()
defer tracker.mu.Unlock()

var (
numStales = 0
numOk = 0
resubmits []*types.Transaction
)
for sender, txs := range tracker.byAddr {
// Wipe the stales
Expand All @@ -141,7 +142,7 @@ func (tracker *TxTracker) recheck(journalCheck bool) (resubmits []*types.Transac
}

if journalCheck { // rejournal
rejournal = make(map[common.Address]types.Transactions)
rejournal := make(map[common.Address]types.Transactions)
for _, tx := range tracker.all {
addr, _ := types.Sender(tracker.signer, tx)
rejournal[addr] = append(rejournal[addr], tx)
Expand All @@ -153,10 +154,18 @@ func (tracker *TxTracker) recheck(journalCheck bool) (resubmits []*types.Transac
return int(a.Nonce() - b.Nonce())
})
}
// Rejournal the tracker while holding the lock. No new transactions will
// be added to the old journal during this period, preventing any potential
// transaction loss.
if tracker.journal != nil {
if err := tracker.journal.rotate(rejournal); err != nil {
log.Warn("Transaction journal rotation failed", "err", err)
}
}
}
localGauge.Update(int64(len(tracker.all)))
log.Debug("Tx tracker status", "need-resubmit", len(resubmits), "stale", numStales, "ok", numOk)
return resubmits, rejournal
return resubmits
}

// Start implements node.Lifecycle interface
Expand Down Expand Up @@ -185,6 +194,12 @@ func (tracker *TxTracker) loop() {
tracker.TrackAll(transactions)
return nil
})

// Setup the writer for the upcoming transactions
if err := tracker.journal.setupWriter(); err != nil {
log.Error("Failed to setup the journal writer", "err", err)
return
}
defer tracker.journal.close()
}
var (
Expand All @@ -196,20 +211,15 @@ func (tracker *TxTracker) loop() {
case <-tracker.shutdownCh:
return
case <-timer.C:
checkJournal := tracker.journal != nil && time.Since(lastJournal) > tracker.rejournal
resubmits, rejournal := tracker.recheck(checkJournal)
var rejournal bool
if tracker.journal != nil && time.Since(lastJournal) > tracker.rejournal {
rejournal, lastJournal = true, time.Now()
log.Debug("Rejournal the transaction tracker")
}
resubmits := tracker.recheck(rejournal)
if len(resubmits) > 0 {
tracker.pool.Add(resubmits, false)
}
if checkJournal {
// Lock to prevent journal.rotate <-> journal.insert (via TrackAll) conflicts
tracker.mu.Lock()
lastJournal = time.Now()
if err := tracker.journal.rotate(rejournal); err != nil {
log.Warn("Transaction journal rotation failed", "err", err)
}
tracker.mu.Unlock()
}
timer.Reset(recheckInterval)
}
}
Expand Down
53 changes: 48 additions & 5 deletions core/txpool/locals/tx_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
package locals

import (
"fmt"
"maps"
"math/big"
"math/rand"
"path/filepath"
"testing"
"time"

Expand Down Expand Up @@ -146,20 +150,59 @@ func TestResubmit(t *testing.T) {
txsA := txs[:len(txs)/2]
txsB := txs[len(txs)/2:]
env.pool.Add(txsA, true)

pending, queued := env.pool.ContentFrom(address)
if len(pending) != len(txsA) || len(queued) != 0 {
t.Fatalf("Unexpected txpool content: %d, %d", len(pending), len(queued))
}
env.tracker.TrackAll(txs)

resubmit, all := env.tracker.recheck(true)
resubmit := env.tracker.recheck(true)
if len(resubmit) != len(txsB) {
t.Fatalf("Unexpected transactions to resubmit, got: %d, want: %d", len(resubmit), len(txsB))
}
if len(all) == 0 || len(all[address]) == 0 {
t.Fatalf("Unexpected transactions being tracked, got: %d, want: %d", 0, len(txs))
env.tracker.mu.Lock()
allCopy := maps.Clone(env.tracker.all)
env.tracker.mu.Unlock()

if len(allCopy) != len(txs) {
t.Fatalf("Unexpected transactions being tracked, got: %d, want: %d", len(allCopy), len(txs))
}
if len(all[address]) != len(txs) {
t.Fatalf("Unexpected transactions being tracked, got: %d, want: %d", len(all[address]), len(txs))
}

func TestJournal(t *testing.T) {
journalPath := filepath.Join(t.TempDir(), fmt.Sprintf("%d", rand.Int63()))
env := newTestEnv(t, 10, 0, journalPath)
defer env.close()

env.tracker.Start()
defer env.tracker.Stop()

txs := env.makeTxs(10)
txsA := txs[:len(txs)/2]
txsB := txs[len(txs)/2:]
env.pool.Add(txsA, true)

pending, queued := env.pool.ContentFrom(address)
if len(pending) != len(txsA) || len(queued) != 0 {
t.Fatalf("Unexpected txpool content: %d, %d", len(pending), len(queued))
}
env.tracker.TrackAll(txsA)
env.tracker.TrackAll(txsB)
env.tracker.recheck(true) // manually rejournal the tracker

// Make sure all the transactions are properly journalled
trackerB := New(journalPath, time.Minute, gspec.Config, env.pool)
trackerB.journal.load(func(transactions []*types.Transaction) []error {
trackerB.TrackAll(transactions)
return nil
})

trackerB.mu.Lock()
allCopy := maps.Clone(trackerB.all)
trackerB.mu.Unlock()

if len(allCopy) != len(txs) {
t.Fatalf("Unexpected transactions being tracked, got: %d, want: %d", len(allCopy), len(txs))
}
}