Skip to content

Commit 79adfa4

Browse files
committed
Use histogram to set default stream ACK deadline
With all the messages lease-managed (even those on hold), there is no need to have a fixed default value.
1 parent a0cf284 commit 79adfa4

File tree

3 files changed

+8
-31
lines changed

3 files changed

+8
-31
lines changed

pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,6 @@
5151
_RESUME_THRESHOLD = 0.8
5252
"""The load threshold below which to resume the incoming message stream."""
5353

54-
_DEFAULT_STREAM_ACK_DEADLINE = 60
55-
"""The default message acknowledge deadline in seconds for incoming message stream.
56-
57-
This default deadline is dynamically modified for the messages that are added
58-
to the lease management.
59-
"""
60-
6154

6255
def _maybe_wrap_exception(exception):
6356
"""Wraps a gRPC exception class, if needed."""
@@ -412,17 +405,7 @@ def open(self, callback, on_callback_error):
412405
)
413406

414407
# Create the RPC
415-
416-
# We must use a fixed value for the ACK deadline, as we cannot read it
417-
# from the subscription. The latter would require `pubsub.subscriptions.get`
418-
# permission, which is not granted to the default subscriber role
419-
# `roles/pubsub.subscriber`.
420-
# See also https://github.com/googleapis/google-cloud-python/issues/9339
421-
#
422-
# When dynamic lease management is enabled for the "on hold" messages,
423-
# the default stream ACK deadline should again be set based on the
424-
# historic ACK timing data, i.e. `self.ack_histogram.percentile(99)`.
425-
stream_ack_deadline_seconds = _DEFAULT_STREAM_ACK_DEADLINE
408+
stream_ack_deadline_seconds = self.ack_histogram.percentile(99)
426409

427410
get_initial_request = functools.partial(
428411
self._get_initial_request, stream_ack_deadline_seconds

pubsub/tests/system.py

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -382,10 +382,6 @@ class CallbackError(Exception):
382382
with pytest.raises(CallbackError):
383383
future.result(timeout=30)
384384

385-
@pytest.mark.xfail(
386-
reason="The default stream ACK deadline is static and received messages "
387-
"exceeding FlowControl.max_messages are currently not lease managed."
388-
)
389385
def test_streaming_pull_ack_deadline(
390386
self, publisher, subscriber, project, topic_path, subscription_path, cleanup
391387
):
@@ -400,29 +396,29 @@ def test_streaming_pull_ack_deadline(
400396
# Subscribe to the topic. This must happen before the messages
401397
# are published.
402398
subscriber.create_subscription(
403-
subscription_path, topic_path, ack_deadline_seconds=240
399+
subscription_path, topic_path, ack_deadline_seconds=45
404400
)
405401

406402
# publish some messages and wait for completion
407403
self._publish_messages(publisher, topic_path, batch_sizes=[2])
408404

409405
# subscribe to the topic
410406
callback = StreamingPullCallback(
411-
processing_time=70, # more than the default stream ACK deadline (60s)
407+
processing_time=13, # more than the default stream ACK deadline (10s)
412408
resolve_at_msg_count=3, # one more than the published messages count
413409
)
414410
flow_control = types.FlowControl(max_messages=1)
415411
subscription_future = subscriber.subscribe(
416412
subscription_path, callback, flow_control=flow_control
417413
)
418414

419-
# We expect to process the first two messages in 2 * 70 seconds, and
415+
# We expect to process the first two messages in 2 * 13 seconds, and
420416
# any duplicate message that is re-sent by the backend in additional
421-
# 70 seconds, totalling 210 seconds (+ overhead) --> if there have been
422-
# no duplicates in 240 seconds, we can reasonably assume that there
417+
# 13 seconds, totalling 39 seconds (+ overhead) --> if there have been
418+
# no duplicates in 60 seconds, we can reasonably assume that there
423419
# won't be any.
424420
try:
425-
callback.done_future.result(timeout=240)
421+
callback.done_future.result(timeout=60)
426422
except exceptions.TimeoutError:
427423
# future timed out, because we received no excessive messages
428424
assert sorted(callback.seen_message_ids) == [1, 2]

pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -429,8 +429,6 @@ def test_heartbeat_inactive():
429429
"google.cloud.pubsub_v1.subscriber._protocol.heartbeater.Heartbeater", autospec=True
430430
)
431431
def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bidi_rpc):
432-
stream_ack_deadline = streaming_pull_manager._DEFAULT_STREAM_ACK_DEADLINE
433-
434432
manager = make_manager()
435433

436434
manager.open(mock.sentinel.callback, mock.sentinel.on_callback_error)
@@ -460,7 +458,7 @@ def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bi
460458
)
461459
initial_request_arg = resumable_bidi_rpc.call_args.kwargs["initial_request"]
462460
assert initial_request_arg.func == manager._get_initial_request
463-
assert initial_request_arg.args[0] == stream_ack_deadline
461+
assert initial_request_arg.args[0] == 10 # the default stream ACK timeout
464462
assert not manager._client.api.get_subscription.called
465463

466464
resumable_bidi_rpc.return_value.add_done_callback.assert_called_once_with(

0 commit comments

Comments
 (0)