Skip to content

Commit

Permalink
Refactor FlowControlDataQueue to improve performances (#9659)
Browse files Browse the repository at this point in the history
  • Loading branch information
bdraco authored Nov 4, 2024
1 parent 6c932dd commit 1bb146a
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 33 deletions.
1 change: 1 addition & 0 deletions CHANGES/9659.misc.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improved performance of the internal ``DataQueue`` -- by :user:`bdraco`.
65 changes: 32 additions & 33 deletions aiohttp/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,6 @@ def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
self._eof = False
self._waiter: Optional[asyncio.Future[None]] = None
self._exception: Union[Type[BaseException], BaseException, None] = None
self._size = 0
self._buffer: Deque[_SizedT] = collections.deque()

def __len__(self) -> int:
Expand All @@ -630,48 +629,39 @@ def set_exception(
) -> None:
self._eof = True
self._exception = exc

waiter = self._waiter
if waiter is not None:
if (waiter := self._waiter) is not None:
self._waiter = None
set_exception(waiter, exc, exc_cause)

def feed_data(self, data: _SizedT) -> None:
self._size += len(data)
self._buffer.append(data)

waiter = self._waiter
if waiter is not None:
if (waiter := self._waiter) is not None:
self._waiter = None
set_result(waiter, None)

def feed_eof(self) -> None:
self._eof = True

waiter = self._waiter
if waiter is not None:
if (waiter := self._waiter) is not None:
self._waiter = None
set_result(waiter, None)

async def _wait_for_data(self) -> None:
assert not self._waiter
self._waiter = self._loop.create_future()
try:
await self._waiter
except (asyncio.CancelledError, asyncio.TimeoutError):
self._waiter = None
raise

async def read(self) -> _SizedT:
if not self._buffer and not self._eof:
assert not self._waiter
self._waiter = self._loop.create_future()
try:
await self._waiter
except (asyncio.CancelledError, asyncio.TimeoutError):
self._waiter = None
raise

await self._wait_for_data()
if self._buffer:
data = self._buffer.popleft()
self._size -= len(data)
return data
else:
if self._exception is not None:
raise self._exception
else:
raise EofStream
return self._buffer.popleft()
if self._exception is not None:
raise self._exception
raise EofStream

def __aiter__(self) -> AsyncStreamIterator[_SizedT]:
return AsyncStreamIterator(self.read)
Expand All @@ -687,19 +677,28 @@ def __init__(
self, protocol: BaseProtocol, limit: int, *, loop: asyncio.AbstractEventLoop
) -> None:
super().__init__(loop=loop)

self._size = 0
self._protocol = protocol
self._limit = limit * 2

def feed_data(self, data: _SizedT) -> None:
super().feed_data(data)

self._size += len(data)
self._buffer.append(data)
if (waiter := self._waiter) is not None:
self._waiter = None
set_result(waiter, None)
if self._size > self._limit and not self._protocol._reading_paused:
self._protocol.pause_reading()

async def read(self) -> _SizedT:
try:
return await super().read()
finally:
if not self._buffer and not self._eof:
await self._wait_for_data()
if self._buffer:
data = self._buffer.popleft()
self._size -= len(data)
if self._size < self._limit and self._protocol._reading_paused:
self._protocol.resume_reading()
return data
if self._exception is not None:
raise self._exception
raise EofStream

0 comments on commit 1bb146a

Please sign in to comment.