Skip to content

Commit 96d451d

Browse files
Periodically PullGossip only from connected validators (#2399)
1 parent 0da5bcc commit 96d451d

File tree

10 files changed

+61
-80
lines changed

10 files changed

+61
-80
lines changed

chains/manager.go

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -859,13 +859,14 @@ func (m *manager) createAvalancheChain(
859859
// Create engine, bootstrapper and state-syncer in this order,
860860
// to make sure start callbacks are duly initialized
861861
snowmanEngineConfig := smeng.Config{
862-
Ctx: ctx,
863-
AllGetsServer: snowGetHandler,
864-
VM: vmWrappingProposerVM,
865-
Sender: snowmanMessageSender,
866-
Validators: vdrs,
867-
Params: consensusParams,
868-
Consensus: snowmanConsensus,
862+
Ctx: ctx,
863+
AllGetsServer: snowGetHandler,
864+
VM: vmWrappingProposerVM,
865+
Sender: snowmanMessageSender,
866+
Validators: vdrs,
867+
ConnectedValidators: connectedValidators,
868+
Params: consensusParams,
869+
Consensus: snowmanConsensus,
869870
}
870871
snowmanEngine, err := smeng.New(snowmanEngineConfig)
871872
if err != nil {
@@ -1201,14 +1202,15 @@ func (m *manager) createSnowmanChain(
12011202
// Create engine, bootstrapper and state-syncer in this order,
12021203
// to make sure start callbacks are duly initialized
12031204
engineConfig := smeng.Config{
1204-
Ctx: ctx,
1205-
AllGetsServer: snowGetHandler,
1206-
VM: vm,
1207-
Sender: messageSender,
1208-
Validators: vdrs,
1209-
Params: consensusParams,
1210-
Consensus: consensus,
1211-
PartialSync: m.PartialSyncPrimaryNetwork && ctx.ChainID == constants.PlatformChainID,
1205+
Ctx: ctx,
1206+
AllGetsServer: snowGetHandler,
1207+
VM: vm,
1208+
Sender: messageSender,
1209+
Validators: vdrs,
1210+
ConnectedValidators: connectedValidators,
1211+
Params: consensusParams,
1212+
Consensus: consensus,
1213+
PartialSync: m.PartialSyncPrimaryNetwork && ctx.ChainID == constants.PlatformChainID,
12121214
}
12131215
engine, err := smeng.New(engineConfig)
12141216
if err != nil {

node/overridden_manager.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,6 @@ func (o *overriddenManager) Sample(_ ids.ID, size int) ([]ids.NodeID, error) {
6868
return o.manager.Sample(o.subnetID, size)
6969
}
7070

71-
func (o *overriddenManager) UniformSample(_ ids.ID, size int) ([]ids.NodeID, error) {
72-
return o.manager.UniformSample(o.subnetID, size)
73-
}
74-
7571
func (o *overriddenManager) GetMap(ids.ID) map[ids.NodeID]*validators.GetValidatorOutput {
7672
return o.manager.GetMap(o.subnetID)
7773
}

snow/engine/common/tracker/peers.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ type Peers interface {
3333
ConnectedPercent() float64
3434
// TotalWeight returns the total validator weight
3535
TotalWeight() uint64
36+
// SampleValidator returns a randomly selected connected validator. If there
37+
// are no currently connected validators then it will return false.
38+
SampleValidator() (ids.NodeID, bool)
3639
// PreferredPeers returns the currently connected validators. If there are
3740
// no currently connected validators then it will return the currently
3841
// connected peers.
@@ -108,6 +111,13 @@ func (p *lockedPeers) TotalWeight() uint64 {
108111
return p.peers.TotalWeight()
109112
}
110113

114+
func (p *lockedPeers) SampleValidator() (ids.NodeID, bool) {
115+
p.lock.RLock()
116+
defer p.lock.RUnlock()
117+
118+
return p.peers.SampleValidator()
119+
}
120+
111121
func (p *lockedPeers) PreferredPeers() set.Set[ids.NodeID] {
112122
p.lock.RLock()
113123
defer p.lock.RUnlock()
@@ -263,6 +273,10 @@ func (p *peerData) TotalWeight() uint64 {
263273
return p.totalWeight
264274
}
265275

276+
func (p *peerData) SampleValidator() (ids.NodeID, bool) {
277+
return p.connectedValidators.Peek()
278+
}
279+
266280
func (p *peerData) PreferredPeers() set.Set[ids.NodeID] {
267281
if p.connectedValidators.Len() == 0 {
268282
connectedPeers := set.NewSet[ids.NodeID](p.connectedPeers.Len())

snow/engine/snowman/config.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/ava-labs/avalanchego/snow/consensus/snowball"
99
"github.com/ava-labs/avalanchego/snow/consensus/snowman"
1010
"github.com/ava-labs/avalanchego/snow/engine/common"
11+
"github.com/ava-labs/avalanchego/snow/engine/common/tracker"
1112
"github.com/ava-labs/avalanchego/snow/engine/snowman/block"
1213
"github.com/ava-labs/avalanchego/snow/validators"
1314
)
@@ -16,11 +17,12 @@ import (
1617
type Config struct {
1718
common.AllGetsServer
1819

19-
Ctx *snow.ConsensusContext
20-
VM block.ChainVM
21-
Sender common.Sender
22-
Validators validators.Manager
23-
Params snowball.Parameters
24-
Consensus snowman.Consensus
25-
PartialSync bool
20+
Ctx *snow.ConsensusContext
21+
VM block.ChainVM
22+
Sender common.Sender
23+
Validators validators.Manager
24+
ConnectedValidators tracker.Peers
25+
Params snowball.Parameters
26+
Consensus snowman.Consensus
27+
PartialSync bool
2628
}

snow/engine/snowman/config_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,18 @@ import (
88
"github.com/ava-labs/avalanchego/snow/consensus/snowball"
99
"github.com/ava-labs/avalanchego/snow/consensus/snowman"
1010
"github.com/ava-labs/avalanchego/snow/engine/common"
11+
"github.com/ava-labs/avalanchego/snow/engine/common/tracker"
1112
"github.com/ava-labs/avalanchego/snow/engine/snowman/block"
1213
"github.com/ava-labs/avalanchego/snow/validators"
1314
)
1415

1516
func DefaultConfig() Config {
1617
return Config{
17-
Ctx: snow.DefaultConsensusContextTest(),
18-
VM: &block.TestVM{},
19-
Sender: &common.SenderTest{},
20-
Validators: validators.NewManager(),
18+
Ctx: snow.DefaultConsensusContextTest(),
19+
VM: &block.TestVM{},
20+
Sender: &common.SenderTest{},
21+
Validators: validators.NewManager(),
22+
ConnectedValidators: tracker.NewPeers(),
2123
Params: snowball.Parameters{
2224
K: 1,
2325
AlphaPreference: 1,

snow/engine/snowman/transitive.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -169,11 +169,10 @@ func (t *Transitive) Gossip(ctx context.Context) error {
169169

170170
// Uniform sampling is used here to reduce bandwidth requirements of
171171
// nodes with a large amount of stake weight.
172-
vdrIDs, err := t.Validators.UniformSample(t.Ctx.SubnetID, 1)
173-
if err != nil {
172+
vdrID, ok := t.ConnectedValidators.SampleValidator()
173+
if !ok {
174174
t.Ctx.Log.Error("skipping block gossip",
175-
zap.String("reason", "no validators"),
176-
zap.Error(err),
175+
zap.String("reason", "no connected validators"),
177176
)
178177
return nil
179178
}
@@ -190,9 +189,13 @@ func (t *Transitive) Gossip(ctx context.Context) error {
190189
}
191190

192191
t.requestID++
193-
vdrSet := set.Of(vdrIDs...)
194-
preferredID := t.Consensus.Preference()
195-
t.Sender.SendPullQuery(ctx, vdrSet, t.requestID, preferredID, nextHeightToAccept)
192+
t.Sender.SendPullQuery(
193+
ctx,
194+
set.Of(vdrID),
195+
t.requestID,
196+
t.Consensus.Preference(),
197+
nextHeightToAccept,
198+
)
196199
} else {
197200
t.Ctx.Log.Debug("skipping block gossip",
198201
zap.String("reason", "blocks currently processing"),

snow/engine/snowman/transitive_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/ava-labs/avalanchego/snow/validators"
2323
"github.com/ava-labs/avalanchego/utils/constants"
2424
"github.com/ava-labs/avalanchego/utils/set"
25+
"github.com/ava-labs/avalanchego/version"
2526
)
2627

2728
var (
@@ -41,6 +42,9 @@ func setup(t *testing.T, engCfg Config) (ids.NodeID, validators.Manager, *common
4142

4243
vdr := ids.GenerateTestNodeID()
4344
require.NoError(vals.AddStaker(engCfg.Ctx.SubnetID, vdr, nil, ids.Empty, 1))
45+
require.NoError(engCfg.ConnectedValidators.Connected(context.Background(), vdr, version.CurrentApp))
46+
47+
vals.RegisterCallbackListener(engCfg.Ctx.SubnetID, engCfg.ConnectedValidators)
4448

4549
sender := &common.SenderTest{T: t}
4650
engCfg.Sender = sender

snow/validators/manager.go

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,6 @@ type Manager interface {
8585
// If sampling the requested size isn't possible, an error will be returned.
8686
Sample(subnetID ids.ID, size int) ([]ids.NodeID, error)
8787

88-
// UniformSample returns a collection of validatorIDs in the subnet.
89-
// If sampling the requested size isn't possible, an error will be returned.
90-
UniformSample(subnetID ids.ID, size int) ([]ids.NodeID, error)
91-
9288
// Map of the validators in this subnet
9389
GetMap(subnetID ids.ID) map[ids.NodeID]*GetValidatorOutput
9490

@@ -257,21 +253,6 @@ func (m *manager) Sample(subnetID ids.ID, size int) ([]ids.NodeID, error) {
257253
return set.Sample(size)
258254
}
259255

260-
func (m *manager) UniformSample(subnetID ids.ID, size int) ([]ids.NodeID, error) {
261-
if size == 0 {
262-
return nil, nil
263-
}
264-
265-
m.lock.RLock()
266-
set, exists := m.subnetToVdrs[subnetID]
267-
m.lock.RUnlock()
268-
if !exists {
269-
return nil, ErrMissingValidators
270-
}
271-
272-
return set.UniformSample(size)
273-
}
274-
275256
func (m *manager) GetMap(subnetID ids.ID) map[ids.NodeID]*GetValidatorOutput {
276257
m.lock.RLock()
277258
set, exists := m.subnetToVdrs[subnetID]

snow/validators/set.go

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -243,13 +243,6 @@ func (s *vdrSet) Sample(size int) ([]ids.NodeID, error) {
243243
return s.sample(size)
244244
}
245245

246-
func (s *vdrSet) UniformSample(size int) ([]ids.NodeID, error) {
247-
s.lock.RLock()
248-
defer s.lock.RUnlock()
249-
250-
return s.uniformSample(size)
251-
}
252-
253246
func (s *vdrSet) sample(size int) ([]ids.NodeID, error) {
254247
if !s.samplerInitialized {
255248
if err := s.sampler.Initialize(s.weights); err != nil {
@@ -270,22 +263,6 @@ func (s *vdrSet) sample(size int) ([]ids.NodeID, error) {
270263
return list, nil
271264
}
272265

273-
func (s *vdrSet) uniformSample(size int) ([]ids.NodeID, error) {
274-
uniform := sampler.NewUniform()
275-
uniform.Initialize(uint64(len(s.vdrSlice)))
276-
277-
indices, err := uniform.Sample(size)
278-
if err != nil {
279-
return nil, err
280-
}
281-
282-
list := make([]ids.NodeID, size)
283-
for i, index := range indices {
284-
list[i] = s.vdrSlice[index].NodeID
285-
}
286-
return list, nil
287-
}
288-
289266
func (s *vdrSet) TotalWeight() (uint64, error) {
290267
s.lock.RLock()
291268
defer s.lock.RUnlock()

utils/set/set.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ func (s Set[_]) MarshalJSON() ([]byte, error) {
184184
return jsonBuf.Bytes(), errs.Err
185185
}
186186

187-
// Returns an element. If the set is empty, returns false
187+
// Returns a random element. If the set is empty, returns false
188188
func (s *Set[T]) Peek() (T, bool) {
189189
for elt := range *s {
190190
return elt, true

0 commit comments

Comments
 (0)