Skip to content

Pub/Sub has no way to track errors from the subscriber thread. #3888

Closed
@theacodes

Description

@theacodes

Presently if you subscribe and an error occurs in the consumer's thread there's no way to respond to the error on the main thread.

It just logs the exception and hangs:

Exception in thread Consumer helper: consume bidirectional stream:
Traceback (most recent call last):
  File "/Users/jonwayne/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/Users/jonwayne/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/jonwayne/workspace/python-docs-samples/pubsub/cloud-client/env/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/_consumer.py", line 248, in _blocking_consume
    self._policy.on_exception(exc)
  File "/Users/jonwayne/workspace/python-docs-samples/pubsub/cloud-client/env/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/policy/thread.py", line 135, in on_exception
    raise exception
  File "/Users/jonwayne/workspace/python-docs-samples/pubsub/cloud-client/env/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/_consumer.py", line 234, in _blocking_consume
    for response in response_generator:
  File "/Users/jonwayne/workspace/python-docs-samples/pubsub/cloud-client/env/lib/python3.6/site-packages/grpc/_channel.py", line 363, in __next__
    return self._next()
  File "/Users/jonwayne/workspace/python-docs-samples/pubsub/cloud-client/env/lib/python3.6/site-packages/grpc/_channel.py", line 357, in _next
    raise self
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.NOT_FOUND, Resource not found (resource=my-subs).)>

It seems that consumer.open should return a future. This future could be used to both block the main thread to consume message and pass through exceptions.

So the current sample:

def receive_messages(project, subscription_name):
    """Receives messages from a pull subscription."""
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project, subscription_name)

    def callback(message):
        print('Received message: {}'.format(message))
        message.ack()

    subscriber.subscribe(subscription_path, callback=callback)

    # The subscriber is non-blocking, so we must keep the main thread from
    # exiting to allow it to process messages in the background.
    print('Listening for messages on {}'.format(subscription_path))
    while True:
        time.sleep(60)

Could instead be:

def receive_messages(project, subscription_name):
    """Receives messages from a pull subscription."""
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project, subscription_name)

    def callback(message):
        print('Received message: {}'.format(message))
        message.ack()

    future = subscriber.subscribe(subscription_path, callback=callback)

    print('Listening for messages on {}'.format(subscription_path))
    # The subscriber is non-blocking but returns a future that allows us
    # to block the main thread and allow messages to process in the
    # background. This effectively blocks forever unless an error
    # occurs in the subscriber thread in which case it'll be
    # re-raised here.
    future.result()

Metadata

Metadata

Labels

api: pubsubIssues related to the Pub/Sub API.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions