Skip to content

Commit cc3242f

Browse files
Dynamically update mempool gossip request rate limit (#4162)
Signed-off-by: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Co-authored-by: Stephen Buttolph <stephen@avalabs.org>
1 parent f2e3273 commit cc3242f

File tree

21 files changed

+1108
-264
lines changed

21 files changed

+1108
-264
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ require (
1515
github.com/DataDog/zstd v1.5.2
1616
github.com/StephenButtolph/canoto v0.17.1
1717
github.com/antithesishq/antithesis-sdk-go v0.3.8
18-
github.com/ava-labs/coreth v0.15.3-rc.5
18+
github.com/ava-labs/coreth v0.15.4-0.20250822180419-87fa386ffea9
1919
github.com/ava-labs/ledger-avalanche/go v0.0.0-20241009183145-e6f90a8a1a60
2020
github.com/ava-labs/libevm v1.13.14-0.3.0.rc.6
2121
github.com/btcsuite/btcd/btcutil v1.1.3

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@ github.com/antithesishq/antithesis-sdk-go v0.3.8/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl
7070
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
7171
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
7272
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
73-
github.com/ava-labs/coreth v0.15.3-rc.5 h1:zGk1LaFeZOEkA6uPF1g9BNCjbZOrONx9fm5WtiH5NWg=
74-
github.com/ava-labs/coreth v0.15.3-rc.5/go.mod h1:sEqzSu2f4FJEGFL7CP3zNOQtQ0MupWJdzTp7W65EDf8=
73+
github.com/ava-labs/coreth v0.15.4-0.20250822180419-87fa386ffea9 h1:lg7DWh3WJIVmc8uGsDgsgiUkOWOz5/qRtPJLckhoHuY=
74+
github.com/ava-labs/coreth v0.15.4-0.20250822180419-87fa386ffea9/go.mod h1:ZMzhjHxfW1j0jAajxD7e/VdiTfFe2Ftnir0EOLalZfU=
7575
github.com/ava-labs/firewood-go-ethhash/ffi v0.0.9 h1:zw0g+cUbZDsGdWx1PKmBChkpy+ixL3QgiI86DUOuXvo=
7676
github.com/ava-labs/firewood-go-ethhash/ffi v0.0.9/go.mod h1:cq89ua3iiZ5wPBALTEQS5eG8DIZcs7ov6OiL4YR1BVY=
7777
github.com/ava-labs/ledger-avalanche/go v0.0.0-20241009183145-e6f90a8a1a60 h1:EL66gtXOAwR/4KYBjOV03LTWgkEXvLePribLlJNu4g0=

network/p2p/client.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ type Client struct {
3636
handlerPrefix []byte
3737
router *router
3838
sender common.AppSender
39-
options *clientOptions
39+
// nodeSampler is used to select nodes to route Client.AppRequestAny to
40+
nodeSampler NodeSampler
4041
}
4142

4243
// AppRequestAny issues an AppRequest to an arbitrary node decided by Client.
@@ -47,7 +48,7 @@ func (c *Client) AppRequestAny(
4748
appRequestBytes []byte,
4849
onResponse AppResponseCallback,
4950
) error {
50-
sampled := c.options.nodeSampler.Sample(ctx, 1)
51+
sampled := c.nodeSampler.Sample(ctx, 1)
5152
if len(sampled) != 1 {
5253
return ErrNoPeers
5354
}

network/p2p/gossip/gossip_test.go

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,12 @@ func TestGossiperGossip(t *testing.T) {
113113
responseSender := &enginetest.SenderStub{
114114
SentAppResponse: make(chan []byte, 1),
115115
}
116-
responseNetwork, err := p2p.NewNetwork(logging.NoLog{}, responseSender, prometheus.NewRegistry(), "")
116+
responseNetwork, err := p2p.NewNetwork(
117+
logging.NoLog{},
118+
responseSender,
119+
prometheus.NewRegistry(),
120+
"",
121+
)
117122
require.NoError(err)
118123

119124
responseBloom, err := NewBloomFilter(prometheus.NewRegistry(), "", 1000, 0.01, 0.05)
@@ -149,7 +154,14 @@ func TestGossiperGossip(t *testing.T) {
149154
SentAppRequest: make(chan []byte, 1),
150155
}
151156

152-
requestNetwork, err := p2p.NewNetwork(logging.NoLog{}, requestSender, prometheus.NewRegistry(), "")
157+
peers := &p2p.Peers{}
158+
requestNetwork, err := p2p.NewNetwork(
159+
logging.NoLog{},
160+
requestSender,
161+
prometheus.NewRegistry(),
162+
"",
163+
peers,
164+
)
153165
require.NoError(err)
154166
require.NoError(requestNetwork.Connected(context.Background(), ids.EmptyNodeID, nil))
155167

@@ -163,7 +175,10 @@ func TestGossiperGossip(t *testing.T) {
163175
require.NoError(requestSet.Add(item))
164176
}
165177

166-
requestClient := requestNetwork.NewClient(0x0)
178+
requestClient := requestNetwork.NewClient(
179+
0x0,
180+
p2p.PeerSampler{Peers: peers},
181+
)
167182

168183
require.NoError(err)
169184
gossiper := NewPullGossiper[*testTx](
@@ -526,16 +541,7 @@ func TestPushGossiper(t *testing.T) {
526541
sender := &enginetest.SenderStub{
527542
SentAppGossip: make(chan []byte, 2),
528543
}
529-
network, err := p2p.NewNetwork(
530-
logging.NoLog{},
531-
sender,
532-
prometheus.NewRegistry(),
533-
"",
534-
)
535-
require.NoError(err)
536-
client := network.NewClient(0)
537544
validators := p2p.NewValidators(
538-
&p2p.Peers{},
539545
logging.NoLog{},
540546
constants.PrimaryNetworkID,
541547
&validatorstest.State{
@@ -548,6 +554,15 @@ func TestPushGossiper(t *testing.T) {
548554
},
549555
time.Hour,
550556
)
557+
network, err := p2p.NewNetwork(
558+
logging.NoLog{},
559+
sender,
560+
prometheus.NewRegistry(),
561+
"",
562+
validators,
563+
)
564+
require.NoError(err)
565+
client := network.NewClient(0, p2p.PeerSampler{Peers: &p2p.Peers{}})
551566
metrics, err := NewMetrics(prometheus.NewRegistry(), "")
552567
require.NoError(err)
553568
marshaller := testMarshaller{}
@@ -621,6 +636,10 @@ type testValidatorSet struct {
621636
validators set.Set[ids.NodeID]
622637
}
623638

639+
func (t testValidatorSet) Len(context.Context) int {
640+
return len(t.validators)
641+
}
642+
624643
func (t testValidatorSet) Has(_ context.Context, nodeID ids.NodeID) bool {
625644
return t.validators.Contains(nodeID)
626645
}

network/p2p/handler.go

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,13 @@ package p2p
55

66
import (
77
"context"
8+
"errors"
9+
"fmt"
10+
"math"
11+
"sync"
812
"time"
913

14+
"github.com/prometheus/client_golang/prometheus"
1015
"go.uber.org/zap"
1116

1217
"github.com/ava-labs/avalanchego/ids"
@@ -27,6 +32,9 @@ var (
2732
_ Handler = (*NoOpHandler)(nil)
2833
_ Handler = (*TestHandler)(nil)
2934
_ Handler = (*ValidatorHandler)(nil)
35+
36+
errPeriodMustBePositive = errors.New("period must be positive")
37+
errRequestsPerPeerMustBeNonNegative = errors.New("requests-per-peer must be non-negative")
3038
)
3139

3240
// Handler is the server-side logic for virtual machine application protocols.
@@ -57,6 +65,117 @@ func (NoOpHandler) AppRequest(context.Context, ids.NodeID, time.Time, []byte) ([
5765
return nil, nil
5866
}
5967

68+
type DynamicThrottlerHandler struct {
69+
handler *ThrottlerHandler
70+
validatorSet ValidatorSet
71+
requestsPerPeer float64
72+
73+
throttler *SlidingWindowThrottler
74+
throttleLimitMetric prometheus.Gauge
75+
lock sync.Mutex
76+
prevNumConnectedValidators int
77+
}
78+
79+
func (d *DynamicThrottlerHandler) AppGossip(
80+
ctx context.Context,
81+
nodeID ids.NodeID,
82+
gossipBytes []byte,
83+
) {
84+
d.checkUpdateThrottlingLimit(ctx)
85+
86+
d.handler.AppGossip(ctx, nodeID, gossipBytes)
87+
}
88+
89+
func (d *DynamicThrottlerHandler) AppRequest(
90+
ctx context.Context,
91+
nodeID ids.NodeID,
92+
deadline time.Time,
93+
requestBytes []byte,
94+
) ([]byte, *common.AppError) {
95+
d.checkUpdateThrottlingLimit(ctx)
96+
97+
return d.handler.AppRequest(ctx, nodeID, deadline, requestBytes)
98+
}
99+
100+
func (d *DynamicThrottlerHandler) checkUpdateThrottlingLimit(ctx context.Context) {
101+
d.lock.Lock()
102+
defer d.lock.Unlock()
103+
104+
numValidators := d.validatorSet.Len(ctx)
105+
106+
if numValidators == d.prevNumConnectedValidators {
107+
return
108+
}
109+
110+
d.prevNumConnectedValidators = numValidators
111+
112+
if numValidators == 0 {
113+
d.setLimit(0)
114+
return
115+
}
116+
117+
n := float64(numValidators)
118+
119+
// guaranteed to not overflow an int
120+
expectedSamples := d.requestsPerPeer / n
121+
variance := d.requestsPerPeer * (n - 1) / (n * n)
122+
stdDeviation := math.Sqrt(variance)
123+
124+
// Throttle anything beyond 4 standard deviations which should throttle
125+
// anything beyond the 99.994 percentile of expected requests.
126+
limit := expectedSamples + 4*stdDeviation
127+
d.setLimit(limit)
128+
}
129+
130+
func (d *DynamicThrottlerHandler) setLimit(limit float64) {
131+
d.throttler.setLimit(limit)
132+
d.throttleLimitMetric.Set(limit)
133+
}
134+
135+
// NewDynamicThrottlerHandler wraps a handler with defaults.
136+
// Period is the throttling evaluation period during which this node is
137+
// expecting each peer to make requestsPerPeer requests to the network. The
138+
// throttling limit is dynamically updated to be inversely proportional to the
139+
// number of connected network validators.
140+
func NewDynamicThrottlerHandler(
141+
log logging.Logger,
142+
handler Handler,
143+
validatorSet ValidatorSet,
144+
period time.Duration,
145+
requestsPerPeer float64,
146+
metrics prometheus.Registerer,
147+
namespace string,
148+
) (*DynamicThrottlerHandler, error) {
149+
if period <= 0 {
150+
return nil, errPeriodMustBePositive
151+
}
152+
153+
if math.IsNaN(requestsPerPeer) || requestsPerPeer < 0 {
154+
return nil, errRequestsPerPeerMustBeNonNegative
155+
}
156+
157+
// Throttling limit will be initialized when a request is handled
158+
throttler := NewSlidingWindowThrottler(period, 0)
159+
160+
throttleLimitMetric := prometheus.NewGauge(prometheus.GaugeOpts{
161+
Namespace: namespace,
162+
Name: "throttle_limit",
163+
Help: "maximum number of requests per peer for a single throttling period",
164+
})
165+
166+
if err := metrics.Register(throttleLimitMetric); err != nil {
167+
return nil, fmt.Errorf("failed to register throttle limit metric: %w", err)
168+
}
169+
170+
return &DynamicThrottlerHandler{
171+
handler: NewThrottlerHandler(handler, throttler, log),
172+
validatorSet: validatorSet,
173+
requestsPerPeer: requestsPerPeer,
174+
throttler: throttler,
175+
throttleLimitMetric: throttleLimitMetric,
176+
}, nil
177+
}
178+
60179
func NewValidatorHandler(
61180
handler Handler,
62181
validatorSet ValidatorSet,

0 commit comments

Comments
 (0)