From db56d743b2f11a8889938da4f044e73c0ad4bd30 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Wed, 18 Dec 2024 08:55:12 -1000 Subject: [PATCH] [PR #10171/5185f93 backport][3.11] Stream unpauses protocol before releasing connection (#10179) Co-authored-by: Javier Torres --- CHANGES/10169.bugfix.rst | 3 +++ CONTRIBUTORS.txt | 1 + aiohttp/streams.py | 3 +++ tests/test_flowcontrol_streams.py | 23 +++++++++++++++++++++++ 4 files changed, 30 insertions(+) create mode 100644 CHANGES/10169.bugfix.rst diff --git a/CHANGES/10169.bugfix.rst b/CHANGES/10169.bugfix.rst new file mode 100644 index 00000000000..32e06783856 --- /dev/null +++ b/CHANGES/10169.bugfix.rst @@ -0,0 +1,3 @@ +Fixed a hang where a connection previously used for a streaming +download could be returned to the pool in a paused state. +-- by :user:`javitonino`. diff --git a/CONTRIBUTORS.txt b/CONTRIBUTORS.txt index 5acc4de44fc..589784b29cb 100644 --- a/CONTRIBUTORS.txt +++ b/CONTRIBUTORS.txt @@ -171,6 +171,7 @@ Jan Buchar Jan Gosmann Jarno Elonen Jashandeep Sohi +Javier Torres Jean-Baptiste Estival Jens Steinhauser Jeonghun Lee diff --git a/aiohttp/streams.py b/aiohttp/streams.py index 029d577b88c..6126fb5695d 100644 --- a/aiohttp/streams.py +++ b/aiohttp/streams.py @@ -220,6 +220,9 @@ def feed_eof(self) -> None: self._eof_waiter = None set_result(waiter, None) + if self._protocol._reading_paused: + self._protocol.resume_reading() + for cb in self._eof_callbacks: try: cb() diff --git a/tests/test_flowcontrol_streams.py b/tests/test_flowcontrol_streams.py index 68e623b6dd7..9874cc2511e 100644 --- a/tests/test_flowcontrol_streams.py +++ b/tests/test_flowcontrol_streams.py @@ -4,6 +4,7 @@ import pytest from aiohttp import streams +from aiohttp.base_protocol import BaseProtocol @pytest.fixture @@ -112,6 +113,15 @@ async def test_read_nowait(self, stream) -> None: assert res == b"" assert stream._protocol.resume_reading.call_count == 1 # type: ignore[attr-defined] + async def test_resumed_on_eof(self, stream: streams.StreamReader) -> None: + stream.feed_data(b"data") + assert stream._protocol.pause_reading.call_count == 1 # type: ignore[attr-defined] + assert stream._protocol.resume_reading.call_count == 0 # type: ignore[attr-defined] + stream._protocol._reading_paused = True + + stream.feed_eof() + assert stream._protocol.resume_reading.call_count == 1 # type: ignore[attr-defined] + async def test_flow_control_data_queue_waiter_cancelled( buffer: streams.FlowControlDataQueue, @@ -180,3 +190,16 @@ async def test_flow_control_data_queue_read_eof( buffer.feed_eof() with pytest.raises(streams.EofStream): await buffer.read() + + +async def test_stream_reader_eof_when_full() -> None: + loop = asyncio.get_event_loop() + protocol = BaseProtocol(loop=loop) + protocol.transport = asyncio.Transport() + stream = streams.StreamReader(protocol, 1024, loop=loop) + + data_len = stream._high_water + 1 + stream.feed_data(b"0" * data_len) + assert protocol._reading_paused + stream.feed_eof() + assert not protocol._reading_paused