Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions vllm/v1/engine/core_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,8 @@ class BackgroundResources:
output_socket: Optional[Union[zmq.Socket, zmq.asyncio.Socket]] = None
input_socket: Optional[Union[zmq.Socket, zmq.asyncio.Socket]] = None
first_req_send_socket: Optional[zmq.asyncio.Socket] = None
first_req_rcv_socket: Optional[zmq.asyncio.Socket] = None
stats_update_socket: Optional[zmq.asyncio.Socket] = None
output_queue_task: Optional[asyncio.Task] = None
stats_update_task: Optional[asyncio.Task] = None
shutdown_path: Optional[str] = None
Expand All @@ -344,6 +346,12 @@ def __call__(self):
self.coordinator.close()

cancel_task_threadsafe(self.output_queue_task)

# ZMQ context termination can hang if the sockets
# aren't explicitly closed first.
for socket in (self.first_req_rcv_socket, self.stats_update_socket):
if socket is not None:
socket.close(linger=0)
cancel_task_threadsafe(self.stats_update_task)

# ZMQ context termination can hang if the sockets
Expand Down Expand Up @@ -977,6 +985,8 @@ async def run_engine_stats_update_task():
bind=False) as first_req_rcv_socket:
assert isinstance(socket, zmq.asyncio.Socket)
assert isinstance(first_req_rcv_socket, zmq.asyncio.Socket)
self.resources.stats_update_socket = socket
self.resources.first_req_rcv_socket = first_req_rcv_socket
# Send subscription message.
await socket.send(b'\x01')

Expand Down