-
Notifications
You must be signed in to change notification settings - Fork 215
Description
This test failed!
To configure my behavior, see the Flaky Bot documentation.
If I'm commenting on this issue too often, add the flakybot: quiet
label and
I will stop commenting.
commit: 1dcff30
buildURL: Build Status, Sponge
status: failed
Test output
self = publisher = topic_path = 'projects/precise-truck-742/topics/t-1678138055460' subscriber = subscription_path = 'projects/precise-truck-742/subscriptions/s-1678138055462' cleanup = [(>, ()...erClient object at 0x7fd8dcaf3850>>, (), {'subscription': 'projects/precise-truck-742/subscriptions/s-1678138055462'})]def test_streaming_pull_blocking_shutdown( self, publisher, topic_path, subscriber, subscription_path, cleanup ): # Make sure the topic and subscription get deleted. cleanup.append((publisher.delete_topic, (), {"topic": topic_path})) cleanup.append( (subscriber.delete_subscription, (), {"subscription": subscription_path}) ) # The ACK-s are only persisted if *all* messages published in the same batch # are ACK-ed. We thus publish each message in its own batch so that the backend # treats all messages' ACKs independently of each other. publisher.create_topic(name=topic_path) subscriber.create_subscription(name=subscription_path, topic=topic_path) _publish_messages(publisher, topic_path, batch_sizes=[1] * 10) # Artificially delay message processing, gracefully shutdown the streaming pull # in the meantime, then verify that those messages were nevertheless processed. processed_messages = [] def callback(message): time.sleep(15) processed_messages.append(message.data) message.ack() # Flow control limits should exceed the number of worker threads, so that some # of the messages will be blocked on waiting for free scheduler threads. flow_control = pubsub_v1.types.FlowControl(max_messages=5) executor = concurrent.futures.ThreadPoolExecutor(max_workers=3) scheduler = pubsub_v1.subscriber.scheduler.ThreadScheduler(executor=executor) subscription_future = subscriber.subscribe( subscription_path, callback=callback, flow_control=flow_control, scheduler=scheduler, await_callbacks_on_shutdown=True, ) try: subscription_future.result(timeout=10) # less than the sleep in callback except exceptions.TimeoutError: subscription_future.cancel() subscription_future.result() # block until shutdown completes # Blocking om shutdown should have waited for the already executing # callbacks to finish. assert len(processed_messages) == 3 # The messages that were not processed should have been NACK-ed and we should # receive them again quite soon. all_done = threading.Barrier(7 + 1, timeout=5) # +1 because of the main thread remaining = [] def callback2(message): remaining.append(message.data) message.ack() all_done.wait() subscription_future = subscriber.subscribe( subscription_path, callback=callback2, await_callbacks_on_shutdown=False ) try:
all_done.wait()
tests/system.py:685:
self = <threading.Barrier object at 0x7fd8d898bf10>, timeout = 5
def wait(self, timeout=None): """Wait for the barrier. When the specified number of threads have started waiting, they are all simultaneously awoken. If an 'action' was provided for the barrier, one of the threads will have executed that callback prior to returning. Returns an individual index number from 0 to 'parties-1'. """ if timeout is None: timeout = self._timeout with self._cond: self._enter() # Block while the barrier drains. index = self._count self._count += 1 try: if index + 1 == self._parties: # We release the barrier self._release() else: # We wait until someone releases us
self._wait(timeout)
/usr/local/lib/python3.10/threading.py:668:
self = <threading.Barrier object at 0x7fd8d898bf10>, timeout = 5
def _wait(self, timeout): if not self._cond.wait_for(lambda : self._state != 0, timeout): #timed out. Break the barrier self._break()
raise BrokenBarrierError
E threading.BrokenBarrierError
/usr/local/lib/python3.10/threading.py:706: BrokenBarrierError
During handling of the above exception, another exception occurred:
self = <tests.system.TestStreamingPull object at 0x7fd8dca2b520>
publisher = <google.cloud.pubsub_v1.PublisherClient object at 0x7fd8b0700c70>
topic_path = 'projects/precise-truck-742/topics/t-1678138055460'
subscriber = <google.cloud.pubsub_v1.SubscriberClient object at 0x7fd8dcaf3850>
subscription_path = 'projects/precise-truck-742/subscriptions/s-1678138055462'
cleanup = [(<bound method PublisherClient.delete_topic of <google.cloud.pubsub_v1.PublisherClient object at 0x7fd8b0700c70>>, ()...erClient object at 0x7fd8dcaf3850>>, (), {'subscription': 'projects/precise-truck-742/subscriptions/s-1678138055462'})]def test_streaming_pull_blocking_shutdown( self, publisher, topic_path, subscriber, subscription_path, cleanup ): # Make sure the topic and subscription get deleted. cleanup.append((publisher.delete_topic, (), {"topic": topic_path})) cleanup.append( (subscriber.delete_subscription, (), {"subscription": subscription_path}) ) # The ACK-s are only persisted if *all* messages published in the same batch # are ACK-ed. We thus publish each message in its own batch so that the backend # treats all messages' ACKs independently of each other. publisher.create_topic(name=topic_path) subscriber.create_subscription(name=subscription_path, topic=topic_path) _publish_messages(publisher, topic_path, batch_sizes=[1] * 10) # Artificially delay message processing, gracefully shutdown the streaming pull # in the meantime, then verify that those messages were nevertheless processed. processed_messages = [] def callback(message): time.sleep(15) processed_messages.append(message.data) message.ack() # Flow control limits should exceed the number of worker threads, so that some # of the messages will be blocked on waiting for free scheduler threads. flow_control = pubsub_v1.types.FlowControl(max_messages=5) executor = concurrent.futures.ThreadPoolExecutor(max_workers=3) scheduler = pubsub_v1.subscriber.scheduler.ThreadScheduler(executor=executor) subscription_future = subscriber.subscribe( subscription_path, callback=callback, flow_control=flow_control, scheduler=scheduler, await_callbacks_on_shutdown=True, ) try: subscription_future.result(timeout=10) # less than the sleep in callback except exceptions.TimeoutError: subscription_future.cancel() subscription_future.result() # block until shutdown completes # Blocking om shutdown should have waited for the already executing # callbacks to finish. assert len(processed_messages) == 3 # The messages that were not processed should have been NACK-ed and we should # receive them again quite soon. all_done = threading.Barrier(7 + 1, timeout=5) # +1 because of the main thread remaining = [] def callback2(message): remaining.append(message.data) message.ack() all_done.wait() subscription_future = subscriber.subscribe( subscription_path, callback=callback2, await_callbacks_on_shutdown=False ) try: all_done.wait() except threading.BrokenBarrierError: # PRAGMA: no cover
pytest.fail("The remaining messages have not been re-delivered in time.")
E Failed: The remaining messages have not been re-delivered in time.
tests/system.py:687: Failed