diff --git a/pubsub/cloud-client/publisher.py b/pubsub/cloud-client/publisher.py index a577abc63721..b7e574e04978 100644 --- a/pubsub/cloud-client/publisher.py +++ b/pubsub/cloud-client/publisher.py @@ -22,27 +22,33 @@ """ import argparse -import time -from google.cloud import pubsub_v1 - -def list_topics(project): +def list_topics(project_id): """Lists all Pub/Sub topics in the given project.""" # [START pubsub_list_topics] + from google.cloud import pubsub_v1 + + # TODO project_id = "Your Google Cloud Project ID" + publisher = pubsub_v1.PublisherClient() - project_path = publisher.project_path(project) + project_path = publisher.project_path(project_id) for topic in publisher.list_topics(project_path): print(topic) # [END pubsub_list_topics] -def create_topic(project, topic_name): +def create_topic(project_id, topic_name): """Create a new Pub/Sub topic.""" # [START pubsub_create_topic] + from google.cloud import pubsub_v1 + + # TODO project_id = "Your Google Cloud Project ID" + # TODO topic_name = "Your Pub/Sub topic name" + publisher = pubsub_v1.PublisherClient() - topic_path = publisher.topic_path(project, topic_name) + topic_path = publisher.topic_path(project_id, topic_name) topic = publisher.create_topic(topic_path) @@ -50,11 +56,16 @@ def create_topic(project, topic_name): # [END pubsub_create_topic] -def delete_topic(project, topic_name): +def delete_topic(project_id, topic_name): """Deletes an existing Pub/Sub topic.""" # [START pubsub_delete_topic] + from google.cloud import pubsub_v1 + + # TODO project_id = "Your Google Cloud Project ID" + # TODO topic_name = "Your Pub/Sub topic name" + publisher = pubsub_v1.PublisherClient() - topic_path = publisher.topic_path(project, topic_name) + topic_path = publisher.topic_path(project_id, topic_name) publisher.delete_topic(topic_path) @@ -62,30 +73,44 @@ def delete_topic(project, topic_name): # [END pubsub_delete_topic] -def publish_messages(project, topic_name): +def publish_messages(project_id, topic_name): """Publishes multiple messages to a Pub/Sub topic.""" # [START pubsub_quickstart_publisher] # [START pubsub_publish] + from google.cloud import pubsub_v1 + + # TODO project_id = "Your Google Cloud Project ID" + # TODO topic_name = "Your Pub/Sub topic name" + publisher = pubsub_v1.PublisherClient() - topic_path = publisher.topic_path(project, topic_name) + # The `topic_path` method creates a fully qualified identifier + # in the form `projects/{project_id}/topics/{topic_name}` + topic_path = publisher.topic_path(project_id, topic_name) for n in range(1, 10): data = u'Message number {}'.format(n) # Data must be a bytestring data = data.encode('utf-8') - publisher.publish(topic_path, data=data) + # When you publish a message, the client returns a future. + future = publisher.publish(topic_path, data=data) + print('Published {} of message ID {}.'.format(data, future.result())) print('Published messages.') # [END pubsub_quickstart_publisher] # [END pubsub_publish] -def publish_messages_with_custom_attributes(project, topic_name): +def publish_messages_with_custom_attributes(project_id, topic_name): """Publishes multiple messages with custom attributes to a Pub/Sub topic.""" # [START pubsub_publish_custom_attributes] + from google.cloud import pubsub_v1 + + # TODO project_id = "Your Google Cloud Project ID" + # TODO topic_name = "Your Pub/Sub topic name" + publisher = pubsub_v1.PublisherClient() - topic_path = publisher.topic_path(project, topic_name) + topic_path = publisher.topic_path(project_id, topic_name) for n in range(1, 10): data = u'Message number {}'.format(n) @@ -99,12 +124,17 @@ def publish_messages_with_custom_attributes(project, topic_name): # [END pubsub_publish_custom_attributes] -def publish_messages_with_futures(project, topic_name): +def publish_messages_with_futures(project_id, topic_name): """Publishes multiple messages to a Pub/Sub topic and prints their message IDs.""" # [START pubsub_publisher_concurrency_control] + from google.cloud import pubsub_v1 + + # TODO project_id = "Your Google Cloud Project ID" + # TODO topic_name = "Your Pub/Sub topic name" + publisher = pubsub_v1.PublisherClient() - topic_path = publisher.topic_path(project, topic_name) + topic_path = publisher.topic_path(project_id, topic_name) # When you publish a message, the client returns a Future. This Future # can be used to track when the message is published. @@ -124,11 +154,18 @@ def publish_messages_with_futures(project, topic_name): # [END pubsub_publisher_concurrency_control] -def publish_messages_with_error_handler(project, topic_name): +def publish_messages_with_error_handler(project_id, topic_name): """Publishes multiple messages to a Pub/Sub topic with an error handler.""" # [START pubsub_publish_messages_error_handler] + import time + + from google.cloud import pubsub_v1 + + # TODO project_id = "Your Google Cloud Project ID" + # TODO topic_name = "Your Pub/Sub topic name" + publisher = pubsub_v1.PublisherClient() - topic_path = publisher.topic_path(project, topic_name) + topic_path = publisher.topic_path(project_id, topic_name) def callback(message_future): # When timeout is unspecified, the exception method waits indefinitely. @@ -155,17 +192,22 @@ def callback(message_future): # [END pubsub_publish_messages_error_handler] -def publish_messages_with_batch_settings(project, topic_name): +def publish_messages_with_batch_settings(project_id, topic_name): """Publishes multiple messages to a Pub/Sub topic with batch settings.""" # [START pubsub_publisher_batch_settings] - # Configure the batch to publish once there is one kilobyte of data or - # 1 second has passed. + from google.cloud import pubsub_v1 + + # TODO project_id = "Your Google Cloud Project ID" + # TODO topic_name = "Your Pub/Sub topic name" + + # Configure the batch to publish as soon as there is one kilobyte + # of data or one second has passed. batch_settings = pubsub_v1.types.BatchSettings( max_bytes=1024, # One kilobyte max_latency=1, # One second ) publisher = pubsub_v1.PublisherClient(batch_settings) - topic_path = publisher.topic_path(project, topic_name) + topic_path = publisher.topic_path(project_id, topic_name) for n in range(1, 10): data = u'Message number {}'.format(n) @@ -182,7 +224,7 @@ def publish_messages_with_batch_settings(project, topic_name): description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter ) - parser.add_argument('project', help='Your Google Cloud project ID') + parser.add_argument('project_id', help='Your Google Cloud project ID') subparsers = parser.add_subparsers(dest='command') subparsers.add_parser('list', help=list_topics.__doc__) @@ -220,18 +262,19 @@ def publish_messages_with_batch_settings(project, topic_name): args = parser.parse_args() if args.command == 'list': - list_topics(args.project) + list_topics(args.project_id) elif args.command == 'create': - create_topic(args.project, args.topic_name) + create_topic(args.project_id, args.topic_name) elif args.command == 'delete': - delete_topic(args.project, args.topic_name) + delete_topic(args.project_id, args.topic_name) elif args.command == 'publish': - publish_messages(args.project, args.topic_name) + publish_messages(args.project_id, args.topic_name) elif args.command == 'publish-with-custom-attributes': - publish_messages_with_custom_attributes(args.project, args.topic_name) + publish_messages_with_custom_attributes( + args.project_id, args.topic_name) elif args.command == 'publish-with-futures': - publish_messages_with_futures(args.project, args.topic_name) + publish_messages_with_futures(args.project_id, args.topic_name) elif args.command == 'publish-with-error-handler': - publish_messages_with_error_handler(args.project, args.topic_name) + publish_messages_with_error_handler(args.project_id, args.topic_name) elif args.command == 'publish-with-batch-settings': - publish_messages_with_batch_settings(args.project, args.topic_name) + publish_messages_with_batch_settings(args.project_id, args.topic_name) diff --git a/pubsub/cloud-client/subscriber.py b/pubsub/cloud-client/subscriber.py index 3915a82ef9c8..5802218b4998 100644 --- a/pubsub/cloud-client/subscriber.py +++ b/pubsub/cloud-client/subscriber.py @@ -22,43 +22,52 @@ """ import argparse -import time -import logging -import random -import multiprocessing -from google.cloud import pubsub_v1 - -def list_subscriptions_in_topic(project, topic_name): +def list_subscriptions_in_topic(project_id, topic_name): """Lists all subscriptions for a given topic.""" # [START pubsub_list_topic_subscriptions] + from google.cloud import pubsub_v1 + + # TODO project_id = "Your Google Cloud Project ID" + # TODO topic_name = "Your Pub/Sub topic name" + publisher = pubsub_v1.PublisherClient() - topic_path = publisher.topic_path(project, topic_name) + topic_path = publisher.topic_path(project_id, topic_name) for subscription in publisher.list_topic_subscriptions(topic_path): print(subscription) # [END pubsub_list_topic_subscriptions] -def list_subscriptions_in_project(project): +def list_subscriptions_in_project(project_id): """Lists all subscriptions in the current project.""" # [START pubsub_list_subscriptions] + from google.cloud import pubsub_v1 + + # TODO project_id = "Your Google Cloud Project ID" + subscriber = pubsub_v1.SubscriberClient() - project_path = subscriber.project_path(project) + project_path = subscriber.project_path(project_id) for subscription in subscriber.list_subscriptions(project_path): print(subscription.name) # [END pubsub_list_subscriptions] -def create_subscription(project, topic_name, subscription_name): +def create_subscription(project_id, topic_name, subscription_name): """Create a new pull subscription on the given topic.""" # [START pubsub_create_pull_subscription] + from google.cloud import pubsub_v1 + + # TODO project_id = "Your Google Cloud Project ID" + # TODO topic_name = "Your Pub/Sub topic name" + # TODO subscription_name = "Your Pub/Sub subscription name" + subscriber = pubsub_v1.SubscriberClient() - topic_path = subscriber.topic_path(project, topic_name) + topic_path = subscriber.topic_path(project_id, topic_name) subscription_path = subscriber.subscription_path( - project, subscription_name) + project_id, subscription_name) subscription = subscriber.create_subscription( subscription_path, topic_path) @@ -67,17 +76,23 @@ def create_subscription(project, topic_name, subscription_name): # [END pubsub_create_pull_subscription] -def create_push_subscription(project, +def create_push_subscription(project_id, topic_name, subscription_name, endpoint): """Create a new push subscription on the given topic.""" # [START pubsub_create_push_subscription] - # endpoint = "https://my-test-project.appspot.com/push" + from google.cloud import pubsub_v1 + + # TODO project_id = "Your Google Cloud Project ID" + # TODO topic_name = "Your Pub/Sub topic name" + # TODO subscription_name = "Your Pub/Sub subscription name" + # TODO endpoint = "https://my-test-project.appspot.com/push" + subscriber = pubsub_v1.SubscriberClient() - topic_path = subscriber.topic_path(project, topic_name) + topic_path = subscriber.topic_path(project_id, topic_name) subscription_path = subscriber.subscription_path( - project, subscription_name) + project_id, subscription_name) push_config = pubsub_v1.types.PushConfig( push_endpoint=endpoint) @@ -90,14 +105,17 @@ def create_push_subscription(project, # [END pubsub_create_push_subscription] -def delete_subscription(project, subscription_name): +def delete_subscription(project_id, subscription_name): """Deletes an existing Pub/Sub topic.""" # [START pubsub_delete_subscription] - # project = "Your Google Cloud Project ID" - # subscription_name = "Your Pubsub subscription name" + from google.cloud import pubsub_v1 + + # TODO project_id = "Your Google Cloud Project ID" + # TODO subscription_name = "Your Pub/Sub subscription name" + subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( - project, subscription_name) + project_id, subscription_name) subscriber.delete_subscription(subscription_path) @@ -105,17 +123,23 @@ def delete_subscription(project, subscription_name): # [END pubsub_delete_subscription] -def update_subscription(project, subscription_name, endpoint): +def update_subscription(project_id, subscription_name, endpoint): """ Updates an existing Pub/Sub subscription's push endpoint URL. Note that certain properties of a subscription, such as its topic, are not modifiable. """ # [START pubsub_update_push_configuration] - # endpoint = "https://my-test-project.appspot.com/push" + from google.cloud import pubsub_v1 + + # TODO project_id = "Your Google Cloud Project ID" + # TODO topic_name = "Your Pub/Sub topic name" + # TODO subscription_name = "Your Pub/Sub subscription name" + # TODO endpoint = "https://my-test-project.appspot.com/push" + subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( - project, subscription_name) + project_id, subscription_name) push_config = pubsub_v1.types.PushConfig( push_endpoint=endpoint) @@ -139,15 +163,22 @@ def update_subscription(project, subscription_name, endpoint): # [END pubsub_update_push_configuration] -def receive_messages(project, subscription_name): +def receive_messages(project_id, subscription_name): """Receives messages from a pull subscription.""" # [START pubsub_subscriber_async_pull] # [START pubsub_quickstart_subscriber] - # project = "Your Google Cloud Project ID" - # subscription_name = "Your Pubsub subscription name" + import time + + from google.cloud import pubsub_v1 + + # TODO project_id = "Your Google Cloud Project ID" + # TODO subscription_name = "Your Pub/Sub subscription name" + subscriber = pubsub_v1.SubscriberClient() + # The `subscription_path` method creates a fully qualified identifier + # in the form `projects/{project_id}/subscriptions/{subscription_name}` subscription_path = subscriber.subscription_path( - project, subscription_name) + project_id, subscription_name) def callback(message): print('Received message: {}'.format(message)) @@ -155,8 +186,8 @@ def callback(message): subscriber.subscribe(subscription_path, callback=callback) - # The subscriber is non-blocking, so we must keep the main thread from - # exiting to allow it to process messages in the background. + # The subscriber is non-blocking. We must keep the main thread from + # exiting to allow it to process messages asynchronously in the background. print('Listening for messages on {}'.format(subscription_path)) while True: time.sleep(60) @@ -164,14 +195,19 @@ def callback(message): # [END pubsub_quickstart_subscriber] -def receive_messages_with_custom_attributes(project, subscription_name): +def receive_messages_with_custom_attributes(project_id, subscription_name): """Receives messages from a pull subscription.""" # [START pubsub_subscriber_sync_pull_custom_attributes] - # project = "Your Google Cloud Project ID" - # subscription_name = "Your Pubsub subscription name" + import time + + from google.cloud import pubsub_v1 + + # TODO project_id = "Your Google Cloud Project ID" + # TODO subscription_name = "Your Pub/Sub subscription name" + subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( - project, subscription_name) + project_id, subscription_name) def callback(message): print('Received message: {}'.format(message.data)) @@ -192,14 +228,19 @@ def callback(message): # [END pubsub_subscriber_sync_pull_custom_attributes] -def receive_messages_with_flow_control(project, subscription_name): +def receive_messages_with_flow_control(project_id, subscription_name): """Receives messages from a pull subscription with flow control.""" # [START pubsub_subscriber_flow_settings] - # project = "Your Google Cloud Project ID" - # subscription_name = "Your Pubsub subscription name" + import time + + from google.cloud import pubsub_v1 + + # TODO project_id = "Your Google Cloud Project ID" + # TODO subscription_name = "Your Pub/Sub subscription name" + subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( - project, subscription_name) + project_id, subscription_name) def callback(message): print('Received message: {}'.format(message.data)) @@ -218,16 +259,19 @@ def callback(message): # [END pubsub_subscriber_flow_settings] -def synchronous_pull(project, subscription_name): +def synchronous_pull(project_id, subscription_name): """Pulling messages synchronously.""" # [START pubsub_subscriber_sync_pull] - # project = "Your Google Cloud Project ID" - # subscription_name = "Your Pubsub subscription name" + from google.cloud import pubsub_v1 + + # TODO project_id = "Your Google Cloud Project ID" + # TODO subscription_name = "Your Pub/Sub subscription name" + subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( - project, subscription_name) + project_id, subscription_name) - NUM_MESSAGES=3 + NUM_MESSAGES = 3 # The subscriber pulls a specific number of messages. response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES) @@ -244,18 +288,26 @@ def synchronous_pull(project, subscription_name): # [END pubsub_subscriber_sync_pull] -def synchronous_pull_with_lease_management(project, subscription_name): +def synchronous_pull_with_lease_management(project_id, subscription_name): """Pulling messages synchronously with lease management""" # [START pubsub_subscriber_sync_pull_with_lease] - # project = "Your Google Cloud Project ID" - # subscription_name = "Your Pubsub subscription name" + import logging + import multiprocessing + import random + import time + + from google.cloud import pubsub_v1 + + # TODO project_id = "Your Google Cloud Project ID" + # TODO subscription_name = "Your Pub/Sub subscription name" + subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( - project, subscription_name) + project_id, subscription_name) - NUM_MESSAGES=2 - ACK_DEADLINE=30 - SLEEP_TIME=10 + NUM_MESSAGES = 2 + ACK_DEADLINE = 30 + SLEEP_TIME = 10 # The subscriber pulls a specific number of messages. response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES) @@ -266,7 +318,7 @@ def synchronous_pull_with_lease_management(project, subscription_name): def worker(msg): """Simulates a long-running process.""" - RUN_TIME = random.randint(1,60) + RUN_TIME = random.randint(1, 60) logger.info('{}: Running {} for {}s'.format( time.strftime("%X", time.gmtime()), msg.message.data, RUN_TIME)) time.sleep(RUN_TIME) @@ -279,16 +331,20 @@ def worker(msg): process.start() while processes: - for process, (ack_id, msg_data) in processes.items(): + for process in list(processes): + ack_id, msg_data = processes[process] # If the process is still running, reset the ack deadline as # specified by ACK_DEADLINE once every while as specified # by SLEEP_TIME. if process.is_alive(): # `ack_deadline_seconds` must be between 10 to 600. - subscriber.modify_ack_deadline(subscription_path, - [ack_id], ack_deadline_seconds=ACK_DEADLINE) + subscriber.modify_ack_deadline( + subscription_path, + [ack_id], + ack_deadline_seconds=ACK_DEADLINE) logger.info('{}: Reset ack deadline for {} for {}s'.format( - time.strftime("%X", time.gmtime()), msg_data, ACK_DEADLINE)) + time.strftime("%X", time.gmtime()), + msg_data, ACK_DEADLINE)) # If the processs is finished, acknowledges using `ack_id`. else: @@ -305,14 +361,17 @@ def worker(msg): # [END pubsub_subscriber_sync_pull_with_lease] -def listen_for_errors(project, subscription_name): +def listen_for_errors(project_id, subscription_name): """Receives messages and catches errors from a pull subscription.""" # [START pubsub_subscriber_error_listener] - # project = "Your Google Cloud Project ID" - # subscription_name = "Your Pubsub subscription name" + from google.cloud import pubsub_v1 + + # TODO project = "Your Google Cloud Project ID" + # TODO subscription_name = "Your Pubsub subscription name" + subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( - project, subscription_name) + project_id, subscription_name) def callback(message): print('Received message: {}'.format(message)) @@ -337,7 +396,7 @@ def callback(message): description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter ) - parser.add_argument('project', help='Your Google Cloud project ID') + parser.add_argument('project_id', help='Your Google Cloud project ID') subparsers = parser.add_subparsers(dest='command') list_in_topic_parser = subparsers.add_parser( @@ -389,7 +448,8 @@ def callback(message): synchronous_pull_with_lease_management_parser = subparsers.add_parser( 'receive-synchronously-with-lease', help=synchronous_pull_with_lease_management.__doc__) - synchronous_pull_with_lease_management_parser.add_argument('subscription_name') + synchronous_pull_with_lease_management_parser.add_argument( + 'subscription_name') listen_for_errors_parser = subparsers.add_parser( 'listen_for_errors', help=listen_for_errors.__doc__) @@ -398,37 +458,37 @@ def callback(message): args = parser.parse_args() if args.command == 'list_in_topic': - list_subscriptions_in_topic(args.project, args.topic_name) + list_subscriptions_in_topic(args.project_id, args.topic_name) elif args.command == 'list_in_project': - list_subscriptions_in_project(args.project) + list_subscriptions_in_project(args.project_id) elif args.command == 'create': create_subscription( - args.project, args.topic_name, args.subscription_name) + args.project_id, args.topic_name, args.subscription_name) elif args.command == 'create-push': create_push_subscription( - args.project, + args.project_id, args.topic_name, args.subscription_name, args.endpoint) elif args.command == 'delete': delete_subscription( - args.project, args.subscription_name) + args.project_id, args.subscription_name) elif args.command == 'update': update_subscription( - args.project, args.subscription_name, args.endpoint) + args.project_id, args.subscription_name, args.endpoint) elif args.command == 'receive': - receive_messages(args.project, args.subscription_name) + receive_messages(args.project_id, args.subscription_name) elif args.command == 'receive-custom-attributes': receive_messages_with_custom_attributes( - args.project, args.subscription_name) + args.project_id, args.subscription_name) elif args.command == 'receive-flow-control': receive_messages_with_flow_control( - args.project, args.subscription_name) + args.project_id, args.subscription_name) elif args.command == 'receive-synchronously': synchronous_pull( - args.project, args.subscription_name) + args.project_id, args.subscription_name) elif args.command == 'receive-synchronously-with-lease': synchronous_pull_with_lease_management( - args.project, args.subscription_name) + args.project_id, args.subscription_name) elif args.command == 'listen_for_errors': - listen_for_errors(args.project, args.subscription_name) + listen_for_errors(args.project_id, args.subscription_name)