Skip to content

Commit b12bf6d

Browse files
committed
Correctly ignore all incoming messages after the Terminate message
Once Terminate (`b'X'`) has been sent, there is no point in processing any stray messages in the buffer, so discard them explicitly.
1 parent 8fe4ccf commit b12bf6d

File tree

4 files changed

+20
-4
lines changed

4 files changed

+20
-4
lines changed

asyncpg/exceptions/_base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
__all__ = ('PostgresError', 'FatalPostgresError', 'UnknownPostgresError',
1414
'InterfaceError', 'InterfaceWarning', 'PostgresLogMessage',
15-
'InternalClientError', 'OutdatedSchemaCacheError')
15+
'InternalClientError', 'OutdatedSchemaCacheError', 'ProtocolError')
1616

1717

1818
def _is_asyncpg_class(cls):

asyncpg/protocol/coreproto.pxd

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ cdef enum ProtocolState:
1717
PROTOCOL_FAILED = 1
1818
PROTOCOL_ERROR_CONSUME = 2
1919
PROTOCOL_CANCELLED = 3
20+
PROTOCOL_TERMINATING = 4
2021

2122
PROTOCOL_AUTH = 10
2223
PROTOCOL_PREPARE = 11

asyncpg/protocol/coreproto.pyx

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,16 @@ cdef class CoreProtocol:
118118
else:
119119
self.buffer.discard_message()
120120

121+
elif state == PROTOCOL_TERMINATING:
122+
# The connection is being terminated.
123+
# discard all messages until connection
124+
# termination.
125+
self.buffer.discard_message()
126+
121127
else:
122128
raise apg_exc.InternalClientError(
123-
'protocol is in an unknown state {}'.format(state))
129+
f'cannot process message {chr(mtype)!r}: '
130+
f'protocol is in an unexpected state {state!r}.')
124131

125132
except Exception as ex:
126133
self.result_type = RESULT_FAILED
@@ -637,8 +644,7 @@ cdef class CoreProtocol:
637644
'cannot switch to "idle" state; '
638645
'protocol is in the "failed" state')
639646
elif self.state == PROTOCOL_IDLE:
640-
raise apg_exc.InternalClientError(
641-
'protocol is already in the "idle" state')
647+
pass
642648
else:
643649
self.state = new_state
644650

@@ -648,6 +654,9 @@ cdef class CoreProtocol:
648654
elif new_state == PROTOCOL_CANCELLED:
649655
self.state = PROTOCOL_CANCELLED
650656

657+
elif new_state == PROTOCOL_TERMINATING:
658+
self.state = PROTOCOL_TERMINATING
659+
651660
else:
652661
if self.state == PROTOCOL_IDLE:
653662
self.state = new_state
@@ -903,6 +912,7 @@ cdef class CoreProtocol:
903912
cdef _terminate(self):
904913
cdef WriteBuffer buf
905914
self._ensure_connected()
915+
self._set_state(PROTOCOL_TERMINATING)
906916
buf = WriteBuffer.new_message(b'X')
907917
buf.end_message()
908918
self._write(buf)

asyncpg/protocol/protocol.pyx

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -805,6 +805,11 @@ cdef class BaseProtocol(CoreProtocol):
805805
elif self.state == PROTOCOL_COPY_IN_DATA:
806806
self._on_result__copy_in(waiter)
807807

808+
elif self.state == PROTOCOL_TERMINATING:
809+
# We are waiting for the connection to drop, so
810+
# ignore any stray results at this point.
811+
pass
812+
808813
else:
809814
raise apg_exc.InternalClientError(
810815
'got result for unknown protocol state {}'.

0 commit comments

Comments
 (0)