Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions ci/docker/serve.build.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,43 @@ SHELL ["/bin/bash", "-ice"]

COPY . .

# Install HAProxy from source for serve tests (requires 2.2+ for http-request return)
RUN <<EOF
#!/bin/bash
set -euo pipefail

# Install HAProxy build dependencies
sudo apt-get update && sudo apt-get install -y \
build-essential \
curl \
libc6-dev \
liblua5.3-dev \
libpcre3-dev \
libssl-dev \
socat \
wget \
zlib1g-dev \
&& sudo rm -rf /var/lib/apt/lists/*

# Create haproxy user and group
sudo groupadd -r haproxy || true
sudo useradd -r -g haproxy haproxy || true

# Download and compile HAProxy from official source
HAPROXY_VERSION="2.8.12"
HAPROXY_BUILD_DIR="$(mktemp -d)"
wget -O "${HAPROXY_BUILD_DIR}/haproxy.tar.gz" "https://www.haproxy.org/download/2.8/src/haproxy-${HAPROXY_VERSION}.tar.gz"
tar -xzf "${HAPROXY_BUILD_DIR}/haproxy.tar.gz" -C "${HAPROXY_BUILD_DIR}" --strip-components=1
make -C "${HAPROXY_BUILD_DIR}" TARGET=linux-glibc USE_OPENSSL=1 USE_ZLIB=1 USE_PCRE=1 USE_LUA=1 USE_PROMEX=1
sudo make -C "${HAPROXY_BUILD_DIR}" install
rm -rf "${HAPROXY_BUILD_DIR}"

# Create HAProxy directories
sudo mkdir -p /etc/haproxy /run/haproxy /var/log/haproxy
sudo chown -R haproxy:haproxy /run/haproxy

EOF

RUN <<EOF
#!/bin/bash

Expand Down
52 changes: 52 additions & 0 deletions docker/ray/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,60 @@

ARG BASE_IMAGE
ARG FULL_BASE_IMAGE=rayproject/ray-deps:nightly"$BASE_IMAGE"

# --- HAProxy Build Stage ---
FROM ubuntu:22.04 AS haproxy-builder

RUN <<EOF
#!/bin/bash
set -euo pipefail

apt-get update -y
apt-get install -y --no-install-recommends \
build-essential \
ca-certificates \
curl \
libc6-dev \
liblua5.3-dev \
libpcre3-dev \
libssl-dev \
zlib1g-dev

rm -rf /var/lib/apt/lists/*

# Install HAProxy from source
HAPROXY_VERSION="2.8.12"
HAPROXY_BUILD_DIR=$(mktemp -d)
curl -sSfL -o "${HAPROXY_BUILD_DIR}/haproxy.tar.gz" "https://www.haproxy.org/download/2.8/src/haproxy-${HAPROXY_VERSION}.tar.gz"
tar -xzf "${HAPROXY_BUILD_DIR}/haproxy.tar.gz" -C "${HAPROXY_BUILD_DIR}" --strip-components=1
make -C "${HAPROXY_BUILD_DIR}" TARGET=linux-glibc USE_OPENSSL=1 USE_ZLIB=1 USE_PCRE=1 USE_LUA=1 USE_PROMEX=1 -j$(nproc)
make -C "${HAPROXY_BUILD_DIR}" install SBINDIR=/usr/local/bin
rm -rf "${HAPROXY_BUILD_DIR}"

EOF

# --- Main Stage ---
FROM $FULL_BASE_IMAGE

# Copy HAProxy binary from builder stage
COPY --from=haproxy-builder /usr/local/bin/haproxy /usr/local/bin/haproxy

# Install HAProxy runtime dependencies and setup directories
USER root
RUN <<EOF
#!/bin/bash
set -euo pipefail

apt-get update -y
apt-get install -y --no-install-recommends socat liblua5.3-0

mkdir -p /etc/haproxy /run/haproxy /var/log/haproxy
chown -R ray:"$(id -gn ray)" /run/haproxy

rm -rf /var/lib/apt/lists/*
EOF
USER ray

ARG WHEEL_PATH
ARG FIND_LINKS_PATH=".whl"
ARG CONSTRAINTS_FILE="requirements_compiled.txt"
Expand Down
72 changes: 72 additions & 0 deletions python/ray/serve/_private/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,9 @@
# rechecking whether the proxy actor is drained or not.
PROXY_DRAIN_CHECK_PERIOD_S = 5

# Message returned by proxy health check when draining.
DRAINING_MESSAGE = "This node is being drained."

#: Number of times in a row that a replica must fail the health check before
#: being marked unhealthy.
REPLICA_HEALTH_CHECK_UNHEALTHY_THRESHOLD = 3
Expand Down Expand Up @@ -570,6 +573,12 @@
# The message to return when the replica is healthy.
HEALTHY_MESSAGE = "success"

# The message to return when the route table is not populated yet.
NO_ROUTES_MESSAGE = "Route table is not populated yet."

# The message to return when no replicas are available yet.
NO_REPLICAS_MESSAGE = "No replicas are available yet."

# Feature flag to enable a limited form of direct ingress where ingress applications
# listen on port 8000 (HTTP) and 9000 (gRPC). No proxies will be started.
RAY_SERVE_ENABLE_DIRECT_INGRESS = (
Expand Down Expand Up @@ -656,3 +665,66 @@
5000, # 5s
10000, # 10s
]

# Feature flag to use HAProxy
RAY_SERVE_ENABLE_HAPROXY = os.environ.get("RAY_SERVE_ENABLE_HAPROXY", "0") == "1"

# HAProxy configuration defaults
RAY_SERVE_HAPROXY_MAXCONN = int(os.environ.get("RAY_SERVE_HAPROXY_MAXCONN", "20000"))
RAY_SERVE_HAPROXY_NBTHREAD = int(os.environ.get("RAY_SERVE_HAPROXY_NBTHREAD", "4"))
RAY_SERVE_HAPROXY_CONFIG_FILE_LOC = os.environ.get(
"RAY_SERVE_HAPROXY_CONFIG_FILE_LOC", "/tmp/haproxy-serve/haproxy.cfg"
)
RAY_SERVE_HAPROXY_SOCKET_PATH = os.environ.get(
"RAY_SERVE_HAPROXY_SOCKET_PATH", "/tmp/haproxy-serve/admin.sock"
)
RAY_SERVE_ENABLE_HAPROXY_OPTIMIZED_CONFIG = (
os.environ.get("RAY_SERVE_ENABLE_HAPROXY_OPTIMIZED_CONFIG", "1") == "1"
)
RAY_SERVE_HAPROXY_SERVER_STATE_BASE = os.environ.get(
"RAY_SERVE_HAPROXY_SERVER_STATE_BASE", "/tmp/haproxy-serve"
)
RAY_SERVE_HAPROXY_SERVER_STATE_FILE = os.environ.get(
"RAY_SERVE_HAPROXY_SERVER_STATE_FILE", "/tmp/haproxy-serve/server-state"
)
RAY_SERVE_HAPROXY_HARD_STOP_AFTER_S = int(
os.environ.get("RAY_SERVE_HAPROXY_HARD_STOP_AFTER_S", "120")
)
RAY_SERVE_HAPROXY_METRICS_PORT = int(
os.environ.get("RAY_SERVE_HAPROXY_METRICS_PORT", "9101")
)
RAY_SERVE_HAPROXY_SYSLOG_PORT = int(
os.environ.get("RAY_SERVE_HAPROXY_SYSLOG_PORT", "514")
)
RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S = (
int(os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S"))
if os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S")
else None
)
Comment on lines +699 to +703
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This code calls os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S") twice, which is inefficient. You can use the walrus operator := to store the result of the first call and reuse it, making the code more efficient and readable.

Suggested change
RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S = (
int(os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S"))
if os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S")
else None
)
RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S = (
int(val)
if (val := os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S"))
else None
)

RAY_SERVE_HAPROXY_TIMEOUT_CONNECT_S = (
int(os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_CONNECT_S"))
if os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_CONNECT_S")
else None
)
Comment on lines +704 to +708
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This code calls os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_CONNECT_S") twice, which is inefficient. You can use the walrus operator := to store the result of the first call and reuse it, making the code more efficient and readable.

Suggested change
RAY_SERVE_HAPROXY_TIMEOUT_CONNECT_S = (
int(os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_CONNECT_S"))
if os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_CONNECT_S")
else None
)
RAY_SERVE_HAPROXY_TIMEOUT_CONNECT_S = (
int(val)
if (val := os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_CONNECT_S"))
else None
)

RAY_SERVE_HAPROXY_TIMEOUT_CLIENT_S = int(
os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_CLIENT_S", "3600")
)
RAY_SERVE_HAPROXY_HEALTH_CHECK_FALL = int(
os.environ.get("RAY_SERVE_HAPROXY_HEALTH_CHECK_FALL", "2")
)
RAY_SERVE_HAPROXY_HEALTH_CHECK_RISE = int(
os.environ.get("RAY_SERVE_HAPROXY_HEALTH_CHECK_RISE", "2")
)
RAY_SERVE_HAPROXY_HEALTH_CHECK_INTER = os.environ.get(
"RAY_SERVE_HAPROXY_HEALTH_CHECK_INTER", "5s"
)
RAY_SERVE_HAPROXY_HEALTH_CHECK_FASTINTER = os.environ.get(
"RAY_SERVE_HAPROXY_HEALTH_CHECK_FASTINTER", "250ms"
)
RAY_SERVE_HAPROXY_HEALTH_CHECK_DOWNINTER = os.environ.get(
"RAY_SERVE_HAPROXY_HEALTH_CHECK_DOWNINTER", "250ms"
)

# Direct ingress must be enabled if HAProxy is enabled
if RAY_SERVE_ENABLE_HAPROXY:
RAY_SERVE_ENABLE_DIRECT_INGRESS = True
59 changes: 53 additions & 6 deletions python/ray/serve/_private/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
CONTROL_LOOP_INTERVAL_S,
RAY_SERVE_CONTROLLER_CALLBACK_IMPORT_PATH,
RAY_SERVE_ENABLE_DIRECT_INGRESS,
RAY_SERVE_ENABLE_HAPROXY,
RAY_SERVE_RPC_LATENCY_WARNING_THRESHOLD_MS,
RECOVERING_LONG_POLL_BROADCAST_TIMEOUT_S,
SERVE_CONTROLLER_NAME,
Expand All @@ -48,7 +49,10 @@
from ray.serve._private.controller_health_metrics_tracker import (
ControllerHealthMetricsTracker,
)
from ray.serve._private.default_impl import create_cluster_node_info_cache
from ray.serve._private.default_impl import (
create_cluster_node_info_cache,
get_proxy_actor_class,
)
from ray.serve._private.deployment_info import DeploymentInfo
from ray.serve._private.deployment_state import (
DeploymentReplica,
Expand Down Expand Up @@ -187,8 +191,14 @@ async def __init__(
self.cluster_node_info_cache = create_cluster_node_info_cache(self.gcs_client)
self.cluster_node_info_cache.update()

self._ha_proxy_enabled = RAY_SERVE_ENABLE_HAPROXY
self._direct_ingress_enabled = RAY_SERVE_ENABLE_DIRECT_INGRESS
if self._direct_ingress_enabled:
if self._ha_proxy_enabled:
logger.info(
"HAProxy is enabled in ServeController, replacing Serve proxy "
"with HAProxy."
)
elif self._direct_ingress_enabled:
logger.info(
"Direct ingress is enabled in ServeController, enabling proxy "
"on head node only."
Expand All @@ -203,6 +213,7 @@ async def __init__(
cluster_node_info_cache=self.cluster_node_info_cache,
logging_config=self.global_logging_config,
grpc_options=set_proxy_default_grpc_options(grpc_options),
proxy_actor_class=get_proxy_actor_class(),
)
# We modify the HTTP and gRPC options above, so delete them to avoid
del http_options, grpc_options
Expand Down Expand Up @@ -275,7 +286,9 @@ async def __init__(
] = []
self._refresh_autoscaling_deployments_cache()

self._last_broadcasted_target_groups: List[TargetGroup] = []
# Initialize to None (not []) to ensure the first broadcast always happens,
# even if target_groups is empty (e.g., route_prefix=None deployments).
self._last_broadcasted_target_groups: Optional[List[TargetGroup]] = None

def reconfigure_global_logging_config(self, global_logging_config: LoggingConfig):
if (
Expand Down Expand Up @@ -659,6 +672,29 @@ async def run_control_loop_step(
# get all alive replica ids and their node ids.
NodePortManager.prune(self._get_node_id_to_alive_replica_ids())

# HAProxy target group broadcasting
if self._ha_proxy_enabled:
self.broadcast_target_groups_if_changed()

def broadcast_target_groups_if_changed(self) -> None:
"""Broadcast target groups over long poll if they have changed.

Keeps an in-memory record of the last target groups that were broadcast
to determine if they have changed.
"""
target_groups: List[TargetGroup] = self.get_target_groups(
from_proxy_manager=True,
)

# Check if target groups have changed by comparing the objects directly
if self._last_broadcasted_target_groups == target_groups:
return

self.long_poll_host.notify_changed(
{LongPollNamespace.TARGET_GROUPS: target_groups}
)
self._last_broadcasted_target_groups = target_groups

def _create_control_loop_metrics(self):
self.node_update_duration_gauge_s = metrics.Gauge(
"serve_controller_node_update_duration_s",
Expand Down Expand Up @@ -1296,9 +1332,16 @@ def get_target_groups(
that have running replicas, we return target groups for direct ingress.
If there are multiple applications with no running replicas, we return
one target group per application with unique route prefix.
5. HAProxy is enabled and the caller is not an internal proxy manager. In
this case, we return target groups containing the proxies (e.g. haproxy).
6. HAProxy is enabled and the caller is an internal proxy manager (e.g.
haproxy manager). In this case, we return target groups containing the
ingress replicas and possibly the Serve proxies.
"""
proxy_target_groups = self._get_proxy_target_groups()
if not self._direct_ingress_enabled:
if not self._direct_ingress_enabled or (
self._ha_proxy_enabled and not from_proxy_manager
):
return proxy_target_groups

# Get all applications and their metadata
Expand All @@ -1319,6 +1362,10 @@ def get_target_groups(
]

if not apps:
# When HAProxy is enabled and there are no apps, return empty target groups
# so that all requests fall through to the default_backend (404)
if self._ha_proxy_enabled and from_proxy_manager:
return []
return proxy_target_groups

# Create target groups for each application
Expand Down Expand Up @@ -1428,7 +1475,7 @@ def _get_target_groups_for_app_with_no_running_replicas(
TargetGroup(
protocol=RequestProtocol.HTTP,
route_prefix=route_prefix,
targets=http_targets,
targets=[] if self._ha_proxy_enabled else http_targets,
app_name=app_name,
)
)
Expand All @@ -1437,7 +1484,7 @@ def _get_target_groups_for_app_with_no_running_replicas(
TargetGroup(
protocol=RequestProtocol.GRPC,
route_prefix=route_prefix,
targets=grpc_targets,
targets=[] if self._ha_proxy_enabled else grpc_targets,
app_name=app_name,
)
)
Expand Down
12 changes: 12 additions & 0 deletions python/ray/serve/_private/default_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,3 +255,15 @@ def get_controller_impl():
)(ServeController)

return controller_impl


def get_proxy_actor_class():
from ray.serve._private.constants import RAY_SERVE_ENABLE_HAPROXY
from ray.serve._private.proxy import ProxyActor

if RAY_SERVE_ENABLE_HAPROXY:
from ray.serve._private.haproxy import HAProxyManager

return HAProxyManager
else:
return ProxyActor
Loading