Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 26 additions & 23 deletions graft/coreth/plugin/evm/eth_gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"

"github.com/ava-labs/libevm/core/types"
"github.com/ava-labs/libevm/event"
"github.com/ava-labs/libevm/log"
"github.com/prometheus/client_golang/prometheus"

Expand All @@ -33,8 +33,13 @@ var (
_ gossip.Set[*GossipEthTx] = (*GossipEthTxPool)(nil)

_ eth.PushGossiper = (*EthPushGossiper)(nil)

errSubscribing = fmt.Errorf("subscribing to the mempool failed")
)

// NewGossipEthTxPool creates a new GossipEthTxPool.
//
// If a nil error is returned, UpdateBloomFilter must be called.
func NewGossipEthTxPool(mempool *txpool.TxPool, registerer prometheus.Registerer) (*GossipEthTxPool, error) {
bloom, err := gossip.NewBloomFilter(
registerer,
Expand All @@ -47,48 +52,46 @@ func NewGossipEthTxPool(mempool *txpool.TxPool, registerer prometheus.Registerer
return nil, fmt.Errorf("failed to initialize bloom filter: %w", err)
}

pendingTxs := make(chan core.NewTxsEvent, pendingTxsBuffer)
sub := mempool.SubscribeTransactions(pendingTxs, true)
if sub == nil {
return nil, errSubscribing
}

return &GossipEthTxPool{
mempool: mempool,
pendingTxs: make(chan core.NewTxsEvent, pendingTxsBuffer),
sub: sub,
bloom: bloom,
}, nil
}

type GossipEthTxPool struct {
mempool *txpool.TxPool
pendingTxs chan core.NewTxsEvent
sub event.Subscription

bloom *gossip.BloomFilter
lock sync.RWMutex

// subscribed is set to true when the gossip subscription is active
// mostly used for testing
subscribed atomic.Bool
}

// IsSubscribed returns whether or not the gossip subscription is active.
func (g *GossipEthTxPool) IsSubscribed() bool {
return g.subscribed.Load()
bloom *gossip.BloomFilter
}

func (g *GossipEthTxPool) Subscribe(ctx context.Context) {
sub := g.mempool.SubscribeTransactions(g.pendingTxs, false)
if sub == nil {
log.Warn("failed to subscribe to new txs event")
return
}
g.subscribed.CompareAndSwap(false, true)
defer func() {
sub.Unsubscribe()
g.subscribed.CompareAndSwap(true, false)
}()
// UpdateBloomFilter continuously listens for new pending transactions from the
// mempool and adds them to the bloom filter. If the bloom filter reaches its
// capacity, it is reset and all pending transactions are re-added.
func (g *GossipEthTxPool) UpdateBloomFilter(ctx context.Context) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe can be renamedBloomFilterUpdateLoop or DispatchBloomUpdater etc etc to indicate the continous loop.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or really Subscribe haha

defer g.sub.Unsubscribe()

for {
select {
case <-ctx.Done():
log.Debug("shutting down subscription")
return
case pendingTxs := <-g.pendingTxs:
case pendingTxs, ok := <-g.pendingTxs:
if !ok {
log.Debug("pending txs channel closed, shutting down subscription")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I don't think this channel closes normally, should this be a warn/error log?

return
}

g.lock.Lock()
optimalElements := (g.mempool.PendingSize(txpool.PendingFilter{}) + len(pendingTxs.Txs)) * config.TxGossipBloomChurnMultiplier
for _, pendingTx := range pendingTxs.Txs {
Expand Down
6 changes: 1 addition & 5 deletions graft/coreth/plugin/evm/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,7 @@ func TestGossipSubscribe(t *testing.T) {
require.NoError(err)
ctx, cancel := context.WithCancel(t.Context())
defer cancel()
go gossipTxPool.Subscribe(ctx)

require.Eventually(func() bool {
return gossipTxPool.IsSubscribed()
}, 10*time.Second, 500*time.Millisecond, "expected gossipTxPool to be subscribed")
go gossipTxPool.UpdateBloomFilter(ctx)

// create eth txs
ethTxs := getValidEthTxs(key, 10, big.NewInt(226*utils.GWei))
Expand Down
2 changes: 1 addition & 1 deletion graft/coreth/plugin/evm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ func (vm *VM) initBlockBuilding() error {
}
vm.shutdownWg.Add(1)
go func() {
ethTxPool.Subscribe(ctx)
ethTxPool.UpdateBloomFilter(ctx)
vm.shutdownWg.Done()
}()
pushGossipParams := avalanchegossip.BranchingFactor{
Expand Down
Loading