Skip to content

Commit a1036cb

Browse files
authored
Add ray docs for custom autoscaling in serve (#57600)
1. add docs under advance autoscaling 2. promote autoscaling_context to public api --------- Signed-off-by: abrar <abrar@anyscale.com>
1 parent d7ac83e commit a1036cb

File tree

10 files changed

+265
-46
lines changed

10 files changed

+265
-46
lines changed

doc/source/serve/advanced-guides/advanced-autoscaling.md

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,3 +439,79 @@ makes more conservative downscaling decisions.
439439
| `downscaling_factor = 1` | `downscaling_factor = 0.5` |
440440
| ------------------------------------------------ | ----------------------------------------------- |
441441
| ![downscale-smooth-before](https://raw.githubusercontent.com/ray-project/images/master/docs/serve/autoscaling-guide/downscale_smoothing_factor_before.png) | ![downscale-smooth-after](https://raw.githubusercontent.com/ray-project/images/master/docs/serve/autoscaling-guide/downscale_smoothing_factor_after.png) |
442+
443+
444+
(serve-custom-autoscaling-policies)=
445+
## Custom autoscaling policies
446+
447+
:::{warning}
448+
Custom autoscaling policies are experimental and may change in future releases.
449+
:::
450+
451+
Ray Serve’s built-in, request-driven autoscaling works well for most apps. Use **custom autoscaling policies** when you need more control—e.g., scaling on external metrics (CloudWatch, Prometheus), anticipating predictable traffic (scheduled batch jobs), or applying business logic that goes beyond queue thresholds.
452+
453+
Custom policies let you implement scaling logic based on any metrics or rules you choose.
454+
455+
### Custom policy for deployment
456+
457+
A custom autoscaling policy is a user-provided Python function that takes an [`AutoscalingContext`](../api/doc/ray.serve.config.AutoscalingContext.rst) and returns a tuple `(target_replicas, policy_state)` for a single Deployment.
458+
459+
* **Current state:** Current replica count and deployment metadata.
460+
* **Built-in metrics:** Total requests, queued requests, per-replica counts.
461+
* **Custom metrics:** Values your deployment reports via `record_autoscaling_stats()`. (See below.)
462+
* **Capacity bounds:** `min` / `max` replica limits adjusted for current cluster capacity.
463+
* **Policy state:** A `dict` you can use to persist arbitrary state across control-loop iterations.
464+
* **Timing:** Timestamps of the last scale actions and “now”.
465+
466+
The following example showcases a policy that scales up during business hours and evening batch processing, and scales down during off-peak hours:
467+
468+
```{literalinclude} ../doc_code/autoscaling_policy.py
469+
:language: python
470+
:start-after: __begin_scheduled_batch_processing_policy__
471+
:end-before: __end_scheduled_batch_processing_policy__
472+
```
473+
474+
```{literalinclude} ../doc_code/scheduled_batch_processing.py
475+
:language: python
476+
:start-after: __serve_example_begin__
477+
:end-before: __serve_example_end__
478+
```
479+
480+
Policies are defined **per deployment**. If you don’t provide one, Ray Serve falls back to its built-in request-based policy.
481+
482+
The policy function is invoked by the Ray Serve controller every `RAY_SERVE_CONTROL_LOOP_INTERVAL_S` seconds (default **0.1s**), so your logic runs against near-real-time state.
483+
484+
:::{warning}
485+
Keep policy functions **fast and lightweight**. Slow logic can block the Serve controller and degrade cluster responsiveness.
486+
:::
487+
488+
489+
### Custom metrics
490+
491+
You can make richer decisions by emitting your own metrics from the deployment. Implement `record_autoscaling_stats()` to return a `dict[str, float]`. Ray Serve will surface these values in the [`AutoscalingContext`](../api/doc/ray.serve.config.AutoscalingContext.rst).
492+
493+
This example demonstrates how deployments can provide their own metrics (CPU usage, memory usage) and how autoscaling policies can use these metrics to make scaling decisions:
494+
495+
```{literalinclude} ../doc_code/autoscaling_policy.py
496+
:language: python
497+
:start-after: __begin_custom_metrics_autoscaling_policy__
498+
:end-before: __end_custom_metrics_autoscaling_policy__
499+
```
500+
501+
```{literalinclude} ../doc_code/custom_metrics_autoscaling.py
502+
:language: python
503+
:start-after: __serve_example_begin__
504+
:end-before: __serve_example_end__
505+
```
506+
507+
:::{note}
508+
The `record_autoscaling_stats()` method can be either synchronous or asynchronous. It must complete within the timeout specified by `RAY_SERVE_RECORD_AUTOSCALING_STATS_TIMEOUT_S` (default 30 seconds).
509+
:::
510+
511+
In your policy, access custom metrics via:
512+
513+
* **`ctx.raw_metrics[metric_name]`** — A mapping of replica IDs to lists of raw metric values.
514+
The number of data points stored for each replica depends on the [`look_back_period_s`](../api/doc/ray.serve.config.AutoscalingConfig.look_back_period_s.rst) (the sliding window size) and `RAY_SERVE_REPLICA_AUTOSCALING_METRIC_RECORD_INTERVAL_S` (the metric recording interval).
515+
* **`ctx.aggregated_metrics[metric_name]`** — A time-weighted average computed from the raw metric values for each replica.
516+
517+
> Today, aggregation is a time-weighted average. In future releases, additional aggregation options may be supported.

doc/source/serve/api/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ See the [model composition guide](serve-model-composition) for how to update cod
8585
serve.config.HTTPOptions
8686
serve.config.AutoscalingConfig
8787
serve.config.AutoscalingPolicy
88+
serve.config.AutoscalingContext
8889
serve.config.AggregationFunction
8990
serve.config.RequestRouterConfig
9091
```
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# __begin_scheduled_batch_processing_policy__
2+
from datetime import datetime
3+
from typing import Any, Dict
4+
from ray.serve.config import AutoscalingContext
5+
6+
7+
def scheduled_batch_processing_policy(
8+
ctx: AutoscalingContext,
9+
) -> tuple[int, Dict[str, Any]]:
10+
current_time = datetime.now()
11+
current_hour = current_time.hour
12+
# Scale up during business hours (9 AM - 5 PM)
13+
if 9 <= current_hour < 17:
14+
return 2, {"reason": "Business hours"}
15+
# Scale up for evening batch processing (6 PM - 8 PM)
16+
elif 18 <= current_hour < 20:
17+
return 4, {"reason": "Evening batch processing"}
18+
# Minimal scaling during off-peak hours
19+
else:
20+
return 1, {"reason": "Off-peak hours"}
21+
22+
23+
# __end_scheduled_batch_processing_policy__
24+
25+
26+
# __begin_custom_metrics_autoscaling_policy__
27+
def custom_metrics_autoscaling_policy(
28+
ctx: AutoscalingContext,
29+
) -> tuple[int, Dict[str, Any]]:
30+
cpu_usage_metric = ctx.aggregated_metrics.get("cpu_usage", {})
31+
memory_usage_metric = ctx.aggregated_metrics.get("memory_usage", {})
32+
max_cpu_usage = max(cpu_usage_metric.values())
33+
max_memory_usage = max(memory_usage_metric.values())
34+
35+
if max_cpu_usage > 80 or max_memory_usage > 85:
36+
return min(ctx.capacity_adjusted_max_replicas, ctx.current_num_replicas + 1), {}
37+
elif max_cpu_usage < 30 and max_memory_usage < 40:
38+
return max(ctx.capacity_adjusted_min_replicas, ctx.current_num_replicas - 1), {}
39+
else:
40+
return ctx.current_num_replicas, {}
41+
42+
43+
# __end_custom_metrics_autoscaling_policy__
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# __serve_example_begin__
2+
import time
3+
from typing import Dict
4+
5+
from ray import serve
6+
7+
8+
@serve.deployment(
9+
autoscaling_config={
10+
"min_replicas": 1,
11+
"max_replicas": 5,
12+
"policy": {
13+
"policy_function": "autoscaling_policy:custom_metrics_autoscaling_policy"
14+
},
15+
},
16+
max_ongoing_requests=5,
17+
)
18+
class CustomMetricsDeployment:
19+
def __init__(self):
20+
self.cpu_usage = 50.0
21+
self.memory_usage = 60.0
22+
23+
def __call__(self) -> str:
24+
time.sleep(0.1)
25+
self.cpu_usage = min(100, self.cpu_usage + 5)
26+
self.memory_usage = min(100, self.memory_usage + 3)
27+
return "Hello, world!"
28+
29+
def record_autoscaling_stats(self) -> Dict[str, float]:
30+
self.cpu_usage = max(20, self.cpu_usage - 2)
31+
self.memory_usage = max(30, self.memory_usage - 1)
32+
return {
33+
"cpu_usage": self.cpu_usage,
34+
"memory_usage": self.memory_usage,
35+
}
36+
37+
38+
# Create the app
39+
app = CustomMetricsDeployment.bind()
40+
# __serve_example_end__
41+
42+
# TODO: uncomment after autoscaling context is populated with all metrics
43+
# if __name__ == "__main__":
44+
# import requests # noqa
45+
46+
# serve.run(app)
47+
# resp = requests.get("http://localhost:8000/")
48+
# assert resp.text == "Hello, world!"
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# __serve_example_begin__
2+
import asyncio
3+
4+
from ray import serve
5+
from ray.serve.config import AutoscalingConfig, AutoscalingPolicy
6+
7+
8+
@serve.deployment(
9+
autoscaling_config=AutoscalingConfig(
10+
min_replicas=1,
11+
max_replicas=12,
12+
policy=AutoscalingPolicy(
13+
policy_function="autoscaling_policy:scheduled_batch_processing_policy"
14+
),
15+
),
16+
max_ongoing_requests=3,
17+
)
18+
class BatchProcessingDeployment:
19+
async def __call__(self) -> str:
20+
# Simulate batch processing work
21+
await asyncio.sleep(0.5)
22+
return "Hello, world!"
23+
24+
25+
app = BatchProcessingDeployment.bind()
26+
# __serve_example_end__
27+
28+
if __name__ == "__main__":
29+
import requests # noqa
30+
31+
serve.run(app)
32+
resp = requests.get("http://localhost:8000/")
33+
assert resp.text == "Hello, world!"

python/ray/serve/_private/autoscaling_state.py

Lines changed: 1 addition & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import logging
22
import time
33
from collections import defaultdict
4-
from dataclasses import dataclass
54
from typing import Any, Dict, List, Optional, Set
65

76
from ray.serve._private.common import (
@@ -25,49 +24,11 @@
2524
merge_timeseries_dicts,
2625
)
2726
from ray.serve._private.utils import get_capacity_adjusted_num_replicas
27+
from ray.serve.config import AutoscalingContext
2828

2929
logger = logging.getLogger(SERVE_LOGGER_NAME)
3030

3131

32-
@dataclass
33-
class AutoscalingContext:
34-
"""Rich context provided to custom autoscaling policies."""
35-
36-
# Deployment information
37-
deployment_id: DeploymentID
38-
deployment_name: str
39-
app_name: Optional[str]
40-
41-
# Current state
42-
current_num_replicas: int
43-
target_num_replicas: int
44-
running_replicas: List[ReplicaID]
45-
46-
# Built-in metrics
47-
total_num_requests: float
48-
queued_requests: Optional[float]
49-
requests_per_replica: Dict[ReplicaID, float]
50-
51-
# Custom metrics
52-
aggregated_metrics: Dict[str, Dict[ReplicaID, float]]
53-
raw_metrics: Dict[str, Dict[ReplicaID, List[float]]]
54-
55-
# Capacity and bounds
56-
capacity_adjusted_min_replicas: int
57-
capacity_adjusted_max_replicas: int
58-
59-
# Policy state
60-
policy_state: Dict[str, Any]
61-
62-
# Timing
63-
last_scale_up_time: Optional[float]
64-
last_scale_down_time: Optional[float]
65-
current_time: Optional[float]
66-
67-
# Config
68-
config: Optional[Any]
69-
70-
7132
class AutoscalingState:
7233
"""Manages autoscaling for a single deployment."""
7334

python/ray/serve/autoscaling_policy.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,8 @@
22
import math
33
from typing import Any, Dict, Optional, Tuple
44

5-
from ray.serve._private.autoscaling_state import AutoscalingContext
65
from ray.serve._private.constants import CONTROL_LOOP_INTERVAL_S, SERVE_LOGGER_NAME
7-
from ray.serve.config import AutoscalingConfig
6+
from ray.serve.config import AutoscalingConfig, AutoscalingContext
87
from ray.util.annotations import PublicAPI
98

109
logger = logging.getLogger(SERVE_LOGGER_NAME)

python/ray/serve/config.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import json
22
import logging
33
import warnings
4+
from dataclasses import dataclass
45
from enum import Enum
56
from typing import Any, Callable, Dict, List, Optional, Union
67

@@ -16,6 +17,9 @@
1617
validator,
1718
)
1819
from ray._common.utils import import_attr
20+
21+
# Import types needed for AutoscalingContext
22+
from ray.serve._private.common import DeploymentID, ReplicaID
1923
from ray.serve._private.constants import (
2024
DEFAULT_AUTOSCALING_POLICY_NAME,
2125
DEFAULT_GRPC_PORT,
@@ -34,6 +38,62 @@
3438
logger = logging.getLogger(SERVE_LOGGER_NAME)
3539

3640

41+
@PublicAPI(stability="alpha")
42+
@dataclass
43+
class AutoscalingContext:
44+
"""Rich context provided to custom autoscaling policies.
45+
46+
This class provides comprehensive information about a deployment's current state,
47+
metrics, and configuration that can be used by custom autoscaling policies to
48+
make intelligent scaling decisions.
49+
50+
The context includes deployment metadata, current replica state, built-in and
51+
custom metrics, capacity bounds, policy state, and timing information.
52+
"""
53+
54+
# Deployment information
55+
deployment_id: DeploymentID #: Unique identifier for the deployment.
56+
deployment_name: str #: Name of the deployment.
57+
app_name: Optional[str] #: Name of the application containing this deployment.
58+
59+
# Current state
60+
current_num_replicas: int #: Current number of running replicas.
61+
target_num_replicas: int #: Target number of replicas set by the autoscaler.
62+
running_replicas: List[ReplicaID] #: List of currently running replica IDs.
63+
64+
# Built-in metrics
65+
total_num_requests: float #: Total number of requests across all replicas.
66+
queued_requests: Optional[float] #: Number of requests currently queued.
67+
requests_per_replica: Dict[
68+
ReplicaID, float
69+
] #: Mapping of replica ID to number of requests.
70+
71+
# Custom metrics
72+
aggregated_metrics: Dict[
73+
str, Dict[ReplicaID, float]
74+
] #: Time-weighted averages of custom metrics per replica.
75+
raw_metrics: Dict[
76+
str, Dict[ReplicaID, List[float]]
77+
] #: Raw custom metric values per replica.
78+
79+
# Capacity and bounds
80+
capacity_adjusted_min_replicas: int #: Minimum replicas adjusted for cluster capacity.
81+
capacity_adjusted_max_replicas: int #: Maximum replicas adjusted for cluster capacity.
82+
83+
# Policy state
84+
policy_state: Dict[
85+
str, Any
86+
] #: Persistent state dictionary for the autoscaling policy.
87+
88+
# Timing
89+
last_scale_up_time: Optional[float] #: Timestamp of last scale-up action.
90+
last_scale_down_time: Optional[float] #: Timestamp of last scale-down action.
91+
current_time: Optional[float] #: Current timestamp.
92+
93+
# Config
94+
config: Optional[Any] #: Autoscaling configuration for this deployment.
95+
96+
3797
@PublicAPI(stability="alpha")
3898
class RequestRouterConfig(BaseModel):
3999
"""Config for the Serve request router.

python/ray/serve/tests/test_autoscaling_policy.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import ray
1515
from ray import serve
1616
from ray._common.test_utils import SignalActor, wait_for_condition
17-
from ray.serve._private.autoscaling_state import AutoscalingContext
1817
from ray.serve._private.common import (
1918
DeploymentID,
2019
DeploymentStatus,
@@ -36,7 +35,7 @@
3635
get_num_alive_replicas,
3736
tlog,
3837
)
39-
from ray.serve.config import AutoscalingConfig, AutoscalingPolicy
38+
from ray.serve.config import AutoscalingConfig, AutoscalingContext, AutoscalingPolicy
4039
from ray.serve.handle import DeploymentHandle
4140
from ray.serve.schema import ApplicationStatus, ServeDeploySchema
4241
from ray.util.state import list_actors

python/ray/serve/tests/unit/test_autoscaling_policy.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,12 @@
22

33
import pytest
44

5-
from ray.serve._private.autoscaling_state import AutoscalingContext
65
from ray.serve._private.constants import CONTROL_LOOP_INTERVAL_S
76
from ray.serve.autoscaling_policy import (
87
_calculate_desired_num_replicas,
98
replica_queue_length_autoscaling_policy,
109
)
11-
from ray.serve.config import AutoscalingConfig
10+
from ray.serve.config import AutoscalingConfig, AutoscalingContext
1211

1312

1413
class TestCalculateDesiredNumReplicas:

0 commit comments

Comments
 (0)