-
Couldn't load subscription status.
- Fork 2.5k
Description
Please use this template for reporting suspected bugs or requests for help.
Issue description
Hi, team, thanks for the great project! I'm using zmq for broadcasting messages, in the vLLM project. And I encountered some bugs, that I think might be related with zmq.
Environment
- libzmq version (commit hash if unreleased): pyzmq 26.0.3
- OS: linux
Minimal test code / Steps to reproduce the issue
# test.py
from zmq import PUB, REP, REQ, SUB, SUBSCRIBE, Context, LAST_ENDPOINT # type: ignore
import torch
torch.distributed.init_process_group(backend="gloo")
rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()
context = Context()
if rank == 0:
local_socket = context.socket(PUB)
# bind to a random port
local_socket.bind("tcp://*:*")
local_socket_port = local_socket.getsockopt(LAST_ENDPOINT).decode("utf-8").split(":")[-1]
local_sync_socket = context.socket(REP)
local_sync_socket.bind("tcp://*:*")
local_sync_socket_port = local_sync_socket.getsockopt(LAST_ENDPOINT).decode("utf-8").split(":")[-1]
torch.distributed.broadcast_object_list([local_socket_port, local_sync_socket_port], src=0)
# local readers
for i in range(world_size - 1):
recv = local_sync_socket.recv()
assert recv == b"READY"
local_sync_socket.send(b"READY")
local_socket.send(b"READY")
local_socket.send(b"data")
else:
data = [None, None]
torch.distributed.broadcast_object_list(data, src=0)
local_socket_port, local_sync_socket_port = data
local_socket = context.socket(SUB)
local_socket.connect(f"tcp://localhost:{local_socket_port}")
local_socket.setsockopt_string(SUBSCRIBE, "")
local_sync_socket = context.socket(REQ)
local_sync_socket.connect(f"tcp://localhost:{local_sync_socket_port}")
local_sync_socket.send(b"READY")
assert local_sync_socket.recv() == b"READY"
assert local_socket.recv() == b"READY"
assert local_socket.recv() == b"data"run the code for about 100 times:
success_count=0
for ((i=1; i<=100; i++)); do
torchrun --nproc-per-node=8 test.py && ((success_count++)) && echo "Success count: $success_count"
doneAbout once in 20~50 runs, it will hang. The reason is, even if I put a barrier for the publisher and all subscriber, some subscribers still don't get the message. Therefore, they are waiting forever at assert local_socket.recv() == b"READY"
I'm following https://zguide.zeromq.org/docs/chapter2/#Handling-Multiple-Sockets , to add a synchronization point before I publish anything. It works for most of the time. But sometimes it will fail, i.e. publish message before all subscriber are ready to subscribe the message.
I find adding a time.sleep(1) before local_socket.send(b"READY") helps, but that's not an elegant solution.
Are there any methods to check, i.e. if the publisher gets enough subscribers, or the subscriber can check if it is connected to the publisher and is ready to receive the message?
Thanks for the great project, and look forward to the solution!