Description
This is a pyzmq bug
- This is a pyzmq-specific bug, not an issue of zmq socket behavior. Don't worry if you're not sure! We'll figure it out together.
What pyzmq version?
22.3.0
What libzmq version?
4.3.4
Python version (and how it was installed)
Python 3.10 via conda-forge
OS
macOS 14
What happened?
If you attempt to send a message from a different thread that doesn't have an event loop running when using the zqm.asyncio.Context
you'll get an error complain about missing event loop because Socket._Future()
requests access to the currently running event loop by default.
In most cases this would be user error, but in the context of logging, it's quite hard to avoid. For example, when interacting with sync library code it's quite common to use asyncio.to_thread
to prevent blocking calls. If the library you're calling in a thread then logs, you'll get an error because the thread spawned to do the work doesn't have an event loop.
This has came up in several different ways for me. First when using ddtrace
(a Datadog client library) and then later in my own library code.
Code to reproduce bug
import asyncio
import logging
import zmq
from zmq.asyncio import Context, Socket
from zmq.log.handlers import PUBHandler
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
def make_sockets(ctx: Context) -> tuple[Socket, Socket]:
addr = 'tcp://127.0.0.1'
first = ctx.socket(zmq.PAIR)
first.linger = 0
port = first.bind_to_random_port(addr)
second = ctx.socket(zmq.PAIR)
second.linger = 0
second.connect(f'{addr}:{port}')
return first, second
async def print_messages(sub: Socket, stop: asyncio.Event):
while True:
msg_task = sub.recv_multipart()
stop_task = asyncio.create_task(stop.wait())
done, _ = await asyncio.wait({msg_task, stop_task}, return_when=asyncio.FIRST_COMPLETED)
if stop_task in done:
break
print(await msg_task)
async def main():
with Context() as ctx:
pub, sub = make_sockets(ctx)
# backgound task to read sent message
stop_printing = asyncio.Event()
print_task = asyncio.create_task(print_messages(sub, stop_printing))
# create some logs
logger.addHandler(PUBHandler(pub))
logger.info("hello") # log outside thread
await asyncio.to_thread(logger.info, "world") # log in thread
# wait for messages to tbe sent
await asyncio.sleep(1)
# stop the background task
stop_printing.set()
await print_task
if __name__ == "__main__":
asyncio.run(main())
Traceback, if applicable
Traceback (most recent call last):
File "main.py", line 58, in <module>
asyncio.run(main())
File "...venv/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "...venv/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
return future.result()
File "...venv.py", line 47, in main
await asyncio.to_thread(logger.info, "world") # log in thread
File "...venv/lib/python3.10/asyncio/threads.py", line 25, in to_thread
return await loop.run_in_executor(None, func_call)
File "...venv/lib/python3.10/concurrent/futures/thread.py", line 58, in run
result = self.fn(*self.args, **self.kwargs)
File "...venv/lib/python3.10/logging/__init__.py", line 1477, in info
self._log(INFO, msg, args, **kwargs)
File "...venv/lib/python3.10/logging/__init__.py", line 1624, in _log
self.handle(record)
File "...venv/lib/python3.10/logging/__init__.py", line 1634, in handle
self.callHandlers(record)
File "...venv/lib/python3.10/logging/__init__.py", line 1696, in callHandlers
hdlr.handle(record)
File "...venv/lib/python3.10/logging/__init__.py", line 968, in handle
self.emit(record)
File "...venv/lib/python3.10/site-packages/zmq/log/handlers.py", line 186, in emit
self.socket.send_multipart([btopic, bmsg])
File "...venv/lib/python3.10/site-packages/zmq/_future.py", line 321, in send_multipart
return self._add_send_event('send_multipart', msg=msg_parts, kwargs=kwargs)
File "...venv/lib/python3.10/site-packages/zmq/_future.py", line 509, in _add_send_event
f = future or self._Future()
File "...venv/lib/python3.10/asyncio/events.py", line 656, in get_event_loop
raise RuntimeError('There is no current event loop in thread %r.'
RuntimeError: There is no current event loop in thread 'asyncio_0'.
More info
I'm currently using the following as a workaround:
import asyncio
import logging
from typing import Any
from zmq.log.handlers import PUBHandler as _PUBHandler
class PUBHandler(_PUBHandler):
def __init__(self, *args: Any, loop: asyncio.AbstractEventLoop = None, **kwargs: Any) -> None:
self.event_loop = loop or _try_get_running_loop()
super().__init__(*args, **kwargs)
def emit(self, record: logging.LogRecord) -> None:
if self.event_loop is None:
super().emit(record)
else:
self.event_loop.call_soon_threadsafe(super().emit, record)
def _try_get_running_loop() -> asyncio.AbstractEventLoop | None:
try:
return asyncio.get_running_loop()
except RuntimeError:
return None