Skip to content

Implement loop.shutdown_default_executor() #353

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,17 @@ async def foo():
asyncio.wait_for(foo(), timeout=float('inf')))
self.assertEqual(res, 123)

def test_shutdown_default_executor(self):
if not hasattr(self.loop, "shutdown_default_executor"):
raise unittest.SkipTest()

async def foo():
await self.loop.run_in_executor(None, time.sleep, .1)

self.loop.run_until_complete(foo())
self.loop.run_until_complete(
self.loop.shutdown_default_executor())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the only test we have in cpython/asyncio/tests?

Copy link
Contributor Author

@aeros aeros Jul 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it looks like this is the only test that uses it in CPython:

    def test_run_in_executor_cancel(self):
        called = False

        def patched_call_soon(*args):
            nonlocal called
            called = True

        def run():
            time.sleep(0.05)

        f2 = self.loop.run_in_executor(None, run)
        f2.cancel()
        self.loop.run_until_complete(
                self.loop.shutdown_default_executor())
        self.loop.close()
        self.loop.call_soon = patched_call_soon
        self.loop.call_soon_threadsafe = patched_call_soon
        time.sleep(0.4)
        self.assertFalse(called)

In retrospect, I probably should have added some dedicated test(s) for shutdown_default_executor() in the PR that added it in the first place, but at the time, I didn't have a decent understanding of asyncio unit tests. Should I add this one to the CPython regression tests and maybe a couple more if I can think of some that could be useful?

Note: It's also used in test_threads.py for a tearDown() and utils.py for close_loop(). But those are hardly dedicated tests, so I didn't include it above.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I add this one to the CPython regression tests and maybe a couple more if I can think of some that could be useful?

The more tests the better :) If they are not redundant, of course. I'll merge this PR now, huge thanks!



class TestBaseUV(_TestBase, UVTestCase):

Expand Down
1 change: 1 addition & 0 deletions uvloop/includes/stdlib.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ cdef int ssl_SSL_ERROR_WANT_WRITE = ssl.SSL_ERROR_WANT_WRITE
cdef int ssl_SSL_ERROR_SYSCALL = ssl.SSL_ERROR_SYSCALL

cdef uint64_t MAIN_THREAD_ID = <uint64_t><int64_t>threading.main_thread().ident
cdef threading_Thread = threading.Thread

cdef int subprocess_PIPE = subprocess.PIPE
cdef int subprocess_STDOUT = subprocess.STDOUT
Expand Down
2 changes: 2 additions & 0 deletions uvloop/loop.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ cdef class Loop:
object _asyncgens
bint _asyncgens_shutdown_called

bint _executor_shutdown_called

char _recv_buffer[UV_STREAM_RECV_BUF_SIZE]
bint _recv_buffer_in_use

Expand Down
30 changes: 30 additions & 0 deletions uvloop/loop.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ cdef class Loop:

# Set to True when `loop.shutdown_asyncgens` is called.
self._asyncgens_shutdown_called = False
# Set to True when `loop.shutdown_default_executor` is called.
self._executor_shutdown_called = False

self._servers = set()

Expand Down Expand Up @@ -591,6 +593,7 @@ cdef class Loop:
self.handler_idle = None
self.handler_check__exec_writes = None

self._executor_shutdown_called = True
executor = self._default_executor
if executor is not None:
self._default_executor = None
Expand Down Expand Up @@ -2669,6 +2672,8 @@ cdef class Loop:

if executor is None:
executor = self._default_executor
# Only check when the default executor is being used
self._check_default_executor()
if executor is None:
executor = cc_ThreadPoolExecutor()
self._default_executor = executor
Expand Down Expand Up @@ -3090,6 +3095,10 @@ cdef class Loop:
await waiter
return udp, protocol

def _check_default_executor(self):
if self._executor_shutdown_called:
raise RuntimeError('Executor shutdown has been called')

def _asyncgen_finalizer_hook(self, agen):
self._asyncgens.discard(agen)
if not self.is_closed():
Expand Down Expand Up @@ -3131,6 +3140,27 @@ cdef class Loop:
'asyncgen': agen
})

@cython.iterable_coroutine
async def shutdown_default_executor(self):
"""Schedule the shutdown of the default executor."""
self._executor_shutdown_called = True
if self._default_executor is None:
return
future = self.create_future()
thread = threading_Thread(target=self._do_shutdown, args=(future,))
thread.start()
try:
await future
finally:
thread.join()

def _do_shutdown(self, future):
try:
self._default_executor.shutdown(wait=True)
self.call_soon_threadsafe(future.set_result, None)
except Exception as ex:
self.call_soon_threadsafe(future.set_exception, ex)


cdef void __loop_alloc_buffer(uv.uv_handle_t* uvhandle,
size_t suggested_size,
Expand Down