Skip to content

Commit cc10153

Browse files
authored
Merge branch 'dev' into e2e-staking-rewards
2 parents ac2c4ac + 484a72f commit cc10153

File tree

11 files changed

+948
-40
lines changed

11 files changed

+948
-40
lines changed

network/p2p/gossip/bloom.go

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
2+
// See the file LICENSE for licensing terms.
3+
4+
package gossip
5+
6+
import (
7+
"crypto/rand"
8+
"encoding/binary"
9+
"hash"
10+
11+
bloomfilter "github.com/holiman/bloomfilter/v2"
12+
13+
"github.com/ava-labs/avalanchego/ids"
14+
)
15+
16+
var _ hash.Hash64 = (*hasher)(nil)
17+
18+
// NewBloomFilter returns a new instance of a bloom filter with at most
19+
// [maxExpectedElements] elements anticipated at any moment, and a false
20+
// positive probability of [falsePositiveProbability].
21+
func NewBloomFilter(
22+
maxExpectedElements uint64,
23+
falsePositiveProbability float64,
24+
) (*BloomFilter, error) {
25+
bloom, err := bloomfilter.NewOptimal(
26+
maxExpectedElements,
27+
falsePositiveProbability,
28+
)
29+
if err != nil {
30+
return nil, err
31+
}
32+
33+
salt, err := randomSalt()
34+
return &BloomFilter{
35+
Bloom: bloom,
36+
Salt: salt,
37+
}, err
38+
}
39+
40+
type BloomFilter struct {
41+
Bloom *bloomfilter.Filter
42+
// Salt is provided to eventually unblock collisions in Bloom. It's possible
43+
// that conflicting Gossipable items collide in the bloom filter, so a salt
44+
// is generated to eventually resolve collisions.
45+
Salt ids.ID
46+
}
47+
48+
func (b *BloomFilter) Add(gossipable Gossipable) {
49+
h := gossipable.GetID()
50+
salted := &hasher{
51+
hash: h[:],
52+
salt: b.Salt,
53+
}
54+
b.Bloom.Add(salted)
55+
}
56+
57+
func (b *BloomFilter) Has(gossipable Gossipable) bool {
58+
h := gossipable.GetID()
59+
salted := &hasher{
60+
hash: h[:],
61+
salt: b.Salt,
62+
}
63+
return b.Bloom.Contains(salted)
64+
}
65+
66+
// ResetBloomFilterIfNeeded resets a bloom filter if it breaches a target false
67+
// positive probability. Returns true if the bloom filter was reset.
68+
func ResetBloomFilterIfNeeded(
69+
bloomFilter *BloomFilter,
70+
falsePositiveProbability float64,
71+
) (bool, error) {
72+
if bloomFilter.Bloom.FalsePosititveProbability() < falsePositiveProbability {
73+
return false, nil
74+
}
75+
76+
newBloom, err := bloomfilter.New(bloomFilter.Bloom.M(), bloomFilter.Bloom.K())
77+
if err != nil {
78+
return false, err
79+
}
80+
salt, err := randomSalt()
81+
if err != nil {
82+
return false, err
83+
}
84+
85+
bloomFilter.Bloom = newBloom
86+
bloomFilter.Salt = salt
87+
return true, nil
88+
}
89+
90+
func randomSalt() (ids.ID, error) {
91+
salt := ids.ID{}
92+
_, err := rand.Read(salt[:])
93+
return salt, err
94+
}
95+
96+
type hasher struct {
97+
hash []byte
98+
salt ids.ID
99+
}
100+
101+
func (h *hasher) Write(p []byte) (n int, err error) {
102+
h.hash = append(h.hash, p...)
103+
return len(p), nil
104+
}
105+
106+
func (h *hasher) Sum(b []byte) []byte {
107+
h.hash = append(h.hash, b...)
108+
return h.hash
109+
}
110+
111+
func (h *hasher) Reset() {
112+
h.hash = ids.Empty[:]
113+
}
114+
115+
func (*hasher) BlockSize() int {
116+
return ids.IDLen
117+
}
118+
119+
func (h *hasher) Sum64() uint64 {
120+
salted := ids.ID{}
121+
for i := 0; i < len(h.hash) && i < ids.IDLen; i++ {
122+
salted[i] = h.hash[i] ^ h.salt[i]
123+
}
124+
125+
return binary.BigEndian.Uint64(salted[:])
126+
}
127+
128+
func (h *hasher) Size() int {
129+
return len(h.hash)
130+
}

