Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,18 @@ def start_cache_service(self, device_ids, ipc_signal_suffix, create_cache_tensor
def check_and_free_block_tables(self):
self.resource_manager.check_and_free_block_tables()

def clear_data(self):
try:
llm_logger.info("Clear Data: Start")
self.token_processor.clear_data()
self.engine_worker_queue.clear_data()
self.zmq_server.req_dict.clear()
llm_logger.info("Clear Data: Successfully")
return True
except Exception as e:
llm_logger.error(f"Clear data error: {e}")
return False

def _exit_sub_services(self):
"""
exit sub services
Expand Down
4 changes: 4 additions & 0 deletions fastdeploy/engine/sched/resource_manager_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,10 @@ def _free_blocks(self, request: Request):
def finish_requests_async(self, request_ids: Union[str, Iterable[str]]):
return self.finish_execution_pool.submit(self.finish_requests, request_ids)

def clear_data(self):
self.waiting: deque[Request] = deque()
self.to_be_rescheduled_request_id_set = set()

def finish_requests(self, request_ids: Union[str, Iterable[str]]):
llm_logger.info(f"recycle resources for requests: {request_ids}")
try:
Expand Down
3 changes: 3 additions & 0 deletions fastdeploy/entrypoints/engine_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ def create_zmq_client(self, model, mode):
self.zmq_client = ZmqIpcClient(model, mode)
self.zmq_client.connect()

def check_model_weight_status(self):
return self.model_weights_status_signal.value[0] < 0

async def format_and_add_data(self, prompts: dict):
"""
Format the request data and send the request to the server.
Expand Down
1 change: 1 addition & 0 deletions fastdeploy/entrypoints/openai/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ def reset_scheduler():

if llm_engine is None:
return Response("Engine not loaded", status_code=500)
llm_engine.engine.clear_data()
llm_engine.engine.scheduler.reset()
return Response("Scheduler Reset Successfully", status_code=200)

Expand Down
4 changes: 4 additions & 0 deletions fastdeploy/entrypoints/openai/serving_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ async def chat_completion_stream_generator(
decoder_base_url=self.tokenizer_base_url,
)
while num_choices > 0:
if self.engine_client.check_model_weight_status():
raise ValueError("Engine is clearing model weight")
try:
response = await asyncio.wait_for(response_queue.get(), timeout=10)
current_waiting_time = 0
Expand Down Expand Up @@ -425,6 +427,8 @@ async def chat_completion_full_generator(
decoder_base_url=self.tokenizer_base_url,
)
while True:
if self.engine_client.check_model_weight_status():
raise ValueError("Engine is clearing model weight")
try:
response = await asyncio.wait_for(response_queue.get(), timeout=10)
current_waiting_time = 0
Expand Down
4 changes: 4 additions & 0 deletions fastdeploy/entrypoints/openai/serving_completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ async def completion_full_generator(
completion_batched_token_ids = [[] for _ in range(num_choices)]
current_waiting_time = 0
while num_choices > 0:
if self.engine_client.check_model_weight_status():
raise ValueError("Engine is clearing model weight")
try:
response = await asyncio.wait_for(response_queue.get(), timeout=10)
current_waiting_time = 0
Expand Down Expand Up @@ -333,6 +335,8 @@ async def completion_stream_generator(
)
current_waiting_time = 0
while num_choices > 0:
if self.engine_client.check_model_weight_status():
raise ValueError("Engine is clearing model weight")
try:
response = await asyncio.wait_for(response_queue.get(), timeout=10)
current_waiting_time = 0
Expand Down
7 changes: 7 additions & 0 deletions fastdeploy/inter_communicator/engine_worker_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,13 @@ def get_disaggregated_tasks(self):
llm_logger.debug("get tasks from queue success")
return item

def clear_data(self):
self.lock.acquire()
self.tasks[:] = list()
self.client_read_flag[:] = [1] * self.num_client
self.lock.release()
llm_logger.info("clear data for engine worker queue")

def cleanup(self):
"""
Exit the worker queue gracefully.
Expand Down
25 changes: 25 additions & 0 deletions fastdeploy/output/token_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,31 @@ def _record_completion_metrics(self, task, current_time):
main_process_metrics.request_inference_time.observe(current_time - task.inference_start_time)
main_process_metrics.request_generation_tokens.observe(self.tokens_counter[task.request_id])

def clear_data(self):
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
self.resource_manager.clear_data()
for i in range(self.cfg.max_num_seqs):
if self.resource_manager.stop_flags[i]:
continue
task = self.resource_manager.tasks_list[i]
result = RequestOutput(
request_id=task.request_id,
outputs=CompletionOutput(
index=i,
send_idx=self.tokens_counter[task.request_id],
token_ids=task.eos_token_ids,
draft_token_ids=[],
),
finished=True,
metrics=RequestMetrics(
arrival_time=time.time(),
request_start_time=task.arrival_time,
),
)
is_prefill = task.disaggregate_info is not None and task.disaggregate_info["role"] == "prefill"
self._recycle_resources(task.request_id, i, task, result, is_prefill)
llm_logger.warning(f"clear data for task {task.request_id}")

def _record_speculative_decoding_mertics(self, accept_num):
"""Record metrics of speculative decoding"""
if not hasattr(main_process_metrics, "spec_decode_draft_acceptance_rate"):
Expand Down
1 change: 1 addition & 0 deletions fastdeploy/rl/dynamic_weight_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ def check_model_weights_status(model_weights_status, model_runner, pid):
logger.info("finished loading new checkpoint")
elif model_weights_status.value[0] == ModelWeightsStatus.CLEARING:
logger.info("infer engine stopped! start to clear checkpoint...")
model_runner.clear_requests()
model_runner.clear_parameters(pid)
while model_weights_status.value[0] != ModelWeightsStatus.CLEARED:
time.sleep(0.01)
Expand Down
4 changes: 4 additions & 0 deletions fastdeploy/worker/gpu_model_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1704,6 +1704,10 @@ def clear_cache(self):
self.forward_meta.clear_caches()
paddle.device.cuda.empty_cache()

def clear_requests(self):
"""Dynamic model loader use to clear requests use for RL"""
self.share_inputs["stop_flags"][:] = True

def clear_parameters(self, pid):
"""Dynamic model loader use to clear parameters use for RL"""
# Clear CUDAGraph
Expand Down
Loading