-
Notifications
You must be signed in to change notification settings - Fork 836
Cleanup GossipEthTxPool #4619
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Cleanup GossipEthTxPool #4619
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,9 +9,9 @@ import ( | |
| "context" | ||
| "fmt" | ||
| "sync" | ||
| "sync/atomic" | ||
|
|
||
| "github.com/ava-labs/libevm/core/types" | ||
| "github.com/ava-labs/libevm/event" | ||
| "github.com/ava-labs/libevm/log" | ||
| "github.com/prometheus/client_golang/prometheus" | ||
|
|
||
|
|
@@ -33,8 +33,13 @@ var ( | |
| _ gossip.Set[*GossipEthTx] = (*GossipEthTxPool)(nil) | ||
|
|
||
| _ eth.PushGossiper = (*EthPushGossiper)(nil) | ||
|
|
||
| errSubscribing = fmt.Errorf("subscribing to the mempool failed") | ||
| ) | ||
|
|
||
| // NewGossipEthTxPool creates a new GossipEthTxPool. | ||
| // | ||
| // If a nil error is returned, UpdateBloomFilter must be called. | ||
| func NewGossipEthTxPool(mempool *txpool.TxPool, registerer prometheus.Registerer) (*GossipEthTxPool, error) { | ||
| bloom, err := gossip.NewBloomFilter( | ||
| registerer, | ||
|
|
@@ -47,48 +52,46 @@ func NewGossipEthTxPool(mempool *txpool.TxPool, registerer prometheus.Registerer | |
| return nil, fmt.Errorf("failed to initialize bloom filter: %w", err) | ||
| } | ||
|
|
||
| pendingTxs := make(chan core.NewTxsEvent, pendingTxsBuffer) | ||
| sub := mempool.SubscribeTransactions(pendingTxs, true) | ||
| if sub == nil { | ||
| return nil, errSubscribing | ||
| } | ||
|
|
||
| return &GossipEthTxPool{ | ||
| mempool: mempool, | ||
| pendingTxs: make(chan core.NewTxsEvent, pendingTxsBuffer), | ||
StephenButtolph marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| sub: sub, | ||
| bloom: bloom, | ||
| }, nil | ||
| } | ||
|
|
||
| type GossipEthTxPool struct { | ||
| mempool *txpool.TxPool | ||
| pendingTxs chan core.NewTxsEvent | ||
| sub event.Subscription | ||
|
|
||
| bloom *gossip.BloomFilter | ||
| lock sync.RWMutex | ||
|
|
||
| // subscribed is set to true when the gossip subscription is active | ||
| // mostly used for testing | ||
| subscribed atomic.Bool | ||
| } | ||
|
|
||
| // IsSubscribed returns whether or not the gossip subscription is active. | ||
| func (g *GossipEthTxPool) IsSubscribed() bool { | ||
| return g.subscribed.Load() | ||
| bloom *gossip.BloomFilter | ||
| } | ||
|
|
||
| func (g *GossipEthTxPool) Subscribe(ctx context.Context) { | ||
| sub := g.mempool.SubscribeTransactions(g.pendingTxs, false) | ||
| if sub == nil { | ||
| log.Warn("failed to subscribe to new txs event") | ||
| return | ||
| } | ||
| g.subscribed.CompareAndSwap(false, true) | ||
| defer func() { | ||
| sub.Unsubscribe() | ||
| g.subscribed.CompareAndSwap(true, false) | ||
| }() | ||
| // UpdateBloomFilter continuously listens for new pending transactions from the | ||
| // mempool and adds them to the bloom filter. If the bloom filter reaches its | ||
| // capacity, it is reset and all pending transactions are re-added. | ||
| func (g *GossipEthTxPool) UpdateBloomFilter(ctx context.Context) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: maybe can be renamed
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. or really |
||
| defer g.sub.Unsubscribe() | ||
|
|
||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| log.Debug("shutting down subscription") | ||
| return | ||
| case pendingTxs := <-g.pendingTxs: | ||
| case pendingTxs, ok := <-g.pendingTxs: | ||
| if !ok { | ||
| log.Debug("pending txs channel closed, shutting down subscription") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: I don't think this channel closes normally, should this be a warn/error log? |
||
| return | ||
| } | ||
|
|
||
| g.lock.Lock() | ||
| optimalElements := (g.mempool.PendingSize(txpool.PendingFilter{}) + len(pendingTxs.Txs)) * config.TxGossipBloomChurnMultiplier | ||
| for _, pendingTx := range pendingTxs.Txs { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.