Skip to content

BUG: PUBHandler is not thread-safe when using with async Context #1967

Open
@rmorshea

Description

@rmorshea

This is a pyzmq bug

  • This is a pyzmq-specific bug, not an issue of zmq socket behavior. Don't worry if you're not sure! We'll figure it out together.

What pyzmq version?

22.3.0

What libzmq version?

4.3.4

Python version (and how it was installed)

Python 3.10 via conda-forge

OS

macOS 14

What happened?

If you attempt to send a message from a different thread that doesn't have an event loop running when using the zqm.asyncio.Context you'll get an error complain about missing event loop because Socket._Future() requests access to the currently running event loop by default.

In most cases this would be user error, but in the context of logging, it's quite hard to avoid. For example, when interacting with sync library code it's quite common to use asyncio.to_thread to prevent blocking calls. If the library you're calling in a thread then logs, you'll get an error because the thread spawned to do the work doesn't have an event loop.

This has came up in several different ways for me. First when using ddtrace (a Datadog client library) and then later in my own library code.

Code to reproduce bug

import asyncio
import logging

import zmq
from zmq.asyncio import Context, Socket
from zmq.log.handlers import PUBHandler

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


def make_sockets(ctx: Context) -> tuple[Socket, Socket]:
    addr = 'tcp://127.0.0.1'

    first = ctx.socket(zmq.PAIR)
    first.linger = 0
    port = first.bind_to_random_port(addr)

    second = ctx.socket(zmq.PAIR)
    second.linger = 0
    second.connect(f'{addr}:{port}')

    return first, second


async def print_messages(sub: Socket, stop: asyncio.Event):
    while True:
        msg_task = sub.recv_multipart()
        stop_task = asyncio.create_task(stop.wait())
        done, _ = await asyncio.wait({msg_task, stop_task}, return_when=asyncio.FIRST_COMPLETED)
        if stop_task in done:
            break
        print(await msg_task)


async def main():
    with Context() as ctx:
        pub, sub = make_sockets(ctx)

        # backgound task to read sent message
        stop_printing = asyncio.Event()
        print_task = asyncio.create_task(print_messages(sub, stop_printing))

        # create some logs
        logger.addHandler(PUBHandler(pub))
        logger.info("hello")  # log outside thread
        await asyncio.to_thread(logger.info, "world")  # log in thread

        # wait for messages to tbe sent
        await asyncio.sleep(1)

        # stop the background task
        stop_printing.set()
        await print_task


if __name__ == "__main__":
    asyncio.run(main())

Traceback, if applicable

Traceback (most recent call last):
  File "main.py", line 58, in <module>
    asyncio.run(main())
  File "...venv/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "...venv/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
    return future.result()
  File "...venv.py", line 47, in main
    await asyncio.to_thread(logger.info, "world")  # log in thread
  File "...venv/lib/python3.10/asyncio/threads.py", line 25, in to_thread
    return await loop.run_in_executor(None, func_call)
  File "...venv/lib/python3.10/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "...venv/lib/python3.10/logging/__init__.py", line 1477, in info
    self._log(INFO, msg, args, **kwargs)
  File "...venv/lib/python3.10/logging/__init__.py", line 1624, in _log
    self.handle(record)
  File "...venv/lib/python3.10/logging/__init__.py", line 1634, in handle
    self.callHandlers(record)
  File "...venv/lib/python3.10/logging/__init__.py", line 1696, in callHandlers
    hdlr.handle(record)
  File "...venv/lib/python3.10/logging/__init__.py", line 968, in handle
    self.emit(record)
  File "...venv/lib/python3.10/site-packages/zmq/log/handlers.py", line 186, in emit
    self.socket.send_multipart([btopic, bmsg])
  File "...venv/lib/python3.10/site-packages/zmq/_future.py", line 321, in send_multipart
    return self._add_send_event('send_multipart', msg=msg_parts, kwargs=kwargs)
  File "...venv/lib/python3.10/site-packages/zmq/_future.py", line 509, in _add_send_event
    f = future or self._Future()
  File "...venv/lib/python3.10/asyncio/events.py", line 656, in get_event_loop
    raise RuntimeError('There is no current event loop in thread %r.'
RuntimeError: There is no current event loop in thread 'asyncio_0'.

More info

I'm currently using the following as a workaround:

import asyncio
import logging
from typing import Any

from zmq.log.handlers import PUBHandler as _PUBHandler


class PUBHandler(_PUBHandler):

    def __init__(self, *args: Any, loop: asyncio.AbstractEventLoop = None, **kwargs: Any) -> None:
        self.event_loop = loop or _try_get_running_loop()
        super().__init__(*args, **kwargs)

    def emit(self, record: logging.LogRecord) -> None:
        if self.event_loop is None:
            super().emit(record)
        else:
            self.event_loop.call_soon_threadsafe(super().emit, record)


def _try_get_running_loop() -> asyncio.AbstractEventLoop | None:
    try:
        return asyncio.get_running_loop()
    except RuntimeError:
        return None

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions