Description
Version: 4.2.0
(only version with asyncio)
Platform: Mac / Docker / Python 3.7.12
My code builds off of this repo, but the code in the repo isn't relevant for the minimum code sample. I'm just using it to provide the redis instance atm. https://github.com/RedisGears/EdgeRealtimeVideoAnalytics
Description:
I'm wrapping redis using FastAPI to handle streaming video in and out. It runs fine for the first few connections, but if you refresh the page enough, you'll end up with a broken connection and ALL redis calls hang.
tl;dr a solution
Make sure we always discard the writer instance when disconnecting
Replace here: https://github.com/redis/redis-py/blob/master/redis/asyncio/connection.py#L822-L831
try:
if os.getpid() == self.pid:
self._writer.close() # type: ignore[union-attr]
# py3.6 doesn't have this method
if hasattr(self._writer, "wait_closed"):
await self._writer.wait_closed() # type: ignore[union-attr]
except OSError:
pass
finally: # +++ MAKE SURE THIS ALWAYS GETS CALLED
self._reader = None
self._writer = None
I tested by patching in this change and it does seem to fix the error.
Digging under the hood - why is this happening
It seems like there is a race condition with incomplete error handling (e.g. if the task is cancelled with asyncio CancelledError? not 100% sure) and we end up closing the writer, while hanging on to the instance of the closed writer.
Once that happens, on the next call to redis
execute
this line raises this exception (the error output is suppressed, but you can print it out manually)
redis-py/redis/asyncio/connection.py
Line 861 in ef4caf5
RuntimeError: unable to perform operation on <TCPTransport closed=True reading=False>; the handler is closed
This triggers conn.disconnect
where the writer is supposedly closed and cleaned up.
But if the writer is already closed, then for some reason, _writer.wait_closed
never returns which is why it hangs.
So there seem to be two bugs here:
- If closing the
StreamWriter
is interrupted, we'll end up hanging onto a closed writer and that connection object will be totally broken (and by extension the connection pool because it tries to reuse connections) - since we hang onto the closed writer, the next call to
wait_closed
will hang indefinitely. This may be a bug in python asyncio, but we should try to address it here
Solution to infinite hang
Fixing the first error should prevent the second error from happening. However, if you want to be extra safe, you could add this to prevent the second error in case it ever did slip thru:
Fix: only call self._writer.close()
and self._writer.wait_closed()
if not getattr(self._writer.transport, '_closing', False)
Bonus info - tracing the indefinite hang back through cpython
StreamWriter.wait_closed
(hangs forever)
https://github.com/python/cpython/blob/48b3ae9e29545891bece874b4c0c0e394fe0f048/Lib/asyncio/streams.py#L344
which calls protocol._get_close_waiter
https://github.com/python/cpython/blob/f4c03484da59049eb62a9bf7777b963e2267d187/Lib/asyncio/streams.py#L280
which returns protocol._closed
which is a future defined here
https://github.com/python/cpython/blob/f4c03484da59049eb62a9bf7777b963e2267d187/Lib/asyncio/streams.py#L212
the future is resolved here (protocol._closed.set_result()
in protocol.connection_lost
)
https://github.com/python/cpython/blob/f4c03484da59049eb62a9bf7777b963e2267d187/Lib/asyncio/streams.py#L247
protocol.connection_lost
is called in transport. _call_connection_lost
https://github.com/python/cpython/blob/6927632492cbad86a250aa006c1847e03b03e70b/Lib/asyncio/proactor_events.py#L153
Which is called in transport.close()
https://github.com/python/cpython/blob/6927632492cbad86a250aa006c1847e03b03e70b/Lib/asyncio/proactor_events.py#L108
But if transport._closing
is already true, then _call_connection_lost
is never called and the future is never resolved
https://github.com/python/cpython/blob/6927632492cbad86a250aa006c1847e03b03e70b/Lib/asyncio/proactor_events.py#L103
Example code
The problem can be reproduced with this - you just need to go to /
and refresh a bunch pretty quickly and you should see it hang in the console on 'before execute'
and show a broken image.
import asyncio
from fastapi import FastAPI
from fastapi.responses import HTMLResponse, StreamingResponse
from redis import asyncio as aioredis
app = FastAPI()
red = aioredis.from_url('redis://redisedge:6379')
with open('data/random.jpg', 'rb') as f:
placeholder_image = f.read()
@app.get('/')
def index():
return HTMLResponse('<img src="/video" width="600" height="400" />')
@app.get('/push')
async def add_image():
return await red.xadd('camera:bug-test', { 'image': placeholder_image })
@app.get('/video')
def video_feed(name='camera:bug-test',field='image'):
async def stream():
while True:
# query image
p = red.pipeline(transaction=True).xrevrange(name, count=1)
print('before execute')
cmsg, = await p.execute() # hangs here forever
print('after execute', len(cmsg))
# serve image
if not cmsg:
break
yield format_frame(cmsg[0][1][field.encode('utf-8')])
await asyncio.sleep(0.01)
return StreamingResponse(stream(), media_type='multipart/x-mixed-replace; boundary=frame')
def format_frame(frame):
return (
b'--frame\r\n'
b'Pragma-directive: no-cache\r\n'
b'Cache-directive: no-cache\r\n'
b'Cache-control: no-cache\r\n'
b'Pragma: no-cache\r\n'
b'Expires: 0\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n\r\n')
requirements.txt
fastapi
uvicorn[standard]
redis>=4.2.0
to run:
uvicorn redis_fastapi_bug_test:app --reload
Personally, I just ran it in a docker container using the repo I mentioned at the top of the page.
items needed:
- a redis instance
- a test image
data/random.jpg