Skip to content

Commit

Permalink
More resilient _close_coro cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
leszekhanusz committed Jul 17, 2024
1 parent db9f5db commit 9eaafa4
Showing 1 changed file with 50 additions and 23 deletions.
73 changes: 50 additions & 23 deletions gql/transport/aiohttp_websockets.py
Original file line number Diff line number Diff line change
Expand Up @@ -967,18 +967,23 @@ async def _close_coro(self, e: Exception, clean_close: bool = True) -> None:

try:

# We should always have an active websocket connection here
assert self.websocket is not None

# Properly shut down liveness checker if enabled
if self.check_keep_alive_task is not None:
# More info: https://stackoverflow.com/a/43810272/1113207
self.check_keep_alive_task.cancel()
with suppress(asyncio.CancelledError):
await self.check_keep_alive_task
try:
# Properly shut down liveness checker if enabled
if self.check_keep_alive_task is not None:
# More info: https://stackoverflow.com/a/43810272/1113207
self.check_keep_alive_task.cancel()
with suppress(asyncio.CancelledError):
await self.check_keep_alive_task
except Exception as exc: # pragma: no cover
log.warning(
"_close_coro cancel keep alive task exception: " + repr(exc)
)

# Calling the subclass close hook
await self._close_hook()
try:
# Calling the subclass close hook
await self._close_hook()
except Exception as exc: # pragma: no cover
log.warning("_close_coro close_hook exception: " + repr(exc))

# Saving exception to raise it later if trying to use the transport
# after it has already closed.
Expand All @@ -999,8 +1004,13 @@ async def _close_coro(self, e: Exception, clean_close: bool = True) -> None:

log.debug("_close_coro: close websocket connection")

await self.websocket.close()
self.websocket = None
try:
assert self.websocket is not None

await self.websocket.close()
self.websocket = None
except Exception as exc:
log.warning("_close_coro websocket close exception: " + repr(exc))

log.debug("_close_coro: close aiohttp session")

Expand All @@ -1012,31 +1022,48 @@ async def _close_coro(self, e: Exception, clean_close: bool = True) -> None:
log.debug("connector_owner is False -> not closing connector")

else:
assert self.session is not None

closed_event = AIOHTTPTransport.create_aiohttp_closed_event(
self.session
)
await self.session.close()
try:
await asyncio.wait_for(closed_event.wait(), self.ssl_close_timeout)
except asyncio.TimeoutError:
pass
assert self.session is not None

closed_event = AIOHTTPTransport.create_aiohttp_closed_event(
self.session
)
await self.session.close()
try:
await asyncio.wait_for(
closed_event.wait(), self.ssl_close_timeout
)
except asyncio.TimeoutError:
pass
except Exception as exc: # pragma: no cover
log.warning("_close_coro session close exception: " + repr(exc))

self.session = None

log.debug("_close_coro: aiohttp session closed")

try:
assert self.receive_data_task is not None

self.receive_data_task.cancel()
with suppress(asyncio.CancelledError):
await self.receive_data_task
except Exception as exc: # pragma: no cover
log.warning(
"_close_coro cancel receive data task exception: " + repr(exc)
)

except Exception as exc: # pragma: no cover
log.warning("Exception catched in _close_coro: " + repr(exc))

finally:

log.debug("_close_coro: start cleanup")
log.debug("_close_coro: final cleanup")

self.websocket = None
self.close_task = None
self.check_keep_alive_task = None
self.receive_data_task = None
self._wait_closed.set()

log.debug("_close_coro: exiting")
Expand Down

0 comments on commit 9eaafa4

Please sign in to comment.