Skip to content

Commit d76d982

Browse files
committed
Fix KeyboardInterrupt handling logic.
When uvloop is run in the main thread we *always* want to set up a self-pipe and a signal wakeup FD. That's the only way how libuv can be notified that a ^C happened and break away from selecting on sockets. asyncio does not need to do that, as the 'selectors' module it uses is already aware of the way Python implements ^C handling. This translates to a slightly different behavior between asyncio & uvloop: 1. uvloop needs to always call signal.set_wakeup_fd() when run in the main thread; 2. asyncio only needs to call signal.set_wakeup_fd() when a user registers a signal handler. (2) means that if the user had not set up any signals, the signal wakeup FD stays the same between different asyncio runs. This commit fixes uvloop signal implementation to make sure that uvloop behaves the same way as asyncio in regards to signal wakeup FD between the loop runs. It also ensures that uvloop always have a proper self-pipe set up so that ^C is always supported when it is run in the main thread. Issue #295.
1 parent 679fde6 commit d76d982

File tree

5 files changed

+146
-95
lines changed

5 files changed

+146
-95
lines changed

tests/test_dealloc.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ async def foo():
3636
return 42
3737
3838
def main():
39-
asyncio.set_event_loop(uvloop.new_event_loop())
39+
uvloop.install()
4040
loop = asyncio.get_event_loop()
4141
loop.set_debug(True)
4242
loop.run_until_complete(foo())

tests/test_regr1.py

Lines changed: 27 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -74,36 +74,33 @@ def on_alarm(self, sig, fr):
7474
raise FailedTestError
7575

7676
def run_test(self):
77-
try:
78-
for i in range(10):
79-
for threaded in [True, False]:
80-
if threaded:
81-
qin, qout = queue.Queue(), queue.Queue()
82-
threading.Thread(
83-
target=run_server,
84-
args=(qin, qout),
85-
daemon=True).start()
86-
else:
87-
qin = multiprocessing.Queue()
88-
qout = multiprocessing.Queue()
89-
multiprocessing.Process(
90-
target=run_server,
91-
args=(qin, qout),
92-
daemon=True).start()
93-
94-
addr = qout.get()
95-
loop = self.new_loop()
96-
asyncio.set_event_loop(loop)
97-
loop.create_task(
98-
loop.create_connection(
99-
lambda: EchoClientProtocol(loop),
100-
host=addr[0], port=addr[1]))
101-
loop.run_forever()
102-
loop.close()
103-
qin.put('stop')
104-
qout.get()
105-
finally:
106-
loop.close()
77+
for i in range(10):
78+
for threaded in [True, False]:
79+
if threaded:
80+
qin, qout = queue.Queue(), queue.Queue()
81+
threading.Thread(
82+
target=run_server,
83+
args=(qin, qout),
84+
daemon=True).start()
85+
else:
86+
qin = multiprocessing.Queue()
87+
qout = multiprocessing.Queue()
88+
multiprocessing.Process(
89+
target=run_server,
90+
args=(qin, qout),
91+
daemon=True).start()
92+
93+
addr = qout.get()
94+
loop = self.new_loop()
95+
asyncio.set_event_loop(loop)
96+
loop.create_task(
97+
loop.create_connection(
98+
lambda: EchoClientProtocol(loop),
99+
host=addr[0], port=addr[1]))
100+
loop.run_forever()
101+
loop.close()
102+
qin.put('stop')
103+
qout.get()
107104

