From 8179bda53ec4b08260097b11dca24f45e80bf5eb Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Thu, 19 Jul 2018 13:31:04 -0700 Subject: [PATCH] Modified publisher with error handling (#1568) --- pubsub/cloud-client/publisher.py | 31 ++++++++++++++++----------- pubsub/cloud-client/publisher_test.py | 7 ++++++ pubsub/cloud-client/requirements.txt | 5 ----- 3 files changed, 25 insertions(+), 18 deletions(-) diff --git a/pubsub/cloud-client/publisher.py b/pubsub/cloud-client/publisher.py index c24dddca9972..96caba9f4735 100644 --- a/pubsub/cloud-client/publisher.py +++ b/pubsub/cloud-client/publisher.py @@ -22,7 +22,7 @@ """ import argparse -import concurrent.futures +import time from google.cloud import pubsub_v1 @@ -130,29 +130,27 @@ def publish_messages_with_error_handler(project, topic_name): publisher = pubsub_v1.PublisherClient() topic_path = publisher.topic_path(project, topic_name) - # When you publish a message, the client returns a Future. This Future - # can be used to track if an error has occurred. - futures = [] - - def callback(f): - exc = f.exception() - if exc: + def callback(message_future): + if message_future.exception(): print('Publishing message on {} threw an Exception {}.'.format( - topic_name, exc)) + topic_name, message_future.exception())) + else: + print(message_future.result()) for n in range(1, 10): data = u'Message number {}'.format(n) # Data must be a bytestring data = data.encode('utf-8') + # When you publish a message, the client returns a Future. message_future = publisher.publish(topic_path, data=data) message_future.add_done_callback(callback) - futures.append(message_future) + + print('Published message IDs:') # We must keep the main thread from exiting to allow it to process # messages in the background. - concurrent.futures.wait(futures) - - print('Published messages.') + while True: + time.sleep(60) # [END pubsub_publish_messages_error_handler] @@ -208,6 +206,11 @@ def publish_messages_with_batch_settings(project, topic_name): help=publish_messages_with_futures.__doc__) publish_with_futures_parser.add_argument('topic_name') + publish_with_error_handler_parser = subparsers.add_parser( + 'publish-with-error-handler', + help=publish_messages_with_error_handler.__doc__) + publish_with_error_handler_parser.add_argument('topic_name') + publish_with_batch_settings_parser = subparsers.add_parser( 'publish-with-batch-settings', help=publish_messages_with_batch_settings.__doc__) @@ -227,5 +230,7 @@ def publish_messages_with_batch_settings(project, topic_name): publish_messages_with_custom_attributes(args.project, args.topic_name) elif args.command == 'publish-with-futures': publish_messages_with_futures(args.project, args.topic_name) + elif args.command == 'publish-with-error-handler': + publish_messages_with_error_handler(args.project, args.topic_name) elif args.command == 'publish-with-batch-settings': publish_messages_with_batch_settings(args.project, args.topic_name) diff --git a/pubsub/cloud-client/publisher_test.py b/pubsub/cloud-client/publisher_test.py index a008a06100ec..2eda09c6c185 100644 --- a/pubsub/cloud-client/publisher_test.py +++ b/pubsub/cloud-client/publisher_test.py @@ -95,6 +95,13 @@ def test_publish_with_batch_settings(topic, capsys): assert 'Published' in out +def test_publish_with_error_handler(topic, capsys): + publisher.publish_messages_with_error_handler(PROJECT, TOPIC) + + out, _ = capsys.readouterr() + assert 'Published' in out + + def test_publish_with_futures(topic, capsys): publisher.publish_messages_with_futures(PROJECT, TOPIC) diff --git a/pubsub/cloud-client/requirements.txt b/pubsub/cloud-client/requirements.txt index dcffe157c203..c74eb4e3ccc1 100644 --- a/pubsub/cloud-client/requirements.txt +++ b/pubsub/cloud-client/requirements.txt @@ -1,6 +1 @@ -<<<<<<< HEAD google-cloud-pubsub==0.33.0 -======= -google-cloud-pubsub==0.32.1 -futures==3.1.1; python_version < '3' ->>>>>>> master