Closed
Description
Presently if you subscribe and an error occurs in the consumer's thread there's no way to respond to the error on the main thread.
It just logs the exception and hangs:
Exception in thread Consumer helper: consume bidirectional stream:
Traceback (most recent call last):
File "/Users/jonwayne/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/Users/jonwayne/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "/Users/jonwayne/workspace/python-docs-samples/pubsub/cloud-client/env/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/_consumer.py", line 248, in _blocking_consume
self._policy.on_exception(exc)
File "/Users/jonwayne/workspace/python-docs-samples/pubsub/cloud-client/env/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/policy/thread.py", line 135, in on_exception
raise exception
File "/Users/jonwayne/workspace/python-docs-samples/pubsub/cloud-client/env/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/_consumer.py", line 234, in _blocking_consume
for response in response_generator:
File "/Users/jonwayne/workspace/python-docs-samples/pubsub/cloud-client/env/lib/python3.6/site-packages/grpc/_channel.py", line 363, in __next__
return self._next()
File "/Users/jonwayne/workspace/python-docs-samples/pubsub/cloud-client/env/lib/python3.6/site-packages/grpc/_channel.py", line 357, in _next
raise self
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.NOT_FOUND, Resource not found (resource=my-subs).)>
It seems that consumer.open
should return a future. This future could be used to both block the main thread to consume message and pass through exceptions.
So the current sample:
def receive_messages(project, subscription_name):
"""Receives messages from a pull subscription."""
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)
def callback(message):
print('Received message: {}'.format(message))
message.ack()
subscriber.subscribe(subscription_path, callback=callback)
# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print('Listening for messages on {}'.format(subscription_path))
while True:
time.sleep(60)
Could instead be:
def receive_messages(project, subscription_name):
"""Receives messages from a pull subscription."""
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)
def callback(message):
print('Received message: {}'.format(message))
message.ack()
future = subscriber.subscribe(subscription_path, callback=callback)
print('Listening for messages on {}'.format(subscription_path))
# The subscriber is non-blocking but returns a future that allows us
# to block the main thread and allow messages to process in the
# background. This effectively blocks forever unless an error
# occurs in the subscriber thread in which case it'll be
# re-raised here.
future.result()