108105
@unittest.skipIf(
109106
multiprocessing.get_start_method(False) == 'spawn',

tests/test_signals.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,45 @@ async def worker():
117117
loop = """ + self.NEW_LOOP + """
118118
asyncio.set_event_loop(loop)
119119
loop.create_task(worker())
120+
try:
121+
loop.run_forever()
122+
finally:
123+
srv.close()
124+
loop.run_until_complete(srv.wait_closed())
125+
loop.close()
126+
"""
127+
128+
proc = await asyncio.create_subprocess_exec(
129+
sys.executable, b'-W', b'ignore', b'-c', PROG,
130+
stdout=subprocess.PIPE,
131+
stderr=subprocess.PIPE)
132+
133+
await proc.stdout.readline()
134+
time.sleep(DELAY)
135+
proc.send_signal(signal.SIGINT)
136+
out, err = await proc.communicate()
137+
self.assertIn(b'KeyboardInterrupt', err)
138+
139+
self.loop.run_until_complete(runner())
140+
141+
@tb.silence_long_exec_warning()
142+
def test_signals_sigint_uvcode_two_loop_runs(self):
143+
async def runner():
144+
PROG = R"""\
145+
import asyncio
146+
import uvloop
147+
148+
srv = None
149+
150+
async def worker():
151+
global srv
152+
cb = lambda *args: None
153+
srv = await asyncio.start_server(cb, '127.0.0.1', 0)
154+
155+
loop = """ + self.NEW_LOOP + """
156+
asyncio.set_event_loop(loop)
157+
loop.run_until_complete(worker())
158+
print('READY', flush=True)
120159
try:
121160
loop.run_forever()
122161
finally:

uvloop/loop.pxd

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ cdef class Loop:
6565
object _ssock
6666
object _csock
6767
bint _listening_signals
68+
int _old_signal_wakeup_id
6869

6970
set _timers
7071
dict _polls
@@ -149,6 +150,8 @@ cdef class Loop:
149150

150151
cdef void _handle_exception(self, object ex)
151152

153+
cdef inline _is_main_thread(self)
154+
152155
cdef inline _new_future(self)
153156
cdef inline _check_signal(self, sig)
154157
cdef inline _check_closed(self)
@@ -186,10 +189,9 @@ cdef class Loop:
186189

187190
cdef _sock_set_reuseport(self, int fd)
188191

189-
cdef _setup_signals(self)
192+
cdef _setup_or_resume_signals(self)
190193
cdef _shutdown_signals(self)
191-
cdef _recv_signals_start(self)
192-
cdef _recv_signals_stop(self)
194+
cdef _pause_signals(self)
193195

194196
cdef _handle_signal(self, sig)
195197
cdef _read_from_self(self)

uvloop/loop.pyx

Lines changed: 74 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ cdef class Loop:
167167
self._ssock = self._csock = None
168168
self._signal_handlers = {}
169169
self._listening_signals = False
170+
self._old_signal_wakeup_id = -1
170171

171172
self._coroutine_debug_set = False
172173

@@ -183,6 +184,9 @@ cdef class Loop:
183184

184185
self._servers = set()
185186

187+
cdef inline _is_main_thread(self):
188+
return MAIN_THREAD_ID == PyThread_get_thread_ident()
189+
186190
def __init__(self):
187191
self.set_debug((not sys_ignore_environment
188192
and bool(os_environ.get('PYTHONASYNCIODEBUG'))))
@@ -241,34 +245,40 @@ cdef class Loop:
241245

242246
self._debug_exception_handler_cnt = 0
243247

244-
cdef _setup_signals(self):
245-
cdef int old_wakeup_fd
248+
cdef _setup_or_resume_signals(self):
249+
if not self._is_main_thread():
250+
return
246251

247252
if self._listening_signals:
248-
return
253+
raise RuntimeError('signals handling has been already setup')
254+
255+
if self._ssock is not None:
256+
raise RuntimeError('self-pipe exists before loop run')
257+
258+
# Create a self-pipe and call set_signal_wakeup_fd() with one
259+
# of its ends. This is needed so that libuv knows that it needs
260+
# to wakeup on ^C (no matter if the SIGINT handler is still the
261+
# standard Python's one or or user set their own.)
249262

250263
self._ssock, self._csock = socket_socketpair()
251-
self._ssock.setblocking(False)
252-
self._csock.setblocking(False)
253264
try:
254-
old_wakeup_fd = _set_signal_wakeup_fd(self._csock.fileno())
255-
except (OSError, ValueError):
256-
# Not the main thread
265+
self._ssock.setblocking(False)
266+
self._csock.setblocking(False)
267+
268+
fileno = self._csock.fileno()
269+
270+
self._old_signal_wakeup_id = _set_signal_wakeup_fd(fileno)
271+
except Exception:
272+
# Out of all statements in the try block, only the
273+
# "_set_signal_wakeup_fd()" call can fail, but it shouldn't,
274+
# as we ensure that the current thread is the main thread.
275+
# Still, if something goes horribly wrong we want to clean up
276+
# the socket pair.
257277
self._ssock.close()
258278
self._csock.close()
259-
self._ssock = self._csock = None
260-
return
261-
262-
self._listening_signals = True
263-
return old_wakeup_fd
264-
265-
cdef _recv_signals_start(self):
266-
cdef object old_wakeup_fd = None
267-
if self._ssock is None:
268-
old_wakeup_fd = self._setup_signals()
269-
if self._ssock is None:
270-
# Not the main thread.
271-
return
279+
self._ssock = None
280+
self._csock = None
281+
raise
272282

273283
self._add_reader(
274284
self._ssock,
@@ -277,38 +287,49 @@ cdef class Loop:
277287
"Loop._read_from_self",
278288
<method_t>self._read_from_self,
279289
self))
280-
return old_wakeup_fd
281290

282-
cdef _recv_signals_stop(self):
283-
if self._ssock is None:
284-
return
291+
self._listening_signals = True
285292

286-
self._remove_reader(self._ssock)
293+
cdef _pause_signals(self):
294+
if not self._is_main_thread():
295+
if self._listening_signals:
296+
raise RuntimeError(
297+
'cannot pause signals handling; no longer running in '
298+
'the main thread')
299+
else:
300+
return
287301

