Skip to content

tests.system.TestStreamingPull: test_streaming_pull_blocking_shutdown[grpc-grpc] failed #879

@flaky-bot

Description

@flaky-bot

This test failed!

To configure my behavior, see the Flaky Bot documentation.

If I'm commenting on this issue too often, add the flakybot: quiet label and
I will stop commenting.


commit: 1dcff30
buildURL: Build Status, Sponge
status: failed

Test output
self = 
publisher = 
topic_path = 'projects/precise-truck-742/topics/t-1678138055460'
subscriber = 
subscription_path = 'projects/precise-truck-742/subscriptions/s-1678138055462'
cleanup = [(>, ()...erClient object at 0x7fd8dcaf3850>>, (), {'subscription': 'projects/precise-truck-742/subscriptions/s-1678138055462'})]
def test_streaming_pull_blocking_shutdown(
    self, publisher, topic_path, subscriber, subscription_path, cleanup
):
    # Make sure the topic and subscription get deleted.
    cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
    cleanup.append(
        (subscriber.delete_subscription, (), {"subscription": subscription_path})
    )

    # The ACK-s are only persisted if *all* messages published in the same batch
    # are ACK-ed. We thus publish each message in its own batch so that the backend
    # treats all messages' ACKs independently of each other.
    publisher.create_topic(name=topic_path)
    subscriber.create_subscription(name=subscription_path, topic=topic_path)
    _publish_messages(publisher, topic_path, batch_sizes=[1] * 10)

    # Artificially delay message processing, gracefully shutdown the streaming pull
    # in the meantime, then verify that those messages were nevertheless processed.
    processed_messages = []

    def callback(message):
        time.sleep(15)
        processed_messages.append(message.data)
        message.ack()

    # Flow control limits should exceed the number of worker threads, so that some
    # of the messages will be blocked on waiting for free scheduler threads.
    flow_control = pubsub_v1.types.FlowControl(max_messages=5)
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
    scheduler = pubsub_v1.subscriber.scheduler.ThreadScheduler(executor=executor)
    subscription_future = subscriber.subscribe(
        subscription_path,
        callback=callback,
        flow_control=flow_control,
        scheduler=scheduler,
        await_callbacks_on_shutdown=True,
    )

    try:
        subscription_future.result(timeout=10)  # less than the sleep in callback
    except exceptions.TimeoutError:
        subscription_future.cancel()
        subscription_future.result()  # block until shutdown completes

    # Blocking om shutdown should have waited for the already executing
    # callbacks to finish.
    assert len(processed_messages) == 3

    # The messages that were not processed should have been NACK-ed and we should
    # receive them again quite soon.
    all_done = threading.Barrier(7 + 1, timeout=5)  # +1 because of the main thread
    remaining = []

    def callback2(message):
        remaining.append(message.data)
        message.ack()
        all_done.wait()

    subscription_future = subscriber.subscribe(
        subscription_path, callback=callback2, await_callbacks_on_shutdown=False
    )

    try:
      all_done.wait()

tests/system.py:685:


self = <threading.Barrier object at 0x7fd8d898bf10>, timeout = 5

def wait(self, timeout=None):
    """Wait for the barrier.

    When the specified number of threads have started waiting, they are all
    simultaneously awoken. If an 'action' was provided for the barrier, one
    of the threads will have executed that callback prior to returning.
    Returns an individual index number from 0 to 'parties-1'.

    """
    if timeout is None:
        timeout = self._timeout
    with self._cond:
        self._enter() # Block while the barrier drains.
        index = self._count
        self._count += 1
        try:
            if index + 1 == self._parties:
                # We release the barrier
                self._release()
            else:
                # We wait until someone releases us
              self._wait(timeout)

/usr/local/lib/python3.10/threading.py:668:


self = <threading.Barrier object at 0x7fd8d898bf10>, timeout = 5

def _wait(self, timeout):
    if not self._cond.wait_for(lambda : self._state != 0, timeout):
        #timed out.  Break the barrier
        self._break()
      raise BrokenBarrierError

E threading.BrokenBarrierError

/usr/local/lib/python3.10/threading.py:706: BrokenBarrierError

During handling of the above exception, another exception occurred:

self = <tests.system.TestStreamingPull object at 0x7fd8dca2b520>
publisher = <google.cloud.pubsub_v1.PublisherClient object at 0x7fd8b0700c70>
topic_path = 'projects/precise-truck-742/topics/t-1678138055460'
subscriber = <google.cloud.pubsub_v1.SubscriberClient object at 0x7fd8dcaf3850>
subscription_path = 'projects/precise-truck-742/subscriptions/s-1678138055462'
cleanup = [(<bound method PublisherClient.delete_topic of <google.cloud.pubsub_v1.PublisherClient object at 0x7fd8b0700c70>>, ()...erClient object at 0x7fd8dcaf3850>>, (), {'subscription': 'projects/precise-truck-742/subscriptions/s-1678138055462'})]

def test_streaming_pull_blocking_shutdown(
    self, publisher, topic_path, subscriber, subscription_path, cleanup
):
    # Make sure the topic and subscription get deleted.
    cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
    cleanup.append(
        (subscriber.delete_subscription, (), {"subscription": subscription_path})
    )

    # The ACK-s are only persisted if *all* messages published in the same batch
    # are ACK-ed. We thus publish each message in its own batch so that the backend
    # treats all messages' ACKs independently of each other.
    publisher.create_topic(name=topic_path)
    subscriber.create_subscription(name=subscription_path, topic=topic_path)
    _publish_messages(publisher, topic_path, batch_sizes=[1] * 10)

    # Artificially delay message processing, gracefully shutdown the streaming pull
    # in the meantime, then verify that those messages were nevertheless processed.
    processed_messages = []

    def callback(message):
        time.sleep(15)
        processed_messages.append(message.data)
        message.ack()

    # Flow control limits should exceed the number of worker threads, so that some
    # of the messages will be blocked on waiting for free scheduler threads.
    flow_control = pubsub_v1.types.FlowControl(max_messages=5)
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
    scheduler = pubsub_v1.subscriber.scheduler.ThreadScheduler(executor=executor)
    subscription_future = subscriber.subscribe(
        subscription_path,
        callback=callback,
        flow_control=flow_control,
        scheduler=scheduler,
        await_callbacks_on_shutdown=True,
    )

    try:
        subscription_future.result(timeout=10)  # less than the sleep in callback
    except exceptions.TimeoutError:
        subscription_future.cancel()
        subscription_future.result()  # block until shutdown completes

    # Blocking om shutdown should have waited for the already executing
    # callbacks to finish.
    assert len(processed_messages) == 3

    # The messages that were not processed should have been NACK-ed and we should
    # receive them again quite soon.
    all_done = threading.Barrier(7 + 1, timeout=5)  # +1 because of the main thread
    remaining = []

    def callback2(message):
        remaining.append(message.data)
        message.ack()
        all_done.wait()

    subscription_future = subscriber.subscribe(
        subscription_path, callback=callback2, await_callbacks_on_shutdown=False
    )

    try:
        all_done.wait()
    except threading.BrokenBarrierError:  # PRAGMA: no cover
      pytest.fail("The remaining messages have not been re-delivered in time.")

E Failed: The remaining messages have not been re-delivered in time.

tests/system.py:687: Failed

Metadata

Metadata

Assignees

Labels

api: pubsubIssues related to the googleapis/python-pubsub API.flakybot: flakyTells the Flaky Bot not to close or comment on this issue.flakybot: issueAn issue filed by the Flaky Bot. Should not be added manually.priority: p2Moderately-important priority. Fix may not be included in next release.type: bugError or flaw in code with unintended results or allowing sub-optimal usage patterns.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions