Skip to content
50 changes: 26 additions & 24 deletions fastdeploy/engine/sched/resource_manager_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -1110,36 +1110,38 @@ def has_resource_for_prefilled_req(self, request_id: str):
Check whether there are enough slot and gpu resource for the prefilled request,
of which the cache is saved in cpu buffer.
"""
assert self.config.scheduler_config.splitwise_role == "decode", "Only D instance can call this method"
assert request_id in self.preallocated_reqs, "request_id must be in preallocate"
need_blocks_num = len(self.preallocated_reqs[request_id].disaggregate_info["block_tables"])
return self.available_batch() > 0 and self.cache_manager.can_allocate_gpu_blocks(need_blocks_num)
with self.lock:
assert self.config.scheduler_config.splitwise_role == "decode", "Only D instance can call this method"
assert request_id in self.preallocated_reqs, "request_id must be in preallocate"
need_blocks_num = len(self.preallocated_reqs[request_id].disaggregate_info["block_tables"])
return self.available_batch() > 0 and self.cache_manager.can_allocate_gpu_blocks(need_blocks_num)

def add_prefilled_request(self, request_output: RequestOutput):
"""
In P/D aggregated deployment, D should continue to decode after receiving first token and cache from P.
NOTE: GPU resources should be checked in advance to ensure they are sufficient for the prefilled request.
"""
assert self.config.scheduler_config.splitwise_role == "decode", "Only D instance can call this method"
if request_output.request_id not in self.requests:
llm_logger.error(f"Request {request_output.request_id} not found in requests")
return
request = self.requests[request_output.request_id]

# update request and insert to running
request.output_token_ids.append(request_output.outputs.token_ids[0])
request.num_cached_tokens = request_output.num_cached_tokens
if (
self.config.speculative_config.method in ["mtp"]
and self.config.scheduler_config.splitwise_role == "decode"
):
request.draft_token_ids = copy.deepcopy(request_output.outputs.draft_token_ids)
request.need_prefill_tokens = len(request.prompt_token_ids) + 1

request_output.metrics.decode_recv_req_time = request.metrics.decode_recv_req_time
request_output.metrics.decode_preallocate_req_time = request.metrics.decode_preallocate_req_time
request.metrics = request_output.metrics
self.running.append(request)
with self.lock:
assert self.config.scheduler_config.splitwise_role == "decode", "Only D instance can call this method"
if request_output.request_id not in self.requests:
llm_logger.error(f"Request {request_output.request_id} not found in requests")
return
request = self.requests[request_output.request_id]

# update request and insert to running
request.output_token_ids.append(request_output.outputs.token_ids[0])
request.num_cached_tokens = request_output.num_cached_tokens
if (
self.config.speculative_config.method in ["mtp"]
and self.config.scheduler_config.splitwise_role == "decode"
):
request.draft_token_ids = copy.deepcopy(request_output.outputs.draft_token_ids)
request.need_prefill_tokens = len(request.prompt_token_ids) + 1

request_output.metrics.decode_recv_req_time = request.metrics.decode_recv_req_time
request_output.metrics.decode_preallocate_req_time = request.metrics.decode_preallocate_req_time
request.metrics = request_output.metrics
self.running.append(request)

def _free_blocks(self, request: Request):
if self.config.cache_config.enable_prefix_caching:
Expand Down
Loading