network/p2p/gossip/bloom_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
2+
// See the file LICENSE for licensing terms.
3+
4+
package gossip
5+
6+
import (
7+
"testing"
8+
9+
bloomfilter "github.com/holiman/bloomfilter/v2"
10+
11+
"github.com/stretchr/testify/require"
12+
13+
"github.com/ava-labs/avalanchego/ids"
14+
)
15+
16+
func TestBloomFilterRefresh(t *testing.T) {
17+
tests := []struct {
18+
name string
19+
falsePositiveProbability float64
20+
add []*testTx
21+
expected []*testTx
22+
}{
23+
{
24+
name: "no refresh",
25+
falsePositiveProbability: 1,
26+
add: []*testTx{
27+
{id: ids.ID{0}},
28+
},
29+
expected: []*testTx{
30+
{id: ids.ID{0}},
31+
},
32+
},
33+
{
34+
name: "refresh",
35+
falsePositiveProbability: 0.1,
36+
add: []*testTx{
37+
{id: ids.ID{0}},
38+
{id: ids.ID{1}},
39+
},
40+
expected: []*testTx{
41+
{id: ids.ID{1}},
42+
},
43+
},
44+
}
45+
46+
for _, tt := range tests {
47+
t.Run(tt.name, func(t *testing.T) {
48+
require := require.New(t)
49+
b, err := bloomfilter.New(10, 1)
50+
require.NoError(err)
51+
bloom := BloomFilter{
52+
Bloom: b,
53+
}
54+
55+
for _, item := range tt.add {
56+
_, err = ResetBloomFilterIfNeeded(&bloom, tt.falsePositiveProbability)
57+
require.NoError(err)
58+
bloom.Add(item)
59+
}
60+
61+
require.Equal(uint64(len(tt.expected)), bloom.Bloom.N())
62+
63+
for _, expected := range tt.expected {
64+
require.True(bloom.Has(expected))
65+
}
66+
})
67+
}
68+
}

network/p2p/gossip/gossip.go

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
2+
// See the file LICENSE for licensing terms.
3+
4+
package gossip
5+
6+
import (
7+
"context"
8+
"time"
9+
10+
"go.uber.org/zap"
11+
12+
"google.golang.org/protobuf/proto"
13+
14+
"github.com/ava-labs/avalanchego/ids"
15+
"github.com/ava-labs/avalanchego/network/p2p"
16+
"github.com/ava-labs/avalanchego/proto/pb/sdk"
17+
"github.com/ava-labs/avalanchego/utils/logging"
18+
)
19+
20+
// GossipableAny exists to help create non-nil pointers to a concrete Gossipable
21+
// ref: https://stackoverflow.com/questions/69573113/how-can-i-instantiate-a-non-nil-pointer-of-type-argument-with-generic-go
22+
type GossipableAny[T any] interface {
23+
*T
24+
Gossipable
25+
}
26+
27+
type Config struct {
28+
Frequency time.Duration
29+
PollSize int
30+
}
31+
32+
func NewGossiper[T any, U GossipableAny[T]](
33+
config Config,
34+
log logging.Logger,
35+
set Set[U],
36+
client *p2p.Client,
37+
) *Gossiper[T, U] {
38+
return &Gossiper[T, U]{
39+
config: config,
40+
log: log,
41+
set: set,
42+
client: client,
43+
}
44+
}
45+
46+
type Gossiper[T any, U GossipableAny[T]] struct {
47+
config Config
48+
log logging.Logger
49+
set Set[U]
50+
client *p2p.Client
51+
}
52+
53+
func (g *Gossiper[_, _]) Gossip(ctx context.Context) {
54+
gossipTicker := time.NewTicker(g.config.Frequency)
55+
defer gossipTicker.Stop()
56+
57+
for {
58+
select {
59+
case <-gossipTicker.C:
60+
if err := g.gossip(ctx); err != nil {
61+
g.log.Warn("failed to gossip", zap.Error(err))
62+
}
63+
case <-ctx.Done():
64+
g.log.Debug("shutting down gossip")
65+
return
66+
}
67+
}
68+
}
69+
70+
func (g *Gossiper[_, _]) gossip(ctx context.Context) error {
71+
bloom, salt, err := g.set.GetFilter()
72+
if err != nil {
73+
return err
74+
}
75+
76+
request := &sdk.PullGossipRequest{
77+
Filter: bloom,
78+
Salt: salt,
79+
}
80+
msgBytes, err := proto.Marshal(request)
81+
if err != nil {
82+
return err
83+
}
84+
85+
for i := 0; i < g.config.PollSize; i++ {
86+
if err := g.client.AppRequestAny(ctx, msgBytes, g.handleResponse); err != nil {
87+
return err
88+
}
89+
}
90+
91+
return nil
92+
}
93+
94+
func (g *Gossiper[T, U]) handleResponse(
95+
nodeID ids.NodeID,
96+
responseBytes []byte,
97+
err error,
98+
) {
99+
if err != nil {
100+
g.log.Debug(
101+
"failed gossip request",
102+
zap.Stringer("nodeID", nodeID),
103+
zap.Error(err),
104+
)
105+
return
106+
}
107+
108+
response := &sdk.PullGossipResponse{}
109+
if err := proto.Unmarshal(responseBytes, response); err != nil {
110+
g.log.Debug("failed to unmarshal gossip response", zap.Error(err))
111+
return
112+
}
113+
114+
for _, bytes := range response.Gossip {
115+
gossipable := U(new(T))
116+
if err := gossipable.Unmarshal(bytes); err != nil {
117+
g.log.Debug(
118+
"failed to unmarshal gossip",
119+
zap.Stringer("nodeID", nodeID),
120+
zap.Error(err),
121+
)
122+
continue
123+
}
124+
125+
hash := gossipable.GetID()
126+
g.log.Debug(
127+
"received gossip",
128+
zap.Stringer("nodeID", nodeID),
129+
zap.Stringer("id", hash),
130+
)
131+
if err := g.set.Add(gossipable); err != nil {
132+
g.log.Debug(
133+
"failed to add gossip to the known set",
134+
zap.Stringer("nodeID", nodeID),
135+
zap.Stringer("id", hash),
136+
zap.Error(err),
137+
)
138+
continue
139+
}
140+
}
141+
}

0 commit comments

Comments
 (0)