Skip to content

Commit 95edd86

Browse files
authored
Merge branch 'dev' into e2e-add-bootstrap-check
2 parents a0eb69c + 3811802 commit 95edd86

12 files changed

+469
-59
lines changed

network/p2p/throttler.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
2+
// See the file LICENSE for licensing terms.
3+
4+
package p2p
5+
6+
import (
7+
"sync"
8+
"time"
9+
10+
"github.com/ava-labs/avalanchego/ids"
11+
"github.com/ava-labs/avalanchego/utils/timer/mockable"
12+
)
13+
14+
var _ Throttler = (*SlidingWindowThrottler)(nil)
15+
16+
type Throttler interface {
17+
// Handle returns true if a message from [nodeID] should be handled.
18+
Handle(nodeID ids.NodeID) bool
19+
}
20+
21+
// NewSlidingWindowThrottler returns a new instance of SlidingWindowThrottler.
22+
// Nodes are throttled if they exceed [limit] messages during an interval of
23+
// time over [period].
24+
// [period] and [limit] should both be > 0.
25+
func NewSlidingWindowThrottler(period time.Duration, limit int) *SlidingWindowThrottler {
26+
now := time.Now()
27+
return &SlidingWindowThrottler{
28+
period: period,
29+
limit: float64(limit),
30+
windows: [2]window{
31+
{
32+
start: now,
33+
hits: make(map[ids.NodeID]float64),
34+
},
35+
{
36+
start: now.Add(-period),
37+
hits: make(map[ids.NodeID]float64),
38+
},
39+
},
40+
}
41+
}
42+
43+
// window is used internally by SlidingWindowThrottler to represent the amount
44+
// of hits from a node in the evaluation period beginning at [start]
45+
type window struct {
46+
start time.Time
47+
hits map[ids.NodeID]float64
48+
}
49+
50+
// SlidingWindowThrottler is an implementation of the sliding window throttling
51+
// algorithm.
52+
type SlidingWindowThrottler struct {
53+
period time.Duration
54+
limit float64
55+
clock mockable.Clock
56+
57+
lock sync.Mutex
58+
current int
59+
windows [2]window
60+
}
61+
62+
// Handle returns true if the amount of calls received in the last [s.period]
63+
// time is less than [s.limit]
64+
//
65+
// This is calculated by adding the current period's count to a weighted count
66+
// of the previous period.
67+
func (s *SlidingWindowThrottler) Handle(nodeID ids.NodeID) bool {
68+
s.lock.Lock()
69+
defer s.lock.Unlock()
70+
71+
// The current window becomes the previous window if the current evaluation
72+
// period is over
73+
now := s.clock.Time()
74+
sinceUpdate := now.Sub(s.windows[s.current].start)
75+
if sinceUpdate >= 2*s.period {
76+
s.rotate(now.Add(-s.period))
77+
}
78+
if sinceUpdate >= s.period {
79+
s.rotate(now)
80+
sinceUpdate = 0
81+
}
82+
83+
currentHits := s.windows[s.current].hits
84+
current := currentHits[nodeID]
85+
previousFraction := float64(s.period-sinceUpdate) / float64(s.period)
86+
previous := s.windows[1-s.current].hits[nodeID]
87+
estimatedHits := current + previousFraction*previous
88+
if estimatedHits >= s.limit {
89+
// The peer has sent too many requests, drop this request.
90+
return false
91+
}
92+
93+
currentHits[nodeID]++
94+
return true
95+
}
96+
97+
func (s *SlidingWindowThrottler) rotate(t time.Time) {
98+
s.current = 1 - s.current
99+
s.windows[s.current] = window{
100+
start: t,
101+
hits: make(map[ids.NodeID]float64),
102+
}
103+
}

