Skip to content

Permanent reconnecting async session #324

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
115 changes: 115 additions & 0 deletions docs/advanced/async_permanent_session.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
.. _async_permanent_session:

Async permanent session
=======================

Sometimes you want to have a single permanent reconnecting async session to a GraphQL backend,
and that can be `difficult to manage`_ manually with the :code:`async with client as session` syntax.

It is now possible to have a single reconnecting session using the
:meth:`connect_async <gql.Client.connect_async>` method of Client
with a :code:`reconnecting=True` argument.

.. code-block:: python

# Create a session from the client which will reconnect automatically.
# This session can be kept in a class for example to provide a way
# to execute GraphQL queries from many different places
session = await client.connect_async(reconnecting=True)

# You can run execute or subscribe method on this session
result = await session.execute(query)

# When you want the connection to close (for cleanup),
# you call close_async
await client.close_async()


When you use :code:`reconnecting=True`, gql will watch the exceptions generated
during the execute and subscribe calls and, if it detects a TransportClosed exception
(indicating that the link to the underlying transport is broken),
it will try to reconnect to the backend again.

Retries
-------

Connection retries
^^^^^^^^^^^^^^^^^^

With :code:`reconnecting=True`, gql will use the `backoff`_ module to repeatedly try to connect with
exponential backoff and jitter with a maximum delay of 60 seconds by default.

You can change the default reconnecting profile by providing your own
backoff decorator to the :code:`retry_connect` argument.

.. code-block:: python

# Here wait maximum 5 minutes between connection retries
retry_connect = backoff.on_exception(
backoff.expo, # wait generator (here: exponential backoff)
Exception, # which exceptions should cause a retry (here: everything)
max_value=300, # max wait time in seconds
)
session = await client.connect_async(
reconnecting=True,
retry_connect=retry_connect,
)

Execution retries
^^^^^^^^^^^^^^^^^

With :code:`reconnecting=True`, by default we will also retry up to 5 times
when an exception happens during an execute call (to manage a possible loss in the connection
to the transport).

There is no retry in case of a :code:`TransportQueryError` exception as it indicates that
the connection to the backend is working correctly.

You can change the default execute retry profile by providing your own
backoff decorator to the :code:`retry_execute` argument.

.. code-block:: python

# Here Only 3 tries for execute calls
retry_execute = backoff.on_exception(
backoff.expo,
Exception,
max_tries=3,
giveup=lambda e: isinstance(e, TransportQueryError),
)
session = await client.connect_async(
reconnecting=True,
retry_execute=retry_execute,
)

If you don't want any retry on the execute calls, you can disable the retries with :code:`retry_execute=False`

Subscription retries
^^^^^^^^^^^^^^^^^^^^

There is no :code:`retry_subscribe` as it is not feasible with async generators.
If you want retries for your subscriptions, then you can do it yourself
with backoff decorators on your methods.

.. code-block:: python

@backoff.on_exception(backoff.expo,
Exception,
max_tries=3,
giveup=lambda e: isinstance(e, TransportQueryError))
async def execute_subscription1(session):
async for result in session.subscribe(subscription1):
print(result)

FastAPI example
---------------

.. literalinclude:: ../code_examples/fastapi_async.py

Console example
---------------

.. literalinclude:: ../code_examples/console_async.py

.. _difficult to manage: https://github.com/graphql-python/gql/issues/179
.. _backoff: https://github.com/litl/backoff
1 change: 1 addition & 0 deletions docs/advanced/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Advanced
:maxdepth: 2

async_advanced_usage
async_permanent_session
logging
error_handling
local_schema
Expand Down
73 changes: 73 additions & 0 deletions docs/code_examples/console_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import asyncio
import logging

from aioconsole import ainput

from gql import Client, gql
from gql.transport.aiohttp import AIOHTTPTransport

logging.basicConfig(level=logging.INFO)

GET_CONTINENT_NAME = """
query getContinentName ($code: ID!) {
continent (code: $code) {
name
}
}
"""


class GraphQLContinentClient:
def __init__(self):
self._client = Client(
transport=AIOHTTPTransport(url="https://countries.trevorblades.com/")
)
self._session = None

self.get_continent_name_query = gql(GET_CONTINENT_NAME)

async def connect(self):
self._session = await self._client.connect_async(reconnecting=True)

async def close(self):
await self._client.close_async()

async def get_continent_name(self, code):
params = {"code": code}

answer = await self._session.execute(
self.get_continent_name_query, variable_values=params
)

return answer.get("continent").get("name")


