Skip to content

Commit bdf0063

Browse files
committed
Make bootstrapping handle its own timeouts
Currently, an engine registers timeouts into the handler, which schedules the timeouts on behalf of the the engine. The handler then notifies the engine when the timeout expired. However, the only engine that uses this mechanism is the bootstrapping engine, and not the other engine types such as the snowman and state sync engines. It therefore makes sense to consolidate the timeout handling instead of delegating them to the handler. By moving the timeout handling closer to the bootstrapper, we can make the API of the common.Engine be slimmer by removing the Timeout() method from it. Signed-off-by: Yacov Manevich <yacov.manevich@avalabs.org>
1 parent 1b6288f commit bdf0063

File tree

18 files changed

+271
-134
lines changed

18 files changed

+271
-134
lines changed

chains/manager.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -962,7 +962,6 @@ func (m *manager) createAvalancheChain(
962962
StartupTracker: startupTracker,
963963
Sender: snowmanMessageSender,
964964
BootstrapTracker: sb,
965-
Timer: h,
966965
PeerTracker: peerTracker,
967966
AncestorsMaxContainersReceived: m.BootstrapAncestorsMaxContainersReceived,
968967
DB: blockBootstrappingDB,
@@ -1357,7 +1356,6 @@ func (m *manager) createSnowmanChain(
13571356
StartupTracker: startupTracker,
13581357
Sender: messageSender,
13591358
BootstrapTracker: sb,
1360-
Timer: h,
13611359
PeerTracker: peerTracker,
13621360
AncestorsMaxContainersReceived: m.BootstrapAncestorsMaxContainersReceived,
13631361
DB: bootstrappingDB,

message/internal_msg_builder.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
var (
1717
disconnected = &Disconnected{}
1818
gossipRequest = &GossipRequest{}
19-
timeout = &Timeout{}
2019

2120
_ fmt.Stringer = (*GetStateSummaryFrontierFailed)(nil)
2221
_ chainIDGetter = (*GetStateSummaryFrontierFailed)(nil)
@@ -50,8 +49,6 @@ var (
5049
_ fmt.Stringer = (*Disconnected)(nil)
5150

5251
_ fmt.Stringer = (*GossipRequest)(nil)
53-
54-
_ fmt.Stringer = (*Timeout)(nil)
5552
)
5653

5754
type GetStateSummaryFrontierFailed struct {
@@ -391,18 +388,3 @@ func InternalGossipRequest(
391388
expiration: mockable.MaxTime,
392389
}
393390
}
394-
395-
type Timeout struct{}
396-
397-
func (Timeout) String() string {
398-
return ""
399-
}
400-
401-
func InternalTimeout(nodeID ids.NodeID) InboundMessage {
402-
return &inboundMessage{
403-
nodeID: nodeID,
404-
op: TimeoutOp,
405-
message: timeout,
406-
expiration: mockable.MaxTime,
407-
}
408-
}

snow/engine/common/bootstrap_tracker.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,7 @@ type BootstrapTracker interface {
1414
// Bootstrapped marks the named chain as being bootstrapped
1515
Bootstrapped(chainID ids.ID)
1616

17-
OnBootstrapCompleted() chan struct{}
17+
// AllBootstrapped returns a channel that is closed when all chains in this
18+
// subnet have been bootstrapped
19+
AllBootstrapped() <-chan struct{}
1820
}

snow/engine/common/engine.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -415,9 +415,6 @@ type InternalHandler interface {
415415
// Notify this engine of peer changes.
416416
validators.Connector
417417

418-
// Notify this engine that a registered timeout has fired.
419-
Timeout(context.Context) error
420-
421418
// Gossip to the network a container on the accepted frontier
422419
Gossip(context.Context) error
423420

snow/engine/common/timer.go

Lines changed: 105 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,113 @@
33

44
package common
55

6-
import "time"
6+
import (
7+
"sync"
8+
"time"
9+
)
710

8-
// Timer describes the standard interface for specifying a timeout
9-
type Timer interface {
11+
// PreemptionSignal signals when to preempt the pendingTimeout of the timeout handler.
12+
type PreemptionSignal struct {
13+
activateOnce sync.Once
14+
initOnce sync.Once
15+
signal chan struct{}
16+
}
17+
18+
func (ps *PreemptionSignal) init() {
19+
ps.signal = make(chan struct{})
20+
}
21+
22+
// Listen returns a read-only channel that is closed when Preempt() is invoked.
23+
func (ps *PreemptionSignal) Listen() <-chan struct{} {
24+
ps.initOnce.Do(ps.init)
25+
return ps.signal
26+
}
27+
28+
// Preempt causes any past and future calls of Listen to return a closed channel.
29+
func (ps *PreemptionSignal) Preempt() {
30+
ps.initOnce.Do(ps.init)
31+
ps.activateOnce.Do(func() {
32+
close(ps.signal)
33+
})
34+
}
35+
36+
// timeoutScheduler schedules timeouts to be dispatched in the future.
37+
// Only a single timeout can be pending to be scheduled at any given time.
38+
// Once a preemption signal is closed, all timeouts are immediately dispatched.
39+
type timeoutScheduler struct {
40+
newTimer func(duration time.Duration) *time.Timer
41+
onTimeout func()
42+
preemptionSignal <-chan struct{}
43+
pendingTimeout chan struct{}
44+
}
45+
46+
// NewTimeoutScheduler constructs a new timeout scheduler with the given function to be invoked upon a timeout,
47+
// unless the preemptionSignal is closed and in which case it invokes the function immediately.
48+
func NewTimeoutScheduler(onTimeout func(), preemptionSignal <-chan struct{}, newTimer func(duration time.Duration) *time.Timer) *timeoutScheduler {
49+
pendingTimout := make(chan struct{}, 1)
50+
pendingTimout <- struct{}{}
51+
return &timeoutScheduler{
52+
preemptionSignal: preemptionSignal,
53+
newTimer: newTimer,
54+
onTimeout: onTimeout,
55+
pendingTimeout: pendingTimout,
56+
}
57+
}
58+
59+
// RegisterTimeout fires the function the timeout scheduler is initialized with no later than the given timeout.
60+
func (th *timeoutScheduler) RegisterTimeout(d time.Duration) {
61+
acquiredToken := th.acquirePendingTimeoutToken()
62+
preempted := th.preempted()
63+
64+
if !preempted && !acquiredToken {
65+
return
66+
}
67+
68+
go th.scheduleTimeout(d, acquiredToken)
69+
}
70+
71+
func (th *timeoutScheduler) scheduleTimeout(d time.Duration, acquiredToken bool) {
72+
timer := th.newTimer(d)
73+
defer timer.Stop()
74+
75+
defer th.onTimeout()
76+
77+
select {
78+
case <-timer.C:
79+
case <-th.preemptionSignal:
80+
}
81+
82+
if acquiredToken {
83+
th.relinquishPendingTimeoutToken()
84+
}
85+
}
86+
87+
func (th *timeoutScheduler) preempted() bool {
88+
select {
89+
case <-th.preemptionSignal:
90+
return true
91+
default:
92+
return false
93+
}
94+
}
95+
96+
func (th *timeoutScheduler) acquirePendingTimeoutToken() bool {
97+
select {
98+
case <-th.pendingTimeout:
99+
return true
100+
default:
101+
return false
102+
}
103+
}
104+
105+
func (th *timeoutScheduler) relinquishPendingTimeoutToken() {
106+
th.pendingTimeout <- struct{}{}
107+
}
108+
109+
// TimeoutRegistrar describes the standard interface for specifying a timeout
110+
type TimeoutRegistrar interface {
10111
// RegisterTimeout specifies how much time to delay the next timeout message
11112
// by. If the subnet has been bootstrapped, the timeout will fire
12-
// immediately.
113+
// immediately via calling Preempt().
13114
RegisterTimeout(time.Duration)
14115
}

snow/engine/common/timer_test.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
2+
// See the file LICENSE for licensing terms.
3+
4+
package common
5+
6+
import (
7+
"sync"
8+
"testing"
9+
"time"
10+
)
11+
12+
func TestTimeoutScheduler(t *testing.T) {
13+
for _, testCase := range []struct {
14+
expectedInvocationCount int
15+
desc string
16+
shouldPreempt bool
17+
clock chan time.Time
18+
initClock func(chan time.Time)
19+
advanceTime func(chan time.Time)
20+
}{
21+
{
22+
desc: "multiple pendingTimeout one after the other with preemption",
23+
expectedInvocationCount: 10,
24+
shouldPreempt: true,
25+
clock: make(chan time.Time, 1),
26+
initClock: func(_ chan time.Time) {},
27+
advanceTime: func(_ chan time.Time) {},
28+
},
29+
{
30+
desc: "multiple pendingTimeout one after the other",
31+
expectedInvocationCount: 10,
32+
clock: make(chan time.Time, 1),
33+
initClock: func(clock chan time.Time) {
34+
clock <- time.Now()
35+
},
36+
advanceTime: func(clock chan time.Time) {
37+
clock <- time.Now()
38+
},
39+
},
40+
} {
41+
t.Run(testCase.desc, func(_ *testing.T) {
42+
// Not enough invocations means the test would stall.
43+
// Too many invocations means a negative counter panic.
44+
var wg sync.WaitGroup
45+
wg.Add(testCase.expectedInvocationCount)
46+
47+
testCase.initClock(testCase.clock)
48+
49+
var preemptionSignal PreemptionSignal
50+
ps := preemptionSignal.Listen()
51+
52+
if testCase.shouldPreempt {
53+
preemptionSignal.Preempt()
54+
}
55+
56+
// Order enforces timeouts to be registered once after another,
57+
// in order to make the tests deterministic.
58+
order := make(chan struct{})
59+
60+
newTimer := func(_ time.Duration) *time.Timer {
61+
// We use a duration of 0 to not leave a lingering timer
62+
// after the test finishes.
63+
// Then we replace the time channel to have control over the timer.
64+
timer := time.NewTimer(0)
65+
timer.C = testCase.clock
66+
return timer
67+
}
68+
69+
onTimeout := func() {
70+
order <- struct{}{}
71+
wg.Done()
72+
testCase.advanceTime(testCase.clock)
73+
}
74+
75+
ts := NewTimeoutScheduler(onTimeout, ps, newTimer)
76+
77+
for i := 0; i < testCase.expectedInvocationCount; i++ {
78+
ts.RegisterTimeout(time.Hour)
79+
<-order
80+
}
81+
82+
wg.Wait()
83+
})
84+
}
85+
}
86+
87+
func TestTimeoutSchedulerConcurrentRegister(_ *testing.T) {
88+
clock := make(chan time.Time, 2)
89+
newTimer := func(_ time.Duration) *time.Timer {
90+
// We use a duration of 0 to not leave a lingering timer
91+
// after the test finishes.
92+
// Then we replace the time channel to have control over the timer.
93+
timer := time.NewTimer(0)
94+
timer.C = clock
95+
return timer
96+
}
97+
98+
var wg sync.WaitGroup
99+
wg.Add(1)
100+
101+
onTimeout := func() {
102+
wg.Done()
103+
}
104+
105+
roChan := make(<-chan struct{})
106+
107+
ts := NewTimeoutScheduler(onTimeout, roChan, newTimer)
108+
109+
ts.RegisterTimeout(time.Hour) // First timeout is registered
110+
ts.RegisterTimeout(time.Hour) // Second should not
111+
112+
// Clock ticks are after registering, in order to ensure onTimeout() isn't fired until second registration is invoked.
113+
clock <- time.Now()
114+
clock <- time.Now()
115+
116+
wg.Wait()
117+
}

snow/engine/common/traced_engine.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -329,13 +329,6 @@ func (e *tracedEngine) Disconnected(ctx context.Context, nodeID ids.NodeID) erro
329329
return e.engine.Disconnected(ctx, nodeID)
330330
}
331331

332-
func (e *tracedEngine) Timeout(ctx context.Context) error {
333-
ctx, span := e.tracer.Start(ctx, "tracedEngine.Timeout")
334-
defer span.End()
335-
336-
return e.engine.Timeout(ctx)
337-
}
338-
339332
func (e *tracedEngine) Gossip(ctx context.Context) error {
340333
ctx, span := e.tracer.Start(ctx, "tracedEngine.Gossip")
341334
defer span.End()

snow/engine/enginetest/bootstrap_tracker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func (s *BootstrapTracker) Bootstrapped(chainID ids.ID) {
5454
}
5555
}
5656

57-
func (s *BootstrapTracker) OnBootstrapCompleted() chan struct{} {
57+
func (s *BootstrapTracker) AllBootstrapped() <-chan struct{} {
5858
if s.OnBootstrapCompletedF != nil {
5959
return s.OnBootstrapCompletedF()
6060
} else if s.CantOnBootstrapCompleted && s.T != nil {

snow/engine/enginetest/engine.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
)
2020

2121
var (
22-
errTimeout = errors.New("unexpectedly called Timeout")
2322
errGossip = errors.New("unexpectedly called Gossip")
2423
errNotify = errors.New("unexpectedly called Notify")
2524
errGetStateSummaryFrontier = errors.New("unexpectedly called GetStateSummaryFrontier")
@@ -189,19 +188,6 @@ func (e *Engine) Start(ctx context.Context, startReqID uint32) error {
189188
return errStart
190189
}
191190

192-
func (e *Engine) Timeout(ctx context.Context) error {
193-
if e.TimeoutF != nil {
194-
return e.TimeoutF(ctx)
195-
}
196-
if !e.CantTimeout {
197-
return nil
198-
}
199-
if e.T != nil {
200-
require.FailNow(e.T, errTimeout.Error())
201-
}
202-
return errTimeout
203-
}
204-
205191
func (e *Engine) Gossip(ctx context.Context) error {
206192
if e.GossipF != nil {
207193
return e.GossipF(ctx)

snow/engine/enginetest/timer.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,8 @@ import (
88
"time"
99

1010
"github.com/stretchr/testify/require"
11-
12-
"github.com/ava-labs/avalanchego/snow/engine/common"
1311
)
1412

15-
var _ common.Timer = (*Timer)(nil)
16-
1713
// Timer is a test timer
1814
type Timer struct {
1915
T *testing.T
@@ -23,11 +19,6 @@ type Timer struct {
2319
RegisterTimeoutF func(time.Duration)
2420
}
2521

26-
// Default set the default callable value to [cant]
27-
func (t *Timer) Default(cant bool) {
28-
t.CantRegisterTimout = cant
29-
}
30-
3122
func (t *Timer) RegisterTimeout(delay time.Duration) {
3223
if t.RegisterTimeoutF != nil {
3324
t.RegisterTimeoutF(delay)

0 commit comments

Comments
 (0)