Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions fastdeploy/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@
"FD_HPU_CHUNK_SIZE": lambda: int(os.getenv("FD_HPU_CHUNK_SIZE", "64")),
"FD_PREFILL_WAIT_DECODE_RESOURCE_SECONDS": lambda: int(os.getenv("FD_PREFILL_WAIT_DECODE_RESOURCE_SECONDS", "30")),
"FMQ_CONFIG_JSON": lambda: os.getenv("FMQ_CONFIG_JSON", None),
Comment thread
rainyfly marked this conversation as resolved.
"FD_TOKEN_PROCESSOR_HEALTH_TIMEOUT": lambda: int(os.getenv("FD_TOKEN_PROCESSOR_HEALTH_TIMEOUT", "120")),
}


Expand Down
31 changes: 31 additions & 0 deletions fastdeploy/output/token_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,30 @@ def __init__(self, cfg, cached_generated_tokens, engine_worker_queue, split_conn
self.accept_token_num_per_head_per_request = {}
self.accept_token_num_per_head = [0] * MAX_DRAFT_TOKENS

# health monitor
self.timestamp_for_alive_before_handle_batch = None
self.timestamp_for_alive_after_handle_batch = None
self.health_lock = threading.Lock()
self.engine_output_token_hang = False
Comment thread
rainyfly marked this conversation as resolved.

def healthy(self):
"""
whether token processor is healthy
Comment thread
rainyfly marked this conversation as resolved.
"""
with self.health_lock:
if self.timestamp_for_alive_after_handle_batch is None: # has entered handle batch
Comment thread
rainyfly marked this conversation as resolved.
if (
self.timestamp_for_alive_before_handle_batch is not None
and time.time() - self.timestamp_for_alive_before_handle_batch
> envs.FD_TOKEN_PROCESSOR_HEALTH_TIMEOUT
):
return False
else:
return True
if self.engine_output_token_hang:
return False
return True
Comment thread
xiaoxiaohehe001 marked this conversation as resolved.

def _cleanup_resources(self):
"""Cleaning up shared memory resources"""
if hasattr(self, "executor"):
Expand Down Expand Up @@ -404,7 +428,14 @@ def process_sampling_results(self):
if self.output_tokens[0, 0] == -2:
continue
llm_logger.debug(f"rank_id {rank_id} self.output_tokens[0, 0] {self.output_tokens[0, 0]}")
with self.health_lock:
self.timestamp_for_alive_before_handle_batch = time.time()
self.timestamp_for_alive_after_handle_batch = None
self._process_batch_output()
with self.health_lock:
self.timestamp_for_alive_before_handle_batch = None
self.timestamp_for_alive_after_handle_batch = time.time()

Comment thread
rainyfly marked this conversation as resolved.
except Exception as e:
llm_logger.info(f"while get input_data error: {e} {traceback.format_exc()!s}")

Expand Down
6 changes: 6 additions & 0 deletions fastdeploy/splitwise/internal_adapter_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ def _recv_external_module_control_instruct(self):
self.recv_control_cmd_server.response_for_control_cmd(task_id_str, result)
elif task["cmd"] == "connect_rdma":
self.engine.engine_worker_queue.put_connect_rdma_task(task)
elif task["cmd"] == "check_health":
Comment thread
xiaoxiaohehe001 marked this conversation as resolved.
is_health = self.engine.token_processor.healthy()
result = {"task_id": task_id_str, "result": is_health}
logger.debug(f"Response for task: {task_id_str}: is_health {is_health}")
with self.response_lock:
self.recv_control_cmd_server.response_for_control_cmd(task_id_str, result)

except Exception as e:
logger.error(f"handle_control_cmd got error: {e}, {traceback.format_exc()!s}")
Expand Down
Loading