network/p2p/throttler_handler.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
2+
// See the file LICENSE for licensing terms.
3+
4+
package p2p
5+
6+
import (
7+
"context"
8+
"errors"
9+
"fmt"
10+
"time"
11+
12+
"github.com/ava-labs/avalanchego/ids"
13+
)
14+
15+
var (
16+
ErrThrottled = errors.New("throttled")
17+
_ Handler = (*ThrottlerHandler)(nil)
18+
)
19+
20+
type ThrottlerHandler struct {
21+
Handler
22+
Throttler Throttler
23+
}
24+
25+
func (t ThrottlerHandler) AppGossip(ctx context.Context, nodeID ids.NodeID, gossipBytes []byte) error {
26+
if !t.Throttler.Handle(nodeID) {
27+
return fmt.Errorf("dropping message from %s: %w", nodeID, ErrThrottled)
28+
}
29+
30+
return t.Handler.AppGossip(ctx, nodeID, gossipBytes)
31+
}
32+
33+
func (t ThrottlerHandler) AppRequest(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, error) {
34+
if !t.Throttler.Handle(nodeID) {
35+
return nil, fmt.Errorf("dropping message from %s: %w", nodeID, ErrThrottled)
36+
}
37+
38+
return t.Handler.AppRequest(ctx, nodeID, deadline, requestBytes)
39+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
2+
// See the file LICENSE for licensing terms.
3+
4+
package p2p
5+
6+
import (
7+
"context"
8+
"testing"
9+
"time"
10+
11+
"github.com/stretchr/testify/require"
12+
13+
"github.com/ava-labs/avalanchego/ids"
14+
)
15+
16+
func TestThrottlerHandlerAppGossip(t *testing.T) {
17+
tests := []struct {
18+
name string
19+
Throttler Throttler
20+
expectedErr error
21+
}{
22+
{
23+
name: "throttled",
24+
Throttler: NewSlidingWindowThrottler(time.Second, 1),
25+
},
26+
{
27+
name: "throttler errors",
28+
Throttler: NewSlidingWindowThrottler(time.Second, 0),
29+
expectedErr: ErrThrottled,
30+
},
31+
}
32+
for _, tt := range tests {
33+
t.Run(tt.name, func(t *testing.T) {
34+
require := require.New(t)
35+
36+
handler := ThrottlerHandler{
37+
Handler: NoOpHandler{},
38+
Throttler: tt.Throttler,
39+
}
40+
err := handler.AppGossip(context.Background(), ids.GenerateTestNodeID(), []byte("foobar"))
41+
require.ErrorIs(err, tt.expectedErr)
42+
})
43+
}
44+
}
45+
46+
func TestThrottlerHandlerAppRequest(t *testing.T) {
47+
tests := []struct {
48+
name string
49+
Throttler Throttler
50+
expectedErr error
51+
}{
52+
{
53+
name: "throttled",
54+
Throttler: NewSlidingWindowThrottler(time.Second, 1),
55+
},
56+
{
57+
name: "throttler errors",
58+
Throttler: NewSlidingWindowThrottler(time.Second, 0),
59+
expectedErr: ErrThrottled,
60+
},
61+
}
62+
for _, tt := range tests {
63+
t.Run(tt.name, func(t *testing.T) {
64+
require := require.New(t)
65+
66+
handler := ThrottlerHandler{
67+
Handler: NoOpHandler{},
68+
Throttler: tt.Throttler,
69+
}
70+
_, err := handler.AppRequest(context.Background(), ids.GenerateTestNodeID(), time.Time{}, []byte("foobar"))
71+
require.ErrorIs(err, tt.expectedErr)
72+
})
73+
}
74+
}

network/p2p/throttler_test.go

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
2+
// See the file LICENSE for licensing terms.
3+
4+
package p2p
5+
6+
import (
7+
"testing"
8+
"time"
9+
10+
"github.com/stretchr/testify/require"
11+
12+
"github.com/ava-labs/avalanchego/ids"
13+
)
14+
15+
func TestSlidingWindowThrottlerHandle(t *testing.T) {
16+
period := time.Minute
17+
previousWindowStartTime := time.Time{}
18+
currentWindowStartTime := previousWindowStartTime.Add(period)
19+
20+
nodeID := ids.GenerateTestNodeID()
21+
22+
type call struct {
23+
time time.Time
24+
throttled bool
25+
}
26+
27+
tests := []struct {
28+
name string
29+
limit int
30+
calls []call
31+
}{
32+
{
33+
name: "throttled in current window",
34+
limit: 1,
35+
calls: []call{
36+
{
37+
time: currentWindowStartTime,
38+
},
39+
{
40+
time: currentWindowStartTime,
41+
throttled: true,
42+
},
43+
},
44+
},
45+
{
46+
name: "throttled from previous window",
47+
limit: 1,
48+
calls: []call{
49+
{
50+
time: previousWindowStartTime,
51+
},
52+
{
53+
time: currentWindowStartTime,
54+
throttled: true,
55+
},
56+
},
57+
},
58+
{
59+
name: "throttled over multiple evaluation periods",
60+
limit: 5,
61+
calls: []call{
62+
{
63+
time: currentWindowStartTime.Add(30 * time.Second),
64+
},
65+
{
66+
time: currentWindowStartTime.Add(period).Add(1 * time.Second),
67+
},
68+
{
69+
time: currentWindowStartTime.Add(period).Add(2 * time.Second),
70+
},
71+
{
72+
time: currentWindowStartTime.Add(period).Add(3 * time.Second),
73+
},
74+
{
75+
time: currentWindowStartTime.Add(period).Add(4 * time.Second),
76+
},
77+
{
78+
time: currentWindowStartTime.Add(period).Add(30 * time.Second),
79+
},
80+
{
81+
time: currentWindowStartTime.Add(period).Add(30 * time.Second),
82+
throttled: true,
83+
},
84+
{
85+
time: currentWindowStartTime.Add(5 * period),
86+
},
87+
},
88+
},
89+
{
90+
name: "one hit per period",
91+
limit: 2,
92+
calls: []call{
93+
{
94+
time: currentWindowStartTime,
95+
},
96+
{
97+
time: currentWindowStartTime.Add(period).Add(time.Second),
98+
},
99+
{
100+
time: currentWindowStartTime.Add(2 * period).Add(time.Second),
101+
},
102+
{
103+
time: currentWindowStartTime.Add(3 * period).Add(time.Second),
104+
},
105+
{
106+
time: currentWindowStartTime.Add(4 * period).Add(time.Second),
107+
},
108+
},
109+
},
110+
{
111+
// if too much time passes by, a current window might not be a
112+
// valid previous window.
113+
name: "current window needs to be reset",
114+
limit: 1,
115+
calls: []call{
116+
{
117+
time: currentWindowStartTime,
118+
},
119+
{
120+
time: currentWindowStartTime.Add(10 * period),
121+
},
122+
},
123+
},
124+
}
125+
126+
for _, tt := range tests {
127+
t.Run(tt.name, func(t *testing.T) {
128+
require := require.New(t)
129+
throttler := NewSlidingWindowThrottler(period, tt.limit)
130+
throttler.windows[throttler.current].start = currentWindowStartTime
131+
throttler.windows[1-throttler.current].start = previousWindowStartTime
132+
133+
for _, call := range tt.calls {
134+
throttler.clock.Set(call.time)
135+
require.Equal(call.throttled, !throttler.Handle(nodeID))
136+
}
137+
})
138+
}
139+
}

0 commit comments

Comments
 (0)