Skip to content

Commit 4e80be5

Browse files
Provide alternative to gossip.BloomFilter
1 parent 5950ba3 commit 4e80be5

File tree

13 files changed

+448
-301
lines changed

13 files changed

+448
-301
lines changed

network/p2p/gossip/bloom.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ import (
1818
//
1919
// Invariant: The returned bloom filter is not safe to reset concurrently with
2020
// other operations. However, it is otherwise safe to access concurrently.
21+
//
22+
// Deprecated: [SetWithBloomFilter] should be used to manage bloom filters.
2123
func NewBloomFilter(
2224
registerer prometheus.Registerer,
2325
namespace string,
@@ -45,6 +47,7 @@ func NewBloomFilter(
4547
return filter, err
4648
}
4749

50+
// Deprecated: [SetWithBloomFilter] should be used to manage bloom filters.
4851
type BloomFilter struct {
4952
minTargetElements int
5053
targetFalsePositiveProbability float64
@@ -85,6 +88,8 @@ func (b *BloomFilter) Marshal() ([]byte, []byte) {
8588
// the same [targetFalsePositiveProbability].
8689
//
8790
// Returns true if the bloom filter was reset.
91+
//
92+
// Deprecated: [SetWithBloomFilter] should be used to manage bloom filters.
8893
func ResetBloomFilterIfNeeded(
8994
bloomFilter *BloomFilter,
9095
targetElements int,

network/p2p/gossip/gossip.go

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/ava-labs/avalanchego/ids"
1919
"github.com/ava-labs/avalanchego/network/p2p"
2020
"github.com/ava-labs/avalanchego/snow/engine/common"
21+
"github.com/ava-labs/avalanchego/utils/bloom"
2122
"github.com/ava-labs/avalanchego/utils/buffer"
2223
"github.com/ava-labs/avalanchego/utils/logging"
2324
"github.com/ava-labs/avalanchego/utils/set"
@@ -40,6 +41,7 @@ const (
4041
var (
4142
_ Gossiper = (*ValidatorGossiper)(nil)
4243
_ Gossiper = (*PullGossiper[Gossipable])(nil)
44+
_ Gossiper = (*PushGossiper[Gossipable])(nil)
4345

4446
ioTypeLabels = []string{ioLabel, typeLabel}
4547
sentPushLabels = prometheus.Labels{
@@ -75,6 +77,17 @@ var (
7577
ErrInvalidRegossipFrequency = errors.New("re-gossip frequency cannot be negative")
7678
)
7779

80+
// Gossipable is an item that can be gossiped across the network
81+
type Gossipable interface {
82+
GossipID() ids.ID
83+
}
84+
85+
// Marshaller handles parsing logic for a concrete Gossipable type
86+
type Marshaller[T Gossipable] interface {
87+
MarshalGossip(T) ([]byte, error)
88+
UnmarshalGossip([]byte) (T, error)
89+
}
90+
7891
// Gossiper gossips Gossipables to other nodes
7992
type Gossiper interface {
8093
// Gossip runs a cycle of gossip. Returns an error if we failed to gossip.
@@ -191,7 +204,7 @@ func (v ValidatorGossiper) Gossip(ctx context.Context) error {
191204
func NewPullGossiper[T Gossipable](
192205
log logging.Logger,
193206
marshaller Marshaller[T],
194-
set Set[T],
207+
set PullGossiperSet[T],
195208
client *p2p.Client,
196209
metrics Metrics,
197210
pollSize int,
@@ -206,17 +219,27 @@ func NewPullGossiper[T Gossipable](
206219
}
207220
}
208221

222+
// PullGossiperSet exposes the current bloom filter and allows adding new items
223+
// that were not included in the filter.
224+
type PullGossiperSet[T Gossipable] interface {
225+
// Add adds a value to the set. Returns an error if v was not added.
226+
Add(v T) error
227+
// BloomFilter returns the bloom filter and its corresponding salt.
228+
BloomFilter() (bloom *bloom.Filter, salt ids.ID)
229+
}
230+
209231
type PullGossiper[T Gossipable] struct {
210232
log logging.Logger
211233
marshaller Marshaller[T]
212-
set Set[T]
234+
set PullGossiperSet[T]
213235
client *p2p.Client
214236
metrics Metrics
215237
pollSize int
216238
}
217239

218240
func (p *PullGossiper[_]) Gossip(ctx context.Context) error {
219-
msgBytes, err := MarshalAppRequest(p.set.GetFilter())
241+
bf, salt := p.set.BloomFilter()
242+
msgBytes, err := MarshalAppRequest(bf.Marshal(), salt[:])
220243
if err != nil {
221244
return err
222245
}
@@ -293,7 +316,7 @@ func (p *PullGossiper[_]) handleResponse(
293316
// NewPushGossiper returns an instance of PushGossiper
294317
func NewPushGossiper[T Gossipable](
295318
marshaller Marshaller[T],
296-
mempool Set[T],
319+
set PushGossiperSet,
297320
validators p2p.ValidatorSubset,
298321
client *p2p.Client,
299322
metrics Metrics,
@@ -320,7 +343,7 @@ func NewPushGossiper[T Gossipable](
320343

321344
return &PushGossiper[T]{
322345
marshaller: marshaller,
323-
set: mempool,
346+
set: set,
324347
validators: validators,
325348
client: client,
326349
metrics: metrics,
@@ -336,10 +359,16 @@ func NewPushGossiper[T Gossipable](
336359
}, nil
337360
}
338361

362+
// PushGossiperSet exposes whether hashes are still included in a set.
363+
type PushGossiperSet interface {
364+
// Has returns true if the hash is in the set.
365+
Has(h ids.ID) bool
366+
}
367+
339368
// PushGossiper broadcasts gossip to peers randomly in the network
340369
type PushGossiper[T Gossipable] struct {
341370
marshaller Marshaller[T]
342-
set Set[T]
371+
set PushGossiperSet
343372
validators p2p.ValidatorSubset
344373
client *p2p.Client
345374
metrics Metrics

network/p2p/gossip/gossip_test.go

Lines changed: 38 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"github.com/ava-labs/avalanchego/snow/engine/enginetest"
2121
"github.com/ava-labs/avalanchego/snow/validators"
2222
"github.com/ava-labs/avalanchego/snow/validators/validatorstest"
23-
"github.com/ava-labs/avalanchego/utils/bloom"
2423
"github.com/ava-labs/avalanchego/utils/constants"
2524
"github.com/ava-labs/avalanchego/utils/logging"
2625
"github.com/ava-labs/avalanchego/utils/set"
@@ -69,38 +68,35 @@ func (marshaller) UnmarshalGossip(bytes []byte) (tx, error) {
6968

7069
type setDouble struct {
7170
txs set.Set[tx]
72-
bloom *BloomFilter
7371
onAdd func(tx tx)
7472
}
7573

76-
func (s *setDouble) Add(gossipable tx) error {
77-
if s.txs.Contains(gossipable) {
78-
return fmt.Errorf("%s already present", ids.ID(gossipable))
74+
func (s *setDouble) Add(t tx) error {
75+
if s.txs.Contains(t) {
76+
return fmt.Errorf("%s already present", t)
7977
}
8078

81-
s.txs.Add(gossipable)
82-
s.bloom.Add(gossipable)
79+
s.txs.Add(t)
8380
if s.onAdd != nil {
84-
s.onAdd(gossipable)
81+
s.onAdd(t)
8582
}
86-
8783
return nil
8884
}
8985

90-
func (s *setDouble) Has(gossipID ids.ID) bool {
91-
return s.txs.Contains(tx(gossipID))
86+
func (s *setDouble) Has(id ids.ID) bool {
87+
return s.txs.Contains(tx(id))
9288
}
9389

94-
func (s *setDouble) Iterate(f func(gossipable tx) bool) {
90+
func (s *setDouble) Iterate(f func(t tx) bool) {
9591
for tx := range s.txs {
9692
if !f(tx) {
9793
return
9894
}
9995
}
10096
}
10197

102-
func (s *setDouble) GetFilter() ([]byte, []byte) {
103-
return s.bloom.Marshal()
98+
func (s *setDouble) Len() int {
99+
return s.txs.Len()
104100
}
105101

106102
func TestGossiperGossip(t *testing.T) {
@@ -175,13 +171,18 @@ func TestGossiperGossip(t *testing.T) {
175171
)
176172
require.NoError(err)
177173

178-
responseBloom, err := NewBloomFilter(prometheus.NewRegistry(), "", 1000, 0.01, 0.05)
174+
var serverSet setDouble
175+
serverMempool, err := NewSetWithBloomFilter(
176+
&serverSet,
177+
prometheus.NewRegistry(),
178+
"",
179+
1000,
180+
0.01,
181+
0.05,
182+
)
179183
require.NoError(err)
180-
responseSet := &setDouble{
181-
bloom: responseBloom,
182-
}
183184
for _, item := range tt.responder {
184-
require.NoError(responseSet.Add(item))
185+
require.NoError(serverMempool.Add(item))
185186
}
186187

187188
metrics, err := NewMetrics(prometheus.NewRegistry(), "")
@@ -196,7 +197,7 @@ func TestGossiperGossip(t *testing.T) {
196197
handler := NewHandler[tx](
197198
logging.NoLog{},
198199
marshaller,
199-
responseSet,
200+
serverMempool,
200201
metrics,
201202
tt.targetResponseSize,
202203
)
@@ -218,13 +219,18 @@ func TestGossiperGossip(t *testing.T) {
218219
require.NoError(err)
219220
require.NoError(requestNetwork.Connected(t.Context(), ids.EmptyNodeID, nil))
220221

221-
bloom, err := NewBloomFilter(prometheus.NewRegistry(), "", 1000, 0.01, 0.05)
222+
var requestSet setDouble
223+
clientMempool, err := NewSetWithBloomFilter(
224+
&requestSet,
225+
prometheus.NewRegistry(),
226+
"",
227+
1000,
228+
0.01,
229+
0.05,
230+
)
222231
require.NoError(err)
223-
requestSet := &setDouble{
224-
bloom: bloom,
225-
}
226232
for _, item := range tt.requester {
227-
require.NoError(requestSet.Add(item))
233+
require.NoError(clientMempool.Add(item))
228234
}
229235

230236
requestClient := requestNetwork.NewClient(
@@ -236,7 +242,7 @@ func TestGossiperGossip(t *testing.T) {
236242
gossiper := NewPullGossiper[tx](
237243
logging.NoLog{},
238244
marshaller,
239-
requestSet,
245+
clientMempool,
240246
requestClient,
241247
metrics,
242248
1,
@@ -457,20 +463,10 @@ func TestPushGossiperNew(t *testing.T) {
457463
}
458464
}
459465

460-
type fullSet[T Gossipable] struct{}
461-
462-
func (fullSet[T]) Add(T) error {
463-
return nil
464-
}
465-
466-
func (fullSet[T]) Has(ids.ID) bool {
467-
return true
468-
}
469-
470-
func (fullSet[T]) Iterate(func(gossipable T) bool) {}
466+
type hasFunc func(h ids.ID) bool
471467

472-
func (fullSet[_]) GetFilter() ([]byte, []byte) {
473-
return bloom.FullFilter.Marshal(), ids.Empty[:]
468+
func (f hasFunc) Has(h ids.ID) bool {
469+
return f(h)
474470
}
475471

476472
// Tests that the outgoing gossip is equivalent to what was accumulated
@@ -593,7 +589,9 @@ func TestPushGossiper(t *testing.T) {
593589

594590
gossiper, err := NewPushGossiper[tx](
595591
marshaller,
596-
fullSet[tx]{},
592+
hasFunc(func(ids.ID) bool {
593+
return true // Never remove the items from the set
594+
}),
597595
validators,
598596
client,
599597
metrics,

network/p2p/gossip/gossipable.go

Lines changed: 0 additions & 31 deletions
This file was deleted.

network/p2p/gossip/handler.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,19 @@ import (
1818

1919
var _ p2p.Handler = (*Handler[Gossipable])(nil)
2020

21+
// HandlerSet exposes the ability to add new values to the set in response to
22+
// pushed information and for responding to pull requests.
23+
type HandlerSet[T Gossipable] interface {
24+
// Add adds a value to the set. Returns an error if v was not added.
25+
Add(v T) error
26+
// Iterate iterates over elements until f returns false.
27+
Iterate(f func(v T) bool)
28+
}
29+
2130
func NewHandler[T Gossipable](
2231
log logging.Logger,
2332
marshaller Marshaller[T],
24-
set Set[T],
33+
set HandlerSet[T],
2534
metrics Metrics,
2635
targetResponseSize int,
2736
) *Handler[T] {
@@ -39,7 +48,7 @@ type Handler[T Gossipable] struct {
3948
p2p.Handler
4049
marshaller Marshaller[T]
4150
log logging.Logger
42-
set Set[T]
51+
set HandlerSet[T]
4352
metrics Metrics
4453
targetResponseSize int
4554
}

0 commit comments

Comments
 (0)