Skip to content

Commit 7e800e1

Browse files
committed
Fix the race on the timers
Formerly, we were doing a logic like the following: Check the timer in front of the priority queue of timers. If it is expired, call the callback and pop it from the queue. However, this logic is flawed because the timers queue was accessed from multiple threads. So, it may be the case that, we check the timer in front of the queue, run its callback, then, some other thread adds another timer to the queue concurrently and it is put in front of the queue. Now, when the reactor thread pops the item in front of the queue, it will be the newly added timer, before its callback being executed. To solve this, we use a double buffering approach. A thead-safe queue is used to store newly added timers which can be efficiently modified on both ends on multiple threads. Then, the reactor thread periodically pops items from that queue in FIFO order and maintains a min heap using a list with the help of heappush and heappop functions. Only if the min heap has some elements, timers are popped from that and executed. Also, the unneeded `timer_cancelled_cb` is field is removed. We were removing the timer from the queue after canceling it. That was the only usage of this field. We do the same with the new approach, if it is canceled, it will return `True` on `check_timer` call without executing its `timer_ended_cb` and will be removed from the heap.
1 parent aecabc9 commit 7e800e1

File tree

1 file changed

+42
-36
lines changed

1 file changed

+42
-36
lines changed

hazelcast/reactor.py

Lines changed: 42 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,14 @@
99

1010
from collections import deque
1111
from functools import total_ordering
12+
from heapq import heappush, heappop
1213

1314
from hazelcast import six
1415
from hazelcast.config import SSLProtocol
1516
from hazelcast.connection import Connection
1617
from hazelcast.core import Address
1718
from hazelcast.errors import HazelcastError
1819
from hazelcast.future import Future
19-
from hazelcast.six.moves import queue
2020

2121
try:
2222
import ssl
@@ -31,7 +31,8 @@ class AsyncoreReactor(object):
3131
_is_live = False
3232

3333
def __init__(self):
34-
self._timers = queue.PriorityQueue()
34+
self._timers = [] # Accessed only from the reactor thread
35+
self._new_timers = deque() # Popped only from the reactor thread
3536
self._map = {}
3637

3738
def start(self):
@@ -55,28 +56,36 @@ def _loop(self):
5556
_logger.exception("Error in Reactor Thread")
5657
# TODO: shutdown client
5758
return
58-
_logger.debug("Reactor Thread exited. %s" % self._timers.qsize())
59+
_logger.debug("Reactor Thread exited")
5960
self._cleanup_all_timers()
6061

6162
def _check_timers(self):
62-
now = time.time()
63-
while not self._timers.empty():
64-
try:
65-
timer = self._timers.queue[0][1]
66-
except IndexError:
67-
return
68-
69-
if timer.check_timer(now):
70-
try:
71-
self._timers.get_nowait()
72-
except queue.Empty:
73-
pass
74-
else:
75-
return
63+
timers = self._timers
64+
65+
if self._new_timers:
66+
new_timers = self._new_timers
67+
while new_timers:
68+
# There is no need to check for exception here,
69+
# reactor thread is the only one popping from
70+
# the deque. So, if the we pass the size check
71+
# above, there should be at least one element
72+
heappush(timers, new_timers.popleft())
73+
74+
if timers:
75+
now = time.time()
76+
while timers:
77+
timer = timers[0][1]
78+
if timer.check_timer(now):
79+
heappop(timers)
80+
else:
81+
# Timer in the root of the min heap is not expired
82+
# Therefore, there should not be any expired
83+
# timers in the heap.
84+
return
7685

7786
def add_timer_absolute(self, timeout, callback):
78-
timer = Timer(timeout, callback, self._cleanup_timer)
79-
self._timers.put_nowait((timer.end, timer))
87+
timer = Timer(timeout, callback)
88+
self._new_timers.append((timer.end, timer))
8089
return timer
8190

8291
def add_timer(self, delay, callback):
@@ -105,19 +114,18 @@ def connection_factory(self, connection_manager, connection_id, address, network
105114
return AsyncoreConnection(self._map, connection_manager, connection_id,
106115
address, network_config, message_callback)
107116

108-
def _cleanup_timer(self, timer):
109-
try:
110-
self._timers.queue.remove((timer.end, timer))
111-
except ValueError:
112-
pass
113-
114117
def _cleanup_all_timers(self):
115-
while not self._timers.empty():
116-
try:
117-
_, timer = self._timers.get_nowait()
118-
timer.timer_ended_cb()
119-
except queue.Empty:
120-
return
118+
self._check_timers()
119+
120+
timers = self._timers
121+
while timers:
122+
_, timer = timers.pop()
123+
timer.timer_ended_cb()
124+
125+
# Although it is not the case with the current code base,
126+
# the timers ended above may add new timers.
127+
if self._new_timers:
128+
self._cleanup_all_timers()
121129

122130

123131
_BUFFER_SIZE = 128000
@@ -274,26 +282,24 @@ def __str__(self):
274282

275283
@total_ordering
276284
class Timer(object):
277-
__slots__ = ("end", "timer_ended_cb", "timer_canceled_cb", "canceled")
285+
__slots__ = ("end", "timer_ended_cb", "canceled")
278286

279-
def __init__(self, end, timer_ended_cb, timer_canceled_cb):
287+
def __init__(self, end, timer_ended_cb):
280288
self.end = end
281289
self.timer_ended_cb = timer_ended_cb
282-
self.timer_canceled_cb = timer_canceled_cb
283290
self.canceled = False
284291

285292
def __eq__(self, other):
286293
return self.end == other.end
287294

288295
def __ne__(self, other):
289-
return not (self == other)
296+
return self.end != other.end
290297

291298
def __lt__(self, other):
292299
return self.end < other.end
293300

294301
def cancel(self):
295302
self.canceled = True
296-
self.timer_canceled_cb(self)
297303

298304
def check_timer(self, now):
299305
if self.canceled:

0 commit comments

Comments
 (0)