Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Frontend] MQLLMEngine supports profiling. #8761

Merged
merged 8 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion vllm/engine/multiprocessing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,12 @@ class RPCStartupRequest(Enum):
class RPCStartupResponse:
tracing_enabled: bool

class RPCUProfileRequest(Enum):
START_PROFILE = 1
STOP_PROFILE = 2

RPC_REQUEST_T = Union[RPCProcessRequest, RPCAbortRequest, RPCHealthRequest,
RPCStartupRequest]
RPCStartupRequest, RPCUProfileRequest]

REQUEST_OUTPUTS_T = Union[List[RequestOutput], RPCError]

Expand Down
28 changes: 21 additions & 7 deletions vllm/engine/multiprocessing/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
VLLM_RPC_SUCCESS_STR, RPCAbortRequest,
RPCError, RPCHealthRequest,
RPCProcessRequest, RPCStartupRequest,
RPCStartupResponse)
RPCStartupResponse, RPCUProfileRequest)
# yapf: enable
from vllm.envs import VLLM_RPC_TIMEOUT
from vllm.inputs import PromptType
Expand All @@ -38,10 +38,10 @@

class MQClientClosedError(Exception):
"""Exception class raised when the client is used post-close.

The client can be closed, which closes the ZMQ context. This normally
happens on server shutdown. In some cases, methods like abort and
do_log_stats will still be called and then try to open a socket, which
happens on server shutdown. In some cases, methods like abort and
do_log_stats will still be called and then try to open a socket, which
causes a ZMQError and creates a huge stack trace.
So, we throw this error such that we can suppress it.
"""
Expand Down Expand Up @@ -126,11 +126,11 @@ def get_data_socket(self) -> Iterator[Socket]:

async def run_check_health_loop(self, timeout: int):
"""Background loop that continually probes the RPCServer for health.

The loop sends CHECK_HEALTH requests to the INPUT_SOCKET, which
the MQLLMEngine server is blocking on.

The Server replies on the HEALTH_SOCKET (rather than on the
The Server replies on the HEALTH_SOCKET (rather than on the
OUTPUT_SOCKET such that the messages are not intermingled with
output streaming).
"""
Expand Down Expand Up @@ -351,7 +351,7 @@ async def do_log_stats(self):
async def check_health(self):
"""
The check health loop probes the health status of the
Engine's health every N seconds and sets _errored_with
Engine's health every N seconds and sets _errored_with
if the engine is unhealthy.
"""
if self._errored_with is not None:
Expand Down Expand Up @@ -497,3 +497,17 @@ async def _process_request(
await self.abort(request_id)
finally:
self.output_queues.pop(request_id)

async def start_profile(self) -> None:
"""Start profiling the engine"""

await self._send_one_way_rpc_request(
request=RPCUProfileRequest.START_PROFILE,
socket = self.input_socket)

async def stop_profile(self) -> None:
"""Stop profiling the engine"""

await self._send_one_way_rpc_request(
request=RPCUProfileRequest.STOP_PROFILE,
socket = self.input_socket)
23 changes: 22 additions & 1 deletion vllm/engine/multiprocessing/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
VLLM_RPC_SUCCESS_STR, RPCAbortRequest,
RPCError, RPCHealthRequest,
RPCProcessRequest, RPCStartupRequest,
RPCStartupResponse)
RPCStartupResponse, RPCUProfileRequest)
# yapf: enable
from vllm.logger import init_logger
from vllm.outputs import RequestOutput
from vllm.usage.usage_lib import UsageContext
from vllm.executor.gpu_executor import GPUExecutorAsync

CONFIG_TYPE = Union[ModelConfig, DecodingConfig, ParallelConfig,
SchedulerConfig, LoRAConfig]
Expand Down Expand Up @@ -231,6 +232,11 @@ def handle_new_input(self):
self._handle_abort_request(request)
elif isinstance(request, RPCHealthRequest):
self._handle_health_request()
elif isinstance(request, RPCUProfileRequest):
if request == RPCUProfileRequest.START_PROFILE:
self.start_profile()
else:
self.stop_profile()
else:
raise ValueError("Unknown RPCRequest Type: {request}")

Expand Down Expand Up @@ -313,6 +319,21 @@ def _set_errored(self, e: BaseException):
if self._errored_with is None:
self._errored_with = e

def start_profile(self) -> None:
# using type instead of isinstance to check to avoid capturing
# inherited classes
if type(self.engine.model_executor) == GPUExecutorAsync:
self.engine.model_executor.start_profile()
else:
self.engine.model_executor._run_workers("start_profile")

def stop_profile(self) -> None:
# using type instead of isinstance to check to avoid capturing
# inherited classes
if type(self.engine.model_executor) == GPUExecutorAsync:
self.engine.model_executor.stop_profile()
else:
self.engine.model_executor._run_workers("stop_profile")

def run_mp_engine(engine_args: AsyncEngineArgs, usage_context: UsageContext,
ipc_path: str):
Expand Down
Loading