Skip to content

Commit

Permalink
Copy ListenerQueue in aiohttp_websockets.py
Browse files Browse the repository at this point in the history
It is necessary to avoid needing the websockets dependency when using the AIOHTTPWebsockets transport
  • Loading branch information
leszekhanusz committed Jul 13, 2024
1 parent f1fb25d commit 3a762ae
Showing 1 changed file with 54 additions and 1 deletion.
55 changes: 54 additions & 1 deletion gql/transport/aiohttp_websockets.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,63 @@
TransportQueryError,
TransportServerError,
)
from gql.transport.websockets_base import ListenerQueue

log = logging.getLogger("gql.transport.aiohttp_websockets")

ParsedAnswer = Tuple[str, Optional[ExecutionResult]]


class ListenerQueue:
"""Special queue used for each query waiting for server answers
If the server is stopped while the listener is still waiting,
Then we send an exception to the queue and this exception will be raised
to the consumer once all the previous messages have been consumed from the queue
"""

def __init__(self, query_id: int, send_stop: bool) -> None:
self.query_id: int = query_id
self.send_stop: bool = send_stop
self._queue: asyncio.Queue = asyncio.Queue()
self._closed: bool = False

async def get(self) -> ParsedAnswer:

try:
item = self._queue.get_nowait()
except asyncio.QueueEmpty:
item = await self._queue.get()

self._queue.task_done()

# If we receive an exception when reading the queue, we raise it
if isinstance(item, Exception):
self._closed = True
raise item

# Don't need to save new answers or
# send the stop message if we already received the complete message
answer_type, execution_result = item
if answer_type == "complete":
self.send_stop = False
self._closed = True

return item

async def put(self, item: ParsedAnswer) -> None:

if not self._closed:
await self._queue.put(item)

async def set_exception(self, exception: Exception) -> None:

# Put the exception in the queue
await self._queue.put(exception)

# Don't need to send stop messages in case of error
self.send_stop = False
self._closed = True


class AIOHTTPWebsocketsTransport(AsyncTransport):

Expand Down

0 comments on commit 3a762ae

Please sign in to comment.