Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 56 additions & 31 deletions network/p2p/gossip/bloom.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package gossip

import (
"crypto/rand"
"sync"

"github.com/prometheus/client_golang/prometheus"

Expand All @@ -16,8 +17,7 @@ import (
// anticipated at any moment, and a false positive probability of [targetFalsePositiveProbability]. If the
// false positive probability exceeds [resetFalsePositiveProbability], the bloom filter will be reset.
//
// Invariant: The returned bloom filter is not safe to reset concurrently with
// other operations. However, it is otherwise safe to access concurrently.
// The returned bloom filter is safe for concurrent usage.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this refactor encourage misuse of this bloom filter?

Specifically, it seems like this change does not expect for the bloom filter to be refilled atomically with its reset. This code certainly can be cleaned up... But adding this locking behavior doesn't seem useful (when using the bloom filter correctly).

If the bloom filter is accessed in a racy manner (Specifically Marshal being called concurrently with ResetIfNeeded followed by the expected Add calls), wouldn't we send a (potentially empty) bloom filter even though we may have a large number of items still in the set?

Copy link
Contributor Author

@ARR4N ARR4N Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I see what you mean. I think the best way to address that along with my intended improvement is to add a afterReset func() argument, which is called while still holding the write lock, and expect refills to be in there. WDYT?

EDIT: I implemented it in the latest commit (7118ad6) because it's easy to revert but does a better job of communicating my thinking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed from a general callback to an iter.Seq[Gossipable] to reduce the scope to exactly what's needed.

func NewBloomFilter(
registerer prometheus.Registerer,
namespace string,
Expand All @@ -36,12 +36,8 @@ func NewBloomFilter(

metrics: metrics,
}
err = resetBloomFilter(
filter,
minTargetElements,
targetFalsePositiveProbability,
resetFalsePositiveProbability,
)
// A lock is unnecessary as no other goroutine could have access.
err = filter.resetWhenLocked(minTargetElements)
return filter, err
}

Expand All @@ -52,6 +48,11 @@ type BloomFilter struct {

metrics *bloom.Metrics

// [bloom.Filter] itself is threadsafe, but resetting requires replacing it
// entirely. This mutex protects the [BloomFilter] fields, not the
// [bloom.Filter], so resetting is a write while everything else is a read.
resetMu sync.RWMutex

maxCount int
bloom *bloom.Filter
// salt is provided to eventually unblock collisions in Bloom. It's possible
Expand All @@ -61,17 +62,26 @@ type BloomFilter struct {
}

func (b *BloomFilter) Add(gossipable Gossipable) {
b.resetMu.RLock()
defer b.resetMu.RUnlock()

h := gossipable.GossipID()
bloom.Add(b.bloom, h[:], b.salt[:])
b.metrics.Count.Inc()
}

func (b *BloomFilter) Has(gossipable Gossipable) bool {
b.resetMu.RLock()
defer b.resetMu.RUnlock()

h := gossipable.GossipID()
return bloom.Contains(b.bloom, h[:], b.salt[:])
}

func (b *BloomFilter) Marshal() ([]byte, []byte) {
b.resetMu.RLock()
defer b.resetMu.RUnlock()

bloomBytes := b.bloom.Marshal()
// salt must be copied here to ensure the bytes aren't overwritten if salt
// is later modified.
Expand All @@ -81,37 +91,52 @@ func (b *BloomFilter) Marshal() ([]byte, []byte) {

// ResetBloomFilterIfNeeded resets a bloom filter if it breaches [targetFalsePositiveProbability].
//
// If [targetElements] exceeds [minTargetElements], the size of the bloom filter will grow to maintain
// the same [targetFalsePositiveProbability].
//
// Returns true if the bloom filter was reset.
// Deprecated: use [BloomFilter.ResetIfNeeded].
func ResetBloomFilterIfNeeded(
bloomFilter *BloomFilter,
targetElements int,
) (bool, error) {
if bloomFilter.bloom.Count() <= bloomFilter.maxCount {
return bloomFilter.ResetIfNeeded(targetElements)
}

// ResetIfNeeded resets the bloom filter if it breaches [targetFalsePositiveProbability].
//
// If [targetElements] exceeds [minTargetElements], the size of the bloom filter will grow to maintain
// the same [targetFalsePositiveProbability].
//
// Returns true if the bloom filter was reset.
func (b *BloomFilter) ResetIfNeeded(targetElements int) (bool, error) {
mu := &b.resetMu

// Although this pattern requires a double checking of the same property,
// it's cheap and avoids unnecessarily locking out all other goroutines on
// every call to this method.
isResetNeeded := func() bool {
return b.bloom.Count() > b.maxCount
}
mu.RLock()
reset := isResetNeeded()
mu.RUnlock()
if !reset {
return false, nil
}

targetElements = max(bloomFilter.minTargetElements, targetElements)
err := resetBloomFilter(
bloomFilter,
targetElements,
bloomFilter.targetFalsePositiveProbability,
bloomFilter.resetFalsePositiveProbability,
)
mu.Lock()
defer mu.Unlock()
// Another thread may have beaten us to acquire the write lock.
if !isResetNeeded() {
return false, nil
}

targetElements = max(b.minTargetElements, targetElements)
err := b.resetWhenLocked(targetElements)
return err == nil, err
}

func resetBloomFilter(
bloomFilter *BloomFilter,
targetElements int,
targetFalsePositiveProbability,
resetFalsePositiveProbability float64,
) error {
func (b *BloomFilter) resetWhenLocked(targetElements int) error {
numHashes, numEntries := bloom.OptimalParameters(
targetElements,
targetFalsePositiveProbability,
b.targetFalsePositiveProbability,
)
newBloom, err := bloom.New(numHashes, numEntries)
if err != nil {
Expand All @@ -122,10 +147,10 @@ func resetBloomFilter(
return err
}

bloomFilter.maxCount = bloom.EstimateCount(numHashes, numEntries, resetFalsePositiveProbability)
bloomFilter.bloom = newBloom
bloomFilter.salt = newSalt
b.maxCount = bloom.EstimateCount(numHashes, numEntries, b.resetFalsePositiveProbability)
b.bloom = newBloom
b.salt = newSalt

bloomFilter.metrics.Reset(newBloom, bloomFilter.maxCount)
b.metrics.Reset(newBloom, b.maxCount)
return nil
}
31 changes: 31 additions & 0 deletions network/p2p/gossip/bloom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package gossip

import (
"slices"
"sync"
"testing"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -106,3 +107,33 @@ func TestBloomFilterRefresh(t *testing.T) {
})
}
}

func TestBloomFilterClobber(t *testing.T) {
b, err := NewBloomFilter(prometheus.NewRegistry(), "", 1, 0.5, 0.5)
require.NoError(t, err, "NewBloomFilter()")

start := make(chan struct{})
var wg sync.WaitGroup

for _, fn := range []func(){
func() { b.Add(&testTx{}) },
func() { b.Has(&testTx{}) },
func() { b.Marshal() },
func() {
_, err := b.ResetIfNeeded(1)
require.NoErrorf(t, err, "%T.ResetIfNeeded()", b)
},
} {
for range 10_000 {
wg.Add(1)
go func() {
<-start
fn()
wg.Done()
}()
}
}

close(start)
wg.Wait()
}