Skip to content

feat: manual deployment of an SLA-based planner for kubernetes #1587

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 24 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 76 additions & 14 deletions benchmarks/profiler/profile_sla.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import signal
import subprocess
import time
from typing import Literal
from typing import Literal, Optional

import matplotlib.pyplot as plt
import numpy as np
Expand Down Expand Up @@ -59,12 +59,32 @@
logger.addHandler(console_handler)


def get_dynamo_serve_cmd(config_file_path):
def get_dynamo_env():
"""Get environment variables with Dynamo runtime configuration"""
env = os.environ.copy()

# Ensure ETCD and NATS are configured for Dynamo runtime
if "ETCD_ENDPOINTS" not in env:
logger.warning("ETCD_ENDPOINTS not set, using default localhost:2379")
env["ETCD_ENDPOINTS"] = "localhost:2379"

if "NATS_SERVER" not in env:
logger.warning("NATS_SERVER not set, using default nats://localhost:4222")
env["NATS_SERVER"] = "nats://localhost:4222"

logger.info(f"Using ETCD_ENDPOINTS: {env['ETCD_ENDPOINTS']}")
logger.info(f"Using NATS_SERVER: {env['NATS_SERVER']}")

return env


def get_dynamo_serve_cmd(config_file_path, disaggregated=False):
config_file_path = os.path.abspath(config_file_path)
graph_target = "graphs.disagg:Frontend" if disaggregated else "graphs.agg:Frontend"
return [
"dynamo",
"serve",
"graphs.agg:Frontend",
graph_target,
"-f",
config_file_path,
]
Expand Down Expand Up @@ -263,6 +283,8 @@ def get_model_name(config: dict) -> str:


def get_port(config: dict) -> int:
if "DYNAMO_PORT" in os.environ:
return int(os.environ["DYNAMO_PORT"])
if "Common" in config and "port" in config["Common"]:
return config["Common"]["port"]
else:
Expand Down Expand Up @@ -292,9 +314,14 @@ def shutdown_deployment(dynamo_process):
time.sleep(5)


def wait_for_server_ready(model_name: str, port: int, timeout: int = 300):
def wait_for_server_ready(
model_name: str, port: int, timeout: int = 300, url: Optional[str] = None
):
logger.info("Waiting for the server to be ready...")
endpoint_url = f"http://localhost:{port}/v1/chat/completions"
if url is not None:
endpoint_url = url
else:
endpoint_url = f"http://localhost:{port}/v1/chat/completions"
start_time = time.time()
server_ready = False

Expand Down Expand Up @@ -342,6 +369,7 @@ def get_kv_cache_size_from_dynamo_log(dynamo_log_fn: str) -> int:
return int(token_count * concurrency)
except Exception as e:
logger.warning(f"Failed to parse KV cache size from line: {line}. Error: {e}")
logger.warning("Failed to parse KV cache size from dynamo log, returning 0")
return 0


Expand Down Expand Up @@ -487,6 +515,18 @@ def benchmark_decode(isl, osl, num_request, genai_perf_artifact_dir, model_name,
default=6,
help="how many samples to benchmark to interpolate ITL under different active kv cache size and decode context length",
)
parser.add_argument(
"--url",
type=str,
default=None,
help="Override the endpoint URL for the LLM frontend (e.g. http://llm-agg-frontend:3000/v1/chat/completions)",
)
parser.add_argument(
"--disaggregated",
action="store_true",
default=False,
help="Use disaggregated mode (graphs.disagg:Frontend) instead of aggregated mode (graphs.agg:Frontend). Default is aggregated mode.",
)
args = parser.parse_args()

if args.example_dir is None:
Expand All @@ -500,14 +540,24 @@ def benchmark_decode(isl, osl, num_request, genai_perf_artifact_dir, model_name,
"Failed to infer example directory, please provide explicitly using --example-dir <path-to-example-dir>"
)
exit(1)
logger.info(f"Example directory: {args.example_dir}")

