Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 40 additions & 23 deletions vllm/v1/engine/core_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,31 +347,48 @@ def __call__(self):

if isinstance(self.output_socket, zmq.asyncio.Socket):
# Async case.
loop = self.output_socket._get_loop()
asyncio.get_running_loop()
sockets = (self.output_socket, self.input_socket,
self.first_req_send_socket, self.first_req_rcv_socket,
self.stats_update_socket)

tasks = (self.output_queue_task, self.stats_update_task)

def close_sockets_and_tasks():
try:
loop = self.output_socket._get_loop()
asyncio.get_running_loop()
except RuntimeError:
# There is no running loop

# This can happen if the finalizer is called from a thread
# without an event loop (e.g., the engine monitor).
# In this case, we can't schedule async cleanup.
# Do a best-effort synchronous cleanup of sockets.
logger.warning(
"Could not get event loop for async cleanup. "
"Tasks may not be cancelled, sockets will be closed.")
sockets = (self.output_socket, self.input_socket,
self.first_req_send_socket,
self.first_req_rcv_socket, self.stats_update_socket)
close_sockets(sockets)
for task in tasks:
if task is not None and not task.done():
task.cancel()

if in_loop(loop):
close_sockets_and_tasks()
elif not loop.is_closed():
loop.call_soon_threadsafe(close_sockets_and_tasks)
else:
# Loop has been closed, try to clean up directly.
del tasks
del close_sockets_and_tasks
close_sockets(sockets)
del self.output_queue_task
del self.stats_update_task
# There is a running loop.
sockets = (self.output_socket, self.input_socket,
self.first_req_send_socket,
self.first_req_rcv_socket, self.stats_update_socket)

tasks = (self.output_queue_task, self.stats_update_task)

def close_sockets_and_tasks():
close_sockets(sockets)
for task in tasks:
if task is not None and not task.done():
task.cancel()

if in_loop(loop):
close_sockets_and_tasks()
elif not loop.is_closed():
loop.call_soon_threadsafe(close_sockets_and_tasks)
else:
# Loop has been closed, try to clean up directly.
del tasks
del close_sockets_and_tasks
close_sockets(sockets)
del self.output_queue_task
del self.stats_update_task
else:
# Sync case.

Expand Down