Skip to content

Convert search endpoints to asynchronous #3449

Open

Description

Problem

Since we have converted the application to ASGI, we can now benefit from the async Elasticsearch client. Requests to Elasticsearch are some of the longest running blocking operations in our application. By using the async client, we can remove that block when waiting for queries to come back from Elasticsearch.

Description

The primary thing to convert in the search route is the Elasticsearch client usage. The asynchronous client swaps the underlying "node" (the request engine the ES Client uses) with aiohttp. We can't just swap out all usages of the Elasticsearch client for the async client, however, so we'll need to maintain both the synchronous and asynchronous for a period of time while we switch over from sync to async in all our usage of the client.

To do this, update the _elasticsearch_connect function to return both a synchronous and asynchronous client. Rename the existing synchronous ES client to SYNC_ES and add a new ASYNC_ES assigned to the asynchronous client. Update all usages of settings.ES to settings.SYNC_ES.

Now for the complex part of this issue: the Elasticsearch DSL library, which we use for our normal search routine, does not yet support the async Elasticsearch client. We can work around this, however, by not using Search::execute and changing the get_es_response function to use ASYNC_ES.search directly instead.

search_response = s.execute()

Something like this might work:

@log_timing_info
async def get_es_response(s: Search, *args, **kwargs):
    if settings.VERBOSE_ES_RESPONSE:
        log.info(pprint.pprint(s.to_dict()))

    if not hasattr(s, "_response"):
        try:
            raw_response = await settings.ASYNC_ES.search(
                index=s._index,
                body=s.to_dict(),
                **self._params
            )

            s._response = s._response_class(
                s,
                raw_response.body
            )

            if settings.VERBOSE_ES_RESPONSE:
                log.info(pprint.pprint(s._response.to_dict()))
        except (BadRequestError, NotFoundError) as e:
            raise ValueError(e)

    return s._response

That's basically an adaptation of the Elasticsearch DSL's Search::execute method into our get_es_response function.

All functions that call get_es_response.

After updating get_es_response, we'll also want to update check_dead_links to be an asynchronous function rather than having it call an async_to_sync wrapped function. We'll need to follow the chain of functions all the way up to the route endpoints until the route endpoints and all functions that interact with Elasticsearch they use are async def.

Additional context

Marked staff only due to complexity.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    • Status

      📋 Backlog

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions