Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ go 1.21
require (
github.com/DataDog/zstd v1.5.2
github.com/NYTimes/gziphandler v1.1.1
github.com/ava-labs/coreth v0.13.1-rc.5
github.com/ava-labs/coreth v0.13.2-stake-sampling
github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34
github.com/btcsuite/btcd/btcutil v1.1.3
github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811
Expand Down Expand Up @@ -68,7 +68,6 @@ require (
)

require (
github.com/BurntSushi/toml v1.2.1 // indirect
github.com/FactomProject/basen v0.0.0-20150613233007-fe3947df716e // indirect
github.com/FactomProject/btcutilecc v0.0.0-20130527213604-d3a63a5752ec // indirect
github.com/VictoriaMetrics/fastcache v1.10.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3f
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/toml v1.2.1 h1:9F2/+DoOYIOksmaJFPw1tGFy1eDnIJXg+UHjuD8lTak=
github.com/BurntSushi/toml v1.2.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8=
github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/CloudyKit/fastprinter v0.0.0-20200109182630-33d98a066a53/go.mod h1:+3IMCy2vIlbG1XG/0ggNQv0SvxCAIpPM5b1nCz56Xno=
github.com/CloudyKit/jet/v3 v3.0.0/go.mod h1:HKQPgSJmdK8hdoAbKUUWajkHyHo4RaU5rMdUywE7VMo=
Expand All @@ -63,8 +63,8 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8=
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/ava-labs/coreth v0.13.1-rc.5 h1:YcTs9nryZLkf4gPmMyFx1TREFpDTPdg/VCNGGHSF2TY=
github.com/ava-labs/coreth v0.13.1-rc.5/go.mod h1:4y1igTe/sFOIrpAtXoY+AdmfftNHrmrhBBRVfGCAPcw=
github.com/ava-labs/coreth v0.13.2-stake-sampling h1:sCJtRFjvAO384lXKknhSStHDxayLSIOMqoXZ23lc7Rg=
github.com/ava-labs/coreth v0.13.2-stake-sampling/go.mod h1:Q0dtRO3MPa5mDeYXwYdT7600utgR27u16oQTsWXOeiE=
github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34 h1:mg9Uw6oZFJKytJxgxnl3uxZOs/SB8CVHg6Io4Tf99Zc=
github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34/go.mod h1:pJxaT9bUgeRNVmNRgtCHb7sFDIRKy7CzTQVi8gGNT6g=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
Expand Down
105 changes: 48 additions & 57 deletions network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/ava-labs/avalanchego/network/dialer"
"github.com/ava-labs/avalanchego/network/peer"
"github.com/ava-labs/avalanchego/network/throttling"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/snow/networking/router"
"github.com/ava-labs/avalanchego/snow/networking/sender"
"github.com/ava-labs/avalanchego/subnets"
Expand All @@ -47,8 +48,7 @@ const (
)

var (
_ sender.ExternalSender = (*network)(nil)
_ Network = (*network)(nil)
_ Network = (*network)(nil)

errNotValidator = errors.New("node is not a validator")
errNotTracked = errors.New("subnet is not tracked")
Expand Down Expand Up @@ -310,25 +310,40 @@ func NewNetwork(
return n, nil
}

func (n *network) Send(msg message.OutboundMessage, nodeIDs set.Set[ids.NodeID], subnetID ids.ID, allower subnets.Allower) set.Set[ids.NodeID] {
peers := n.getPeers(nodeIDs, subnetID, allower)
n.peerConfig.Metrics.MultipleSendsFailed(
msg.Op(),
nodeIDs.Len()-len(peers),
)
return n.send(msg, peers)
}

func (n *network) Gossip(
func (n *network) Send(
msg message.OutboundMessage,
config common.SendConfig,
subnetID ids.ID,
numValidatorsToSend int,
numNonValidatorsToSend int,
numPeersToSend int,
allower subnets.Allower,
) set.Set[ids.NodeID] {
peers := n.samplePeers(subnetID, numValidatorsToSend, numNonValidatorsToSend, numPeersToSend, allower)
return n.send(msg, peers)
namedPeers := n.getPeers(config.NodeIDs, subnetID, allower)
n.peerConfig.Metrics.MultipleSendsFailed(
msg.Op(),
config.NodeIDs.Len()-len(namedPeers),
)

var (
sampledPeers = n.samplePeers(config, subnetID, allower)
sentTo = set.NewSet[ids.NodeID](len(namedPeers) + len(sampledPeers))
now = n.peerConfig.Clock.Time()
)

// send to peer and update metrics
for _, peers := range [][]peer.Peer{namedPeers, sampledPeers} {
for _, peer := range peers {
if peer.Send(n.onCloseCtx, msg) {
sentTo.Add(peer.ID())

// TODO: move send fail rate calculations into the peer metrics
// record metrics for success
n.sendFailRateCalculator.Observe(0, now)
} else {
// record metrics for failure
n.sendFailRateCalculator.Observe(1, now)
}
}
}
return sentTo
}

// HealthCheck returns information about several network layer health checks.
Expand Down Expand Up @@ -696,24 +711,19 @@ func (n *network) getPeers(
}

func (n *network) samplePeers(
config common.SendConfig,
subnetID ids.ID,
numValidatorsToSample,
numNonValidatorsToSample int,
numPeersToSample int,
allower subnets.Allower,
) []peer.Peer {
// If there are fewer validators than [numValidatorsToSample], then only
// sample [numValidatorsToSample] validators.
subnetValidatorsLen := n.config.Validators.Count(subnetID)
if subnetValidatorsLen < numValidatorsToSample {
numValidatorsToSample = subnetValidatorsLen
}
numValidatorsToSample := min(config.Validators, n.config.Validators.Count(subnetID))

n.peersLock.RLock()
defer n.peersLock.RUnlock()

return n.connectedPeers.Sample(
numValidatorsToSample+numNonValidatorsToSample+numPeersToSample,
numValidatorsToSample+config.NonValidators+config.Peers,
func(p peer.Peer) bool {
// Only return peers that are tracking [subnetID]
trackedSubnets := p.TrackedSubnets()
Expand All @@ -722,14 +732,20 @@ func (n *network) samplePeers(
}

peerID := p.ID()
// if the peer was already explicitly included, don't include in the
// sample
if config.NodeIDs.Contains(peerID) {
return false
}

_, isValidator := n.config.Validators.GetValidator(subnetID, peerID)
// check if the peer is allowed to connect to the subnet
if !allower.IsAllowed(peerID, isValidator) {
return false
}

if numPeersToSample > 0 {
numPeersToSample--
if config.Peers > 0 {
config.Peers--
return true
}

Expand All @@ -738,37 +754,12 @@ func (n *network) samplePeers(
return numValidatorsToSample >= 0
}

numNonValidatorsToSample--
return numNonValidatorsToSample >= 0
config.NonValidators--
return config.NonValidators >= 0
},
)
}

// send the message to the provided peers.
//
// send takes ownership of the provided message reference. So, the provided
// message should only be inspected if the reference has been externally
// increased.
func (n *network) send(msg message.OutboundMessage, peers []peer.Peer) set.Set[ids.NodeID] {
sentTo := set.NewSet[ids.NodeID](len(peers))
now := n.peerConfig.Clock.Time()

// send to peer and update metrics
for _, peer := range peers {
if peer.Send(n.onCloseCtx, msg) {
sentTo.Add(peer.ID())

// TODO: move send fail rate calculations into the peer metrics
// record metrics for success
n.sendFailRateCalculator.Observe(0, now)
} else {
// record metrics for failure
n.sendFailRateCalculator.Observe(1, now)
}
}
return sentTo
}

func (n *network) disconnectedFromConnecting(nodeID ids.NodeID) {
n.peersLock.Lock()
defer n.peersLock.Unlock()
Expand Down Expand Up @@ -1208,10 +1199,10 @@ func (n *network) runTimers() {
// pullGossipPeerLists requests validators from peers in the network
func (n *network) pullGossipPeerLists() {
peers := n.samplePeers(
common.SendConfig{
Validators: 1,
},
constants.PrimaryNetworkID,
1, // numValidatorsToSample
0, // numNonValidatorsToSample
0, // numPeersToSample
subnets.NoOpAllower,
)

Expand Down
29 changes: 18 additions & 11 deletions network/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/ava-labs/avalanchego/network/peer"
"github.com/ava-labs/avalanchego/network/throttling"
"github.com/ava-labs/avalanchego/proto/pb/p2p"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/snow/networking/router"
"github.com/ava-labs/avalanchego/snow/networking/tracker"
"github.com/ava-labs/avalanchego/snow/uptime"
Expand Down Expand Up @@ -327,7 +328,14 @@ func TestSend(t *testing.T) {
require.NoError(err)

toSend := set.Of(nodeIDs[1])
sentTo := net0.Send(outboundGetMsg, toSend, constants.PrimaryNetworkID, subnets.NoOpAllower)
sentTo := net0.Send(
outboundGetMsg,
common.SendConfig{
NodeIDs: toSend,
},
constants.PrimaryNetworkID,
subnets.NoOpAllower,
)
require.Equal(toSend, sentTo)

inboundGetMsg := <-received
Expand All @@ -339,7 +347,7 @@ func TestSend(t *testing.T) {
wg.Wait()
}

func TestSendAndGossipWithFilter(t *testing.T) {
func TestSendWithFilter(t *testing.T) {
require := require.New(t)

received := make(chan message.InboundMessage)
Expand All @@ -366,21 +374,20 @@ func TestSendAndGossipWithFilter(t *testing.T) {

toSend := set.Of(nodeIDs...)
validNodeID := nodeIDs[1]
sentTo := net0.Send(outboundGetMsg, toSend, constants.PrimaryNetworkID, newNodeIDConnector(validNodeID))
sentTo := net0.Send(
outboundGetMsg,
common.SendConfig{
NodeIDs: toSend,
},
constants.PrimaryNetworkID,
newNodeIDConnector(validNodeID),
)
require.Len(sentTo, 1)
require.Contains(sentTo, validNodeID)

inboundGetMsg := <-received
require.Equal(message.GetOp, inboundGetMsg.Op())

// Test Gossip now
sentTo = net0.Gossip(outboundGetMsg, constants.PrimaryNetworkID, 0, 0, len(nodeIDs), newNodeIDConnector(validNodeID))
require.Len(sentTo, 1)
require.Contains(sentTo, validNodeID)

inboundGetMsg = <-received
require.Equal(message.GetOp, inboundGetMsg.Op())

for _, net := range networks {
net.StartClose()
}
Expand Down
21 changes: 2 additions & 19 deletions network/p2p/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,29 +108,12 @@ func (c *Client) AppRequest(
// AppGossip sends a gossip message to a random set of peers.
func (c *Client) AppGossip(
ctx context.Context,
config common.SendConfig,
appGossipBytes []byte,
numValidators int,
numNonValidators int,
numPeers int,
) error {
return c.sender.SendAppGossip(
ctx,
PrefixMessage(c.handlerPrefix, appGossipBytes),
numValidators,
numNonValidators,
numPeers,
)
}

// AppGossipSpecific sends a gossip message to a predetermined set of peers.
func (c *Client) AppGossipSpecific(
ctx context.Context,
nodeIDs set.Set[ids.NodeID],
appGossipBytes []byte,
) error {
return c.sender.SendAppGossipSpecific(
ctx,
nodeIDs,
config,
PrefixMessage(c.handlerPrefix, appGossipBytes),
)
}
Expand Down
30 changes: 25 additions & 5 deletions network/p2p/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ import (
"github.com/ava-labs/avalanchego/cache"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/utils"
"github.com/ava-labs/avalanchego/utils/bloom"
"github.com/ava-labs/avalanchego/utils/buffer"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/set"
)

const (
Expand Down Expand Up @@ -261,6 +263,7 @@ func (p *PullGossiper[_]) handleResponse(
func NewPushGossiper[T Gossipable](
marshaller Marshaller[T],
mempool Set[T],
validators p2p.ValidatorPortion,
client *p2p.Client,
metrics Metrics,
gossipParams BranchingFactor,
Expand All @@ -287,6 +290,7 @@ func NewPushGossiper[T Gossipable](
return &PushGossiper[T]{
marshaller: marshaller,
set: mempool,
validators: validators,
client: client,
metrics: metrics,
gossipParams: gossipParams,
Expand All @@ -305,6 +309,7 @@ func NewPushGossiper[T Gossipable](
type PushGossiper[T Gossipable] struct {
marshaller Marshaller[T]
set Set[T]
validators p2p.ValidatorPortion
client *p2p.Client
metrics Metrics

Expand All @@ -322,9 +327,20 @@ type PushGossiper[T Gossipable] struct {
}

type BranchingFactor struct {
Validators int
// StakePercentage determines the percentage of stake that should have
// gossip sent to based on the inverse CDF of stake weights. This value does
// not account for the connectivity of the nodes.
StakePercentage float64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: wonder if it makes sense to take the num strategy here like we do for warp?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose we don't need to because we don't require determinism.

// Validators specifies the number of connected validators, in addition to
// any validators sent from the StakePercentage parameter, to send gossip
// to. These validators are sampled uniformly rather than by stake.
Validators int
// NonValidators specifies the number of connected non-validators to send
// gossip to.
NonValidators int
Peers int
// Peers specifies the number of connected validators or non-validators, in
// addition to the number sent due to other configs, to send gossip to.
Peers int
}

func (b *BranchingFactor) Verify() error {
Expand Down Expand Up @@ -460,12 +476,16 @@ func (p *PushGossiper[T]) gossip(
sentCountMetric.Add(float64(len(gossip)))
sentBytesMetric.Add(float64(sentBytes))

validatorsByStake := p.validators.Top(ctx, gossipParams.StakePercentage)
return p.client.AppGossip(
ctx,
common.SendConfig{
NodeIDs: set.Of(validatorsByStake...),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Validators: gossipParams.Validators,
NonValidators: gossipParams.NonValidators,
Peers: gossipParams.Peers,
},
msgBytes,
gossipParams.Validators,
gossipParams.NonValidators,
gossipParams.Peers,
)
}

Expand Down
Loading