- 
                Notifications
    You must be signed in to change notification settings 
- Fork 92
Scheduler refactor [utils]: multiprocessing encoding and messaging #293
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Scheduler refactor [utils]: multiprocessing encoding and messaging #293
Conversation
d463e77    to
    02f97ff      
    Compare
  
    8831554    to
    da5effc      
    Compare
  
    There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces comprehensive multiprocessing utilities for the scheduler refactor, focusing on encoding and messaging capabilities to optimize data transfer between processes. The implementation provides configurable serialization strategies, binary encoding options, and high-performance interprocess communication mechanisms.
- Adds robust message encoding system with support for Pydantic models, configurable serialization (dict/sequence), and binary encoding (msgpack/msgspec)
- Implements abstract messaging framework with concrete implementations for queues, manager queues, and pipes
- Provides comprehensive test coverage for all new utility modules including smoke, sanity, and regression scenarios
Reviewed Changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description | 
|---|---|
| tests/unit/utils/test_text.py | Fixes spelling error in test docstring (puncutation → punctuation) | 
| tests/unit/utils/test_messaging.py | Comprehensive test suite for messaging utilities with multiprocessing scenarios | 
| tests/unit/utils/test_encoding.py | Complete test coverage for encoding utilities with various data types and serialization strategies | 
| src/guidellm/utils/messaging.py | Core messaging abstractions and implementations for interprocess communication | 
| src/guidellm/utils/encoding.py | Message encoding utilities with Pydantic support and configurable serialization/encoding | 
| src/guidellm/utils/init.py | Exports new encoding and messaging utilities from utils module | 
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
7b74f29    to
    e8300a9      
    Compare
  
    fc47105    to
    b70ee98      
    Compare
  
    Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Mark Kurtz <mark.j.kurtz@gmail.com>
… for the scheduler refactor
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Mark Kurtz <mark.j.kurtz@gmail.com>
1ecc54a    to
    cff8d91      
    Compare
  
    Co-authored-by: Samuel Monson <smonson@redhat.com> Signed-off-by: Mark Kurtz <mark.j.kurtz@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Just one minor comment.
| async def get(self, timeout: float | None = None) -> ReceiveMessageT: | ||
| """ | ||
| Retrieve message from receive buffer with optional timeout. | ||
|  | ||
| :param timeout: Maximum time to wait for a message | ||
| :return: Decoded message from the receive buffer | ||
| """ | ||
| return await asyncio.wait_for( | ||
| self.buffer_receive_queue.async_get(), timeout=timeout | ||
| ) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this function return a TimeoutError when it times out? If so I think it would be beneficial to document that on all of these functions that have a timeout.
| Resetting the refactor state with new PRs, closing this one out | 
Summary
This PR introduces four new utility modules that provide improved capabilities for transferring information across processes to maximize performance. This is done through encoding.py to best serialize the data into a transferrable format and minimize the size of the data as well as through messaging.py which supports high performance multiprocessing queues and pipes for transfer that works with the encoding.
Details
MessageEncoding.pyInterProcessMessaging,InterProcessMessagingManagerQueue,InterProcessMessagingPipe,InterProcessMessagingQueueTest Plan
Related Issues
Use of AI
## WRITTEN BY AI ##)