async def main():
continent_client = GraphQLContinentClient()

continent_codes = ["AF", "AN", "AS", "EU", "NA", "OC", "SA"]

await continent_client.connect()

while True:

answer = await ainput("\nPlease enter a continent code or 'exit':")
answer = answer.strip()

if answer == "exit":
break
elif answer in continent_codes:

try:
continent_name = await continent_client.get_continent_name(answer)
print(f"The continent name is {continent_name}\n")
except Exception as exc:
print(f"Received exception {exc} while trying to get continent name")

else:
print(f"Please enter a valid continent code from {continent_codes}")

await continent_client.close()


asyncio.run(main())
101 changes: 101 additions & 0 deletions docs/code_examples/fastapi_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# First install fastapi and uvicorn:
#
# pip install fastapi uvicorn
#
# then run:
#
# uvicorn fastapi_async:app --reload

import logging

from fastapi import FastAPI, HTTPException
from fastapi.responses import HTMLResponse

from gql import Client, gql
from gql.transport.aiohttp import AIOHTTPTransport

logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger(__name__)

transport = AIOHTTPTransport(url="https://countries.trevorblades.com/graphql")

client = Client(transport=transport)

query = gql(
"""
query getContinentInfo($code: ID!) {
continent(code:$code) {
name
code
countries {
name
capital
}
}
}
"""
)

app = FastAPI()


@app.on_event("startup")
async def startup_event():
print("Connecting to GraphQL backend")

await client.connect_async(reconnecting=True)
print("End of startup")


@app.on_event("shutdown")
async def shutdown_event():
print("Shutting down GraphQL permanent connection...")
await client.close_async()
print("Shutting down GraphQL permanent connection... done")


continent_codes = [
"AF",
"AN",
"AS",
"EU",
"NA",
"OC",
"SA",
]


@app.get("/", response_class=HTMLResponse)
def get_root():

continent_links = ", ".join(
[f'<a href="continent/{code}">{code}</a>' for code in continent_codes]
)

return f"""
<html>
<head>
<title>Continents</title>
</head>
<body>
Continents: {continent_links}
</body>
</html>
"""


@app.get("/continent/{continent_code}")
async def get_continent(continent_code):

if continent_code not in continent_codes:
raise HTTPException(status_code=404, detail="Continent not found")

try:
result = await client.session.execute(
query, variable_values={"code": continent_code}
)
except Exception as e:
log.debug(f"get_continent Error: {e}")
raise HTTPException(status_code=503, detail="GraphQL backend unavailable")

return result
47 changes: 47 additions & 0 deletions docs/code_examples/reconnecting_mutation_http.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import asyncio
import logging

import backoff

from gql import Client, gql
from gql.transport.aiohttp import AIOHTTPTransport

logging.basicConfig(level=logging.INFO)


async def main():

# Note: this example used the test backend from
# https://github.com/slothmanxyz/typegraphql-ws-apollo
transport = AIOHTTPTransport(url="ws://localhost:5000/graphql")

client = Client(transport=transport)

retry_connect = backoff.on_exception(
backoff.expo,
Exception,
max_value=10,
jitter=None,
)
session = await client.connect_async(reconnecting=True, retry_connect=retry_connect)

num = 0

while True:
num += 1

# Execute single query
query = gql("mutation ($message: String!) {sendMessage(message: $message)}")

params = {"message": f"test {num}"}

try:
result = await session.execute(query, variable_values=params)
print(result)
except Exception as e:
print(f"Received exception {e}")

await asyncio.sleep(1)


asyncio.run(main())
47 changes: 47 additions & 0 deletions docs/code_examples/reconnecting_mutation_ws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import asyncio
import logging

import backoff

from gql import Client, gql
from gql.transport.websockets import WebsocketsTransport

logging.basicConfig(level=logging.INFO)


async def main():

# Note: this example used the test backend from
# https://github.com/slothmanxyz/typegraphql-ws-apollo
transport = WebsocketsTransport(url="ws://localhost:5000/graphql")

client = Client(transport=transport)

retry_connect = backoff.on_exception(
backoff.expo,
Exception,
max_value=10,
jitter=None,
)
session = await client.connect_async(reconnecting=True, retry_connect=retry_connect)

num = 0

while True:
num += 1

# Execute single query
query = gql("mutation ($message: String!) {sendMessage(message: $message)}")

params = {"message": f"test {num}"}

try:
result = await session.execute(query, variable_values=params)
print(result)
except Exception as e:
print(f"Received exception {e}")

await asyncio.sleep(1)


asyncio.run(main())
Loading