Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-122881: Reduce asyncio heapq scheduling overhead #122882

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions Lib/asyncio/base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,10 @@ def call_at(self, when, callback, *args, context=None):
timer = events.TimerHandle(when, callback, args, self, context)
if timer._source_traceback:
del timer._source_traceback[-1]
heapq.heappush(self._scheduled, timer)
# The `TimerHandle` is wrapped in a tuple to avoid calling the
# `TimerHandle.__lt__` method since the `heapq` operations
# will need to compare against many other `TimerHandler` objects.
heapq.heappush(self._scheduled, (when, timer))
bdraco marked this conversation as resolved.
Show resolved Hide resolved
bdraco marked this conversation as resolved.
Show resolved Hide resolved
timer._scheduled = True
return timer

Expand Down Expand Up @@ -1959,28 +1962,29 @@ def _run_once(self):
# Remove delayed calls that were cancelled if their number
# is too high
new_scheduled = []
for handle in self._scheduled:
for when_handle in self._scheduled:
handle = when_handle[1]
if handle._cancelled:
handle._scheduled = False
else:
new_scheduled.append(handle)
new_scheduled.append(when_handle)

heapq.heapify(new_scheduled)
self._scheduled = new_scheduled
self._timer_cancelled_count = 0
else:
# Remove delayed calls that were cancelled from head of queue.
while self._scheduled and self._scheduled[0]._cancelled:
while self._scheduled and self._scheduled[0][1]._cancelled:
self._timer_cancelled_count -= 1
handle = heapq.heappop(self._scheduled)
_, handle = heapq.heappop(self._scheduled)
handle._scheduled = False

timeout = None
if self._ready or self._stopping:
timeout = 0
elif self._scheduled:
# Compute the desired timeout.
timeout = self._scheduled[0]._when - self.time()
timeout = self._scheduled[0][0] - self.time()
if timeout > MAXIMUM_SELECT_TIMEOUT:
timeout = MAXIMUM_SELECT_TIMEOUT
elif timeout < 0:
Expand All @@ -1993,11 +1997,8 @@ def _run_once(self):

# Handle 'later' callbacks that are ready.
end_time = self.time() + self._clock_resolution
while self._scheduled:
handle = self._scheduled[0]
if handle._when >= end_time:
break
handle = heapq.heappop(self._scheduled)
while self._scheduled and self._scheduled[0][0] < end_time:
_, handle = heapq.heappop(self._scheduled)
handle._scheduled = False
self._ready.append(handle)

Expand Down
12 changes: 6 additions & 6 deletions Lib/test/test_asyncio/test_base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ def cb():

h = self.loop.call_later(10.0, cb)
self.assertIsInstance(h, asyncio.TimerHandle)
self.assertIn(h, self.loop._scheduled)
self.assertIn((h.when(), h), self.loop._scheduled)
self.assertNotIn(h, self.loop._ready)
with self.assertRaises(TypeError, msg="delay must not be None"):
self.loop.call_later(None, cb)
Expand Down Expand Up @@ -378,13 +378,13 @@ def test__run_once(self):
h1.cancel()

self.loop._process_events = mock.Mock()
self.loop._scheduled.append(h1)
self.loop._scheduled.append(h2)
self.loop._scheduled.append((h1.when(), h1))
self.loop._scheduled.append((h2.when(), h2))
self.loop._run_once()

t = self.loop._selector.select.call_args[0][0]
self.assertTrue(9.5 < t < 10.5, t)
self.assertEqual([h2], self.loop._scheduled)
self.assertEqual([(h2.when(), h2)], self.loop._scheduled)
self.assertTrue(self.loop._process_events.called)

def test_set_debug(self):
Expand All @@ -406,7 +406,7 @@ def cb(loop):
self.loop, None)

self.loop._process_events = mock.Mock()
self.loop._scheduled.append(h)
self.loop._scheduled.append((h.when(), h))
self.loop._run_once()

self.assertTrue(processed)
Expand Down Expand Up @@ -486,7 +486,7 @@ def cb():
self.assertEqual(len(self.loop._scheduled), not_cancelled_count)

# Ensure only uncancelled events remain scheduled
self.assertTrue(all([not x._cancelled for x in self.loop._scheduled]))
self.assertTrue(all([not x._cancelled for _, x in self.loop._scheduled]))

def test_run_until_complete_type_error(self):
self.assertRaises(TypeError,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reduced :mod:`asyncio` :mod:`heapq` scheduling overhead. Scheduling and running an :class:`asyncio.TimerHandle` is now roughly 9.5% faster.
Loading