2323from vllm .logger import init_logger
2424from vllm .lora .request import LoRARequest
2525from vllm .tasks import SupportedTask
26- from vllm .utils import (cancel_task_threadsafe , get_open_port ,
27- get_open_zmq_inproc_path , make_zmq_socket )
26+ from vllm .utils import (close_sockets , get_open_port , get_open_zmq_inproc_path ,
27+ in_loop , make_zmq_socket )
2828from vllm .v1 .engine import (EngineCoreOutputs , EngineCoreRequest ,
2929 EngineCoreRequestType ,
3030 ReconfigureDistributedRequest , ReconfigureRankType ,
@@ -317,7 +317,7 @@ class BackgroundResources:
317317 """Used as a finalizer for clean shutdown, avoiding
318318 circular reference back to the client object."""
319319
320- ctx : Union [ zmq .Context ]
320+ ctx : zmq .Context
321321 # If CoreEngineProcManager, it manages local engines;
322322 # if CoreEngineActorManager, it manages all engines.
323323 engine_manager : Optional [Union [CoreEngineProcManager ,
@@ -326,6 +326,8 @@ class BackgroundResources:
326326 output_socket : Optional [Union [zmq .Socket , zmq .asyncio .Socket ]] = None
327327 input_socket : Optional [Union [zmq .Socket , zmq .asyncio .Socket ]] = None
328328 first_req_send_socket : Optional [zmq .asyncio .Socket ] = None
329+ first_req_rcv_socket : Optional [zmq .asyncio .Socket ] = None
330+ stats_update_socket : Optional [zmq .asyncio .Socket ] = None
329331 output_queue_task : Optional [asyncio .Task ] = None
330332 stats_update_task : Optional [asyncio .Task ] = None
331333 shutdown_path : Optional [str ] = None
@@ -343,23 +345,47 @@ def __call__(self):
343345 if self .coordinator is not None :
344346 self .coordinator .close ()
345347
346- cancel_task_threadsafe (self .output_queue_task )
347- cancel_task_threadsafe (self .stats_update_task )
348+ if isinstance (self .output_socket , zmq .asyncio .Socket ):
349+ # Async case.
350+ loop = self .output_socket ._get_loop ()
351+ asyncio .get_running_loop ()
352+ sockets = (self .output_socket , self .input_socket ,
353+ self .first_req_send_socket , self .first_req_rcv_socket ,
354+ self .stats_update_socket )
355+
356+ tasks = (self .output_queue_task , self .stats_update_task )
357+
358+ def close_sockets_and_tasks ():
359+ close_sockets (sockets )
360+ for task in tasks :
361+ if task is not None and not task .done ():
362+ task .cancel ()
363+
364+ if in_loop (loop ):
365+ close_sockets_and_tasks ()
366+ elif not loop .is_closed ():
367+ loop .call_soon_threadsafe (close_sockets_and_tasks )
368+ else :
369+ # Loop has been closed, try to clean up directly.
370+ del tasks
371+ del close_sockets_and_tasks
372+ close_sockets (sockets )
373+ del self .output_queue_task
374+ del self .stats_update_task
375+ else :
376+ # Sync case.
348377
349- # ZMQ context termination can hang if the sockets
350- # aren't explicitly closed first.
351- for socket in (self .output_socket , self .input_socket ,
352- self .first_req_send_socket ):
353- if socket is not None :
354- socket .close (linger = 0 )
378+ # ZMQ context termination can hang if the sockets
379+ # aren't explicitly closed first.
380+ close_sockets ((self .output_socket , self .input_socket ))
355381
356- if self .shutdown_path is not None :
357- # We must ensure that the sync output socket is
358- # closed cleanly in its own thread.
359- with self .ctx .socket (zmq .PAIR ) as shutdown_sender :
360- shutdown_sender .connect (self .shutdown_path )
361- # Send shutdown signal.
362- shutdown_sender .send (b'' )
382+ if self .shutdown_path is not None :
383+ # We must ensure that the sync output socket is
384+ # closed cleanly in its own thread.
385+ with self .ctx .socket (zmq .PAIR ) as shutdown_sender :
386+ shutdown_sender .connect (self .shutdown_path )
387+ # Send shutdown signal.
388+ shutdown_sender .send (b'' )
363389
364390 def validate_alive (self , frames : Sequence [zmq .Frame ]):
365391 if len (frames ) == 1 and (frames [0 ].buffer
@@ -969,14 +995,19 @@ def _ensure_stats_update_task(self):
969995 self .engine_ranks_managed [- 1 ] + 1 )
970996
971997 async def run_engine_stats_update_task ():
972- with make_zmq_socket (self .ctx , self .stats_update_address ,
973- zmq .XSUB ) as socket , make_zmq_socket (
974- self .ctx ,
975- self .first_req_sock_addr ,
976- zmq .PAIR ,
977- bind = False ) as first_req_rcv_socket :
998+ with (make_zmq_socket (self .ctx ,
999+ self .stats_update_address ,
1000+ zmq .XSUB ,
1001+ linger = 0 ) as socket ,
1002+ make_zmq_socket (self .ctx ,
1003+ self .first_req_sock_addr ,
1004+ zmq .PAIR ,
1005+ bind = False ,
1006+ linger = 0 ) as first_req_rcv_socket ):
9781007 assert isinstance (socket , zmq .asyncio .Socket )
9791008 assert isinstance (first_req_rcv_socket , zmq .asyncio .Socket )
1009+ self .resources .stats_update_socket = socket
1010+ self .resources .first_req_rcv_socket = first_req_rcv_socket
9801011 # Send subscription message.
9811012 await socket .send (b'\x01 ' )
9821013
0 commit comments