Skip to content

New asyncio.exceptions.CancelledError from 4.5.2 #2633

Closed
@rra

Description

@rra

Version: 4.5.2
Platform: Python 3.11.2 on Debian unstable
Description:

Starting with 4.5.2, I'm seeing a new exception when attempting to write to Redis using the asyncio layer. The problem seems to be related to FastAPI: if I create a FastAPI app with any Starlette custom middlware, and then try to set a key in Redis using a separate connection pool created with redis.asyncio.from_url, the set request fails immediately with an uncaught asyncio.exceptions.CancelledError and the following backtrace:

.tox/py/lib/python3.11/site-packages/gafaelfawr/storage/base.py:140: in store
    await self._redis.set(key, encrypted_data, ex=lifetime)
.tox/py/lib/python3.11/site-packages/redis/asyncio/client.py:509: in execute_command
    conn = self.connection or await pool.get_connection(command_name, **options)
.tox/py/lib/python3.11/site-packages/redis/asyncio/connection.py:1408: in get_connection
    if await connection.can_read_destructive():
.tox/py/lib/python3.11/site-packages/redis/asyncio/connection.py:817: in can_read_destructive
    return await self._parser.can_read_destructive()
.tox/py/lib/python3.11/site-packages/redis/asyncio/connection.py:250: in can_read_destructive
    return await self._stream.read(1)
/usr/lib/python3.11/asyncio/streams.py:689: in read
    await self._wait_for_data('read')
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <StreamReader transport=<_SelectorSocketTransport fd=15 read=polling write=<idle, bufsize=0>>>
func_name = 'read'

    async def _wait_for_data(self, func_name):
        """Wait until feed_data() or feed_eof() is called.
    
        If stream was paused, automatically resume it.
        """
        # StreamReader uses a future to link the protocol feed_data() method
        # to a read coroutine. Running two read coroutines at the same time
        # would have an unexpected behaviour. It would not possible to know
        # which coroutine would get the next data.
        if self._waiter is not None:
            raise RuntimeError(
                f'{func_name}() called while another coroutine is '
                f'already waiting for incoming data')
    
        assert not self._eof, '_wait_for_data after EOF'
    
        # Waiting for data while paused will make deadlock, so prevent it.
        # This is essential for readexactly(n) for case when n > self._limit.
        if self._paused:
            self._paused = False
            self._transport.resume_reading()
    
        self._waiter = self._loop.create_future()
        try:
>           await self._waiter
E           asyncio.exceptions.CancelledError

/usr/lib/python3.11/asyncio/streams.py:522: CancelledError

GitHub Actions failure log: https://github.com/lsst-sqre/gafaelfawr/actions/runs/4473916874/jobs/7861846517

Reverting to redis 4.5.1 makes the problem disappear again, so it appears to be triggered by some change in 4.5.2.

Here's the shortest test case that I've come up with. It depends on FastAPI and Starlette but none of my code base. My guess is that something is happening during addition of the middleware that's breaking something about the asyncio work inside redis, but I'm not sure what that could be. Oddly, adding one of the standard Starlette middleware classes does not trigger this problem, only a custom middleware class derived from BaseHTTPMiddleware.

from collections.abc import Callable, Awaitable

import redis.asyncio
from fastapi import FastAPI, APIRouter, Request, Response
from httpx import AsyncClient
from starlette.middleware.base import BaseHTTPMiddleware


class DummyMiddleware(BaseHTTPMiddleware):
    async def dispatch(
        self,
        request: Request,
        call_next: Callable[[Request], Awaitable[Response]],
    ) -> Response:
        return await call_next(request)


@pytest.mark.asyncio
async def test_redis() -> None:
    app = FastAPI()
    app.add_middleware(DummyMiddleware)

    @app.get("/")
    async def index() -> dict[str, str]:
        return {}

    async with AsyncClient(app=app, base_url="https://example.com/") as client:
        await client.get("/")
        pool = redis.asyncio.from_url("redis://localhost:6379/0")
        await pool.set("key", "value")

(If I had to guess, it would be the switch to asyncio.timeout, but I can't find evidence to support that. I know asyncio.timeout does raise CancelledError and then converts it to TimeoutError, but I couldn't find any place where that machinery shouldn't work correctly. This error happens immediately, so it doesn't seem to be a timeout and therefore I may be barking up the wrong tree.)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions