@@ -99,6 +99,14 @@ def request_stop(self) -> None:
9999 self ._output .close ()
100100
101101 async def wait_until_stopped (self ) -> None :
102+ # In atexit scenarios, the original event loop might be closed.
103+ # If so, we can't wait for _stopped_future (it's tied to the closed loop).
104+ if self ._loop .is_closed ():
105+ # Loop is closed. The process is being terminated by run() already.
106+ # Just wait for it directly without asyncio (it will self-clean in time).
107+ return
108+
109+ # Normal case: original loop still exists, wait for the stopped signal
102110 await self ._stopped_future
103111
104112 async def connect (self ) -> None :
@@ -165,10 +173,25 @@ async def run(self) -> None:
165173 Exception ("Connection closed while reading from the driver" )
166174 )
167175 break
176+ except asyncio .CancelledError :
177+ break
168178 await asyncio .sleep (0 )
169-
170- await self ._proc .communicate ()
171- self ._stopped_future .set_result (None )
179+
180+ # Graceful shutdown: only if event loop is still running
181+ try :
182+ asyncio .get_running_loop ()
183+ except RuntimeError :
184+ # No running loop, OS will clean up the process during exit
185+ return
186+
187+ # Process is still running and we have an event loop
188+ if self ._proc .returncode is None :
189+ self ._proc .terminate ()
190+ # Let OS clean up if process doesn't respond to SIGTERM
191+
192+ # Notify anyone waiting that the transport has fully stopped
193+ if not self ._stopped_future .done ():
194+ self ._stopped_future .set_result (None )
172195
173196 def send (self , message : Dict ) -> None :
174197 assert self ._output
0 commit comments