-
-
Notifications
You must be signed in to change notification settings - Fork 32.2k
bpo-32251: Implement asyncio.BufferedProtocol. #4755
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
058bb33
f9c75e7
b3c1869
f70b6f4
3afe68b
9819394
dab5212
856b31e
43648fc
0ba5eb4
6d1222f
7feb5bd
41f64ce
6bb46d9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,6 +12,7 @@ | |
from . import base_events | ||
from . import constants | ||
from . import futures | ||
from . import protocols | ||
from . import sslproto | ||
from . import transports | ||
from .log import logger | ||
|
@@ -91,17 +92,19 @@ def __del__(self): | |
self.close() | ||
|
||
def _fatal_error(self, exc, message='Fatal error on pipe transport'): | ||
if isinstance(exc, base_events._FATAL_ERROR_IGNORE): | ||
if self._loop.get_debug(): | ||
logger.debug("%r: %s", self, message, exc_info=True) | ||
else: | ||
self._loop.call_exception_handler({ | ||
'message': message, | ||
'exception': exc, | ||
'transport': self, | ||
'protocol': self._protocol, | ||
}) | ||
self._force_close(exc) | ||
try: | ||
if isinstance(exc, base_events._FATAL_ERROR_IGNORE): | ||
if self._loop.get_debug(): | ||
logger.debug("%r: %s", self, message, exc_info=True) | ||
else: | ||
self._loop.call_exception_handler({ | ||
'message': message, | ||
'exception': exc, | ||
'transport': self, | ||
'protocol': self._protocol, | ||
}) | ||
finally: | ||
self._force_close(exc) | ||
|
||
def _force_close(self, exc): | ||
if self._closing: | ||
|
@@ -150,6 +153,12 @@ def __init__(self, loop, sock, protocol, waiter=None, | |
extra=None, server=None): | ||
super().__init__(loop, sock, protocol, waiter, extra, server) | ||
self._paused = False | ||
|
||
if protocols._is_buffered_protocol(protocol): | ||
self._loop_reading = self._loop_reading__get_buffer | ||
else: | ||
self._loop_reading = self._loop_reading__data_received | ||
|
||
self._loop.call_soon(self._loop_reading) | ||
|
||
def is_reading(self): | ||
|
@@ -159,6 +168,11 @@ def pause_reading(self): | |
if self._closing or self._paused: | ||
return | ||
self._paused = True | ||
|
||
if self._read_fut is not None and not self._read_fut.done(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Proactor tricks again? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, I think this is just a bug. The problem is that when you call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like you are right |
||
self._read_fut.cancel() | ||
self._read_fut = None | ||
|
||
if self._loop.get_debug(): | ||
logger.debug("%r pauses reading", self) | ||
|
||
|
@@ -170,11 +184,25 @@ def resume_reading(self): | |
if self._loop.get_debug(): | ||
logger.debug("%r resumes reading", self) | ||
|
||
def _loop_reading(self, fut=None): | ||
def _loop_reading__on_eof(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is two underscore in name intentional? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's just a bit more readable this way, since it's one "family" of functions (all three of them are combined in one "mega" function). I can replace with one underscore, but I think it's OK this way too. Up to you. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, agree |
||
if self._loop.get_debug(): | ||
logger.debug("%r received EOF", self) | ||
|
||
try: | ||
keep_open = self._protocol.eof_received() | ||
except Exception as exc: | ||
self._fatal_error( | ||
exc, 'Fatal error: protocol.eof_received() call failed.') | ||
return | ||
|
||
if not keep_open: | ||
self.close() | ||
|
||
def _loop_reading__data_received(self, fut=None): | ||
if self._paused: | ||
return | ||
data = None | ||
|
||
data = None | ||
try: | ||
if fut is not None: | ||
assert self._read_fut is fut or (self._read_fut is None and | ||
|
@@ -197,7 +225,7 @@ def _loop_reading(self, fut=None): | |
return | ||
|
||
# reschedule a new read | ||
self._read_fut = self._loop._proactor.recv(self._sock, 4096) | ||
self._read_fut = self._loop._proactor.recv(self._sock, 32768) | ||
except ConnectionAbortedError as exc: | ||
if not self._closing: | ||
self._fatal_error(exc, 'Fatal read error on pipe transport') | ||
|
@@ -216,12 +244,81 @@ def _loop_reading(self, fut=None): | |
finally: | ||
if data: | ||
self._protocol.data_received(data) | ||
elif data is not None: | ||
if self._loop.get_debug(): | ||
logger.debug("%r received EOF", self) | ||
keep_open = self._protocol.eof_received() | ||
if not keep_open: | ||
self.close() | ||
elif data == b'': | ||
self._loop_reading__on_eof() | ||
|
||
def _loop_reading__get_buffer(self, fut=None): | ||
if self._paused: | ||
return | ||
|
||
nbytes = None | ||
if fut is not None: | ||
assert self._read_fut is fut or (self._read_fut is None and | ||
self._closing) | ||
self._read_fut = None | ||
try: | ||
if fut.done(): | ||
nbytes = fut.result() | ||
else: | ||
# the future will be replaced by next proactor.recv call | ||
fut.cancel() | ||
except ConnectionAbortedError as exc: | ||
if not self._closing: | ||
self._fatal_error( | ||
exc, 'Fatal read error on pipe transport') | ||
elif self._loop.get_debug(): | ||
logger.debug("Read error on pipe transport while closing", | ||
exc_info=True) | ||
except ConnectionResetError as exc: | ||
self._force_close(exc) | ||
except OSError as exc: | ||
self._fatal_error(exc, 'Fatal read error on pipe transport') | ||
except futures.CancelledError: | ||
if not self._closing: | ||
raise | ||
|
||
if nbytes is not None: | ||
if nbytes == 0: | ||
# we got end-of-file so no need to reschedule a new read | ||
self._loop_reading__on_eof() | ||
else: | ||
try: | ||
self._protocol.buffer_updated(nbytes) | ||
except Exception as exc: | ||
self._fatal_error( | ||
exc, | ||
'Fatal error: ' | ||
'protocol.buffer_updated() call failed.') | ||
return | ||
|
||
if self._closing or nbytes == 0: | ||
# since close() has been called we ignore any read data | ||
return | ||
|
||
try: | ||
buf = self._protocol.get_buffer() | ||
except Exception as exc: | ||
self._fatal_error( | ||
exc, 'Fatal error: protocol.get_buffer() call failed.') | ||
return | ||
|
||
try: | ||
# schedule a new read | ||
self._read_fut = self._loop._proactor.recv_into(self._sock, buf) | ||
self._read_fut.add_done_callback(self._loop_reading) | ||
except ConnectionAbortedError as exc: | ||
if not self._closing: | ||
self._fatal_error(exc, 'Fatal read error on pipe transport') | ||
elif self._loop.get_debug(): | ||
logger.debug("Read error on pipe transport while closing", | ||
exc_info=True) | ||
except ConnectionResetError as exc: | ||
self._force_close(exc) | ||
except OSError as exc: | ||
self._fatal_error(exc, 'Fatal read error on pipe transport') | ||
except futures.CancelledError: | ||
if not self._closing: | ||
raise | ||
|
||
|
||
class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like not related to PR subject but I love the change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that the proactor code hasn't received any love recently, so there're many things we can improve here. I'm just fixing things I see slightly broken.