Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pub/Sub: moved import statements inside region tags #1753

Merged
merged 8 commits into from
Oct 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 74 additions & 31 deletions pubsub/cloud-client/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,70 +22,95 @@
"""

import argparse
import time

from google.cloud import pubsub_v1


def list_topics(project):
def list_topics(project_id):
"""Lists all Pub/Sub topics in the given project."""
# [START pubsub_list_topics]
from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"

publisher = pubsub_v1.PublisherClient()
project_path = publisher.project_path(project)
project_path = publisher.project_path(project_id)

for topic in publisher.list_topics(project_path):
print(topic)
# [END pubsub_list_topics]


def create_topic(project, topic_name):
def create_topic(project_id, topic_name):
"""Create a new Pub/Sub topic."""
# [START pubsub_create_topic]
from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project, topic_name)
topic_path = publisher.topic_path(project_id, topic_name)

topic = publisher.create_topic(topic_path)

print('Topic created: {}'.format(topic))
# [END pubsub_create_topic]


def delete_topic(project, topic_name):
def delete_topic(project_id, topic_name):
"""Deletes an existing Pub/Sub topic."""
# [START pubsub_delete_topic]
from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project, topic_name)
topic_path = publisher.topic_path(project_id, topic_name)

publisher.delete_topic(topic_path)

print('Topic deleted: {}'.format(topic_path))
# [END pubsub_delete_topic]


def publish_messages(project, topic_name):
def publish_messages(project_id, topic_name):
"""Publishes multiple messages to a Pub/Sub topic."""
# [START pubsub_quickstart_publisher]
# [START pubsub_publish]
from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project, topic_name)
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_name}`
topic_path = publisher.topic_path(project_id, topic_name)

for n in range(1, 10):
data = u'Message number {}'.format(n)
# Data must be a bytestring
data = data.encode('utf-8')
publisher.publish(topic_path, data=data)
# When you publish a message, the client returns a future.
future = publisher.publish(topic_path, data=data)
print('Published {} of message ID {}.'.format(data, future.result()))

print('Published messages.')
# [END pubsub_quickstart_publisher]
# [END pubsub_publish]


def publish_messages_with_custom_attributes(project, topic_name):
def publish_messages_with_custom_attributes(project_id, topic_name):
"""Publishes multiple messages with custom attributes
to a Pub/Sub topic."""
# [START pubsub_publish_custom_attributes]
from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project, topic_name)
topic_path = publisher.topic_path(project_id, topic_name)

for n in range(1, 10):
data = u'Message number {}'.format(n)
Expand All @@ -99,12 +124,17 @@ def publish_messages_with_custom_attributes(project, topic_name):
# [END pubsub_publish_custom_attributes]


def publish_messages_with_futures(project, topic_name):
def publish_messages_with_futures(project_id, topic_name):
"""Publishes multiple messages to a Pub/Sub topic and prints their
message IDs."""
# [START pubsub_publisher_concurrency_control]
from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project, topic_name)
topic_path = publisher.topic_path(project_id, topic_name)

# When you publish a message, the client returns a Future. This Future
# can be used to track when the message is published.
Expand All @@ -124,11 +154,18 @@ def publish_messages_with_futures(project, topic_name):
# [END pubsub_publisher_concurrency_control]


def publish_messages_with_error_handler(project, topic_name):
def publish_messages_with_error_handler(project_id, topic_name):
"""Publishes multiple messages to a Pub/Sub topic with an error handler."""
# [START pubsub_publish_messages_error_handler]
import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project, topic_name)
topic_path = publisher.topic_path(project_id, topic_name)

def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
Expand All @@ -155,17 +192,22 @@ def callback(message_future):
# [END pubsub_publish_messages_error_handler]


def publish_messages_with_batch_settings(project, topic_name):
def publish_messages_with_batch_settings(project_id, topic_name):
"""Publishes multiple messages to a Pub/Sub topic with batch settings."""
# [START pubsub_publisher_batch_settings]
# Configure the batch to publish once there is one kilobyte of data or
# 1 second has passed.
from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"

# Configure the batch to publish as soon as there is one kilobyte
# of data or one second has passed.
batch_settings = pubsub_v1.types.BatchSettings(
max_bytes=1024, # One kilobyte
max_latency=1, # One second
)
publisher = pubsub_v1.PublisherClient(batch_settings)
topic_path = publisher.topic_path(project, topic_name)
topic_path = publisher.topic_path(project_id, topic_name)

for n in range(1, 10):
data = u'Message number {}'.format(n)
Expand All @@ -182,7 +224,7 @@ def publish_messages_with_batch_settings(project, topic_name):
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument('project', help='Your Google Cloud project ID')
parser.add_argument('project_id', help='Your Google Cloud project ID')

subparsers = parser.add_subparsers(dest='command')
subparsers.add_parser('list', help=list_topics.__doc__)
Expand Down Expand Up @@ -220,18 +262,19 @@ def publish_messages_with_batch_settings(project, topic_name):
args = parser.parse_args()

if args.command == 'list':
list_topics(args.project)
list_topics(args.project_id)
elif args.command == 'create':
create_topic(args.project, args.topic_name)
create_topic(args.project_id, args.topic_name)
elif args.command == 'delete':
delete_topic(args.project, args.topic_name)
delete_topic(args.project_id, args.topic_name)
elif args.command == 'publish':
publish_messages(args.project, args.topic_name)
publish_messages(args.project_id, args.topic_name)
elif args.command == 'publish-with-custom-attributes':
publish_messages_with_custom_attributes(args.project, args.topic_name)
publish_messages_with_custom_attributes(
args.project_id, args.topic_name)
elif args.command == 'publish-with-futures':
publish_messages_with_futures(args.project, args.topic_name)
publish_messages_with_futures(args.project_id, args.topic_name)
elif args.command == 'publish-with-error-handler':
publish_messages_with_error_handler(args.project, args.topic_name)
publish_messages_with_error_handler(args.project_id, args.topic_name)
elif args.command == 'publish-with-batch-settings':
publish_messages_with_batch_settings(args.project, args.topic_name)
publish_messages_with_batch_settings(args.project_id, args.topic_name)
Loading