-
Notifications
You must be signed in to change notification settings - Fork 7.2k
[Serve] Add HAProxy support for Ray Serve #60586
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces HAProxy support to Ray Serve, which is a significant and valuable feature. The implementation is comprehensive, covering HAProxy process management, dynamic configuration generation, and extensive testing. The code is well-structured, with a clear separation of concerns. I have a few suggestions to improve robustness and efficiency, such as replacing an external socat dependency with native Python asyncio for socket communication and refining how environment variables are read. Overall, this is a solid contribution.
| async def _send_socket_command(self, command: str) -> str: | ||
| """Send a command to the HAProxy stats socket via subprocess.""" | ||
| try: | ||
| # Check if a socket file exists | ||
| if not os.path.exists(self.cfg.socket_path): | ||
| raise RuntimeError( | ||
| f"HAProxy socket file does not exist: {self.cfg.socket_path}." | ||
| ) | ||
|
|
||
| proc = await asyncio.create_subprocess_exec( | ||
| "socat", | ||
| "-", | ||
| f"UNIX-CONNECT:{self.cfg.socket_path}", | ||
| stdin=asyncio.subprocess.PIPE, | ||
| stdout=asyncio.subprocess.PIPE, | ||
| stderr=asyncio.subprocess.PIPE, | ||
| ) | ||
|
|
||
| try: | ||
| stdout, stderr = await asyncio.wait_for( | ||
| proc.communicate(f"{command}\n".encode("utf-8")), timeout=5.0 | ||
| ) | ||
| except asyncio.TimeoutError: | ||
| proc.kill() | ||
| await proc.wait() | ||
| raise RuntimeError( | ||
| f"Timeout while sending command '{command}' to HAProxy socket" | ||
| ) | ||
|
|
||
| if proc.returncode != 0: | ||
| err = stderr.decode("utf-8", errors="ignore").strip() | ||
| raise RuntimeError( | ||
| f"Command '{command}' failed with code {proc.returncode}: {err}" | ||
| ) | ||
|
|
||
| result = stdout.decode("utf-8", errors="ignore") | ||
| logger.debug(f"Socket command '{command}' returned {len(result)} chars.") | ||
| return result | ||
| except Exception as e: | ||
| raise RuntimeError(f"Failed to send socket command '{command}': {e}") | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The _send_socket_command method currently uses socat as a subprocess to communicate with the HAProxy admin socket. This introduces a dependency on an external command-line tool (socat) which might not be available in all environments, and is less efficient than using Python's native socket library.
As noted in the TODO on line 748, this can be improved by using Python's asyncio library to interact with the UNIX domain socket directly. This removes the external dependency and is more robust and performant.
Here's a suggested implementation:
async def _send_socket_command(self, command: str) -> str:
"""Send a command to the HAProxy stats socket."""
try:
reader, writer = await asyncio.wait_for(
asyncio.open_unix_connection(self.cfg.socket_path),
timeout=5.0
)
try:
writer.write(f"{command}\n".encode("utf-8"))
await writer.drain()
response_bytes = await asyncio.wait_for(reader.read(), timeout=5.0)
result = response_bytes.decode("utf-8", errors="ignore")
logger.debug(f"Socket command '{command}' returned {len(result)} chars.")
return result
finally:
writer.close()
await writer.wait_closed()
except FileNotFoundError:
raise RuntimeError(
f"HAProxy socket file does not exist: {self.cfg.socket_path}."
)
except asyncio.TimeoutError:
raise RuntimeError(
f"Timeout while sending command '{command}' to HAProxy socket"
)
except Exception as e:
raise RuntimeError(f"Failed to send socket command '{command}': {e}")| RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S = ( | ||
| int(os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S")) | ||
| if os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S") | ||
| else None | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code calls os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S") twice, which is inefficient. You can use the walrus operator := to store the result of the first call and reuse it, making the code more efficient and readable.
| RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S = ( | |
| int(os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S")) | |
| if os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S") | |
| else None | |
| ) | |
| RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S = ( | |
| int(val) | |
| if (val := os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S")) | |
| else None | |
| ) |
| RAY_SERVE_HAPROXY_TIMEOUT_CONNECT_S = ( | ||
| int(os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_CONNECT_S")) | ||
| if os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_CONNECT_S") | ||
| else None | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code calls os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_CONNECT_S") twice, which is inefficient. You can use the walrus operator := to store the result of the first call and reuse it, making the code more efficient and readable.
| RAY_SERVE_HAPROXY_TIMEOUT_CONNECT_S = ( | |
| int(os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_CONNECT_S")) | |
| if os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_CONNECT_S") | |
| else None | |
| ) | |
| RAY_SERVE_HAPROXY_TIMEOUT_CONNECT_S = ( | |
| int(val) | |
| if (val := os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_CONNECT_S")) | |
| else None | |
| ) |
| ) -> None: | ||
| start_time = time.time() | ||
|
|
||
| # TODO: update this to use health checks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As this TODO suggests, using an HTTP health check would be a more robust way to verify HAProxy's availability. The current implementation checks if the admin socket is responsive, but an HTTP check would verify that the frontend is up and correctly configured to serve traffic. You could use an HTTP client like httpx to make a request to http://127.0.0.1:{self.cfg.frontend_port}/-/healthz and check for a successful response.
This PR adds HAProxy support to OSS Ray Serve. HAProxy can be used as an alternative to the default Serve HTTP proxy, providing load balancing with features like graceful reloads and health checks. Key changes: - Add haproxy.py and haproxy_templates.py for HAProxy management - Add HAProxy-related constants to constants.py (RAY_SERVE_ENABLE_HAPROXY, etc.) - Add TARGET_GROUPS to LongPollNamespace for broadcasting target updates - Add get_proxy_actor_class() to default_impl.py for proxy selection - Modify controller.py to support HAProxy mode with target group broadcasting - Add comprehensive test coverage The feature is disabled by default and can be enabled by setting the RAY_SERVE_ENABLE_HAPROXY=1 environment variable. Signed-off-by: Seiji Eicher <seiji@anyscale.com>
…ests - Add DRAINING_MESSAGE constant to constants.py (used by haproxy.py) - Add BUILD.bazel targets for HAProxy integration tests - Add BUILD.bazel target for HAProxy controller unit test Signed-off-by: Seiji Eicher <seiji@anyscale.com>
- Change test size to 'large' - Add RAY_SERVE_DIRECT_INGRESS_MIN_DRAINING_PERIOD_S env variable - Add 'haproxy' tag to test targets Signed-off-by: Seiji Eicher <seiji@anyscale.com>
These constants are needed by haproxy.py but were not defined in constants.py. The same constants are also defined in proxy_router.py for the default proxy. Signed-off-by: Seiji Eicher <seiji@anyscale.com>
The controller.py was missing: 1. Import of get_proxy_actor_class from default_impl 2. Passing proxy_actor_class=get_proxy_actor_class() to ProxyStateManager Without this, the default ProxyActor was always used instead of HAProxyManager even when HAProxy was enabled. Signed-off-by: Seiji Eicher <seiji@anyscale.com>
f0e8a87 to
9d7a87e
Compare
- Add HAProxy build stage to docker/ray/Dockerfile with multi-stage build - Install HAProxy 2.8.12 from source with Prometheus exporter support - Add _dump_ingress_cache_for_testing method to HAProxyManager for test parity Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
The apt-get installed HAProxy 2.0.x doesn't support directives needed by the tests (option idle-close-on-response and http-request return require HAProxy 2.2+). Build from source to get version 2.8.12. Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Summary
RAY_SERVE_ENABLE_HAPROXY=1environment variableChanges
haproxy.py(1209 lines) andhaproxy_templates.py(132 lines) for HAProxy managementconstants.py(RAY_SERVE_ENABLE_HAPROXY, timeouts, health check settings, etc.)TARGET_GROUPStoLongPollNamespacefor broadcasting target group updatesget_proxy_actor_class()todefault_impl.pyfor proxy selection based on feature flagcontroller.pyto support HAProxy mode with target group broadcastingTest plan
RAY_SERVE_ENABLE_HAPROXY=1 pytest python/ray/serve/tests/test_haproxy.py -vRAY_SERVE_ENABLE_HAPROXY=1 pytest python/ray/serve/tests/test_haproxy_api.py -vRAY_SERVE_ENABLE_HAPROXY=1 pytest python/ray/serve/tests/test_metrics_haproxy.py -vRAY_SERVE_ENABLE_HAPROXY=1 pytest python/ray/serve/tests/unit/test_controller_haproxy.py -vRAY_SERVE_ENABLE_HAPROXY=0, behavior should be unchangedpython -c "from ray.serve._private.haproxy import HAProxyManager; print('OK')"