Skip to content

Commit 88576c5

Browse files
authored
eth/fetcher: remove dangling peers from alternates (#32947)
This PR removes dangling peers in `alternates` map In the current code, a dropped peer is removed from alternates for only the specific transaction hash it was requesting. If that peer is listed as an alternate for other transaction hashes, those entries still stick around in alternates/announced even though that peer already got dropped.
1 parent a9e6626 commit 88576c5

File tree

2 files changed

+55
-1
lines changed

2 files changed

+55
-1
lines changed

eth/fetcher/tx_fetcher.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -795,6 +795,10 @@ func (f *TxFetcher) loop() {
795795
if len(f.announced[hash]) == 0 {
796796
delete(f.announced, hash)
797797
}
798+
delete(f.alternates[hash], drop.peer)
799+
if len(f.alternates[hash]) == 0 {
800+
delete(f.alternates, hash)
801+
}
798802
}
799803
delete(f.announces, drop.peer)
800804
}
@@ -858,7 +862,7 @@ func (f *TxFetcher) rescheduleWait(timer *mclock.Timer, trigger chan struct{}) {
858862
// This method is a bit "flaky" "by design". In theory the timeout timer only ever
859863
// should be rescheduled if some request is pending. In practice, a timeout will
860864
// cause the timer to be rescheduled every 5 secs (until the peer comes through or
861-
// disconnects). This is a limitation of the fetcher code because we don't trac
865+
// disconnects). This is a limitation of the fetcher code because we don't track
862866
// pending requests and timed out requests separately. Without double tracking, if
863867
// we simply didn't reschedule the timer on all-timeout then the timer would never
864868
// be set again since len(request) > 0 => something's running.

eth/fetcher/tx_fetcher_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1858,6 +1858,56 @@ func TestBlobTransactionAnnounce(t *testing.T) {
18581858
})
18591859
}
18601860

1861+
func TestTransactionFetcherDropAlternates(t *testing.T) {
1862+
testTransactionFetcherParallel(t, txFetcherTest{
1863+
init: func() *TxFetcher {
1864+
return NewTxFetcher(
1865+
func(common.Hash) bool { return false },
1866+
func(txs []*types.Transaction) []error {
1867+
return make([]error, len(txs))
1868+
},
1869+
func(string, []common.Hash) error { return nil },
1870+
nil,
1871+
)
1872+
},
1873+
steps: []interface{}{
1874+
doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}},
1875+
doWait{time: txArriveTimeout, step: true},
1876+
doTxNotify{peer: "B", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}},
1877+
1878+
isScheduled{
1879+
tracking: map[string][]announce{
1880+
"A": {
1881+
{testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())},
1882+
},
1883+
"B": {
1884+
{testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())},
1885+
},
1886+
},
1887+
fetching: map[string][]common.Hash{
1888+
"A": {testTxsHashes[0]},
1889+
},
1890+
},
1891+
doDrop("B"),
1892+
1893+
isScheduled{
1894+
tracking: map[string][]announce{
1895+
"A": {
1896+
{testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())},
1897+
},
1898+
},
1899+
fetching: map[string][]common.Hash{
1900+
"A": {testTxsHashes[0]},
1901+
},
1902+
},
1903+
doDrop("A"),
1904+
isScheduled{
1905+
tracking: nil, fetching: nil,
1906+
},
1907+
},
1908+
})
1909+
}
1910+
18611911
func testTransactionFetcherParallel(t *testing.T, tt txFetcherTest) {
18621912
t.Parallel()
18631913
testTransactionFetcher(t, tt)

0 commit comments

Comments
 (0)