with open(args.config, "r") as f:
config = yaml.safe_load(f)

# Get the number of available GPUs
available_gpus = get_available_gpu_count()

profile_tp_size = [2**i for i in range(int(math.log2(available_gpus)) + 1)]
if available_gpus == 0:
logger.error("No GPUs detected. This could be due to:")
logger.error("1. NVML library not available in the container")
logger.error("2. No GPUs actually available")
logger.error("3. GPU access not properly configured")
logger.error("Using default TP sizes: [1, 2, 4]")
profile_tp_size = [1, 2, 4]
else:
profile_tp_size = [2**i for i in range(int(math.log2(available_gpus)) + 1)]

logger.info(f"Profiling TP sizes: {profile_tp_size}")

os.makedirs(args.output_dir, exist_ok=True)
Expand Down Expand Up @@ -536,18 +586,19 @@ def benchmark_decode(isl, osl, num_request, genai_perf_artifact_dir, model_name,

# Start the dynamo serve process
logger.info(f"Starting dynamo serve with TP size {tp_size}...")
dynamo_serve_cmd = get_dynamo_serve_cmd(prefill_config_fn)
dynamo_serve_cmd = get_dynamo_serve_cmd(prefill_config_fn, args.disaggregated)
with open(dynamo_log_fn, "w") as dynamo_log_f:
dynamo_process = subprocess.Popen(
dynamo_serve_cmd,
stdout=dynamo_log_f,
stderr=subprocess.STDOUT,
text=True,
cwd=args.example_dir,
env=get_dynamo_env(),
preexec_fn=os.setsid, # Use process group for clean termination
)

if not wait_for_server_ready(model_name, port):
if not wait_for_server_ready(model_name, port, url=args.url):
logger.error(f"Server did not become ready, skip profiling tp={tp_size}")
break

Expand Down Expand Up @@ -616,21 +667,27 @@ def benchmark_decode(isl, osl, num_request, genai_perf_artifact_dir, model_name,

# Start the dynamo serve process
logger.info(f"Starting dynamo serve with TP size {tp_size}...")
dynamo_serve_cmd = get_dynamo_serve_cmd(decode_config_fn)
dynamo_serve_cmd = get_dynamo_serve_cmd(decode_config_fn, args.disaggregated)
logger.info(f"Dynamo serve command: {dynamo_serve_cmd}")
with open(dynamo_log_fn, "w") as dynamo_log_f:
dynamo_process = subprocess.Popen(
dynamo_serve_cmd,
stdout=dynamo_log_f,
stderr=subprocess.STDOUT,
text=True,
cwd=args.example_dir,
env=get_dynamo_env(),
preexec_fn=os.setsid, # Use process group for clean termination
)

if not wait_for_server_ready(model_name, port):
if not wait_for_server_ready(model_name, port, url=args.url):
logger.error(f"Server did not become ready, skip profiling tp={tp_size}")
break

# Print out contents of dynamo_log_fn
with open(dynamo_log_fn, "r") as f:
logger.info(f"Dynamo log contents: {f.read()}")

max_kv_tokens = get_kv_cache_size_from_dynamo_log(dynamo_log_fn)
max_concurrency = max_kv_tokens // (args.isl + args.osl)
sweep_num_request = [
Expand All @@ -639,6 +696,9 @@ def benchmark_decode(isl, osl, num_request, genai_perf_artifact_dir, model_name,
logger.info(
f"Sweeping num_request range based on maximum number of kv tokens: {sweep_num_request}"
)
if len(sweep_num_request) == 0:
logger.error("No num_request to sweep") # TODO: add a potential fix
break

engine_decode_itl = []
engine_decode_thpt_per_gpu = []
Expand Down Expand Up @@ -761,18 +821,19 @@ def benchmark_decode(isl, osl, num_request, genai_perf_artifact_dir, model_name,

# Start the dynamo serve process
logger.info(f"Starting dynamo serve with TP size {tp_size}...")
dynamo_serve_cmd = get_dynamo_serve_cmd(prefill_config_fn)
dynamo_serve_cmd = get_dynamo_serve_cmd(prefill_config_fn, args.disaggregated)
with open(dynamo_log_fn, "w") as dynamo_log_f:
dynamo_process = subprocess.Popen(
dynamo_serve_cmd,
stdout=dynamo_log_f,
stderr=subprocess.STDOUT,
text=True,
cwd=args.example_dir,
env=get_dynamo_env(),
preexec_fn=os.setsid, # Use process group for clean termination
)

if not wait_for_server_ready(model_name, port):
if not wait_for_server_ready(model_name, port, url=args.url):
logger.error(f"Server did not become ready, skip profiling tp={tp_size}")
else:
for isl in range(
Expand Down Expand Up @@ -893,18 +954,19 @@ def benchmark_decode(isl, osl, num_request, genai_perf_artifact_dir, model_name,

# Start the dynamo serve process
logger.info(f"Starting dynamo serve with TP size {tp_size}...")
dynamo_serve_cmd = get_dynamo_serve_cmd(decode_config_fn)
dynamo_serve_cmd = get_dynamo_serve_cmd(decode_config_fn, args.disaggregated)
with open(dynamo_log_fn, "w") as dynamo_log_f:
dynamo_process = subprocess.Popen(
dynamo_serve_cmd,
stdout=dynamo_log_f,
stderr=subprocess.STDOUT,
text=True,
cwd=args.example_dir,
env=get_dynamo_env(),
preexec_fn=os.setsid, # Use process group for clean termination
)

if not wait_for_server_ready(model_name, port):
if not wait_for_server_ready(model_name, port, url=args.url):
logger.error(f"Server did not become ready, skip profiling tp={tp_size}")
else:
max_kv_tokens = get_kv_cache_size_from_dynamo_log(dynamo_log_fn)
Expand Down
4 changes: 2 additions & 2 deletions docs/architecture/sla_planner.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Before using the SLA planner, you must profile the performance of the selected m

```bash
cd $DYNAMO_HOME/benchmarks/profiler/
python -m utils.profile_sla \
python -m profile_sla \
--config <path-to-dynamo-config-file> \
--output-dir <path-to-profile-results-dir> \
--isl <target-input-sequence-length> \
Expand Down Expand Up @@ -143,4 +143,4 @@ A `vllm_v0` example is available for reference:
```bash
cd $DYNAMO_HOME/examples/vllm_v0
dynamo serve graphs.disagg_planner:Frontend -f ./configs/disagg_planner.yaml
```
```
26 changes: 20 additions & 6 deletions examples/llm/components/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,19 +236,23 @@ async def _process_request(self, request_data: Dict[str, Any]):
# Create an async generator function to process this request
async def process_and_stream():
# TODO: queue request at processor when engines are full
router_mode = (await self.etcd_kv_cache.get("router")).decode()

self.use_router = router_mode in (RouterType.KV, RouterType.KV_LOAD)
logger.info(
f"DEBUG: self.use_router = {self.use_router}, self.engine_args.router = '{self.engine_args.router}'"
)

prefix_hit_rate = 0.0 # Default value
if self.use_router:
logger.info("DEBUG: Taking KV router path")
router_generator = await self.router_client.generate(
Tokens(
tokens=engine_prompt["prompt_token_ids"]
).model_dump_json()
)
decision = await router_generator.__anext__()
worker_id, prefix_hit_rate = decision.data()
logger.info(
f"DEBUG: KV router returned worker_id='{worker_id}' (type: {type(worker_id)}), prefix_hit_rate={prefix_hit_rate}"
)
prefix_hit_rate = float(prefix_hit_rate)

# Create request object once with default prefix_hit_rate
Expand All @@ -260,18 +264,28 @@ async def process_and_stream():
).model_dump_json()

if self.use_router:
if worker_id == "":
if worker_id is None or worker_id == "":
logger.info(
"DEBUG: KV router - using generate() because worker_id is None or empty"
)
engine_generator = await self.worker_client.generate(
request_obj
)
else:
logger.info(
f"DEBUG: KV router - using direct() with worker_id={worker_id}"
)
engine_generator = await self.worker_client.direct(
request_obj, int(worker_id)
)
elif router_mode == RouterType.RANDOM:
elif self.engine_args.router == RouterType.RANDOM:
logger.info("DEBUG: Using RANDOM router")
engine_generator = await self.worker_client.generate(request_obj)
elif router_mode == RouterType.ROUND_ROBIN:
elif self.engine_args.router == RouterType.ROUND_ROBIN:
logger.info("DEBUG: Using ROUND_ROBIN router")
engine_generator = await self.worker_client.round_robin(request_obj)
else:
logger.error(f"DEBUG: Unknown router: '{self.engine_args.router}'")

output_generator = self._generate_responses(
engine_generator, request_type
Expand Down
38 changes: 38 additions & 0 deletions manual-configs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
Steps I followed:

Pre-requisite:

Deploy an LLM example (agg first). [LLM deployment guide](../../examples/llm/README.md#kubernetes-deployment) has detailed instructions.

1. Deploy Prometheus:

```bash
kubectl apply -f prometheus-config.yaml
kubectl apply -f prometheus-deployment.yaml
kubectl apply -f prometheus-service.yaml
```

2. Profiling PVC:

```bash
kubectl apply -f profiling-pvc.yaml
```

3. Run the job:

```bash
kubectl apply -f profile-sla-job.yaml
```

To get the image mentioned in line 13:

```bash
# in the main dynamo directory
export DOCKER_SERVER=nvcr.io/nvidian/nim-llm-dev
export IMAGE_TAG=dep-178.1 # or whatever tag
./container/build.sh --target runtime
docker tag dynamo:latest-vllm-runtime $DOCKER_SERVER/dynamo-base-docker-llm:$IMAGE_TAG
docker push $DOCKER_SERVER/dynamo-base-docker-llm:$IMAGE_TAG
```

4. [Not tested yet] Deploy planner-sla and replace the existing LLM planner with it. Not sure whether we can directly apply a DynamoComponentDeployment (ideal) or whether we need to do planner-sla-config.yaml and planner-sla-deployment.yaml (but then how would we attach it to the existing graph?)
33 changes: 33 additions & 0 deletions manual-configs/planner-sla-component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
apiVersion: dynamo.nvidia.com/v1alpha1
kind: DynamoComponentDeployment
metadata:
name: planner-sla
namespace: hannahz-dynamo
spec:
graph: llm-disagg
component:
name: Planner
image: nvcr.io/nvidian/nim-llm-dev/dynamo-base-docker-llm:dep-178.2
command: ["python", "/workspace/examples/common/components/planner_sla.py"]
env:
- name: DYN_DEPLOYMENT_CONFIG
valueFrom:
fieldRef:
fieldPath: metadata.annotations['dynamo.nvidia.com/deployment-config']
- name: PROMETHEUS_ENDPOINT
value: "http://prometheus:9090"
- name: PROFILE_RESULTS_DIR
value: "/workspace/profiling_results"
volumeMounts:
- name: profiling-results
mountPath: /workspace/profiling_results
resources:
requests:
cpu: "1"
memory: "2Gi"
imagePullSecrets:
- name: docker-imagepullsecret
volumes:
- name: profiling-results
persistentVolumeClaim:
claimName: profiling-pvc
11 changes: 11 additions & 0 deletions manual-configs/planner-sla-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: planner-config
namespace: hannahz-dynamo
data:
planner.yaml: |
Planner:
environment: kubernetes
prometheus-endpoint: "http://prometheus:9090"
profile-results-dir: "/workspace/profiling_results"
Loading
Loading