288-
cdef _shutdown_signals(self):
289302
if not self._listening_signals:
290-
return
303+
raise RuntimeError('signals handling has not been setup')
291304

292-
for sig in list(self._signal_handlers):
293-
self.remove_signal_handler(sig)
294-
295-
if not self._listening_signals:
296-
# `remove_signal_handler` will call `_shutdown_signals` when
297-
# removing last signal handler.
298-
return
305+
self._listening_signals = False
299306

300-
try:
301-
signal_set_wakeup_fd(-1)
302-
except (ValueError, OSError) as exc:
303-
aio_logger.info('set_wakeup_fd(-1) failed: %s', exc)
307+
_set_signal_wakeup_fd(self._old_signal_wakeup_id)
304308

305309
self._remove_reader(self._ssock)
306310
self._ssock.close()
307311
self._csock.close()
308312
self._ssock = None
309313
self._csock = None
310314

311-
self._listening_signals = False
315+
cdef _shutdown_signals(self):
316+
if not self._is_main_thread():
317+
if self._signal_handlers:
318+
aio_logger.warning(
319+
'cannot cleanup signal handlers: closing the event loop '
320+
'in a non-main OS thread')
321+
return
322+
323+
if self._listening_signals:
324+
raise RuntimeError(
325+
'cannot shutdown signals handling as it has not been paused')
326+
327+
if self._ssock:
328+
raise RuntimeError(
329+
'self-pipe was not cleaned up after loop was run')
330+
331+
for sig in list(self._signal_handlers):
332+
self.remove_signal_handler(sig)
312333

313334
def __sighandler(self, signum, frame):
314335
self._signals.add(signum)
@@ -451,7 +472,6 @@ cdef class Loop:
451472

452473
cdef _run(self, uv.uv_run_mode mode):
453474
cdef int err
454-
cdef object old_wakeup_fd
455475

456476
if self._closed == 1:
457477
raise RuntimeError('unable to start the loop; it was closed')
@@ -474,7 +494,7 @@ cdef class Loop:
474494
self.handler_check__exec_writes.start()
475495
self.handler_idle.start()
476496

477-
old_wakeup_fd = self._recv_signals_start()
497+
self._setup_or_resume_signals()
478498

479499
if aio_set_running_loop is not None:
480500
aio_set_running_loop(self)
@@ -484,13 +504,11 @@ cdef class Loop:
484504
if aio_set_running_loop is not None:
485505
aio_set_running_loop(None)
486506

487-
self._recv_signals_stop()
488-
if old_wakeup_fd is not None:
489-
signal_set_wakeup_fd(old_wakeup_fd)
490-
491507
self.handler_check__exec_writes.stop()
492508
self.handler_idle.stop()
493509

510+
self._pause_signals()
511+
494512
self._thread_is_main = 0
495513
self._thread_id = 0
496514
self._running = 0
@@ -2794,10 +2812,10 @@ cdef class Loop:
27942812
cdef:
27952813
Handle h
27962814

2797-
if not self._listening_signals:
2798-
self._setup_signals()
2799-
if not self._listening_signals:
2800-
raise ValueError('set_wakeup_fd only works in main thread')
2815+
if not self._is_main_thread():
2816+
raise ValueError(
2817+
'add_signal_handler() can only be called from '
2818+
'the main thread')
28012819

28022820
if (aio_iscoroutine(callback)
28032821
or aio_iscoroutinefunction(callback)):
@@ -2829,14 +2847,6 @@ cdef class Loop:
28292847

28302848
self._check_signal(sig)
28312849
self._check_closed()
2832-
try:
2833-
# set_wakeup_fd() raises ValueError if this is not the
2834-
# main thread. By calling it early we ensure that an
2835-
# event loop running in another thread cannot add a signal
2836-
# handler.
2837-
_set_signal_wakeup_fd(self._csock.fileno())
2838-
except (ValueError, OSError) as exc:
2839-
raise RuntimeError(str(exc))
28402850

28412851
h = new_Handle(self, callback, args or None, None)
28422852
self._signal_handlers[sig] = h
@@ -2866,6 +2876,12 @@ cdef class Loop:
28662876
28672877
Return True if a signal handler was removed, False if not.
28682878
"""
2879+
2880+
if not self._is_main_thread():
2881+
raise ValueError(
2882+
'remove_signal_handler() can only be called from '
2883+
'the main thread')
2884+
28692885
self._check_signal(sig)
28702886

28712887
if not self._listening_signals:
@@ -2889,9 +2905,6 @@ cdef class Loop:
28892905
else:
28902906
raise
28912907

2892-
if not self._signal_handlers:
2893-
self._shutdown_signals()
2894-
28952908
return True
28962909

28972910
@cython.iterable_coroutine

0 commit comments

Comments
 (0)