Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-122881: Reduce asyncio heapq scheduling overhead #122882

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 21 additions & 16 deletions Lib/asyncio/base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,10 @@ def call_at(self, when, callback, *args, context=None):
timer = events.TimerHandle(when, callback, args, self, context)
if timer._source_traceback:
del timer._source_traceback[-1]
heapq.heappush(self._scheduled, timer)
# The `TimerHandle` is wrapped in a tuple to avoid calling the
# `TimerHandle.__lt__` method since the `heapq` operations
# will need to compare against many other `TimerHandler` objects.
heapq.heappush(self._scheduled, (when, timer))
bdraco marked this conversation as resolved.
Show resolved Hide resolved
bdraco marked this conversation as resolved.
Show resolved Hide resolved
timer._scheduled = True
return timer

Expand Down Expand Up @@ -1959,28 +1962,32 @@ def _run_once(self):
# Remove delayed calls that were cancelled if their number
# is too high
new_scheduled = []
for handle in self._scheduled:
for when_handle in self._scheduled:
handle = when_handle[1]
if handle._cancelled:
handle._scheduled = False
else:
new_scheduled.append(handle)
new_scheduled.append(when_handle)

heapq.heapify(new_scheduled)
self._scheduled = new_scheduled
self._timer_cancelled_count = 0
else:
# Remove delayed calls that were cancelled from head of queue.
while self._scheduled and self._scheduled[0]._cancelled:
while self._scheduled and self._scheduled[0][1]._cancelled:
self._timer_cancelled_count -= 1
handle = heapq.heappop(self._scheduled)
_, handle = heapq.heappop(self._scheduled)
handle._scheduled = False

timeout = None
if self._ready or self._stopping:
ready = self._ready
scheduled = self._scheduled
picnixz marked this conversation as resolved.
Show resolved Hide resolved

bdraco marked this conversation as resolved.
Show resolved Hide resolved
if ready or self._stopping:
timeout = 0
elif self._scheduled:
elif scheduled:
# Compute the desired timeout.
timeout = self._scheduled[0]._when - self.time()
timeout = scheduled[0][0] - self.time()
if timeout > MAXIMUM_SELECT_TIMEOUT:
timeout = MAXIMUM_SELECT_TIMEOUT
elif timeout < 0:
Expand All @@ -1993,23 +2000,21 @@ def _run_once(self):

# Handle 'later' callbacks that are ready.
end_time = self.time() + self._clock_resolution
while self._scheduled:
handle = self._scheduled[0]
if handle._when >= end_time:
break
handle = heapq.heappop(self._scheduled)
while scheduled and scheduled[0][0] < end_time:
_, handle = heapq.heappop(scheduled)
handle._scheduled = False
self._ready.append(handle)
ready.append(handle)

# This is the only place where callbacks are actually *called*.
# All other places just add them to ready.
# Note: We run all currently scheduled callbacks, but not any
# callbacks scheduled by callbacks run this time around --
# they will be run the next time (after another I/O poll).
# Use an idiom that is thread-safe without using locks.
ntodo = len(self._ready)
ntodo = len(ready)
ready_popleft = ready.popleft
for i in range(ntodo):
handle = self._ready.popleft()
handle = ready_popleft()
if handle._cancelled:
continue
if self._debug:
Expand Down
12 changes: 6 additions & 6 deletions Lib/test/test_asyncio/test_base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ def cb():

h = self.loop.call_later(10.0, cb)
self.assertIsInstance(h, asyncio.TimerHandle)
self.assertIn(h, self.loop._scheduled)
self.assertIn((h.when(), h), self.loop._scheduled)
self.assertNotIn(h, self.loop._ready)
with self.assertRaises(TypeError, msg="delay must not be None"):
self.loop.call_later(None, cb)
Expand Down Expand Up @@ -378,13 +378,13 @@ def test__run_once(self):
h1.cancel()

self.loop._process_events = mock.Mock()
self.loop._scheduled.append(h1)
self.loop._scheduled.append(h2)
self.loop._scheduled.append((h1.when(), h1))
self.loop._scheduled.append((h2.when(), h2))
self.loop._run_once()

t = self.loop._selector.select.call_args[0][0]
self.assertTrue(9.5 < t < 10.5, t)
self.assertEqual([h2], self.loop._scheduled)
self.assertEqual([(h2.when(), h2)], self.loop._scheduled)
self.assertTrue(self.loop._process_events.called)

def test_set_debug(self):
Expand All @@ -406,7 +406,7 @@ def cb(loop):
self.loop, None)

self.loop._process_events = mock.Mock()
self.loop._scheduled.append(h)
self.loop._scheduled.append((h.when(), h))
self.loop._run_once()

self.assertTrue(processed)
Expand Down Expand Up @@ -486,7 +486,7 @@ def cb():
self.assertEqual(len(self.loop._scheduled), not_cancelled_count)

# Ensure only uncancelled events remain scheduled
self.assertTrue(all([not x._cancelled for x in self.loop._scheduled]))
self.assertTrue(all([not x._cancelled for _, x in self.loop._scheduled]))

def test_run_until_complete_type_error(self):
self.assertRaises(TypeError,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reduced :mod:`asyncio` :mod:`heapq` scheduling overhead.
Loading