Skip to content

Commit eeae17e

Browse files
committed
Add warning if internal bytes count is negative
This should not happen, but if it does, it is a bug in the StreamingPullManager logic, and we should know about it.
1 parent b132c3a commit eeae17e

File tree

2 files changed

+35
-0
lines changed

2 files changed

+35
-0
lines changed

pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,13 @@ def _maybe_release_messages(self):
307307
break
308308

309309
self._on_hold_bytes -= msg.size
310+
311+
if self._on_hold_bytes < 0:
312+
_LOGGER.warning(
313+
"On hold bytes was unexpectedly negative: %s", self._on_hold_bytes
314+
)
315+
self._on_hold_bytes = 0
316+
310317
_LOGGER.debug(
311318
"Released held message, scheduling callback for it, "
312319
"still on hold %s (bytes %s).",

pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,34 @@ def test__maybe_release_messages_below_overload():
286286
assert call_args[1].ack_id in ("ack_foo", "ack_bar")
287287

288288

289+
def test__maybe_release_messages_negative_on_hold_bytes_warning(caplog):
290+
manager = make_manager(
291+
flow_control=types.FlowControl(max_messages=10, max_bytes=1000)
292+
)
293+
294+
msg = mock.create_autospec(message.Message, instance=True, ack_id="ack", size=17)
295+
manager._messages_on_hold.put(msg)
296+
manager._on_hold_bytes = 5 # too low for some reason
297+
298+
_leaser = manager._leaser = mock.create_autospec(leaser.Leaser)
299+
_leaser.message_count = 3
300+
_leaser.bytes = 150
301+
302+
with caplog.at_level(logging.WARNING):
303+
manager._maybe_release_messages()
304+
305+
expected_warnings = [
306+
record.message.lower()
307+
for record in caplog.records
308+
if "unexpectedly negative" in record.message
309+
]
310+
assert len(expected_warnings) == 1
311+
assert "on hold bytes" in expected_warnings[0]
312+
assert "-12" in expected_warnings[0]
313+
314+
assert manager._on_hold_bytes == 0 # should be auto-corrected
315+
316+
289317
def test_send_unary():
290318
manager = make_manager()
291319
manager._UNARY_REQUESTS = True

0 commit comments

Comments
 (0)