Skip to content

uasyncio fast_io EventLoop option reduces I/O scheduling latency #287

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 37 additions & 8 deletions uasyncio.core/uasyncio/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,14 @@ class TimeoutError(CancelledError):

class EventLoop:

def __init__(self, runq_len=16, waitq_len=16):
def __init__(self, runq_len=16, waitq_len=16, ioq_len=0):
self.runq = ucollections.deque((), runq_len, True)
self.ioq_len = ioq_len
if ioq_len:
self.ioq = ucollections.deque((), ioq_len, True)
self._call_io = self._call_now
else:
self._call_io = self.call_soon
self.waitq = utimeq.utimeq(waitq_len)
# Current task being run. Task is a top-level coroutine scheduled
# in the event loop (sub-coroutines executed transparently by
Expand All @@ -42,6 +48,13 @@ def create_task(self, coro):
self.call_later_ms(0, coro)
# CPython asyncio incompatibility: we don't return Task object

def _call_now(self, callback, *args):
if __debug__ and DEBUG:
log.debug("Scheduling in ioq: %s", (callback, args))
self.ioq.append(callback)
if not isinstance(callback, type_gen):
self.ioq.append(args)

def call_soon(self, callback, *args):
if __debug__ and DEBUG:
log.debug("Scheduling in runq: %s", (callback, args))
Expand Down Expand Up @@ -88,13 +101,25 @@ def run_forever(self):
l = len(self.runq)
if __debug__ and DEBUG:
log.debug("Entries in runq: %d", l)
while l:
cb = self.runq.popleft()
l -= 1
cur_q = self.runq # Default: always get tasks from runq
dl = 1
while l or self.ioq_len:
if self.ioq_len:
self.wait(0) # Schedule I/O. Can append to ioq.
if self.ioq:
cur_q = self.ioq
dl = 0
elif l == 0:
break
else:
cur_q = self.runq
dl = 1
l -= dl
cb = cur_q.popleft()
args = ()
if not isinstance(cb, type_gen):
args = self.runq.popleft()
l -= 1
args = cur_q.popleft()
l -= dl
if __debug__ and DEBUG:
log.info("Next callback to run: %s", (cb, args))
cb(*args)
Expand Down Expand Up @@ -125,8 +150,12 @@ def run_forever(self):
continue
elif isinstance(ret, IOReadDone):
self.remove_reader(arg)
self._call_io(cb, args) # Next call produces StopIteration
continue
elif isinstance(ret, IOWriteDone):
self.remove_writer(arg)
self._call_io(cb, args)
continue
elif isinstance(ret, StopLoop):
return arg
else:
Expand Down Expand Up @@ -218,10 +247,10 @@ class IOWriteDone(SysCall1):

_event_loop = None
_event_loop_class = EventLoop
def get_event_loop(runq_len=16, waitq_len=16):
def get_event_loop(runq_len=16, waitq_len=16, ioq_len=0):
global _event_loop
if _event_loop is None:
_event_loop = _event_loop_class(runq_len, waitq_len)
_event_loop = _event_loop_class(runq_len, waitq_len, ioq_len)
return _event_loop

def sleep(secs):
Expand Down
39 changes: 18 additions & 21 deletions uasyncio/uasyncio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ def set_debug(val):

class PollEventLoop(EventLoop):

def __init__(self, runq_len=16, waitq_len=16):
EventLoop.__init__(self, runq_len, waitq_len)
def __init__(self, runq_len=16, waitq_len=16, fast_io=0):
EventLoop.__init__(self, runq_len, waitq_len, fast_io)
self.poller = select.poll()
self.objmap = {}

Expand Down Expand Up @@ -68,25 +68,22 @@ def wait(self, delay):
# We need one-shot behavior (second arg of 1 to .poll())
res = self.poller.ipoll(delay, 1)
#log.debug("poll result: %s", res)
# Remove "if res" workaround after
# https://github.com/micropython/micropython/issues/2716 fixed.
if res:
for sock, ev in res:
cb = self.objmap[id(sock)]
if ev & (select.POLLHUP | select.POLLERR):
# These events are returned even if not requested, and
# are sticky, i.e. will be returned again and again.
# If the caller doesn't do proper error handling and
# unregister this sock, we'll busy-loop on it, so we
# as well can unregister it now "just in case".
self.remove_reader(sock)
if DEBUG and __debug__:
log.debug("Calling IO callback: %r", cb)
if isinstance(cb, tuple):
cb[0](*cb[1])
else:
cb.pend_throw(None)
self.call_soon(cb)
for sock, ev in res:
cb = self.objmap[id(sock)]
if ev & (select.POLLHUP | select.POLLERR):
# These events are returned even if not requested, and
# are sticky, i.e. will be returned again and again.
# If the caller doesn't do proper error handling and
# unregister this sock, we'll busy-loop on it, so we
# as well can unregister it now "just in case".
self.remove_reader(sock)
if DEBUG and __debug__:
log.debug("Calling IO callback: %r", cb)
if isinstance(cb, tuple):
cb[0](*cb[1])
else:
cb.pend_throw(None)
self._call_io(cb)


class StreamReader:
Expand Down