Skip to content

[bug] subscribe channel does not receive message, even after a barrier among publisher and subscriber #4713

@youkaichao

Description

@youkaichao

Please use this template for reporting suspected bugs or requests for help.

Issue description

Hi, team, thanks for the great project! I'm using zmq for broadcasting messages, in the vLLM project. And I encountered some bugs, that I think might be related with zmq.

Environment

  • libzmq version (commit hash if unreleased): pyzmq 26.0.3
  • OS: linux

Minimal test code / Steps to reproduce the issue

# test.py

from zmq import PUB, REP, REQ, SUB, SUBSCRIBE, Context, LAST_ENDPOINT  # type: ignore

import torch

torch.distributed.init_process_group(backend="gloo")

rank = torch.distributed.get_rank()

world_size = torch.distributed.get_world_size()

context = Context()

if rank == 0:

    local_socket = context.socket(PUB)
    # bind to a random port
    local_socket.bind("tcp://*:*")
    local_socket_port = local_socket.getsockopt(LAST_ENDPOINT).decode("utf-8").split(":")[-1]

    local_sync_socket = context.socket(REP)
    local_sync_socket.bind("tcp://*:*")
    local_sync_socket_port = local_sync_socket.getsockopt(LAST_ENDPOINT).decode("utf-8").split(":")[-1]

    torch.distributed.broadcast_object_list([local_socket_port, local_sync_socket_port], src=0)

    # local readers
    for i in range(world_size - 1):
        recv = local_sync_socket.recv()
        assert recv == b"READY"
        local_sync_socket.send(b"READY")
    
    local_socket.send(b"READY")

    local_socket.send(b"data")

else:
    data = [None, None]
    torch.distributed.broadcast_object_list(data, src=0)
    local_socket_port, local_sync_socket_port = data

    local_socket = context.socket(SUB)
    local_socket.connect(f"tcp://localhost:{local_socket_port}")
    local_socket.setsockopt_string(SUBSCRIBE, "")

    local_sync_socket = context.socket(REQ)
    local_sync_socket.connect(f"tcp://localhost:{local_sync_socket_port}")

    local_sync_socket.send(b"READY")
    assert local_sync_socket.recv() == b"READY"

    assert local_socket.recv() == b"READY"
    assert local_socket.recv() == b"data"

run the code for about 100 times:

success_count=0
for ((i=1; i<=100; i++)); do
  torchrun --nproc-per-node=8 test.py && ((success_count++)) && echo "Success count: $success_count"
done

About once in 20~50 runs, it will hang. The reason is, even if I put a barrier for the publisher and all subscriber, some subscribers still don't get the message. Therefore, they are waiting forever at assert local_socket.recv() == b"READY"

I'm following https://zguide.zeromq.org/docs/chapter2/#Handling-Multiple-Sockets , to add a synchronization point before I publish anything. It works for most of the time. But sometimes it will fail, i.e. publish message before all subscriber are ready to subscribe the message.

I find adding a time.sleep(1) before local_socket.send(b"READY") helps, but that's not an elegant solution.

Are there any methods to check, i.e. if the publisher gets enough subscribers, or the subscriber can check if it is connected to the publisher and is ready to receive the message?

Thanks for the great project, and look forward to the solution!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions