Skip to content

Comments

fix: prevent silent message loss on zmq socket#951

Merged
willccbb merged 3 commits intomainfrom
fix/zmq-router-silent-drop
Feb 22, 2026
Merged

fix: prevent silent message loss on zmq socket#951
willccbb merged 3 commits intomainfrom
fix/zmq-router-silent-drop

Conversation

@mikasenghaas
Copy link
Member

@mikasenghaas mikasenghaas commented Feb 22, 2026

Summary

  • Set ZMQ_ROUTER_MANDATORY=1 on the server's ROUTER socket so full send pipes raise EAGAIN for automatic retries instead of silently dropping messages
  • Allow unlimited message buffers for HWMs (SNDHWM=0, RCVHWM=0 on both sides), concurrency limits are set by application (e.g. how many concurrent)

The bug

ZMQ ROUTER sockets silently drop messages when the per-peer send pipe is full. send_multipart() returns success, but the message is discarded by libzmq. The server logs no error. The client waits for a response that will never arrive (up to the 10h default timeout).

This happens under high concurrency when responses accumulate faster than the client can drain them — event loop lag, GC pauses, or large response payloads can all trigger it well below the nominal HWM message count. The buffer doesn't need to hit 10,000 messages; it just needs to fill faster than the client's event loop can call recv.

How ROUTER_MANDATORY fixes it

Without it:

send_multipart() → libzmq silently discards → returns SUCCESS → no error anywhere

With it, two cases:

  • Pipe full (peer connected but slow): libzmq returns EAGAIN → pyzmq catches internally, queues the send, retries when pipe drains → backpressure, no message loss, transparent to application code
  • Peer gone (disconnected): libzmq raises EHOSTUNREACH → our new try/except catches it → debug log, no crash

Minimal repro

import asyncio, uuid, zmq, zmq.asyncio

ADDR = "tcp://127.0.0.1:15557"
NUM_REQUESTS = 200
HWM = 3
USE_ROUTER_MANDATORY = False  # toggle to True to see the fix

async def server(ready, done):
    ctx = zmq.asyncio.Context()
    sock = ctx.socket(zmq.ROUTER)
    sock.setsockopt(zmq.SNDHWM, HWM)
    sock.setsockopt(zmq.RCVHWM, HWM)
    sock.setsockopt(zmq.LINGER, 0)
    if USE_ROUTER_MANDATORY:
        sock.setsockopt(zmq.ROUTER_MANDATORY, 1)
    sock.bind(ADDR)
    ready.set()

    requests = []
    while len(requests) < NUM_REQUESTS:
        client_id, request_id, _ = await sock.recv_multipart()
        requests.append((client_id, request_id))

    send_ok = send_fail = 0
    for client_id, request_id in requests:
        try:
            await sock.send_multipart([client_id, request_id, b"ok"])
            send_ok += 1
        except zmq.ZMQError:
            send_fail += 1

    print(f"[server] {send_ok} ok, {send_fail} failed")
    done.set()
    sock.close(); ctx.term()

async def client(ready, done):
    ctx = zmq.asyncio.Context()
    sock = ctx.socket(zmq.DEALER)
    sock.setsockopt(zmq.SNDHWM, HWM)
    sock.setsockopt(zmq.RCVHWM, HWM)
    sock.setsockopt(zmq.LINGER, 0)
    sock.connect(ADDR)
    await ready.wait()

    pending = {}
    for _ in range(NUM_REQUESTS):
        rid = uuid.uuid4().hex
        fut = asyncio.get_running_loop().create_future()
        pending[rid] = fut
        await sock.send_multipart([rid.encode(), b"req"])

    async def recv_loop():
        while True:
            rid_bytes, data = (await sock.recv_multipart())[:2]
            fut = pending.pop(rid_bytes.decode(), None)
            if fut and not fut.done(): fut.set_result(data)

    task = asyncio.create_task(recv_loop())
    await done.wait(); await asyncio.sleep(0.5)
    task.cancel()
    try: await task
    except asyncio.CancelledError: pass

    received = NUM_REQUESTS - len(pending) + sum(1 for f in pending.values() if f.done())
    print(f"[client] received {received}/{NUM_REQUESTS}")
    sock.close(); ctx.term()

async def main():
    ready, done = asyncio.Event(), asyncio.Event()
    await asyncio.gather(server(ready, done), client(ready, done))

asyncio.run(main())

Without fix:

[server] 200 ok, 0 failed       ← all sends "succeeded"
[client] received 3/200          ← 197 silently lost

With USE_ROUTER_MANDATORY = True:

[server] 200 ok, 0 failed
[client] received 200/200        ← backpressure, no loss

Eval Repro

Set socket message buffer limits much lower (e.g. to reach capacity faster) and then run

 uv run vf-eval gsm8k -n16 -r128 -c-1 -d -v -i -m Qwen/Qwen3-4B-Instruct-2507 -p local -t 128 -s

This will never finish as some messages are lost. From this branch, it works tho

Test plan

  • Verified with repro script — 0 message loss with fix
  • Reproduced in real eval run with HWM=10
  • Run existing env server tests
  • Load test under high concurrency to confirm no regressions

🤖 Generated with Claude Code


Note

Medium Risk
Changes core IPC behavior (buffering/backpressure and error handling) which can affect throughput and memory under load, but is localized to ZMQ client/server socket configuration and send path.

Overview
Prevents silent response loss in the ZMQ env transport under high concurrency by enabling ROUTER_MANDATORY on the server and setting SNDHWM/RCVHWM to unlimited (0) on both client and server.

Server response sending is now wrapped in a try/except to log and ignore zmq.ZMQError (e.g., disconnected clients) instead of failing silently/crashing. Also tightens typing in Environment by casting callable dataset/eval_dataset inputs to DatasetBuilder.

Written by Cursor Bugbot for commit 9cd0052. This will update automatically on new commits. Configure here.

ZMQ ROUTER sockets without ROUTER_MANDATORY silently drop messages
when the per-peer send pipe is full — send_multipart() returns success
but the message is discarded by libzmq. With ROUTER_MANDATORY=1, a
full pipe raises EAGAIN, which pyzmq's async layer handles via
backpressure (queues the send and retries when the client drains).

Also unbounds the response path (server SNDHWM=0, client RCVHWM=0)
so completed responses are never dropped — the work is already done,
losing the result is pure waste.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.

@mikasenghaas mikasenghaas changed the title fix: prevent silent message loss on ZMQ ROUTER socket fix: prevent silent message loss on zmq socket Feb 22, 2026
@willccbb willccbb merged commit 333c956 into main Feb 22, 2026
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants