Skip to content

Commit d73bfeb

Browse files
core/txpool: Initialize journal writer for tx tracker (#32921)
Previously, the journal writer is nil until the first time rejournal (default 1h), which means during this period, txs submitted to this node are not written into journal file (transactions.rlp). If this node is shutdown before the first time rejournal, then txs in pending or queue will get lost. Here, this PR initializes the journal writer soon after launch to solve this issue. --------- Co-authored-by: Gary Rong <garyrong0905@gmail.com>
1 parent b81f03e commit d73bfeb

File tree

3 files changed

+91
-20
lines changed

3 files changed

+91
-20
lines changed

core/txpool/locals/journal.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,25 @@ func (journal *journal) load(add func([]*types.Transaction) []error) error {
117117
return failure
118118
}
119119

120+
func (journal *journal) setupWriter() error {
121+
if journal.writer != nil {
122+
if err := journal.writer.Close(); err != nil {
123+
return err
124+
}
125+
journal.writer = nil
126+
}
127+
128+
// Re-open the journal file for appending
129+
// Use O_APPEND to ensure we always write to the end of the file
130+
sink, err := os.OpenFile(journal.path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
131+
if err != nil {
132+
return err
133+
}
134+
journal.writer = sink
135+
136+
return nil
137+
}
138+
120139
// insert adds the specified transaction to the local disk journal.
121140
func (journal *journal) insert(tx *types.Transaction) error {
122141
if journal.writer == nil {
@@ -177,7 +196,6 @@ func (journal *journal) rotate(all map[common.Address]types.Transactions) error
177196
// close flushes the transaction journal contents to disk and closes the file.
178197
func (journal *journal) close() error {
179198
var err error
180-
181199
if journal.writer != nil {
182200
err = journal.writer.Close()
183201
journal.writer = nil

core/txpool/locals/tx_tracker.go

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -114,13 +114,14 @@ func (tracker *TxTracker) TrackAll(txs []*types.Transaction) {
114114
}
115115

116116
// recheck checks and returns any transactions that needs to be resubmitted.
117-
func (tracker *TxTracker) recheck(journalCheck bool) (resubmits []*types.Transaction, rejournal map[common.Address]types.Transactions) {
117+
func (tracker *TxTracker) recheck(journalCheck bool) []*types.Transaction {
118118
tracker.mu.Lock()
119119
defer tracker.mu.Unlock()
120120

121121
var (
122122
numStales = 0
123123
numOk = 0
124+
resubmits []*types.Transaction
124125
)
125126
for sender, txs := range tracker.byAddr {
126127
// Wipe the stales
@@ -141,7 +142,7 @@ func (tracker *TxTracker) recheck(journalCheck bool) (resubmits []*types.Transac
141142
}
142143

143144
if journalCheck { // rejournal
144-
rejournal = make(map[common.Address]types.Transactions)
145+
rejournal := make(map[common.Address]types.Transactions)
145146
for _, tx := range tracker.all {
146147
addr, _ := types.Sender(tracker.signer, tx)
147148
rejournal[addr] = append(rejournal[addr], tx)
@@ -153,10 +154,18 @@ func (tracker *TxTracker) recheck(journalCheck bool) (resubmits []*types.Transac
153154
return int(a.Nonce() - b.Nonce())
154155
})
155156
}
157+
// Rejournal the tracker while holding the lock. No new transactions will
158+
// be added to the old journal during this period, preventing any potential
159+
// transaction loss.
160+
if tracker.journal != nil {
161+
if err := tracker.journal.rotate(rejournal); err != nil {
162+
log.Warn("Transaction journal rotation failed", "err", err)
163+
}
164+
}
156165
}
157166
localGauge.Update(int64(len(tracker.all)))
158167
log.Debug("Tx tracker status", "need-resubmit", len(resubmits), "stale", numStales, "ok", numOk)
159-
return resubmits, rejournal
168+
return resubmits
160169
}
161170

162171
// Start implements node.Lifecycle interface
@@ -185,6 +194,12 @@ func (tracker *TxTracker) loop() {
185194
tracker.TrackAll(transactions)
186195
return nil
187196
})
197+
198+
// Setup the writer for the upcoming transactions
199+
if err := tracker.journal.setupWriter(); err != nil {
200+
log.Error("Failed to setup the journal writer", "err", err)
201+
return
202+
}
188203
defer tracker.journal.close()
189204
}
190205
var (
@@ -196,20 +211,15 @@ func (tracker *TxTracker) loop() {
196211
case <-tracker.shutdownCh:
197212
return
198213
case <-timer.C:
199-
checkJournal := tracker.journal != nil && time.Since(lastJournal) > tracker.rejournal
200-
resubmits, rejournal := tracker.recheck(checkJournal)
214+
var rejournal bool
215+
if tracker.journal != nil && time.Since(lastJournal) > tracker.rejournal {
216+
rejournal, lastJournal = true, time.Now()
217+
log.Debug("Rejournal the transaction tracker")
218+
}
219+
resubmits := tracker.recheck(rejournal)
201220
if len(resubmits) > 0 {
202221
tracker.pool.Add(resubmits, false)
203222
}
204-
if checkJournal {
205-
// Lock to prevent journal.rotate <-> journal.insert (via TrackAll) conflicts
206-
tracker.mu.Lock()
207-
lastJournal = time.Now()
208-
if err := tracker.journal.rotate(rejournal); err != nil {
209-
log.Warn("Transaction journal rotation failed", "err", err)
210-
}
211-
tracker.mu.Unlock()
212-
}
213223
timer.Reset(recheckInterval)
214224
}
215225
}

core/txpool/locals/tx_tracker_test.go

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@
1717
package locals
1818

1919
import (
20+
"fmt"
21+
"maps"
2022
"math/big"
23+
"math/rand"
24+
"path/filepath"
2125
"testing"
2226
"time"
2327

@@ -146,20 +150,59 @@ func TestResubmit(t *testing.T) {
146150
txsA := txs[:len(txs)/2]
147151
txsB := txs[len(txs)/2:]
148152
env.pool.Add(txsA, true)
153+
149154
pending, queued := env.pool.ContentFrom(address)
150155
if len(pending) != len(txsA) || len(queued) != 0 {
151156
t.Fatalf("Unexpected txpool content: %d, %d", len(pending), len(queued))
152157
}
153158
env.tracker.TrackAll(txs)
154159

155-
resubmit, all := env.tracker.recheck(true)
160+
resubmit := env.tracker.recheck(true)
156161
if len(resubmit) != len(txsB) {
157162
t.Fatalf("Unexpected transactions to resubmit, got: %d, want: %d", len(resubmit), len(txsB))
158163
}
159-
if len(all) == 0 || len(all[address]) == 0 {
160-
t.Fatalf("Unexpected transactions being tracked, got: %d, want: %d", 0, len(txs))
164+
env.tracker.mu.Lock()
165+
allCopy := maps.Clone(env.tracker.all)
166+
env.tracker.mu.Unlock()
167+
168+
if len(allCopy) != len(txs) {
169+
t.Fatalf("Unexpected transactions being tracked, got: %d, want: %d", len(allCopy), len(txs))
161170
}
162-
if len(all[address]) != len(txs) {
163-
t.Fatalf("Unexpected transactions being tracked, got: %d, want: %d", len(all[address]), len(txs))
171+
}
172+
173+
func TestJournal(t *testing.T) {
174+
journalPath := filepath.Join(t.TempDir(), fmt.Sprintf("%d", rand.Int63()))
175+
env := newTestEnv(t, 10, 0, journalPath)
176+
defer env.close()
177+
178+
env.tracker.Start()
179+
defer env.tracker.Stop()
180+
181+
txs := env.makeTxs(10)
182+
txsA := txs[:len(txs)/2]
183+
txsB := txs[len(txs)/2:]
184+
env.pool.Add(txsA, true)
185+
186+
pending, queued := env.pool.ContentFrom(address)
187+
if len(pending) != len(txsA) || len(queued) != 0 {
188+
t.Fatalf("Unexpected txpool content: %d, %d", len(pending), len(queued))
189+
}
190+
env.tracker.TrackAll(txsA)
191+
env.tracker.TrackAll(txsB)
192+
env.tracker.recheck(true) // manually rejournal the tracker
193+
194+
// Make sure all the transactions are properly journalled
195+
trackerB := New(journalPath, time.Minute, gspec.Config, env.pool)
196+
trackerB.journal.load(func(transactions []*types.Transaction) []error {
197+
trackerB.TrackAll(transactions)
198+
return nil
199+
})
200+
201+
trackerB.mu.Lock()
202+
allCopy := maps.Clone(trackerB.all)
203+
trackerB.mu.Unlock()
204+
205+
if len(allCopy) != len(txs) {
206+
t.Fatalf("Unexpected transactions being tracked, got: %d, want: %d", len(allCopy), len(txs))
164207
}
165208
}

0 commit comments

Comments
 (0)