8
8
"time"
9
9
)
10
10
11
- // PreemptionSignal signals when to preempt the pendingTimeout of the timeout handler.
11
+ // PreemptionSignal signals when to preempt the pendingTimeoutToken of the timeout handler.
12
12
type PreemptionSignal struct {
13
13
activateOnce sync.Once
14
14
initOnce sync.Once
@@ -37,38 +37,39 @@ func (ps *PreemptionSignal) Preempt() {
37
37
// Only a single timeout can be pending to be scheduled at any given time.
38
38
// Once a preemption signal is closed, all timeouts are immediately dispatched.
39
39
type timeoutScheduler struct {
40
- newTimer func (duration time.Duration ) * time.Timer
41
- onTimeout func ()
42
- preemptionSignal <- chan struct {}
43
- pendingTimeout chan struct {}
40
+ newTimer func (duration time.Duration ) * time.Timer
41
+ onTimeout func ()
42
+ preemptionSignal <- chan struct {}
43
+ pendingTimeoutToken chan struct {}
44
44
}
45
45
46
46
// NewTimeoutScheduler constructs a new timeout scheduler with the given function to be invoked upon a timeout,
47
47
// unless the preemptionSignal is closed and in which case it invokes the function immediately.
48
- func NewTimeoutScheduler (onTimeout func (), preemptionSignal <- chan struct {}, newTimer func ( duration time. Duration ) * time. Timer ) * timeoutScheduler {
48
+ func NewTimeoutScheduler (onTimeout func (), preemptionSignal <- chan struct {}) * timeoutScheduler {
49
49
pendingTimout := make (chan struct {}, 1 )
50
50
pendingTimout <- struct {}{}
51
51
return & timeoutScheduler {
52
- preemptionSignal : preemptionSignal ,
53
- newTimer : newTimer ,
54
- onTimeout : onTimeout ,
55
- pendingTimeout : pendingTimout ,
52
+ preemptionSignal : preemptionSignal ,
53
+ newTimer : time . NewTimer ,
54
+ onTimeout : onTimeout ,
55
+ pendingTimeoutToken : pendingTimout ,
56
56
}
57
57
}
58
58
59
59
// RegisterTimeout fires the function the timeout scheduler is initialized with no later than the given timeout.
60
60
func (th * timeoutScheduler ) RegisterTimeout (d time.Duration ) {
61
- acquiredToken := th .acquirePendingTimeoutToken ()
62
- preempted := th .preempted ()
63
-
64
- if ! preempted && ! acquiredToken {
61
+ // There can only be a single timeout pending at any time, and once a timeout is scheduled,
62
+ // we prevent future timeouts to be scheduled until the timeout triggers by taking the pendingTimeoutToken.
63
+ // Any subsequent attempt to register a timeout would fail obtaining the pendingTimeoutToken,
64
+ // and return.
65
+ if ! th .acquirePendingTimeoutToken () {
65
66
return
66
67
}
67
68
68
- go th .scheduleTimeout (d , acquiredToken )
69
+ go th .scheduleTimeout (d )
69
70
}
70
71
71
- func (th * timeoutScheduler ) scheduleTimeout (d time.Duration , acquiredToken bool ) {
72
+ func (th * timeoutScheduler ) scheduleTimeout (d time.Duration ) {
72
73
timer := th .newTimer (d )
73
74
defer timer .Stop ()
74
75
@@ -79,37 +80,32 @@ func (th *timeoutScheduler) scheduleTimeout(d time.Duration, acquiredToken bool)
79
80
case <- th .preemptionSignal :
80
81
}
81
82
82
- if acquiredToken {
83
- th .relinquishPendingTimeoutToken ()
84
- }
85
- }
86
-
87
- func (th * timeoutScheduler ) preempted () bool {
88
- select {
89
- case <- th .preemptionSignal :
90
- return true
91
- default :
92
- return false
93
- }
83
+ // Relinquish the pendingTimeoutToken.
84
+ // This is needed to be done before onTimeout() is invoked,
85
+ // and that's why onTimeout() is deferred to be called at the end of the function.
86
+ // If we trigger the timeout prematurely before we relinquish the pendingTimeoutToken,
87
+ // A subsequent timeout scheduling attempt that originates from the triggering of the current timeout
88
+ // will fail, as the pendingTimeoutToken is not yet available.
89
+ th .pendingTimeoutToken <- struct {}{}
94
90
}
95
91
96
92
func (th * timeoutScheduler ) acquirePendingTimeoutToken () bool {
97
93
select {
98
- case <- th .pendingTimeout :
94
+ case <- th .pendingTimeoutToken :
99
95
return true
100
96
default :
101
97
return false
102
98
}
103
99
}
104
100
105
- func (th * timeoutScheduler ) relinquishPendingTimeoutToken () {
106
- th .pendingTimeout <- struct {}{}
107
- }
108
-
109
101
// TimeoutRegistrar describes the standard interface for specifying a timeout
110
102
type TimeoutRegistrar interface {
111
- // RegisterTimeout specifies how much time to delay the next timeout message
112
- // by. If the subnet has been bootstrapped, the timeout will fire
113
- // immediately via calling Preempt().
103
+ // RegisterTimeout specifies how much time to delay the next timeout message by.
104
+ //
105
+ // If there is already a pending timeout message, this call is a no-op.
106
+ // However, it is guaranteed that the timeout will fire at least once after
107
+ // calling this function.
108
+ //
109
+ // If the subnet has been bootstrapped, the timeout will fire immediately via calling Preempt().
114
110
RegisterTimeout (time.Duration )
115
111
}
0 commit comments