Skip to content

Canceling async Redis command leaves connection open, in unsafe state for future commands #2665

Closed
@drago-balto

Description

@drago-balto

Version: 4.5.3

Platform: Python 3.8 on Ubuntu / Generic

Description: Canceling async Redis command leaves connection open, in unsafe state for future commands

This is a reincarnation of #2624, which was closed with an incomplete fix and a possibly unreliable test case. This is the same issue that recently got a lot of attention due to ChatGPT outage, and that remains only partially fixed. The cancellation shielding introduced in #2641 addressed only the cancellation of Redis pipeline operation, but non-pipelined ops are still vulnerable.

This time I am attaching a script that reproduces the issue reliably without relying on an external, slow Redis server. This is achieved by inserting a small TCP socket proxy between the Redis client and local Redis server, with the proxy introducing a 0.1 second delay when sending data in either direction.

Running this script with a Redis server running locally on port 6379 produces the following output:

$ python redis_cancel.py 
managed to cancel the task, connection is left open with unread response
bar: b'foo'
ping: False
foo: b'PONG'
import asyncio

from redis.asyncio import Redis


async def pipe(reader: asyncio.StreamReader, writer: asyncio.StreamWriter, delay: float, name=''):
    while data := await reader.read(1000):
        # print(name, 'received:', data)
        await asyncio.sleep(delay)
        writer.write(data)
        await writer.drain()


class DelayProxy:

    def __init__(self, addr, redis_addr, delay: float):
        self.addr = addr
        self.redis_addr = redis_addr
        self.delay = delay

    async def start(self):
        server = await asyncio.start_server(self.handle, *self.addr)
        asyncio.create_task(server.serve_forever())

    async def handle(self, reader, writer):
        # establish connection to redis
        redis_reader, redis_writer = await asyncio.open_connection(*self.redis_addr)
        pipe1 = asyncio.create_task(pipe(reader, redis_writer, self.delay, 'to redis:'))
        pipe2 = asyncio.create_task(pipe(redis_reader, writer, self.delay, 'from redis:'))
        await asyncio.gather(pipe1, pipe2)


async def main():

    # create a tcp socket proxy that relays data to Redis and back, inserting 0.1 seconds of delay
    dp = DelayProxy(addr=('localhost', 6380), redis_addr=('localhost', 6379), delay=0.1)
    await dp.start()

    # note that we connect to proxy, rather than to Redis directly
    async with Redis(host='localhost', port=6380) as r:

        await r.set('foo', 'foo')
        await r.set('bar', 'bar')

        t = asyncio.create_task(r.get('foo'))
        await asyncio.sleep(0.050)
        t.cancel()
        try:
            await t
            print('try again, we did not cancel the task in time')
        except asyncio.CancelledError:
            print('managed to cancel the task, connection is left open with unread response')

        print('bar:', await r.get('bar'))
        print('ping:', await r.ping())
        print('foo:', await r.get('foo'))

if __name__ == '__main__':
    asyncio.run(main())

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions