Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 44 additions & 36 deletions hazelcast/reactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@

from collections import deque
from functools import total_ordering
from heapq import heappush, heappop

from hazelcast import six
from hazelcast.config import SSLProtocol
from hazelcast.connection import Connection
from hazelcast.core import Address
from hazelcast.errors import HazelcastError
from hazelcast.future import Future
from hazelcast.six.moves import queue

try:
import ssl
Expand All @@ -31,7 +31,8 @@ class AsyncoreReactor(object):
_is_live = False

def __init__(self):
self._timers = queue.PriorityQueue()
self._timers = [] # Accessed only from the reactor thread
self._new_timers = deque() # Popped only from the reactor thread
self._map = {}

def start(self):
Expand All @@ -55,28 +56,36 @@ def _loop(self):
_logger.exception("Error in Reactor Thread")
# TODO: shutdown client
return
_logger.debug("Reactor Thread exited. %s", self._timers.qsize())
_logger.debug("Reactor Thread exited")
self._cleanup_all_timers()

def _check_timers(self):
now = time.time()
while not self._timers.empty():
try:
timer = self._timers.queue[0][1]
except IndexError:
return

if timer.check_timer(now):
try:
self._timers.get_nowait()
except queue.Empty:
pass
else:
return
timers = self._timers

if self._new_timers:
new_timers = self._new_timers
while new_timers:
# There is no need to check for exception here,
# reactor thread is the only one popping from
# the deque. So, if the we pass the size check
# above, there should be at least one element
heappush(timers, new_timers.popleft())

if timers:
now = time.time()
while timers:
timer = timers[0][1]
if timer.check_timer(now):
heappop(timers)
else:
# Timer in the root of the min heap is not expired.
# Therefore, there should be no expired
# timers in the heap.
return

def add_timer_absolute(self, timeout, callback):
timer = Timer(timeout, callback, self._cleanup_timer)
self._timers.put_nowait((timer.end, timer))
timer = Timer(timeout, callback)
self._new_timers.append((timer.end, timer))
return timer

def add_timer(self, delay, callback):
Expand Down Expand Up @@ -105,19 +114,20 @@ def connection_factory(self, connection_manager, connection_id, address, network
return AsyncoreConnection(self._map, connection_manager, connection_id,
address, network_config, message_callback)

def _cleanup_timer(self, timer):
try:
self._timers.queue.remove((timer.end, timer))
except ValueError:
pass

def _cleanup_all_timers(self):
while not self._timers.empty():
try:
_, timer = self._timers.get_nowait()
timer.timer_ended_cb()
except queue.Empty:
return
timers = self._timers
new_timers = self._new_timers

while timers:
_, timer = timers.pop()
timer.timer_ended_cb()

# Although it is not the case with the current code base,
# the timers ended above may add new timers. So, the order
# is important.
while new_timers:
_, timer = new_timers.popleft()
timer.timer_ended_cb()


_BUFFER_SIZE = 128000
Expand Down Expand Up @@ -274,26 +284,24 @@ def __str__(self):

@total_ordering
class Timer(object):
__slots__ = ("end", "timer_ended_cb", "timer_canceled_cb", "canceled")
__slots__ = ("end", "timer_ended_cb", "canceled")

def __init__(self, end, timer_ended_cb, timer_canceled_cb):
def __init__(self, end, timer_ended_cb):
self.end = end
self.timer_ended_cb = timer_ended_cb
self.timer_canceled_cb = timer_canceled_cb
self.canceled = False

def __eq__(self, other):
return self.end == other.end

def __ne__(self, other):
return not (self == other)
return self.end != other.end

def __lt__(self, other):
return self.end < other.end

def cancel(self):
self.canceled = True
self.timer_canceled_cb(self)

def check_timer(self, now):
if self.canceled:
Expand Down