Skip to content

Replace periodic push accepted gossip with pull preference gossip for block discovery #2367

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 58 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
90c5ea4
Replace periodic push gossip with pull gossip
StephenButtolph Nov 23, 2023
231f629
Add timestamps during accept
StephenButtolph Nov 23, 2023
d2365ba
lint
StephenButtolph Nov 23, 2023
cdf0406
Add metric for duration between block timestamp and acceptance time
StephenButtolph Nov 24, 2023
87bbdc1
merged
StephenButtolph Nov 24, 2023
05adad5
nit
StephenButtolph Nov 24, 2023
ca9e454
reduce diff
StephenButtolph Nov 24, 2023
edd6c67
adjust gossip breadth to account for increased frequency
StephenButtolph Nov 24, 2023
7cace9a
Separate new config
StephenButtolph Nov 24, 2023
4af40f4
cleanup
StephenButtolph Nov 24, 2023
25a7199
merged
StephenButtolph Nov 24, 2023
369134a
merged
StephenButtolph Nov 24, 2023
e2df3f0
merged
StephenButtolph Nov 24, 2023
3e9516d
fix tests
StephenButtolph Nov 24, 2023
008011c
increase number of queried nodes
StephenButtolph Nov 24, 2023
df92044
Send fetch preference rather than last accepted
StephenButtolph Nov 24, 2023
85cc98b
renames
StephenButtolph Nov 24, 2023
ceee9aa
nit
StephenButtolph Nov 24, 2023
77c035a
nit
StephenButtolph Nov 24, 2023
5891bba
nit skip gossip request when already polling
StephenButtolph Nov 24, 2023
23988e1
more gossip
StephenButtolph Nov 25, 2023
3918e21
nit
StephenButtolph Nov 25, 2023
380dde2
address nits
StephenButtolph Nov 26, 2023
f752ea8
Add metric to track the stake weight of block providers
StephenButtolph Nov 27, 2023
199848a
WIP add bimap
StephenButtolph Nov 27, 2023
618eac0
merged
StephenButtolph Nov 28, 2023
43faeb9
Implement generic bimap
StephenButtolph Nov 28, 2023
613c28b
docs
StephenButtolph Nov 28, 2023
332c70b
Merge branch 'dev' into bimap
StephenButtolph Nov 28, 2023
6cb48c4
tests
StephenButtolph Nov 28, 2023
01c77be
minimize diff
StephenButtolph Nov 28, 2023
cdf6f41
nit
StephenButtolph Nov 28, 2023
b69eb29
nit
StephenButtolph Nov 28, 2023
e5bd019
Unexport RequestID from snowman engine
StephenButtolph Nov 28, 2023
4f15756
Add block source metrics to monitor gossip
StephenButtolph Nov 28, 2023
86c4661
merged
StephenButtolph Nov 28, 2023
38f29fb
Merge branch 'dev' into add-stake-weight-metric
StephenButtolph Nov 28, 2023
de78ec8
Merge branch 'unexport-request-id' into track-block-request-origin
StephenButtolph Nov 28, 2023
66e9283
Merge branch 'track-block-request-origin' into gossip-source-metrics
StephenButtolph Nov 28, 2023
7cc241c
merged
StephenButtolph Nov 28, 2023
2f61dd1
fix
StephenButtolph Nov 28, 2023
81d6c00
Merge branch 'add-stake-weight-metric' of github.com:ava-labs/avalanc…
StephenButtolph Nov 28, 2023
3234dde
merged
StephenButtolph Nov 28, 2023
7b29a34
merged
StephenButtolph Nov 28, 2023
07058ea
merged
StephenButtolph Nov 28, 2023
285b645
fix merge
StephenButtolph Nov 28, 2023
bcdb9e9
nit
StephenButtolph Nov 28, 2023
43f4683
Merge branch 'track-block-request-origin' into gossip-source-metrics
StephenButtolph Nov 28, 2023
6b79747
Merge branch 'gossip-source-metrics' into replace-periodic-gossip-pre…
StephenButtolph Nov 28, 2023
6097f29
merged
StephenButtolph Nov 28, 2023
dab7a8f
Merge branch 'track-block-request-origin' into gossip-source-metrics
StephenButtolph Nov 28, 2023
6e254d3
Merge branch 'gossip-source-metrics' into replace-periodic-gossip-pre…
StephenButtolph Nov 28, 2023
06e791b
Use uniform pull gossip (#2388)
StephenButtolph Nov 29, 2023
ec35e27
merged
StephenButtolph Nov 29, 2023
83e5ec9
add comment
StephenButtolph Nov 29, 2023
b82b8e0
Merge branch 'dev' into replace-periodic-gossip-preference
StephenButtolph Nov 29, 2023
7ec8852
nit
StephenButtolph Nov 29, 2023
1db911f
Merge branch 'replace-periodic-gossip-preference' of github.com:ava-l…
StephenButtolph Nov 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions chains/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ type ManagerConfig struct {
MeterVMEnabled bool // Should each VM be wrapped with a MeterVM
Metrics metrics.MultiGatherer

AcceptedFrontierGossipFrequency time.Duration
ConsensusAppConcurrency int
FrontierPollFrequency time.Duration
ConsensusAppConcurrency int

// Max Time to spend fetching a container and its
// ancestors when responding to a GetAncestors
Expand Down Expand Up @@ -824,7 +824,7 @@ func (m *manager) createAvalancheChain(
ctx,
vdrs,
msgChan,
m.AcceptedFrontierGossipFrequency,
m.FrontierPollFrequency,
m.ConsensusAppConcurrency,
m.ResourceTracker,
validators.UnhandledSubnetConnector, // avalanche chains don't use subnet connector
Expand Down Expand Up @@ -1166,7 +1166,7 @@ func (m *manager) createSnowmanChain(
ctx,
vdrs,
msgChan,
m.AcceptedFrontierGossipFrequency,
m.FrontierPollFrequency,
m.ConsensusAppConcurrency,
m.ResourceTracker,
subnetConnector,
Expand Down
17 changes: 12 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ const (
subnetConfigFileExt = ".json"
ipResolutionTimeout = 30 * time.Second

ipcDeprecationMsg = "IPC API is deprecated"
keystoreDeprecationMsg = "keystore API is deprecated"
ipcDeprecationMsg = "IPC API is deprecated"
keystoreDeprecationMsg = "keystore API is deprecated"
acceptedFrontierGossipDeprecationMsg = "push-based accepted frontier gossip is deprecated"
)

var (
Expand All @@ -72,6 +73,12 @@ var (
IpcsChainIDsKey: ipcDeprecationMsg,
IpcsPathKey: ipcDeprecationMsg,
KeystoreAPIEnabledKey: keystoreDeprecationMsg,
ConsensusGossipAcceptedFrontierValidatorSizeKey: acceptedFrontierGossipDeprecationMsg,
ConsensusGossipAcceptedFrontierNonValidatorSizeKey: acceptedFrontierGossipDeprecationMsg,
ConsensusGossipAcceptedFrontierPeerSizeKey: acceptedFrontierGossipDeprecationMsg,
ConsensusGossipOnAcceptValidatorSizeKey: acceptedFrontierGossipDeprecationMsg,
ConsensusGossipOnAcceptNonValidatorSizeKey: acceptedFrontierGossipDeprecationMsg,
ConsensusGossipOnAcceptPeerSizeKey: acceptedFrontierGossipDeprecationMsg,
}

errSybilProtectionDisabledStakerWeights = errors.New("sybil protection disabled weights must be positive")
Expand Down Expand Up @@ -1320,9 +1327,9 @@ func GetNodeConfig(v *viper.Viper) (node.Config, error) {
}

// Gossiping
nodeConfig.AcceptedFrontierGossipFrequency = v.GetDuration(ConsensusAcceptedFrontierGossipFrequencyKey)
if nodeConfig.AcceptedFrontierGossipFrequency < 0 {
return node.Config{}, fmt.Errorf("%s must be >= 0", ConsensusAcceptedFrontierGossipFrequencyKey)
nodeConfig.FrontierPollFrequency = v.GetDuration(ConsensusFrontierPollFrequencyKey)
if nodeConfig.FrontierPollFrequency < 0 {
return node.Config{}, fmt.Errorf("%s must be >= 0", ConsensusFrontierPollFrequencyKey)
}

// App handling
Expand Down
2 changes: 1 addition & 1 deletion config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,9 @@ func addNodeFlags(fs *pflag.FlagSet) {
fs.Duration(BenchlistMinFailingDurationKey, constants.DefaultBenchlistMinFailingDuration, "Minimum amount of time messages to a peer must be failing before the peer is benched")

// Router
fs.Duration(ConsensusAcceptedFrontierGossipFrequencyKey, constants.DefaultAcceptedFrontierGossipFrequency, "Frequency of gossiping accepted frontiers")
fs.Uint(ConsensusAppConcurrencyKey, constants.DefaultConsensusAppConcurrency, "Maximum number of goroutines to use when handling App messages on a chain")
fs.Duration(ConsensusShutdownTimeoutKey, constants.DefaultConsensusShutdownTimeout, "Timeout before killing an unresponsive chain")
fs.Duration(ConsensusFrontierPollFrequencyKey, constants.DefaultFrontierPollFrequency, "Frequency of polling for new consensus frontiers")
fs.Uint(ConsensusGossipAcceptedFrontierValidatorSizeKey, constants.DefaultConsensusGossipAcceptedFrontierValidatorSize, "Number of validators to gossip to when gossiping accepted frontier")
fs.Uint(ConsensusGossipAcceptedFrontierNonValidatorSizeKey, constants.DefaultConsensusGossipAcceptedFrontierNonValidatorSize, "Number of non-validators to gossip to when gossiping accepted frontier")
fs.Uint(ConsensusGossipAcceptedFrontierPeerSizeKey, constants.DefaultConsensusGossipAcceptedFrontierPeerSize, "Number of peers to gossip to when gossiping accepted frontier")
Expand Down
4 changes: 2 additions & 2 deletions config/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,9 @@ const (
IpcsChainIDsKey = "ipcs-chain-ids"
IpcsPathKey = "ipcs-path"
MeterVMsEnabledKey = "meter-vms-enabled"
ConsensusAcceptedFrontierGossipFrequencyKey = "consensus-accepted-frontier-gossip-frequency"
ConsensusAppConcurrencyKey = "consensus-app-concurrency"
ConsensusShutdownTimeoutKey = "consensus-shutdown-timeout"
ConsensusFrontierPollFrequencyKey = "consensus-frontier-poll-frequency"
ConsensusGossipAcceptedFrontierValidatorSizeKey = "consensus-accepted-frontier-gossip-validator-size"
ConsensusGossipAcceptedFrontierNonValidatorSizeKey = "consensus-accepted-frontier-gossip-non-validator-size"
ConsensusGossipAcceptedFrontierPeerSizeKey = "consensus-accepted-frontier-gossip-peer-size"
Expand All @@ -154,7 +155,6 @@ const (
AppGossipValidatorSizeKey = "consensus-app-gossip-validator-size"
AppGossipNonValidatorSizeKey = "consensus-app-gossip-non-validator-size"
AppGossipPeerSizeKey = "consensus-app-gossip-peer-size"
ConsensusShutdownTimeoutKey = "consensus-shutdown-timeout"
ProposerVMUseCurrentHeightKey = "proposervm-use-current-height"
FdLimitKey = "fd-limit"
IndexEnabledKey = "index-enabled"
Expand Down
4 changes: 2 additions & 2 deletions node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ type Config struct {
ConsensusRouter router.Router `json:"-"`
RouterHealthConfig router.HealthConfig `json:"routerHealthConfig"`
ConsensusShutdownTimeout time.Duration `json:"consensusShutdownTimeout"`
// Gossip a container in the accepted frontier every [AcceptedFrontierGossipFrequency]
AcceptedFrontierGossipFrequency time.Duration `json:"consensusGossipFreq"`
// Poll for new frontiers every [FrontierPollFrequency]
FrontierPollFrequency time.Duration `json:"consensusGossipFreq"`
// ConsensusAppConcurrency defines the maximum number of goroutines to
// handle App messages per chain.
ConsensusAppConcurrency int `json:"consensusAppConcurrency"`
Expand Down
2 changes: 1 addition & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1014,7 +1014,7 @@ func (n *Node) initChainManager(avaxAssetID ids.ID) error {
Metrics: n.MetricsGatherer,
SubnetConfigs: n.Config.SubnetConfigs,
ChainConfigs: n.Config.ChainConfigs,
AcceptedFrontierGossipFrequency: n.Config.AcceptedFrontierGossipFrequency,
FrontierPollFrequency: n.Config.FrontierPollFrequency,
ConsensusAppConcurrency: n.Config.ConsensusAppConcurrency,
BootstrapMaxTimeGetAncestors: n.Config.BootstrapMaxTimeGetAncestors,
BootstrapAncestorsMaxContainersSent: n.Config.BootstrapAncestorsMaxContainersSent,
Expand Down
4 changes: 4 additions & 0 deletions node/overridden_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ func (o *overriddenManager) Sample(_ ids.ID, size int) ([]ids.NodeID, error) {
return o.manager.Sample(o.subnetID, size)
}

func (o *overriddenManager) UniformSample(_ ids.ID, size int) ([]ids.NodeID, error) {
return o.manager.UniformSample(o.subnetID, size)
}

func (o *overriddenManager) GetMap(ids.ID) map[ids.NodeID]*validators.GetValidatorOutput {
return o.manager.GetMap(o.subnetID)
}
Expand Down
97 changes: 74 additions & 23 deletions snow/engine/snowman/transitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,14 @@ import (
"github.com/ava-labs/avalanchego/utils/wrappers"
)

const nonVerifiedCacheSize = 64 * units.MiB
const (
nonVerifiedCacheSize = 64 * units.MiB

// putGossipPeriod specifies the number of times Gossip will be called per
// Put gossip. This is done to avoid splitting Gossip into multiple
// functions and to allow more frequent pull gossip than push gossip.
putGossipPeriod = 10
)

var _ Engine = (*Transitive)(nil)

Expand Down Expand Up @@ -63,6 +70,8 @@ type Transitive struct {

requestID uint32

gossipCounter int

// track outstanding preference requests
polls poll.Set

Expand Down Expand Up @@ -151,6 +160,69 @@ func newTransitive(config Config) (*Transitive, error) {
return t, t.metrics.Initialize("", config.Ctx.Registerer)
}

func (t *Transitive) Gossip(ctx context.Context) error {
lastAcceptedID, lastAcceptedHeight := t.Consensus.LastAccepted()
if numProcessing := t.Consensus.NumProcessing(); numProcessing == 0 {
t.Ctx.Log.Verbo("sampling from validators",
zap.Stringer("validators", t.Validators),
)

// Uniform sampling is used here to reduce bandwidth requirements of
// nodes with a large amount of stake weight.
vdrIDs, err := t.Validators.UniformSample(t.Ctx.SubnetID, 1)
if err != nil {
t.Ctx.Log.Error("skipping block gossip",
zap.String("reason", "no validators"),
zap.Error(err),
)
return nil
}

nextHeightToAccept, err := math.Add64(lastAcceptedHeight, 1)
if err != nil {
t.Ctx.Log.Error("skipping block gossip",
zap.String("reason", "block height overflow"),
zap.Stringer("blkID", lastAcceptedID),
zap.Uint64("lastAcceptedHeight", lastAcceptedHeight),
zap.Error(err),
)
return nil
}

t.requestID++
vdrSet := set.Of(vdrIDs...)
preferredID := t.Consensus.Preference()
t.Sender.SendPullQuery(ctx, vdrSet, t.requestID, preferredID, nextHeightToAccept)
} else {
t.Ctx.Log.Debug("skipping block gossip",
zap.String("reason", "blocks currently processing"),
zap.Int("numProcessing", numProcessing),
)
}

// TODO: Remove periodic push gossip after v1.11.x is activated
t.gossipCounter++
t.gossipCounter %= putGossipPeriod
if t.gossipCounter > 0 {
return nil
}

lastAccepted, err := t.GetBlock(ctx, lastAcceptedID)
if err != nil {
t.Ctx.Log.Warn("dropping gossip request",
zap.String("reason", "block couldn't be loaded"),
zap.Stringer("blkID", lastAcceptedID),
zap.Error(err),
)
return nil
}
t.Ctx.Log.Verbo("gossiping accepted block to the network",
zap.Stringer("blkID", lastAcceptedID),
)
t.Sender.SendGossip(ctx, lastAccepted.Bytes())
return nil
}

func (t *Transitive) Put(ctx context.Context, nodeID ids.NodeID, requestID uint32, blkBytes []byte) error {
blk, err := t.VM.ParseBlock(ctx, blkBytes)
if err != nil {
Expand Down Expand Up @@ -383,28 +455,6 @@ func (*Transitive) Timeout(context.Context) error {
return nil
}

func (t *Transitive) Gossip(ctx context.Context) error {
blkID, err := t.VM.LastAccepted(ctx)
if err != nil {
return err
}

blk, err := t.GetBlock(ctx, blkID)
if err != nil {
t.Ctx.Log.Warn("dropping gossip request",
zap.String("reason", "block couldn't be loaded"),
zap.Stringer("blkID", blkID),
zap.Error(err),
)
return nil
}
t.Ctx.Log.Verbo("gossiping accepted block to the network",
zap.Stringer("blkID", blkID),
)
t.Sender.SendGossip(ctx, blk.Bytes())
return nil
}

func (*Transitive) Halt(context.Context) {}

func (t *Transitive) Shutdown(ctx context.Context) error {
Expand Down Expand Up @@ -873,6 +923,7 @@ func (t *Transitive) sendQuery(
t.Ctx.Log.Error("dropped query for block",
zap.String("reason", "insufficient number of validators"),
zap.Stringer("blkID", blkID),
zap.Int("size", t.Params.K),
)
return
}
Expand Down
12 changes: 6 additions & 6 deletions snow/engine/snowman/transitive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1382,7 +1382,7 @@ func TestEngineUndeclaredDependencyDeadlock(t *testing.T) {
func TestEngineGossip(t *testing.T) {
require := require.New(t)

_, _, sender, vm, te, gBlk := setupDefaultConfig(t)
nodeID, _, sender, vm, te, gBlk := setupDefaultConfig(t)

vm.LastAcceptedF = func(context.Context) (ids.ID, error) {
return gBlk.ID(), nil
Expand All @@ -1392,15 +1392,15 @@ func TestEngineGossip(t *testing.T) {
return gBlk, nil
}

called := new(bool)
sender.SendGossipF = func(_ context.Context, blkBytes []byte) {
*called = true
require.Equal(gBlk.Bytes(), blkBytes)
var calledSendPullQuery bool
sender.SendPullQueryF = func(_ context.Context, nodeIDs set.Set[ids.NodeID], _ uint32, _ ids.ID, _ uint64) {
calledSendPullQuery = true
require.Equal(set.Of(nodeID), nodeIDs)
}

require.NoError(te.Gossip(context.Background()))

require.True(*called)
require.True(calledSendPullQuery)
}

func TestEngineInvalidBlockIgnoredFromUnexpectedPeer(t *testing.T) {
Expand Down
19 changes: 19 additions & 0 deletions snow/validators/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ type Manager interface {
// If sampling the requested size isn't possible, an error will be returned.
Sample(subnetID ids.ID, size int) ([]ids.NodeID, error)

// UniformSample returns a collection of validatorIDs in the subnet.
// If sampling the requested size isn't possible, an error will be returned.
UniformSample(subnetID ids.ID, size int) ([]ids.NodeID, error)

// Map of the validators in this subnet
GetMap(subnetID ids.ID) map[ids.NodeID]*GetValidatorOutput

Expand Down Expand Up @@ -253,6 +257,21 @@ func (m *manager) Sample(subnetID ids.ID, size int) ([]ids.NodeID, error) {
return set.Sample(size)
}

func (m *manager) UniformSample(subnetID ids.ID, size int) ([]ids.NodeID, error) {
if size == 0 {
return nil, nil
}

m.lock.RLock()
set, exists := m.subnetToVdrs[subnetID]
m.lock.RUnlock()
if !exists {
return nil, ErrMissingValidators
}

return set.UniformSample(size)
}

func (m *manager) GetMap(subnetID ids.ID) map[ids.NodeID]*GetValidatorOutput {
m.lock.RLock()
set, exists := m.subnetToVdrs[subnetID]
Expand Down
23 changes: 23 additions & 0 deletions snow/validators/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,13 @@ func (s *vdrSet) Sample(size int) ([]ids.NodeID, error) {
return s.sample(size)
}

func (s *vdrSet) UniformSample(size int) ([]ids.NodeID, error) {
s.lock.RLock()
defer s.lock.RUnlock()

return s.uniformSample(size)
}

func (s *vdrSet) sample(size int) ([]ids.NodeID, error) {
if !s.samplerInitialized {
if err := s.sampler.Initialize(s.weights); err != nil {
Expand All @@ -263,6 +270,22 @@ func (s *vdrSet) sample(size int) ([]ids.NodeID, error) {
return list, nil
}

func (s *vdrSet) uniformSample(size int) ([]ids.NodeID, error) {
uniform := sampler.NewUniform()
uniform.Initialize(uint64(len(s.vdrSlice)))

indices, err := uniform.Sample(size)
if err != nil {
return nil, err
}

list := make([]ids.NodeID, size)
for i, index := range indices {
list[i] = s.vdrSlice[index].NodeID
}
return list, nil
}

func (s *vdrSet) TotalWeight() (uint64, error) {
s.lock.RLock()
defer s.lock.RUnlock()
Expand Down
4 changes: 2 additions & 2 deletions utils/constants/networking.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ const (
DefaultBenchlistMinFailingDuration = 2*time.Minute + 30*time.Second

// Router
DefaultAcceptedFrontierGossipFrequency = 10 * time.Second
DefaultConsensusAppConcurrency = 2
DefaultConsensusShutdownTimeout = time.Minute
DefaultFrontierPollFrequency = 100 * time.Millisecond
DefaultConsensusGossipAcceptedFrontierValidatorSize = 0
DefaultConsensusGossipAcceptedFrontierNonValidatorSize = 0
DefaultConsensusGossipAcceptedFrontierPeerSize = 15
DefaultConsensusGossipAcceptedFrontierPeerSize = 1
DefaultConsensusGossipOnAcceptValidatorSize = 0
DefaultConsensusGossipOnAcceptNonValidatorSize = 0
DefaultConsensusGossipOnAcceptPeerSize = 10
Expand Down