Skip to content

redis.asyncio trying to reuse a closed connection writer #2065

Closed
@beasteers

Description

@beasteers

Version: 4.2.0 (only version with asyncio)

Platform: Mac / Docker / Python 3.7.12

My code builds off of this repo, but the code in the repo isn't relevant for the minimum code sample. I'm just using it to provide the redis instance atm. https://github.com/RedisGears/EdgeRealtimeVideoAnalytics

Description:

I'm wrapping redis using FastAPI to handle streaming video in and out. It runs fine for the first few connections, but if you refresh the page enough, you'll end up with a broken connection and ALL redis calls hang.

tl;dr a solution

Make sure we always discard the writer instance when disconnecting

Replace here: https://github.com/redis/redis-py/blob/master/redis/asyncio/connection.py#L822-L831

                try:
                    if os.getpid() == self.pid:
                        self._writer.close()  # type: ignore[union-attr]
                        # py3.6 doesn't have this method
                        if hasattr(self._writer, "wait_closed"):
                            await self._writer.wait_closed()  # type: ignore[union-attr]
                except OSError:
                    pass
                finally:  # +++ MAKE SURE THIS ALWAYS GETS CALLED
                    self._reader = None
                    self._writer = None

I tested by patching in this change and it does seem to fix the error.

Digging under the hood - why is this happening

It seems like there is a race condition with incomplete error handling (e.g. if the task is cancelled with asyncio CancelledError? not 100% sure) and we end up closing the writer, while hanging on to the instance of the closed writer.

Once that happens, on the next call to redis execute this line raises this exception (the error output is suppressed, but you can print it out manually)

self._writer.writelines(command)

RuntimeError: unable to perform operation on <TCPTransport closed=True reading=False>; the handler is closed

This triggers conn.disconnect where the writer is supposedly closed and cleaned up.

But if the writer is already closed, then for some reason, _writer.wait_closed never returns which is why it hangs.

So there seem to be two bugs here:

  1. If closing the StreamWriter is interrupted, we'll end up hanging onto a closed writer and that connection object will be totally broken (and by extension the connection pool because it tries to reuse connections)
  2. since we hang onto the closed writer, the next call to wait_closed will hang indefinitely. This may be a bug in python asyncio, but we should try to address it here

Solution to infinite hang

Fixing the first error should prevent the second error from happening. However, if you want to be extra safe, you could add this to prevent the second error in case it ever did slip thru:

Fix: only call self._writer.close() and self._writer.wait_closed() if not getattr(self._writer.transport, '_closing', False)

Bonus info - tracing the indefinite hang back through cpython

StreamWriter.wait_closed (hangs forever)
https://github.com/python/cpython/blob/48b3ae9e29545891bece874b4c0c0e394fe0f048/Lib/asyncio/streams.py#L344

which calls protocol._get_close_waiter
https://github.com/python/cpython/blob/f4c03484da59049eb62a9bf7777b963e2267d187/Lib/asyncio/streams.py#L280

which returns protocol._closed which is a future defined here
https://github.com/python/cpython/blob/f4c03484da59049eb62a9bf7777b963e2267d187/Lib/asyncio/streams.py#L212

the future is resolved here (protocol._closed.set_result() in protocol.connection_lost)
https://github.com/python/cpython/blob/f4c03484da59049eb62a9bf7777b963e2267d187/Lib/asyncio/streams.py#L247

protocol.connection_lost is called in transport. _call_connection_lost
https://github.com/python/cpython/blob/6927632492cbad86a250aa006c1847e03b03e70b/Lib/asyncio/proactor_events.py#L153

Which is called in transport.close()
https://github.com/python/cpython/blob/6927632492cbad86a250aa006c1847e03b03e70b/Lib/asyncio/proactor_events.py#L108

But if transport._closing is already true, then _call_connection_lost is never called and the future is never resolved
https://github.com/python/cpython/blob/6927632492cbad86a250aa006c1847e03b03e70b/Lib/asyncio/proactor_events.py#L103

Example code

The problem can be reproduced with this - you just need to go to / and refresh a bunch pretty quickly and you should see it hang in the console on 'before execute' and show a broken image.

import asyncio
from fastapi import FastAPI
from fastapi.responses import HTMLResponse, StreamingResponse
from redis import asyncio as aioredis

app = FastAPI()
red = aioredis.from_url('redis://redisedge:6379')

with open('data/random.jpg', 'rb') as f:
    placeholder_image = f.read()

@app.get('/')
def index():
    return HTMLResponse('<img src="/video" width="600" height="400" />')

@app.get('/push')
async def add_image():
    return await red.xadd('camera:bug-test', { 'image': placeholder_image })

@app.get('/video')
def video_feed(name='camera:bug-test',field='image'):
    async def stream():
        while True:
            # query image
            p = red.pipeline(transaction=True).xrevrange(name, count=1)
            print('before execute')
            cmsg, = await p.execute()  # hangs here forever
            print('after execute', len(cmsg))

            # serve image
            if not cmsg:
                break
            yield format_frame(cmsg[0][1][field.encode('utf-8')])
            await asyncio.sleep(0.01)

    return StreamingResponse(stream(), media_type='multipart/x-mixed-replace; boundary=frame')


def format_frame(frame):
    return (
        b'--frame\r\n'
        b'Pragma-directive: no-cache\r\n'
        b'Cache-directive: no-cache\r\n'
        b'Cache-control: no-cache\r\n'
        b'Pragma: no-cache\r\n'
        b'Expires: 0\r\n'
        b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n\r\n')

requirements.txt

fastapi
uvicorn[standard]
redis>=4.2.0

to run:

uvicorn redis_fastapi_bug_test:app --reload

Personally, I just ran it in a docker container using the repo I mentioned at the top of the page.

items needed:

  • a redis instance
  • a test image data/random.jpg

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions