Skip to content

feat: optimize zmq receive#131

Merged
viraatc merged 2 commits intomainfrom
feat/viraatc-zmq-opt
Mar 9, 2026
Merged

feat: optimize zmq receive#131
viraatc merged 2 commits intomainfrom
feat/viraatc-zmq-opt

Conversation

@viraatc
Copy link
Copy Markdown
Collaborator

@viraatc viraatc commented Feb 18, 2026

What does this PR do?

ZMQ receiver optimization — use recv_into with a pre-allocated buffer + memoryview slice instead of per-message recv(copy=False), avoiding per-message bytes allocation.

before:

  Query            32 chars (  128 B): issue=  400,200 msg/s    51.2 MB/s, recv=  399,800 msg/s    51.2 MB/s
  QueryResult      32 chars (  113 B): issue=  403,800 msg/s    45.6 MB/s, recv=  403,400 msg/s    45.6 MB/s
  StreamChunk      32 chars (   96 B): issue=  463,200 msg/s    44.5 MB/s, recv=  462,800 msg/s    44.4 MB/s
  Query           128 chars (  224 B): issue=  408,800 msg/s    91.6 MB/s, recv=  408,400 msg/s    91.5 MB/s
  QueryResult     128 chars (  209 B): issue=  410,600 msg/s    85.8 MB/s, recv=  410,200 msg/s    85.7 MB/s
  StreamChunk     128 chars (  192 B): issue=  468,200 msg/s    89.9 MB/s, recv=  467,800 msg/s    89.8 MB/s
  Query           512 chars (  609 B): issue=  394,200 msg/s   240.1 MB/s, recv=  393,800 msg/s   239.8 MB/s
  QueryResult     512 chars (  594 B): issue=  389,600 msg/s   231.4 MB/s, recv=  389,200 msg/s   231.2 MB/s
  StreamChunk     512 chars (  577 B): issue=  441,000 msg/s   254.5 MB/s, recv=  440,644 msg/s   254.3 MB/s
  Query          1024 chars ( 1121 B): issue=  365,200 msg/s   409.4 MB/s, recv=  364,800 msg/s   408.9 MB/s
  QueryResult    1024 chars ( 1106 B): issue=  372,000 msg/s   411.4 MB/s, recv=  371,600 msg/s   411.0 MB/s
  StreamChunk    1024 chars ( 1089 B): issue=  391,200 msg/s   426.0 MB/s, recv=  390,800 msg/s   425.6 MB/s
  Query          4096 chars ( 4193 B): issue=  273,000 msg/s  1144.7 MB/s, recv=  272,600 msg/s  1143.0 MB/s
  QueryResult    4096 chars ( 4178 B): issue=  282,200 msg/s  1179.0 MB/s, recv=  281,895 msg/s  1177.8 MB/s
  StreamChunk    4096 chars ( 4161 B): issue=  290,800 msg/s  1210.0 MB/s, recv=  290,400 msg/s  1208.4 MB/s
  Query         16384 chars (16481 B): issue=  133,600 msg/s  2201.9 MB/s, recv=  129,417 msg/s  2132.9 MB/s
  QueryResult   16384 chars (16466 B): issue=  147,000 msg/s  2420.5 MB/s, recv=  146,600 msg/s  2413.9 MB/s
  StreamChunk   16384 chars (16449 B): issue=  142,000 msg/s  2335.8 MB/s, recv=  128,948 msg/s  2121.1 MB/s
  Query         32768 chars (32865 B): issue=   69,000 msg/s  2267.7 MB/s, recv=   68,600 msg/s  2254.5 MB/s
  QueryResult   32768 chars (32850 B): issue=   63,800 msg/s  2095.8 MB/s, recv=   63,400 msg/s  2082.7 MB/s
  StreamChunk   32768 chars (32833 B): issue=   73,200 msg/s  2403.4 MB/s, recv=   72,800 msg/s  2390.2 MB/s

after:

  Query            32 chars (  128 B): issue=  460,000 msg/s    58.9 MB/s, recv=  459,600 msg/s    58.8 MB/s
  QueryResult      32 chars (  113 B): issue=  450,800 msg/s    50.9 MB/s, recv=  450,400 msg/s    50.9 MB/s
  StreamChunk      32 chars (   96 B): issue=  541,600 msg/s    52.0 MB/s, recv=  541,200 msg/s    52.0 MB/s
  Query           128 chars (  224 B): issue=  467,800 msg/s   104.8 MB/s, recv=  467,400 msg/s   104.7 MB/s
  QueryResult     128 chars (  209 B): issue=  467,200 msg/s    97.6 MB/s, recv=  466,800 msg/s    97.6 MB/s
  StreamChunk     128 chars (  192 B): issue=  519,400 msg/s    99.7 MB/s, recv=  519,000 msg/s    99.6 MB/s
  Query           512 chars (  609 B): issue=  448,400 msg/s   273.1 MB/s, recv=  448,000 msg/s   272.8 MB/s
  QueryResult     512 chars (  594 B): issue=  439,800 msg/s   261.2 MB/s, recv=  439,400 msg/s   261.0 MB/s
  StreamChunk     512 chars (  577 B): issue=  485,200 msg/s   280.0 MB/s, recv=  484,800 msg/s   279.7 MB/s
  Query          1024 chars ( 1121 B): issue=  410,800 msg/s   460.5 MB/s, recv=  410,400 msg/s   460.1 MB/s
  QueryResult    1024 chars ( 1106 B): issue=  414,800 msg/s   458.8 MB/s, recv=  414,400 msg/s   458.3 MB/s
  StreamChunk    1024 chars ( 1089 B): issue=  443,600 msg/s   483.1 MB/s, recv=  443,200 msg/s   482.6 MB/s
  Query          4096 chars ( 4193 B): issue=  301,800 msg/s  1265.4 MB/s, recv=  301,400 msg/s  1263.8 MB/s
  QueryResult    4096 chars ( 4178 B): issue=  297,600 msg/s  1243.4 MB/s, recv=  297,200 msg/s  1241.7 MB/s
  StreamChunk    4096 chars ( 4161 B): issue=  330,600 msg/s  1375.6 MB/s, recv=  330,200 msg/s  1374.0 MB/s
  Query         16384 chars (16481 B): issue=  148,000 msg/s  2439.2 MB/s, recv=  147,600 msg/s  2432.6 MB/s
  QueryResult   16384 chars (16466 B): issue=  145,600 msg/s  2397.4 MB/s, recv=  145,089 msg/s  2389.0 MB/s
  StreamChunk   16384 chars (16449 B): issue=  159,200 msg/s  2618.7 MB/s, recv=  158,800 msg/s  2612.1 MB/s
  Query         32768 chars (32865 B): issue=   73,600 msg/s  2418.9 MB/s, recv=   73,200 msg/s  2405.7 MB/s
  QueryResult   32768 chars (32850 B): issue=   80,200 msg/s  2634.6 MB/s, recv=   79,800 msg/s  2621.4 MB/s
  StreamChunk   32768 chars (32833 B): issue=   72,800 msg/s  2390.2 MB/s, recv=   72,400 msg/s  2377.1 MB/s

Type of change

  • Bug fix
  • New feature
  • Documentation update
  • Refactor/cleanup

Related issues

Testing

  • Tests added/updated
  • All tests pass locally
  • Manual testing completed

Checklist

  • Code follows project style
  • Pre-commit hooks pass
  • Documentation updated (if needed)

Copilot AI review requested due to automatic review settings February 18, 2026 23:51
@viraatc viraatc requested a review from a team as a code owner February 18, 2026 23:51
@github-actions
Copy link
Copy Markdown

github-actions bot commented Feb 18, 2026

MLCommons CLA bot All contributors have signed the MLCommons CLA ✍️ ✅

@gemini-code-assist
Copy link
Copy Markdown

Summary of Changes

Hello @viraatc, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces significant performance enhancements and code quality improvements, primarily by optimizing ZMQ message reception through pre-allocated buffers and migrating JSON handling to msgspec.json. It also refines the HTTP client's response retrieval mechanisms with new non-blocking methods and strengthens connection management, resulting in a more efficient and robust system.

Highlights

  • ZMQ Receive Optimization: Implemented a significant optimization for ZMQ message reception by utilizing pre-allocated bytearray buffers and recv_into to avoid per-message memory allocations and data copying, leading to substantial performance improvements as demonstrated by the provided benchmarks.
  • JSON Library Migration: Replaced the orjson library with msgspec.json across various modules for JSON serialization and deserialization, standardizing on msgspec which is already used for MessagePack.
  • HTTP Client Refinements: Enhanced the HTTPEndpointClient with new non-blocking poll() and drain() methods for response retrieval, made the shutdown() method idempotent, and removed several redundant assert statements for improved code robustness and clarity.
  • Improved Connection Stale Check: Updated the PooledConnection's is_stale method to use select.poll() instead of select.select(), addressing limitations with file descriptors greater than 1024 and improving reliability in high-concurrency scenarios.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • pyproject.toml
    • Removed 'orjson' dependency.
  • src/inference_endpoint/async_utils/transport/zmq/transport.py
    • Added _recv_buf and _recv_view attributes to _ZmqReceiverTransport for pre-allocated receive buffers.
    • Introduced recv_buf_size parameter to _ZmqReceiverTransport constructor, defaulting to 4MB.
    • Modified _on_readable to use self._sock.recv_into with the pre-allocated buffer and memoryview slicing for decoding.
    • Added logging for truncated ZMQ messages when nbytes exceeds buffer length.
    • Passed config.recv_buffer_size to _ZmqReceiverTransport during receiver creation.
  • src/inference_endpoint/endpoint_client/config.py
    • Updated type hints for adapter, accumulator, and worker_pool_transport to use type directly with None assignment and type: ignore.
  • src/inference_endpoint/endpoint_client/http.py
    • Added _poller attribute to PooledConnection to store a select.poll object.
    • Modified is_stale method to use select.poll() for checking connection readability, addressing select() limitations with high file descriptor counts.
  • src/inference_endpoint/endpoint_client/http_client.py
    • Updated docstring example for AsyncHttpEndpointClient to reflect new poll(), recv(), and drain() methods.
    • Added a TODO comment regarding _zmq_context ownership.
    • Removed redundant assert statements for config.adapter, config.accumulator, and config.worker_pool_transport.
    • Refactored drain() method to use list(iter(self.poll, None)) for more concise code.
    • Made shutdown() method idempotent by adding a check for self._shutdown.
  • src/inference_endpoint/endpoint_client/http_sample_issuer.py
    • Removed redundant assert self.http_client.loop is not None.
  • src/inference_endpoint/endpoint_client/worker.py
    • Changed initialization of _pool, _http_template, _loop, _requests, and _responses to occur within the run() method instead of __init__.
    • Removed redundant assert statements for self._loop, self._http_template, self._pool, self._responses, and self.http_config.accumulator.
    • Updated _fire_request to return PooledConnection | None instead of bool.
    • Modified _process_response, _handle_streaming_body, and _handle_non_streaming_body to directly accept PooledConnection as an argument.
    • Added an explicit assertion message for event_logs_dir when record_worker_events is enabled.
  • src/inference_endpoint/evaluation/scoring.py
    • Replaced orjson import with msgspec.json.
    • Updated _load_sample_index_map to use msgspec.json.decode.
    • Updated get_outputs to use msgspec.json.decode for event parsing.
    • Updated _evaluate_via_websocket to use msgspec.json.encode for sending requests and msgspec.json.decode for receiving data.
    • Updated _evaluate_via_subprocess to use msgspec.json.decode for parsing stdout and changed exception handling to msgspec.DecodeError.
  • src/inference_endpoint/load_generator/session.py
    • Replaced orjson import with msgspec.json.
    • Updated _run_test to use msgspec.json.format and msgspec.json.encode for writing runtime settings and sample index map.
  • src/inference_endpoint/metrics/recorder.py
    • Replaced orjson import with msgspec.json.
    • Updated record_event to use msgspec.json.encode for data serialization and changed exception handling to msgspec.EncodeError.
  • src/inference_endpoint/metrics/reporter.py
    • Replaced orjson import with msgspec.json.
    • Updated comment regarding JSON library support for numpy dtypes to msgspec.
    • Updated to_json to use msgspec.json.format and msgspec.json.encode for report serialization.
    • Updated output_sequence_from_data to use msgspec.json.decode and changed exception handling to msgspec.DecodeError.
    • Updated dump_to_json to use msgspec.json.decode for event data and msgspec.json.encode for writing JSON objects to file.
  • src/inference_endpoint/openai/openai_adapter.py
    • Removed orjson import.
    • Updated encode_request to use msgspec.json.encode.
    • Updated decode_endpoint_response to use msgspec.json.decode.
  • src/inference_endpoint/utils/benchmark_httpclient.py
    • Imported ManagedZMQContext.
    • Modified _create_client to initialize AsyncHttpEndpointClient with a ManagedZMQContext.
  • tests/conftest.py
    • Replaced orjson import with msgspec.json.
    • Updated events_db fixture to use msgspec.json.encode for event data.
  • tests/integration/endpoint_client/test_http_client.py
    • Added helper functions _create_client and _make_query.
    • Introduced http_client fixture for testing.
    • Added TestPoll class with tests for non-blocking poll() behavior.
    • Added TestRecv class with tests for blocking recv() behavior.
    • Added TestDrain class with tests for non-blocking drain() bulk retrieval.
    • Added TestShutdown class with tests for idempotent shutdown and post-shutdown request handling.
  • tests/unit/metrics/test_recorder.py
    • Replaced orjson import with msgspec.json.
    • Updated test_event_row_to_insert_params and test_event_row_integration_with_sqlite to use msgspec.json.encode and msgspec.json.decode for JSON data.
  • tests/unit/metrics/test_reporter.py
    • Replaced orjson import with msgspec.json.
    • Updated test_get_output_sequence_lengths_with_stop_performance_tracking and test_create_report_with_stop_performance_tracking to use msgspec.json.encode for event data.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@viraatc viraatc changed the base branch from main to feat/viraatc-drop-orjson February 18, 2026 23:51
@viraatc viraatc changed the base branch from feat/viraatc-drop-orjson to feat/viraatc-cleanup-asserts February 18, 2026 23:52
@viraatc viraatc force-pushed the feat/viraatc-zmq-opt branch from f08032d to 470c930 Compare February 18, 2026 23:53
Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces several significant optimizations and refactorings. The switch to recv_into for ZMQ message receiving is a solid performance improvement, reducing memory allocations. Migrating from orjson to msgspec for JSON handling should also provide a noticeable speed boost. Additionally, the codebase has been cleaned up by removing redundant assertions and improving code style, such as in the drain method. The fix in is_stale to use poll() instead of select() correctly addresses limitations with high file descriptor numbers.

However, I've identified a critical regression regarding thread-safety in the HTTPEndpointClient. The removal of the issue method from this synchronous client wrapper makes it unsafe to call from multiple threads, which was a key feature of its previous design.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request optimizes ZMQ receive performance and migrates JSON serialization from orjson to msgspec.json. The main optimization replaces ZMQ's recv() with recv_into() using a pre-allocated 4MB buffer to eliminate per-message allocations. Performance benchmarks in the PR description show 10-20% throughput improvements across various message sizes. Additionally, the PR removes redundant assert statements, improves type hints, adds comprehensive test coverage for poll/recv/drain methods, and makes various code quality improvements.

Changes:

  • Optimize ZMQ receive with pre-allocated buffer and recv_into() for zero-copy deserialization
  • Replace orjson with msgspec.json throughout the codebase
  • Add comprehensive integration tests for HTTPEndpointClient poll/recv/drain methods
  • Remove redundant assertions and improve type hints in worker.py, config.py, and http_client.py

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
pyproject.toml Remove orjson dependency
src/inference_endpoint/async_utils/transport/zmq/transport.py Add pre-allocated buffer and recv_into() optimization for ZMQ receiver
src/inference_endpoint/endpoint_client/http_client.py Remove issue() override, simplify drain() implementation, add shutdown idempotency
src/inference_endpoint/endpoint_client/http.py Replace select() with poll() to handle file descriptors >= 1024
src/inference_endpoint/endpoint_client/worker.py Remove redundant assertions, pass connection explicitly to handlers
src/inference_endpoint/endpoint_client/config.py Improve type hints for post_init initialized fields
src/inference_endpoint/endpoint_client/http_sample_issuer.py Remove redundant assertion
src/inference_endpoint/utils/benchmark_httpclient.py Add ManagedZMQContext with 4 I/O threads for benchmark client
src/inference_endpoint/openai/openai_adapter.py Migrate from orjson to msgspec.json
src/inference_endpoint/metrics/reporter.py Migrate from orjson to msgspec.json
src/inference_endpoint/metrics/recorder.py Migrate from orjson to msgspec.json
src/inference_endpoint/load_generator/session.py Migrate from orjson to msgspec.json with msgspec.json.format for pretty printing
src/inference_endpoint/evaluation/scoring.py Migrate from orjson to msgspec.json, update exception handling
tests/integration/endpoint_client/test_http_client.py Add comprehensive tests for poll/recv/drain methods and shutdown behavior
tests/unit/metrics/test_reporter.py Update tests to use msgspec.json
tests/unit/metrics/test_recorder.py Update tests to use msgspec.json
tests/conftest.py Update fixtures to use msgspec.json

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Copy Markdown
Collaborator

@arekay-nv arekay-nv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@viraatc viraatc force-pushed the feat/viraatc-zmq-opt branch from 470c930 to db8aabb Compare March 6, 2026 22:49
@viraatc viraatc changed the base branch from feat/viraatc-cleanup-asserts to main March 6, 2026 22:54
Copilot AI review requested due to automatic review settings March 6, 2026 23:02
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@viraatc viraatc force-pushed the feat/viraatc-zmq-opt branch from 9eb3d06 to b82ad3d Compare March 9, 2026 21:57
Copilot AI review requested due to automatic review settings March 9, 2026 22:06
@viraatc viraatc force-pushed the feat/viraatc-zmq-opt branch from b82ad3d to ec21306 Compare March 9, 2026 22:06
@viraatc viraatc force-pushed the feat/viraatc-zmq-opt branch from ec21306 to 545436b Compare March 9, 2026 22:09
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@viraatc viraatc force-pushed the feat/viraatc-zmq-opt branch from 545436b to a6b211c Compare March 9, 2026 22:11
Copilot AI review requested due to automatic review settings March 9, 2026 22:18
@viraatc viraatc force-pushed the feat/viraatc-zmq-opt branch from a6b211c to b172ed8 Compare March 9, 2026 22:18
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@viraatc viraatc merged commit cb804d7 into main Mar 9, 2026
4 checks passed
@viraatc viraatc deleted the feat/viraatc-zmq-opt branch March 9, 2026 22:54
@github-actions github-actions bot locked and limited conversation to collaborators Mar 9, 2026
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants