diff --git a/samples/snippets/publisher.py b/samples/snippets/publisher.py index c24dddca9..96caba9f4 100644 --- a/samples/snippets/publisher.py +++ b/samples/snippets/publisher.py @@ -22,7 +22,7 @@ """ import argparse -import concurrent.futures +import time from google.cloud import pubsub_v1 @@ -130,29 +130,27 @@ def publish_messages_with_error_handler(project, topic_name): publisher = pubsub_v1.PublisherClient() topic_path = publisher.topic_path(project, topic_name) - # When you publish a message, the client returns a Future. This Future - # can be used to track if an error has occurred. - futures = [] - - def callback(f): - exc = f.exception() - if exc: + def callback(message_future): + if message_future.exception(): print('Publishing message on {} threw an Exception {}.'.format( - topic_name, exc)) + topic_name, message_future.exception())) + else: + print(message_future.result()) for n in range(1, 10): data = u'Message number {}'.format(n) # Data must be a bytestring data = data.encode('utf-8') + # When you publish a message, the client returns a Future. message_future = publisher.publish(topic_path, data=data) message_future.add_done_callback(callback) - futures.append(message_future) + + print('Published message IDs:') # We must keep the main thread from exiting to allow it to process # messages in the background. - concurrent.futures.wait(futures) - - print('Published messages.') + while True: + time.sleep(60) # [END pubsub_publish_messages_error_handler] @@ -208,6 +206,11 @@ def publish_messages_with_batch_settings(project, topic_name): help=publish_messages_with_futures.__doc__) publish_with_futures_parser.add_argument('topic_name') + publish_with_error_handler_parser = subparsers.add_parser( + 'publish-with-error-handler', + help=publish_messages_with_error_handler.__doc__) + publish_with_error_handler_parser.add_argument('topic_name') + publish_with_batch_settings_parser = subparsers.add_parser( 'publish-with-batch-settings', help=publish_messages_with_batch_settings.__doc__) @@ -227,5 +230,7 @@ def publish_messages_with_batch_settings(project, topic_name): publish_messages_with_custom_attributes(args.project, args.topic_name) elif args.command == 'publish-with-futures': publish_messages_with_futures(args.project, args.topic_name) + elif args.command == 'publish-with-error-handler': + publish_messages_with_error_handler(args.project, args.topic_name) elif args.command == 'publish-with-batch-settings': publish_messages_with_batch_settings(args.project, args.topic_name) diff --git a/samples/snippets/publisher_test.py b/samples/snippets/publisher_test.py index a008a0610..2eda09c6c 100644 --- a/samples/snippets/publisher_test.py +++ b/samples/snippets/publisher_test.py @@ -95,6 +95,13 @@ def test_publish_with_batch_settings(topic, capsys): assert 'Published' in out +def test_publish_with_error_handler(topic, capsys): + publisher.publish_messages_with_error_handler(PROJECT, TOPIC) + + out, _ = capsys.readouterr() + assert 'Published' in out + + def test_publish_with_futures(topic, capsys): publisher.publish_messages_with_futures(PROJECT, TOPIC) diff --git a/samples/snippets/requirements.txt b/samples/snippets/requirements.txt index 81f06995b..c74eb4e3c 100644 --- a/samples/snippets/requirements.txt +++ b/samples/snippets/requirements.txt @@ -1,2 +1 @@ -google-cloud-pubsub==0.32.1 -futures==3.1.1; python_version < '3' +google-cloud-pubsub==0.33.0