From 5aa21cad05c4af48e990fdc9c1692301a2184023 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Tue, 30 Jun 2020 18:58:03 +0200 Subject: [PATCH] chore: remove PubSub cloud-client samples (#4175) * chore: remove PubSub cloud-client samples * chore: add link to new samples location in README Co-authored-by: Tianzi Cai --- pubsub/cloud-client/README.rst | 241 +------ pubsub/cloud-client/README.rst.in | 30 - pubsub/cloud-client/iam.py | 231 ------ pubsub/cloud-client/iam_test.py | 118 ---- pubsub/cloud-client/publisher.py | 334 --------- pubsub/cloud-client/publisher_test.py | 146 ---- pubsub/cloud-client/quickstart/pub.py | 86 --- pubsub/cloud-client/quickstart/pub_test.py | 56 -- pubsub/cloud-client/quickstart/sub.py | 69 -- pubsub/cloud-client/quickstart/sub_test.py | 102 --- pubsub/cloud-client/requirements-test.txt | 3 - pubsub/cloud-client/requirements.txt | 1 - pubsub/cloud-client/subscriber.py | 783 --------------------- pubsub/cloud-client/subscriber_test.py | 341 --------- 14 files changed, 2 insertions(+), 2539 deletions(-) delete mode 100644 pubsub/cloud-client/README.rst.in delete mode 100644 pubsub/cloud-client/iam.py delete mode 100644 pubsub/cloud-client/iam_test.py delete mode 100644 pubsub/cloud-client/publisher.py delete mode 100644 pubsub/cloud-client/publisher_test.py delete mode 100644 pubsub/cloud-client/quickstart/pub.py delete mode 100644 pubsub/cloud-client/quickstart/pub_test.py delete mode 100644 pubsub/cloud-client/quickstart/sub.py delete mode 100644 pubsub/cloud-client/quickstart/sub_test.py delete mode 100644 pubsub/cloud-client/requirements-test.txt delete mode 100644 pubsub/cloud-client/requirements.txt delete mode 100644 pubsub/cloud-client/subscriber.py delete mode 100644 pubsub/cloud-client/subscriber_test.py diff --git a/pubsub/cloud-client/README.rst b/pubsub/cloud-client/README.rst index f27f9438ea96..3dd2bc886aa0 100644 --- a/pubsub/cloud-client/README.rst +++ b/pubsub/cloud-client/README.rst @@ -1,240 +1,3 @@ -.. This file is automatically generated. Do not edit this file directly. +These samples have been moved. -Google Cloud Pub/Sub Python Samples -=============================================================================== - -.. image:: https://gstatic.com/cloudssh/images/open-btn.png - :target: https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/GoogleCloudPlatform/python-docs-samples&page=editor&open_in_editor=pubsub/cloud-client/README.rst - - -This directory contains samples for Google Cloud Pub/Sub. `Google Cloud Pub/Sub`_ is a fully-managed real-time messaging service that allows you to send and receive messages between independent applications. - - - - -.. _Google Cloud Pub/Sub: https://cloud.google.com/pubsub/docs - -Setup -------------------------------------------------------------------------------- - - -Authentication -++++++++++++++ - -This sample requires you to have authentication setup. Refer to the -`Authentication Getting Started Guide`_ for instructions on setting up -credentials for applications. - -.. _Authentication Getting Started Guide: - https://cloud.google.com/docs/authentication/getting-started - -Install Dependencies -++++++++++++++++++++ - -#. Clone python-docs-samples and change directory to the sample directory you want to use. - - .. code-block:: bash - - $ git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git - -#. Install `pip`_ and `virtualenv`_ if you do not already have them. You may want to refer to the `Python Development Environment Setup Guide`_ for Google Cloud Platform for instructions. - - .. _Python Development Environment Setup Guide: - https://cloud.google.com/python/setup - -#. Create a virtualenv. Samples are compatible with Python 2.7 and 3.4+. - - .. code-block:: bash - - $ virtualenv env - $ source env/bin/activate - -#. Install the dependencies needed to run the samples. - - .. code-block:: bash - - $ pip install -r requirements.txt - -.. _pip: https://pip.pypa.io/ -.. _virtualenv: https://virtualenv.pypa.io/ - -Samples -------------------------------------------------------------------------------- - -Publisher -+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ - -.. image:: https://gstatic.com/cloudssh/images/open-btn.png - :target: https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/GoogleCloudPlatform/python-docs-samples&page=editor&open_in_editor=pubsub/cloud-client/publisher.py,pubsub/cloud-client/README.rst - - - - -To run this sample: - -.. code-block:: bash - - $ python publisher.py --help - - usage: publisher.py [-h] - project_id - {list,create,delete,publish,publish-with-custom-attributes,publish-with-error-handler,publish-with-batch-settings,publish-with-retry-settings} - ... - - This application demonstrates how to perform basic operations on topics - with the Cloud Pub/Sub API. - - For more information, see the README.md under /pubsub and the documentation - at https://cloud.google.com/pubsub/docs. - - positional arguments: - project_id Your Google Cloud project ID - {list,create,delete,publish,publish-with-custom-attributes,publish-with-error-handler,publish-with-batch-settings,publish-with-retry-settings} - list Lists all Pub/Sub topics in the given project. - create Create a new Pub/Sub topic. - delete Deletes an existing Pub/Sub topic. - publish Publishes multiple messages to a Pub/Sub topic. - publish-with-custom-attributes - Publishes multiple messages with custom attributes to - a Pub/Sub topic. - publish-with-error-handler - Publishes multiple messages to a Pub/Sub topic with an - error handler. - publish-with-batch-settings - Publishes multiple messages to a Pub/Sub topic with - batch settings. - publish-with-retry-settings - Publishes messages with custom retry settings. - - optional arguments: - -h, --help show this help message and exit - - - -Subscribers -+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ - -.. image:: https://gstatic.com/cloudssh/images/open-btn.png - :target: https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/GoogleCloudPlatform/python-docs-samples&page=editor&open_in_editor=pubsub/cloud-client/subscriber.py,pubsub/cloud-client/README.rst - - - - -To run this sample: - -.. code-block:: bash - - $ python subscriber.py --help - - usage: subscriber.py [-h] - project_id - {list-in-topic,list-in-project,create,create-with-dead-letter-policy,create-push,delete,update-push,update-dead-letter-policy,remove-dead-letter-policy,receive,receive-custom-attributes,receive-flow-control,receive-synchronously,receive-synchronously-with-lease,listen-for-errors,receive-messages-with-delivery-attempts} - ... - - This application demonstrates how to perform basic operations on - subscriptions with the Cloud Pub/Sub API. - - For more information, see the README.md under /pubsub and the documentation - at https://cloud.google.com/pubsub/docs. - - positional arguments: - project_id Your Google Cloud project ID - {list-in-topic,list-in-project,create,create-with-dead-letter-policy,create-push,delete,update-push,update-dead-letter-policy,remove-dead-letter-policy,receive,receive-custom-attributes,receive-flow-control,receive-synchronously,receive-synchronously-with-lease,listen-for-errors,receive-messages-with-delivery-attempts} - list-in-topic Lists all subscriptions for a given topic. - list-in-project Lists all subscriptions in the current project. - create Create a new pull subscription on the given topic. - create-with-dead-letter-policy - Create a subscription with dead letter policy. - create-push Create a new push subscription on the given topic. - delete Deletes an existing Pub/Sub topic. - update-push Updates an existing Pub/Sub subscription's push - endpoint URL. Note that certain properties of a - subscription, such as its topic, are not modifiable. - update-dead-letter-policy - Update a subscription's dead letter policy. - remove-dead-letter-policy - Remove dead letter policy from a subscription. - receive Receives messages from a pull subscription. - receive-custom-attributes - Receives messages from a pull subscription. - receive-flow-control - Receives messages from a pull subscription with flow - control. - receive-synchronously - Pulling messages synchronously. - receive-synchronously-with-lease - Pulling messages synchronously with lease management - listen-for-errors Receives messages and catches errors from a pull - subscription. - receive-messages-with-delivery-attempts - - optional arguments: - -h, --help show this help message and exit - - - -Identity and Access Management -+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ - -.. image:: https://gstatic.com/cloudssh/images/open-btn.png - :target: https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/GoogleCloudPlatform/python-docs-samples&page=editor&open_in_editor=pubsub/cloud-client/iam.py,pubsub/cloud-client/README.rst - - - - -To run this sample: - -.. code-block:: bash - - $ python iam.py - - usage: iam.py [-h] - project - {get-topic-policy,get-subscription-policy,set-topic-policy,set-subscription-policy,check-topic-permissions,check-subscription-permissions} - ... - - This application demonstrates how to perform basic operations on IAM - policies with the Cloud Pub/Sub API. - - For more information, see the README.md under /pubsub and the documentation - at https://cloud.google.com/pubsub/docs. - - positional arguments: - project Your Google Cloud project ID - {get-topic-policy,get-subscription-policy,set-topic-policy,set-subscription-policy,check-topic-permissions,check-subscription-permissions} - get-topic-policy Prints the IAM policy for the given topic. - get-subscription-policy - Prints the IAM policy for the given subscription. - set-topic-policy Sets the IAM policy for a topic. - set-subscription-policy - Sets the IAM policy for a topic. - check-topic-permissions - Checks to which permissions are available on the given - topic. - check-subscription-permissions - Checks to which permissions are available on the given - subscription. - - optional arguments: - -h, --help show this help message and exit - - - - - -The client library -------------------------------------------------------------------------------- - -This sample uses the `Google Cloud Client Library for Python`_. -You can read the documentation for more details on API usage and use GitHub -to `browse the source`_ and `report issues`_. - -.. _Google Cloud Client Library for Python: - https://googlecloudplatform.github.io/google-cloud-python/ -.. _browse the source: - https://github.com/GoogleCloudPlatform/google-cloud-python -.. _report issues: - https://github.com/GoogleCloudPlatform/google-cloud-python/issues - - -.. _Google Cloud SDK: https://cloud.google.com/sdk/ +https://github.com/googleapis/python-pubsub/tree/master/samples diff --git a/pubsub/cloud-client/README.rst.in b/pubsub/cloud-client/README.rst.in deleted file mode 100644 index ddbc647121b2..000000000000 --- a/pubsub/cloud-client/README.rst.in +++ /dev/null @@ -1,30 +0,0 @@ -# This file is used to generate README.rst - -product: - name: Google Cloud Pub/Sub - short_name: Cloud Pub/Sub - url: https://cloud.google.com/pubsub/docs - description: > - `Google Cloud Pub/Sub`_ is a fully-managed real-time messaging service that - allows you to send and receive messages between independent applications. - -setup: -- auth -- install_deps - -samples: -- name: Quickstart - file: quickstart.py -- name: Publisher - file: publisher.py - show_help: true -- name: Subscribers - file: subscriber.py - show_help: true -- name: Identity and Access Management - file: iam.py - show_help: true - -cloud_client_library: true - -folder: pubsub/cloud-client \ No newline at end of file diff --git a/pubsub/cloud-client/iam.py b/pubsub/cloud-client/iam.py deleted file mode 100644 index 71c55d764c0c..000000000000 --- a/pubsub/cloud-client/iam.py +++ /dev/null @@ -1,231 +0,0 @@ -#!/usr/bin/env python - -# Copyright 2019 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""This application demonstrates how to perform basic operations on IAM -policies with the Cloud Pub/Sub API. - -For more information, see the README.md under /pubsub and the documentation -at https://cloud.google.com/pubsub/docs. -""" - -import argparse - - -def get_topic_policy(project, topic_id): - """Prints the IAM policy for the given topic.""" - # [START pubsub_get_topic_policy] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # topic_id = "your-topic-id" - - client = pubsub_v1.PublisherClient() - topic_path = client.topic_path(project, topic_id) - - policy = client.get_iam_policy(topic_path) - - print("Policy for topic {}:".format(topic_path)) - for binding in policy.bindings: - print("Role: {}, Members: {}".format(binding.role, binding.members)) - # [END pubsub_get_topic_policy] - - -def get_subscription_policy(project, subscription_id): - """Prints the IAM policy for the given subscription.""" - # [START pubsub_get_subscription_policy] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # subscription_id = "your-subscription-id" - - client = pubsub_v1.SubscriberClient() - subscription_path = client.subscription_path(project, subscription_id) - - policy = client.get_iam_policy(subscription_path) - - print("Policy for subscription {}:".format(subscription_path)) - for binding in policy.bindings: - print("Role: {}, Members: {}".format(binding.role, binding.members)) - - client.close() - # [END pubsub_get_subscription_policy] - - -def set_topic_policy(project, topic_id): - """Sets the IAM policy for a topic.""" - # [START pubsub_set_topic_policy] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # topic_id = "your-topic-id" - - client = pubsub_v1.PublisherClient() - topic_path = client.topic_path(project, topic_id) - - policy = client.get_iam_policy(topic_path) - - # Add all users as viewers. - policy.bindings.add(role="roles/pubsub.viewer", members=["allUsers"]) - - # Add a group as a publisher. - policy.bindings.add( - role="roles/pubsub.publisher", members=["group:cloud-logs@google.com"] - ) - - # Set the policy - policy = client.set_iam_policy(topic_path, policy) - - print("IAM policy for topic {} set: {}".format(topic_id, policy)) - # [END pubsub_set_topic_policy] - - -def set_subscription_policy(project, subscription_id): - """Sets the IAM policy for a topic.""" - # [START pubsub_set_subscription_policy] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # subscription_id = "your-subscription-id" - - client = pubsub_v1.SubscriberClient() - subscription_path = client.subscription_path(project, subscription_id) - - policy = client.get_iam_policy(subscription_path) - - # Add all users as viewers. - policy.bindings.add(role="roles/pubsub.viewer", members=["allUsers"]) - - # Add a group as an editor. - policy.bindings.add(role="roles/editor", members=["group:cloud-logs@google.com"]) - - # Set the policy - policy = client.set_iam_policy(subscription_path, policy) - - print("IAM policy for subscription {} set: {}".format(subscription_id, policy)) - - client.close() - # [END pubsub_set_subscription_policy] - - -def check_topic_permissions(project, topic_id): - """Checks to which permissions are available on the given topic.""" - # [START pubsub_test_topic_permissions] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # topic_id = "your-topic-id" - - client = pubsub_v1.PublisherClient() - topic_path = client.topic_path(project, topic_id) - - permissions_to_check = ["pubsub.topics.publish", "pubsub.topics.update"] - - allowed_permissions = client.test_iam_permissions(topic_path, permissions_to_check) - - print( - "Allowed permissions for topic {}: {}".format(topic_path, allowed_permissions) - ) - # [END pubsub_test_topic_permissions] - - -def check_subscription_permissions(project, subscription_id): - """Checks to which permissions are available on the given subscription.""" - # [START pubsub_test_subscription_permissions] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # subscription_id = "your-subscription-id" - - client = pubsub_v1.SubscriberClient() - subscription_path = client.subscription_path(project, subscription_id) - - permissions_to_check = [ - "pubsub.subscriptions.consume", - "pubsub.subscriptions.update", - ] - - allowed_permissions = client.test_iam_permissions( - subscription_path, permissions_to_check - ) - - print( - "Allowed permissions for subscription {}: {}".format( - subscription_path, allowed_permissions - ) - ) - - client.close() - # [END pubsub_test_subscription_permissions] - - -if __name__ == "__main__": - parser = argparse.ArgumentParser( - description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, - ) - parser.add_argument("project", help="Your Google Cloud project ID") - - subparsers = parser.add_subparsers(dest="command") - - get_topic_policy_parser = subparsers.add_parser( - "get-topic-policy", help=get_topic_policy.__doc__ - ) - get_topic_policy_parser.add_argument("topic_id") - - get_subscription_policy_parser = subparsers.add_parser( - "get-subscription-policy", help=get_subscription_policy.__doc__ - ) - get_subscription_policy_parser.add_argument("subscription_id") - - set_topic_policy_parser = subparsers.add_parser( - "set-topic-policy", help=set_topic_policy.__doc__ - ) - set_topic_policy_parser.add_argument("topic_id") - - set_subscription_policy_parser = subparsers.add_parser( - "set-subscription-policy", help=set_subscription_policy.__doc__ - ) - set_subscription_policy_parser.add_argument("subscription_id") - - check_topic_permissions_parser = subparsers.add_parser( - "check-topic-permissions", help=check_topic_permissions.__doc__ - ) - check_topic_permissions_parser.add_argument("topic_id") - - check_subscription_permissions_parser = subparsers.add_parser( - "check-subscription-permissions", help=check_subscription_permissions.__doc__, - ) - check_subscription_permissions_parser.add_argument("subscription_id") - - args = parser.parse_args() - - if args.command == "get-topic-policy": - get_topic_policy(args.project, args.topic_id) - elif args.command == "get-subscription-policy": - get_subscription_policy(args.project, args.subscription_id) - elif args.command == "set-topic-policy": - set_topic_policy(args.project, args.topic_id) - elif args.command == "set-subscription-policy": - set_subscription_policy(args.project, args.subscription_id) - elif args.command == "check-topic-permissions": - check_topic_permissions(args.project, args.topic_id) - elif args.command == "check-subscription-permissions": - check_subscription_permissions(args.project, args.subscription_id) diff --git a/pubsub/cloud-client/iam_test.py b/pubsub/cloud-client/iam_test.py deleted file mode 100644 index d196953f6207..000000000000 --- a/pubsub/cloud-client/iam_test.py +++ /dev/null @@ -1,118 +0,0 @@ -# Copyright 2016 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import uuid - -from google.cloud import pubsub_v1 -import pytest - -import iam - -UUID = uuid.uuid4().hex -PROJECT = os.environ["GOOGLE_CLOUD_PROJECT"] -TOPIC = "iam-test-topic-" + UUID -SUBSCRIPTION = "iam-test-subscription-" + UUID - - -@pytest.fixture(scope="module") -def publisher_client(): - yield pubsub_v1.PublisherClient() - - -@pytest.fixture(scope="module") -def topic(publisher_client): - topic_path = publisher_client.topic_path(PROJECT, TOPIC) - - try: - publisher_client.delete_topic(topic_path) - except Exception: - pass - - publisher_client.create_topic(topic_path) - - yield topic_path - - publisher_client.delete_topic(topic_path) - - -@pytest.fixture(scope="module") -def subscriber_client(): - subscriber_client = pubsub_v1.SubscriberClient() - yield subscriber_client - subscriber_client.close() - - -@pytest.fixture -def subscription(subscriber_client, topic): - subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION) - - try: - subscriber_client.delete_subscription(subscription_path) - except Exception: - pass - - subscriber_client.create_subscription(subscription_path, topic=topic) - - yield subscription_path - - subscriber_client.delete_subscription(subscription_path) - - -def test_get_topic_policy(topic, capsys): - iam.get_topic_policy(PROJECT, TOPIC) - - out, _ = capsys.readouterr() - assert topic in out - - -def test_get_subscription_policy(subscription, capsys): - iam.get_subscription_policy(PROJECT, SUBSCRIPTION) - - out, _ = capsys.readouterr() - assert subscription in out - - -def test_set_topic_policy(publisher_client, topic): - iam.set_topic_policy(PROJECT, TOPIC) - - policy = publisher_client.get_iam_policy(topic) - assert "roles/pubsub.publisher" in str(policy) - assert "allUsers" in str(policy) - - -def test_set_subscription_policy(subscriber_client, subscription): - iam.set_subscription_policy(PROJECT, SUBSCRIPTION) - - policy = subscriber_client.get_iam_policy(subscription) - assert "roles/pubsub.viewer" in str(policy) - assert "allUsers" in str(policy) - - -def test_check_topic_permissions(topic, capsys): - iam.check_topic_permissions(PROJECT, TOPIC) - - out, _ = capsys.readouterr() - - assert topic in out - assert "pubsub.topics.publish" in out - - -def test_check_subscription_permissions(subscription, capsys): - iam.check_subscription_permissions(PROJECT, SUBSCRIPTION) - - out, _ = capsys.readouterr() - - assert subscription in out - assert "pubsub.subscriptions.consume" in out diff --git a/pubsub/cloud-client/publisher.py b/pubsub/cloud-client/publisher.py deleted file mode 100644 index 477b31b9cf71..000000000000 --- a/pubsub/cloud-client/publisher.py +++ /dev/null @@ -1,334 +0,0 @@ -#!/usr/bin/env python - -# Copyright 2016 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""This application demonstrates how to perform basic operations on topics -with the Cloud Pub/Sub API. - -For more information, see the README.md under /pubsub and the documentation -at https://cloud.google.com/pubsub/docs. -""" - -import argparse - - -def list_topics(project_id): - """Lists all Pub/Sub topics in the given project.""" - # [START pubsub_list_topics] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - - publisher = pubsub_v1.PublisherClient() - project_path = publisher.project_path(project_id) - - for topic in publisher.list_topics(project_path): - print(topic) - # [END pubsub_list_topics] - - -def create_topic(project_id, topic_id): - """Create a new Pub/Sub topic.""" - # [START pubsub_quickstart_create_topic] - # [START pubsub_create_topic] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # topic_id = "your-topic-id" - - publisher = pubsub_v1.PublisherClient() - topic_path = publisher.topic_path(project_id, topic_id) - - topic = publisher.create_topic(topic_path) - - print("Topic created: {}".format(topic)) - # [END pubsub_quickstart_create_topic] - # [END pubsub_create_topic] - - -def delete_topic(project_id, topic_id): - """Deletes an existing Pub/Sub topic.""" - # [START pubsub_delete_topic] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # topic_id = "your-topic-id" - - publisher = pubsub_v1.PublisherClient() - topic_path = publisher.topic_path(project_id, topic_id) - - publisher.delete_topic(topic_path) - - print("Topic deleted: {}".format(topic_path)) - # [END pubsub_delete_topic] - - -def publish_messages(project_id, topic_id): - """Publishes multiple messages to a Pub/Sub topic.""" - # [START pubsub_quickstart_publisher] - # [START pubsub_publish] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # topic_id = "your-topic-id" - - publisher = pubsub_v1.PublisherClient() - # The `topic_path` method creates a fully qualified identifier - # in the form `projects/{project_id}/topics/{topic_id}` - topic_path = publisher.topic_path(project_id, topic_id) - - for n in range(1, 10): - data = u"Message number {}".format(n) - # Data must be a bytestring - data = data.encode("utf-8") - # When you publish a message, the client returns a future. - future = publisher.publish(topic_path, data=data) - print(future.result()) - - print("Published messages.") - # [END pubsub_quickstart_publisher] - # [END pubsub_publish] - - -def publish_messages_with_custom_attributes(project_id, topic_id): - """Publishes multiple messages with custom attributes - to a Pub/Sub topic.""" - # [START pubsub_publish_custom_attributes] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # topic_id = "your-topic-id" - - publisher = pubsub_v1.PublisherClient() - topic_path = publisher.topic_path(project_id, topic_id) - - for n in range(1, 10): - data = u"Message number {}".format(n) - # Data must be a bytestring - data = data.encode("utf-8") - # Add two attributes, origin and username, to the message - future = publisher.publish( - topic_path, data, origin="python-sample", username="gcp" - ) - print(future.result()) - - print("Published messages with custom attributes.") - # [END pubsub_publish_custom_attributes] - - -def publish_messages_with_error_handler(project_id, topic_id): - # [START pubsub_publish_messages_error_handler] - """Publishes multiple messages to a Pub/Sub topic with an error handler.""" - import time - - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # topic_id = "your-topic-id" - - publisher = pubsub_v1.PublisherClient() - topic_path = publisher.topic_path(project_id, topic_id) - - futures = dict() - - def get_callback(f, data): - def callback(f): - try: - print(f.result()) - futures.pop(data) - except: # noqa - print("Please handle {} for {}.".format(f.exception(), data)) - - return callback - - for i in range(10): - data = str(i) - futures.update({data: None}) - # When you publish a message, the client returns a future. - future = publisher.publish( - topic_path, data=data.encode("utf-8") # data must be a bytestring. - ) - futures[data] = future - # Publish failures shall be handled in the callback function. - future.add_done_callback(get_callback(future, data)) - - # Wait for all the publish futures to resolve before exiting. - while futures: - time.sleep(5) - - print("Published message with error handler.") - # [END pubsub_publish_messages_error_handler] - - -def publish_messages_with_batch_settings(project_id, topic_id): - """Publishes multiple messages to a Pub/Sub topic with batch settings.""" - # [START pubsub_publisher_batch_settings] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # topic_id = "your-topic-id" - - # Configure the batch to publish as soon as there is ten messages, - # one kilobyte of data, or one second has passed. - batch_settings = pubsub_v1.types.BatchSettings( - max_messages=10, # default 100 - max_bytes=1024, # default 1 MB - max_latency=1, # default 10 ms - ) - publisher = pubsub_v1.PublisherClient(batch_settings) - topic_path = publisher.topic_path(project_id, topic_id) - - # Resolve the publish future in a separate thread. - def callback(future): - message_id = future.result() - print(message_id) - - for n in range(1, 10): - data = u"Message number {}".format(n) - # Data must be a bytestring - data = data.encode("utf-8") - future = publisher.publish(topic_path, data=data) - # Non-blocking. Allow the publisher client to batch multiple messages. - future.add_done_callback(callback) - - print("Published messages with batch settings.") - # [END pubsub_publisher_batch_settings] - - -def publish_messages_with_retry_settings(project_id, topic_id): - """Publishes messages with custom retry settings.""" - # [START pubsub_publisher_retry_settings] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # topic_id = "your-topic-id" - - # Configure the retry settings. Defaults will be overwritten. - retry_settings = { - "interfaces": { - "google.pubsub.v1.Publisher": { - "retry_codes": { - "publish": [ - "ABORTED", - "CANCELLED", - "DEADLINE_EXCEEDED", - "INTERNAL", - "RESOURCE_EXHAUSTED", - "UNAVAILABLE", - "UNKNOWN", - ] - }, - "retry_params": { - "messaging": { - "initial_retry_delay_millis": 100, # default: 100 - "retry_delay_multiplier": 1.3, # default: 1.3 - "max_retry_delay_millis": 60000, # default: 60000 - "initial_rpc_timeout_millis": 5000, # default: 25000 - "rpc_timeout_multiplier": 1.0, # default: 1.0 - "max_rpc_timeout_millis": 600000, # default: 30000 - "total_timeout_millis": 600000, # default: 600000 - } - }, - "methods": { - "Publish": { - "retry_codes_name": "publish", - "retry_params_name": "messaging", - } - }, - } - } - } - - publisher = pubsub_v1.PublisherClient(client_config=retry_settings) - topic_path = publisher.topic_path(project_id, topic_id) - - for n in range(1, 10): - data = u"Message number {}".format(n) - # Data must be a bytestring - data = data.encode("utf-8") - future = publisher.publish(topic_path, data=data) - print(future.result()) - - print("Published messages with retry settings.") - # [END pubsub_publisher_retry_settings] - - -if __name__ == "__main__": - parser = argparse.ArgumentParser( - description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, - ) - parser.add_argument("project_id", help="Your Google Cloud project ID") - - subparsers = parser.add_subparsers(dest="command") - subparsers.add_parser("list", help=list_topics.__doc__) - - create_parser = subparsers.add_parser("create", help=create_topic.__doc__) - create_parser.add_argument("topic_id") - - delete_parser = subparsers.add_parser("delete", help=delete_topic.__doc__) - delete_parser.add_argument("topic_id") - - publish_parser = subparsers.add_parser("publish", help=publish_messages.__doc__) - publish_parser.add_argument("topic_id") - - publish_with_custom_attributes_parser = subparsers.add_parser( - "publish-with-custom-attributes", - help=publish_messages_with_custom_attributes.__doc__, - ) - publish_with_custom_attributes_parser.add_argument("topic_id") - - publish_with_error_handler_parser = subparsers.add_parser( - "publish-with-error-handler", help=publish_messages_with_error_handler.__doc__, - ) - publish_with_error_handler_parser.add_argument("topic_id") - - publish_with_batch_settings_parser = subparsers.add_parser( - "publish-with-batch-settings", - help=publish_messages_with_batch_settings.__doc__, - ) - publish_with_batch_settings_parser.add_argument("topic_id") - - publish_with_retry_settings_parser = subparsers.add_parser( - "publish-with-retry-settings", - help=publish_messages_with_retry_settings.__doc__, - ) - publish_with_retry_settings_parser.add_argument("topic_id") - - args = parser.parse_args() - - if args.command == "list": - list_topics(args.project_id) - elif args.command == "create": - create_topic(args.project_id, args.topic_id) - elif args.command == "delete": - delete_topic(args.project_id, args.topic_id) - elif args.command == "publish": - publish_messages(args.project_id, args.topic_id) - elif args.command == "publish-with-custom-attributes": - publish_messages_with_custom_attributes(args.project_id, args.topic_id) - elif args.command == "publish-with-error-handler": - publish_messages_with_error_handler(args.project_id, args.topic_id) - elif args.command == "publish-with-batch-settings": - publish_messages_with_batch_settings(args.project_id, args.topic_id) - elif args.command == "publish-with-retry-settings": - publish_messages_with_retry_settings(args.project_id, args.topic_id) diff --git a/pubsub/cloud-client/publisher_test.py b/pubsub/cloud-client/publisher_test.py deleted file mode 100644 index b5c2ea1ea4b5..000000000000 --- a/pubsub/cloud-client/publisher_test.py +++ /dev/null @@ -1,146 +0,0 @@ -# Copyright 2016 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import time -import uuid - -import backoff -from google.cloud import pubsub_v1 -import mock -import pytest - -import publisher - -UUID = uuid.uuid4().hex -PROJECT = os.environ["GOOGLE_CLOUD_PROJECT"] -TOPIC_ADMIN = "publisher-test-topic-admin-" + UUID -TOPIC_PUBLISH = "publisher-test-topic-publish-" + UUID - - -@pytest.fixture -def client(): - yield pubsub_v1.PublisherClient() - - -@pytest.fixture -def topic_admin(client): - topic_path = client.topic_path(PROJECT, TOPIC_ADMIN) - - try: - topic = client.get_topic(topic_path) - except: # noqa - topic = client.create_topic(topic_path) - - yield topic.name - # Teardown of `topic_admin` is handled in `test_delete()`. - - -@pytest.fixture -def topic_publish(client): - topic_path = client.topic_path(PROJECT, TOPIC_PUBLISH) - - try: - topic = client.get_topic(topic_path) - except: # noqa - topic = client.create_topic(topic_path) - - yield topic.name - - client.delete_topic(topic.name) - - -def _make_sleep_patch(): - real_sleep = time.sleep - - def new_sleep(period): - if period == 60: - real_sleep(5) - raise RuntimeError("sigil") - else: - real_sleep(period) - - return mock.patch("time.sleep", new=new_sleep) - - -def test_list(client, topic_admin, capsys): - @backoff.on_exception(backoff.expo, AssertionError, max_time=60) - def eventually_consistent_test(): - publisher.list_topics(PROJECT) - out, _ = capsys.readouterr() - assert topic_admin in out - - eventually_consistent_test() - - -def test_create(client): - topic_path = client.topic_path(PROJECT, TOPIC_ADMIN) - try: - client.delete_topic(topic_path) - except Exception: - pass - - publisher.create_topic(PROJECT, TOPIC_ADMIN) - - @backoff.on_exception(backoff.expo, AssertionError, max_time=60) - def eventually_consistent_test(): - assert client.get_topic(topic_path) - - eventually_consistent_test() - - -def test_delete(client, topic_admin): - publisher.delete_topic(PROJECT, TOPIC_ADMIN) - - @backoff.on_exception(backoff.expo, AssertionError, max_time=60) - def eventually_consistent_test(): - with pytest.raises(Exception): - client.get_topic(client.topic_path(PROJECT, TOPIC_ADMIN)) - - eventually_consistent_test() - - -def test_publish(topic_publish, capsys): - publisher.publish_messages(PROJECT, TOPIC_PUBLISH) - - out, _ = capsys.readouterr() - assert "Published" in out - - -def test_publish_with_custom_attributes(topic_publish, capsys): - publisher.publish_messages_with_custom_attributes(PROJECT, TOPIC_PUBLISH) - - out, _ = capsys.readouterr() - assert "Published" in out - - -def test_publish_with_batch_settings(topic_publish, capsys): - publisher.publish_messages_with_batch_settings(PROJECT, TOPIC_PUBLISH) - - out, _ = capsys.readouterr() - assert "Published" in out - - -def test_publish_with_retry_settings(topic_publish, capsys): - publisher.publish_messages_with_retry_settings(PROJECT, TOPIC_PUBLISH) - - out, _ = capsys.readouterr() - assert "Published" in out - - -def test_publish_with_error_handler(topic_publish, capsys): - publisher.publish_messages_with_error_handler(PROJECT, TOPIC_PUBLISH) - - out, _ = capsys.readouterr() - assert "Published" in out diff --git a/pubsub/cloud-client/quickstart/pub.py b/pubsub/cloud-client/quickstart/pub.py deleted file mode 100644 index 16432c0c3627..000000000000 --- a/pubsub/cloud-client/quickstart/pub.py +++ /dev/null @@ -1,86 +0,0 @@ -#!/usr/bin/env python - -# Copyright 2019 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# [START pubsub_quickstart_pub_all] -import argparse -import time - -# [START pubsub_quickstart_pub_deps] -from google.cloud import pubsub_v1 - -# [END pubsub_quickstart_pub_deps] - - -def get_callback(api_future, data, ref): - """Wrap message data in the context of the callback function.""" - - def callback(api_future): - try: - print( - "Published message {} now has message ID {}".format( - data, api_future.result() - ) - ) - ref["num_messages"] += 1 - except Exception: - print( - "A problem occurred when publishing {}: {}\n".format( - data, api_future.exception() - ) - ) - raise - - return callback - - -def pub(project_id, topic_id): - """Publishes a message to a Pub/Sub topic.""" - # [START pubsub_quickstart_pub_client] - # Initialize a Publisher client. - client = pubsub_v1.PublisherClient() - # [END pubsub_quickstart_pub_client] - # Create a fully qualified identifier in the form of - # `projects/{project_id}/topics/{topic_id}` - topic_path = client.topic_path(project_id, topic_id) - - # Data sent to Cloud Pub/Sub must be a bytestring. - data = b"Hello, World!" - - # Keep track of the number of published messages. - ref = dict({"num_messages": 0}) - - # When you publish a message, the client returns a future. - api_future = client.publish(topic_path, data=data) - api_future.add_done_callback(get_callback(api_future, data, ref)) - - # Keep the main thread from exiting while the message future - # gets resolved in the background. - while api_future.running(): - time.sleep(0.5) - print("Published {} message(s).".format(ref["num_messages"])) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser( - description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, - ) - parser.add_argument("project_id", help="Google Cloud project ID") - parser.add_argument("topic_id", help="Pub/Sub topic ID") - - args = parser.parse_args() - - pub(args.project_id, args.topic_id) -# [END pubsub_quickstart_pub_all] diff --git a/pubsub/cloud-client/quickstart/pub_test.py b/pubsub/cloud-client/quickstart/pub_test.py deleted file mode 100644 index 6f5cc06c4456..000000000000 --- a/pubsub/cloud-client/quickstart/pub_test.py +++ /dev/null @@ -1,56 +0,0 @@ -#!/usr/bin/env python - -# Copyright 2019 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import uuid - -from google.api_core.exceptions import AlreadyExists -from google.cloud import pubsub_v1 -import pytest - -import pub # noqa - - -UUID = uuid.uuid4().hex -PROJECT = os.environ["GOOGLE_CLOUD_PROJECT"] -TOPIC = "quickstart-pub-test-topic-" + UUID - - -@pytest.fixture(scope="module") -def publisher_client(): - yield pubsub_v1.PublisherClient() - - -@pytest.fixture(scope="module") -def topic(publisher_client): - topic_path = publisher_client.topic_path(PROJECT, TOPIC) - - try: - publisher_client.create_topic(topic_path) - except AlreadyExists: - pass - - yield TOPIC - - publisher_client.delete_topic(topic_path) - - -def test_pub(publisher_client, topic, capsys): - pub.pub(PROJECT, topic) - - out, _ = capsys.readouterr() - - assert "Hello, World!" in out diff --git a/pubsub/cloud-client/quickstart/sub.py b/pubsub/cloud-client/quickstart/sub.py deleted file mode 100644 index efe00891593e..000000000000 --- a/pubsub/cloud-client/quickstart/sub.py +++ /dev/null @@ -1,69 +0,0 @@ -#!/usr/bin/env python - -# Copyright 2019 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# [START pubsub_quickstart_sub_all] -import argparse - -# [START pubsub_quickstart_sub_deps] -from google.cloud import pubsub_v1 - -# [END pubsub_quickstart_sub_deps] - - -def sub(project_id, subscription_id): - """Receives messages from a Pub/Sub subscription.""" - # [START pubsub_quickstart_sub_client] - # Initialize a Subscriber client - subscriber_client = pubsub_v1.SubscriberClient() - # [END pubsub_quickstart_sub_client] - # Create a fully qualified identifier in the form of - # `projects/{project_id}/subscriptions/{subscription_id}` - subscription_path = subscriber_client.subscription_path(project_id, subscription_id) - - def callback(message): - print( - "Received message {} of message ID {}\n".format(message, message.message_id) - ) - # Acknowledge the message. Unack'ed messages will be redelivered. - message.ack() - print("Acknowledged message {}\n".format(message.message_id)) - - streaming_pull_future = subscriber_client.subscribe( - subscription_path, callback=callback - ) - print("Listening for messages on {}..\n".format(subscription_path)) - - try: - # Calling result() on StreamingPullFuture keeps the main thread from - # exiting while messages get processed in the callbacks. - streaming_pull_future.result() - except: # noqa - streaming_pull_future.cancel() - - subscriber_client.close() - - -if __name__ == "__main__": - parser = argparse.ArgumentParser( - description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, - ) - parser.add_argument("project_id", help="Google Cloud project ID") - parser.add_argument("subscription_id", help="Pub/Sub subscription ID") - - args = parser.parse_args() - - sub(args.project_id, args.subscription_id) -# [END pubsub_quickstart_sub_all] diff --git a/pubsub/cloud-client/quickstart/sub_test.py b/pubsub/cloud-client/quickstart/sub_test.py deleted file mode 100644 index 38047422a935..000000000000 --- a/pubsub/cloud-client/quickstart/sub_test.py +++ /dev/null @@ -1,102 +0,0 @@ -#!/usr/bin/env python - -# Copyright 2019 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import os -import uuid - -from google.api_core.exceptions import AlreadyExists -from google.cloud import pubsub_v1 -import mock -import pytest - -import sub # noqa - - -UUID = uuid.uuid4().hex -PROJECT = os.environ["GOOGLE_CLOUD_PROJECT"] -TOPIC = "quickstart-sub-test-topic-" + UUID -SUBSCRIPTION = "quickstart-sub-test-topic-sub-" + UUID - -publisher_client = pubsub_v1.PublisherClient() -subscriber_client = pubsub_v1.SubscriberClient() - - -@pytest.fixture(scope="module") -def topic_path(): - topic_path = publisher_client.topic_path(PROJECT, TOPIC) - - try: - topic = publisher_client.create_topic(topic_path) - yield topic.name - except AlreadyExists: - yield topic_path - - publisher_client.delete_topic(topic_path) - - -@pytest.fixture(scope="module") -def subscription_path(topic_path): - subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION) - - try: - subscription = subscriber_client.create_subscription( - subscription_path, topic_path - ) - yield subscription.name - except AlreadyExists: - yield subscription_path - - subscriber_client.delete_subscription(subscription_path) - subscriber_client.close() - - -def _publish_messages(topic_path): - publish_future = publisher_client.publish(topic_path, data=b"Hello World!") - publish_future.result() - - -def test_sub(monkeypatch, topic_path, subscription_path, capsys): - - real_client = pubsub_v1.SubscriberClient() - mock_client = mock.Mock(spec=pubsub_v1.SubscriberClient, wraps=real_client) - - # Attributes on mock_client_constructor uses the corresponding - # attributes on pubsub_v1.SubscriberClient. - mock_client_constructor = mock.create_autospec(pubsub_v1.SubscriberClient) - mock_client_constructor.return_value = mock_client - - monkeypatch.setattr(pubsub_v1, "SubscriberClient", mock_client_constructor) - - def mock_subscribe(subscription_path, callback=None): - real_future = real_client.subscribe(subscription_path, callback=callback) - mock_future = mock.Mock(spec=real_future, wraps=real_future) - - def mock_result(): - return real_future.result(timeout=10) - - mock_future.result.side_effect = mock_result - return mock_future - - mock_client.subscribe.side_effect = mock_subscribe - - _publish_messages(topic_path) - - sub.sub(PROJECT, SUBSCRIPTION) - - out, _ = capsys.readouterr() - assert "Received message" in out - assert "Acknowledged message" in out - - real_client.close() diff --git a/pubsub/cloud-client/requirements-test.txt b/pubsub/cloud-client/requirements-test.txt deleted file mode 100644 index adf26b9f98bb..000000000000 --- a/pubsub/cloud-client/requirements-test.txt +++ /dev/null @@ -1,3 +0,0 @@ -backoff==1.10.0 -pytest==5.3.2 -mock==3.0.5 diff --git a/pubsub/cloud-client/requirements.txt b/pubsub/cloud-client/requirements.txt deleted file mode 100644 index 9b496510abb5..000000000000 --- a/pubsub/cloud-client/requirements.txt +++ /dev/null @@ -1 +0,0 @@ -google-cloud-pubsub==1.6.0 diff --git a/pubsub/cloud-client/subscriber.py b/pubsub/cloud-client/subscriber.py deleted file mode 100644 index f079e7d423f8..000000000000 --- a/pubsub/cloud-client/subscriber.py +++ /dev/null @@ -1,783 +0,0 @@ -#!/usr/bin/env python - -# Copyright 2016 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""This application demonstrates how to perform basic operations on -subscriptions with the Cloud Pub/Sub API. - -For more information, see the README.md under /pubsub and the documentation -at https://cloud.google.com/pubsub/docs. -""" - -import argparse - - -def list_subscriptions_in_topic(project_id, topic_id): - """Lists all subscriptions for a given topic.""" - # [START pubsub_list_topic_subscriptions] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # topic_id = "your-topic-id" - - publisher = pubsub_v1.PublisherClient() - topic_path = publisher.topic_path(project_id, topic_id) - - for subscription in publisher.list_topic_subscriptions(topic_path): - print(subscription) - # [END pubsub_list_topic_subscriptions] - - -def list_subscriptions_in_project(project_id): - """Lists all subscriptions in the current project.""" - # [START pubsub_list_subscriptions] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - - subscriber = pubsub_v1.SubscriberClient() - project_path = subscriber.project_path(project_id) - - # Wrap the subscriber in a 'with' block to automatically call close() to - # close the underlying gRPC channel when done. - with subscriber: - for subscription in subscriber.list_subscriptions(project_path): - print(subscription.name) - # [END pubsub_list_subscriptions] - - -def create_subscription(project_id, topic_id, subscription_id): - """Create a new pull subscription on the given topic.""" - # [START pubsub_create_pull_subscription] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # topic_id = "your-topic-id" - # subscription_id = "your-subscription-id" - - subscriber = pubsub_v1.SubscriberClient() - topic_path = subscriber.topic_path(project_id, topic_id) - subscription_path = subscriber.subscription_path(project_id, subscription_id) - - # Wrap the subscriber in a 'with' block to automatically call close() to - # close the underlying gRPC channel when done. - with subscriber: - subscription = subscriber.create_subscription(subscription_path, topic_path) - - print("Subscription created: {}".format(subscription)) - # [END pubsub_create_pull_subscription] - - -def create_subscription_with_dead_letter_topic( - project_id, topic_id, subscription_id, dead_letter_topic_id -): - """Create a subscription with dead letter policy.""" - # [START pubsub_dead_letter_create_subscription] - from google.cloud import pubsub_v1 - from google.cloud.pubsub_v1.types import DeadLetterPolicy - - # TODO(developer) - # project_id = "your-project-id" - # endpoint = "https://my-test-project.appspot.com/push" - # TODO(developer): This is an existing topic that the subscription - # with dead letter policy is attached to. - # topic_id = "your-topic-id" - # TODO(developer): This is an existing subscription with a dead letter policy. - # subscription_id = "your-subscription-id" - # TODO(developer): This is an existing dead letter topic that the subscription - # with dead letter policy will forward dead letter messages to. - # dead_letter_topic_id = "your-dead-letter-topic-id" - - subscriber = pubsub_v1.SubscriberClient() - topic_path = subscriber.topic_path(project_id, topic_id) - subscription_path = subscriber.subscription_path(project_id, subscription_id) - dead_letter_topic_path = subscriber.topic_path(project_id, dead_letter_topic_id) - - dead_letter_policy = DeadLetterPolicy( - dead_letter_topic=dead_letter_topic_path, max_delivery_attempts=10 - ) - - with subscriber: - subscription = subscriber.create_subscription( - subscription_path, topic_path, dead_letter_policy=dead_letter_policy - ) - - print("Subscription created: {}".format(subscription.name)) - print( - "It will forward dead letter messages to: {}".format( - subscription.dead_letter_policy.dead_letter_topic - ) - ) - print( - "After {} delivery attempts.".format( - subscription.dead_letter_policy.max_delivery_attempts - ) - ) - # [END pubsub_dead_letter_create_subscription] - - -def create_push_subscription(project_id, topic_id, subscription_id, endpoint): - """Create a new push subscription on the given topic.""" - # [START pubsub_create_push_subscription] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # topic_id = "your-topic-id" - # subscription_id = "your-subscription-id" - # endpoint = "https://my-test-project.appspot.com/push" - - subscriber = pubsub_v1.SubscriberClient() - topic_path = subscriber.topic_path(project_id, topic_id) - subscription_path = subscriber.subscription_path(project_id, subscription_id) - - push_config = pubsub_v1.types.PushConfig(push_endpoint=endpoint) - - # Wrap the subscriber in a 'with' block to automatically call close() to - # close the underlying gRPC channel when done. - with subscriber: - subscription = subscriber.create_subscription( - subscription_path, topic_path, push_config - ) - - print("Push subscription created: {}".format(subscription)) - print("Endpoint for subscription is: {}".format(endpoint)) - # [END pubsub_create_push_subscription] - - -def delete_subscription(project_id, subscription_id): - """Deletes an existing Pub/Sub topic.""" - # [START pubsub_delete_subscription] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # subscription_id = "your-subscription-id" - - subscriber = pubsub_v1.SubscriberClient() - subscription_path = subscriber.subscription_path(project_id, subscription_id) - - # Wrap the subscriber in a 'with' block to automatically call close() to - # close the underlying gRPC channel when done. - with subscriber: - subscriber.delete_subscription(subscription_path) - - print("Subscription deleted: {}".format(subscription_path)) - # [END pubsub_delete_subscription] - - -def update_push_subscription(project_id, topic_id, subscription_id, endpoint): - """ - Updates an existing Pub/Sub subscription's push endpoint URL. - Note that certain properties of a subscription, such as - its topic, are not modifiable. - """ - # [START pubsub_update_push_configuration] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # topic_id = "your-topic-id" - # subscription_id = "your-subscription-id" - # endpoint = "https://my-test-project.appspot.com/push" - - subscriber = pubsub_v1.SubscriberClient() - subscription_path = subscriber.subscription_path(project_id, subscription_id) - - push_config = pubsub_v1.types.PushConfig(push_endpoint=endpoint) - - subscription = pubsub_v1.types.Subscription( - name=subscription_path, topic=topic_id, push_config=push_config - ) - - update_mask = {"paths": {"push_config"}} - - # Wrap the subscriber in a 'with' block to automatically call close() to - # close the underlying gRPC channel when done. - with subscriber: - result = subscriber.update_subscription(subscription, update_mask) - - print("Subscription updated: {}".format(subscription_path)) - print("New endpoint for subscription is: {}".format(result.push_config)) - # [END pubsub_update_push_configuration] - - -def update_subscription_with_dead_letter_policy( - project_id, topic_id, subscription_id, dead_letter_topic_id -): - """Update a subscription's dead letter policy.""" - # [START pubsub_dead_letter_update_subscription] - from google.cloud import pubsub_v1 - from google.cloud.pubsub_v1.types import DeadLetterPolicy, FieldMask - - # TODO(developer) - # project_id = "your-project-id" - # TODO(developer): This is an existing topic that the subscription - # with dead letter policy is attached to. - # topic_id = "your-topic-id" - # TODO(developer): This is an existing subscription with a dead letter policy. - # subscription_id = "your-subscription-id" - # TODO(developer): This is an existing dead letter topic that the subscription - # with dead letter policy will forward dead letter messages to. - # dead_letter_topic_id = "your-dead-letter-topic-id" - - subscriber = pubsub_v1.SubscriberClient() - topic_path = subscriber.topic_path(project_id, topic_id) - subscription_path = subscriber.subscription_path(project_id, subscription_id) - dead_letter_topic_path = subscriber.topic_path(project_id, dead_letter_topic_id) - - subscription_before_update = subscriber.get_subscription(subscription_path) - print("Before the update: {}".format(subscription_before_update)) - - # Indicates which fields in the provided subscription to update. - update_mask = FieldMask(paths=["dead_letter_policy.max_delivery_attempts"]) - - # Construct a dead letter policy you expect to have after the update. - dead_letter_policy = DeadLetterPolicy( - dead_letter_topic=dead_letter_topic_path, max_delivery_attempts=20 - ) - - # Construct the subscription with the dead letter policy you expect to have - # after the update. Here, values in the required fields (name, topic) help - # identify the subscription. - subscription = pubsub_v1.types.Subscription( - name=subscription_path, topic=topic_path, dead_letter_policy=dead_letter_policy, - ) - - with subscriber: - subscription_after_update = subscriber.update_subscription( - subscription, update_mask - ) - - print("After the update: {}".format(subscription_after_update)) - # [END pubsub_dead_letter_update_subscription] - return subscription_after_update - - -def remove_dead_letter_policy(project_id, topic_id, subscription_id): - """Remove dead letter policy from a subscription.""" - # [START pubsub_dead_letter_remove] - from google.cloud import pubsub_v1 - from google.cloud.pubsub_v1.types import FieldMask - - # TODO(developer) - # project_id = "your-project-id" - # TODO(developer): This is an existing topic that the subscription - # with dead letter policy is attached to. - # topic_id = "your-topic-id" - # TODO(developer): This is an existing subscription with a dead letter policy. - # subscription_id = "your-subscription-id" - - subscriber = pubsub_v1.SubscriberClient() - topic_path = subscriber.topic_path(project_id, topic_id) - subscription_path = subscriber.subscription_path(project_id, subscription_id) - - subscription_before_update = subscriber.get_subscription(subscription_path) - print("Before removing the policy: {}".format(subscription_before_update)) - - # Indicates which fields in the provided subscription to update. - update_mask = FieldMask( - paths=[ - "dead_letter_policy.dead_letter_topic", - "dead_letter_policy.max_delivery_attempts", - ] - ) - - # Construct the subscription (without any dead letter policy) that you - # expect to have after the update. - subscription = pubsub_v1.types.Subscription( - name=subscription_path, topic=topic_path - ) - - with subscriber: - subscription_after_update = subscriber.update_subscription( - subscription, update_mask - ) - - print("After removing the policy: {}".format(subscription_after_update)) - # [END pubsub_dead_letter_remove] - return subscription_after_update - - -def receive_messages(project_id, subscription_id, timeout=None): - """Receives messages from a pull subscription.""" - # [START pubsub_subscriber_async_pull] - # [START pubsub_quickstart_subscriber] - from concurrent.futures import TimeoutError - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # subscription_id = "your-subscription-id" - # Number of seconds the subscriber should listen for messages - # timeout = 5.0 - - subscriber = pubsub_v1.SubscriberClient() - # The `subscription_path` method creates a fully qualified identifier - # in the form `projects/{project_id}/subscriptions/{subscription_id}` - subscription_path = subscriber.subscription_path(project_id, subscription_id) - - def callback(message): - print("Received message: {}".format(message)) - message.ack() - - streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) - print("Listening for messages on {}..\n".format(subscription_path)) - - # Wrap subscriber in a 'with' block to automatically call close() when done. - with subscriber: - try: - # When `timeout` is not set, result() will block indefinitely, - # unless an exception is encountered first. - streaming_pull_future.result(timeout=timeout) - except TimeoutError: - streaming_pull_future.cancel() - # [END pubsub_subscriber_async_pull] - # [END pubsub_quickstart_subscriber] - - -def receive_messages_with_custom_attributes(project_id, subscription_id, timeout=None): - """Receives messages from a pull subscription.""" - # [START pubsub_subscriber_async_pull_custom_attributes] - from concurrent.futures import TimeoutError - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # subscription_id = "your-subscription-id" - # Number of seconds the subscriber should listen for messages - # timeout = 5.0 - - subscriber = pubsub_v1.SubscriberClient() - subscription_path = subscriber.subscription_path(project_id, subscription_id) - - def callback(message): - print("Received message: {}".format(message.data)) - if message.attributes: - print("Attributes:") - for key in message.attributes: - value = message.attributes.get(key) - print("{}: {}".format(key, value)) - message.ack() - - streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) - print("Listening for messages on {}..\n".format(subscription_path)) - - # Wrap subscriber in a 'with' block to automatically call close() when done. - with subscriber: - try: - # When `timeout` is not set, result() will block indefinitely, - # unless an exception is encountered first. - streaming_pull_future.result(timeout=timeout) - except TimeoutError: - streaming_pull_future.cancel() - # [END pubsub_subscriber_async_pull_custom_attributes] - - -def receive_messages_with_flow_control(project_id, subscription_id, timeout=None): - """Receives messages from a pull subscription with flow control.""" - # [START pubsub_subscriber_flow_settings] - from concurrent.futures import TimeoutError - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # subscription_id = "your-subscription-id" - # Number of seconds the subscriber should listen for messages - # timeout = 5.0 - - subscriber = pubsub_v1.SubscriberClient() - subscription_path = subscriber.subscription_path(project_id, subscription_id) - - def callback(message): - print("Received message: {}".format(message.data)) - message.ack() - - # Limit the subscriber to only have ten outstanding messages at a time. - flow_control = pubsub_v1.types.FlowControl(max_messages=10) - - streaming_pull_future = subscriber.subscribe( - subscription_path, callback=callback, flow_control=flow_control - ) - print("Listening for messages on {}..\n".format(subscription_path)) - - # Wrap subscriber in a 'with' block to automatically call close() when done. - with subscriber: - try: - # When `timeout` is not set, result() will block indefinitely, - # unless an exception is encountered first. - streaming_pull_future.result(timeout=timeout) - except TimeoutError: - streaming_pull_future.cancel() - # [END pubsub_subscriber_flow_settings] - - -def synchronous_pull(project_id, subscription_id): - """Pulling messages synchronously.""" - # [START pubsub_subscriber_sync_pull] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # subscription_id = "your-subscription-id" - - subscriber = pubsub_v1.SubscriberClient() - subscription_path = subscriber.subscription_path(project_id, subscription_id) - - NUM_MESSAGES = 3 - - # Wrap the subscriber in a 'with' block to automatically call close() to - # close the underlying gRPC channel when done. - with subscriber: - # The subscriber pulls a specific number of messages. - response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES) - - ack_ids = [] - for received_message in response.received_messages: - print("Received: {}".format(received_message.message.data)) - ack_ids.append(received_message.ack_id) - - # Acknowledges the received messages so they will not be sent again. - subscriber.acknowledge(subscription_path, ack_ids) - - print( - "Received and acknowledged {} messages. Done.".format( - len(response.received_messages) - ) - ) - # [END pubsub_subscriber_sync_pull] - - -def synchronous_pull_with_lease_management(project_id, subscription_id): - """Pulling messages synchronously with lease management""" - # [START pubsub_subscriber_sync_pull_with_lease] - import logging - import multiprocessing - import random - import time - - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # subscription_id = "your-subscription-id" - - subscriber = pubsub_v1.SubscriberClient() - subscription_path = subscriber.subscription_path(project_id, subscription_id) - - NUM_MESSAGES = 2 - ACK_DEADLINE = 30 - SLEEP_TIME = 10 - - # The subscriber pulls a specific number of messages. - response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES) - - multiprocessing.log_to_stderr() - logger = multiprocessing.get_logger() - logger.setLevel(logging.INFO) - - def worker(msg): - """Simulates a long-running process.""" - RUN_TIME = random.randint(1, 60) - logger.info( - "{}: Running {} for {}s".format( - time.strftime("%X", time.gmtime()), msg.message.data, RUN_TIME - ) - ) - time.sleep(RUN_TIME) - - # `processes` stores process as key and ack id and message as values. - processes = dict() - for message in response.received_messages: - process = multiprocessing.Process(target=worker, args=(message,)) - processes[process] = (message.ack_id, message.message.data) - process.start() - - while processes: - for process in list(processes): - ack_id, msg_data = processes[process] - # If the process is still running, reset the ack deadline as - # specified by ACK_DEADLINE once every while as specified - # by SLEEP_TIME. - if process.is_alive(): - # `ack_deadline_seconds` must be between 10 to 600. - subscriber.modify_ack_deadline( - subscription_path, [ack_id], ack_deadline_seconds=ACK_DEADLINE, - ) - logger.info( - "{}: Reset ack deadline for {} for {}s".format( - time.strftime("%X", time.gmtime()), msg_data, ACK_DEADLINE, - ) - ) - - # If the processs is finished, acknowledges using `ack_id`. - else: - subscriber.acknowledge(subscription_path, [ack_id]) - logger.info( - "{}: Acknowledged {}".format( - time.strftime("%X", time.gmtime()), msg_data - ) - ) - processes.pop(process) - - # If there are still processes running, sleeps the thread. - if processes: - time.sleep(SLEEP_TIME) - - print( - "Received and acknowledged {} messages. Done.".format( - len(response.received_messages) - ) - ) - - # Close the underlying gPRC channel. Alternatively, wrap subscriber in - # a 'with' block to automatically call close() when done. - subscriber.close() - # [END pubsub_subscriber_sync_pull_with_lease] - - -def listen_for_errors(project_id, subscription_id, timeout=None): - """Receives messages and catches errors from a pull subscription.""" - # [START pubsub_subscriber_error_listener] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # subscription_id = "your-subscription-id" - # Number of seconds the subscriber should listen for messages - # timeout = 5.0 - - subscriber = pubsub_v1.SubscriberClient() - subscription_path = subscriber.subscription_path(project_id, subscription_id) - - def callback(message): - print("Received message: {}".format(message)) - message.ack() - - streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) - print("Listening for messages on {}..\n".format(subscription_path)) - - # Wrap subscriber in a 'with' block to automatically call close() when done. - with subscriber: - # When `timeout` is not set, result() will block indefinitely, - # unless an exception is encountered first. - try: - streaming_pull_future.result(timeout=timeout) - except Exception as e: - streaming_pull_future.cancel() - print( - "Listening for messages on {} threw an exception: {}.".format( - subscription_id, e - ) - ) - # [END pubsub_subscriber_error_listener] - - -def receive_messages_with_delivery_attempts(project_id, subscription_id, timeout=None): - # [START pubsub_dead_letter_delivery_attempt] - from concurrent.futures import TimeoutError - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # subscription_id = "your-subscription-id" - - subscriber = pubsub_v1.SubscriberClient() - subscription_path = subscriber.subscription_path(project_id, subscription_id) - - def callback(message): - print("Received message: {}".format(message)) - print("With delivery attempts: {}".format(message.delivery_attempt)) - message.ack() - - streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) - print("Listening for messages on {}..\n".format(subscription_path)) - - # Wrap subscriber in a 'with' block to automatically call close() when done. - with subscriber: - # When `timeout` is not set, result() will block indefinitely, - # unless an exception is encountered first. - try: - streaming_pull_future.result(timeout=timeout) - except TimeoutError: - streaming_pull_future.cancel() - # [END pubsub_dead_letter_delivery_attempt] - - -if __name__ == "__main__": - parser = argparse.ArgumentParser( - description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, - ) - parser.add_argument("project_id", help="Your Google Cloud project ID") - - subparsers = parser.add_subparsers(dest="command") - list_in_topic_parser = subparsers.add_parser( - "list-in-topic", help=list_subscriptions_in_topic.__doc__ - ) - list_in_topic_parser.add_argument("topic_id") - - list_in_project_parser = subparsers.add_parser( - "list-in-project", help=list_subscriptions_in_project.__doc__ - ) - - create_parser = subparsers.add_parser("create", help=create_subscription.__doc__) - create_parser.add_argument("topic_id") - create_parser.add_argument("subscription_id") - - create_with_dead_letter_policy_parser = subparsers.add_parser( - "create-with-dead-letter-policy", - help=create_subscription_with_dead_letter_topic.__doc__, - ) - create_with_dead_letter_policy_parser.add_argument("topic_id") - create_with_dead_letter_policy_parser.add_argument("subscription_id") - create_with_dead_letter_policy_parser.add_argument("dead_letter_topic_id") - - create_push_parser = subparsers.add_parser( - "create-push", help=create_push_subscription.__doc__ - ) - create_push_parser.add_argument("topic_id") - create_push_parser.add_argument("subscription_id") - create_push_parser.add_argument("endpoint") - - delete_parser = subparsers.add_parser("delete", help=delete_subscription.__doc__) - delete_parser.add_argument("subscription_id") - - update_push_parser = subparsers.add_parser( - "update-push", help=update_push_subscription.__doc__ - ) - update_push_parser.add_argument("topic_id") - update_push_parser.add_argument("subscription_id") - update_push_parser.add_argument("endpoint") - - update_dead_letter_policy_parser = subparsers.add_parser( - "update-dead-letter-policy", - help=update_subscription_with_dead_letter_policy.__doc__, - ) - update_dead_letter_policy_parser.add_argument("topic_id") - update_dead_letter_policy_parser.add_argument("subscription_id") - update_dead_letter_policy_parser.add_argument("dead_letter_topic_id") - - remove_dead_letter_policy_parser = subparsers.add_parser( - "remove-dead-letter-policy", help=remove_dead_letter_policy.__doc__ - ) - remove_dead_letter_policy_parser.add_argument("topic_id") - remove_dead_letter_policy_parser.add_argument("subscription_id") - - receive_parser = subparsers.add_parser("receive", help=receive_messages.__doc__) - receive_parser.add_argument("subscription_id") - receive_parser.add_argument("timeout", default=None, type=float, nargs="?") - - receive_with_custom_attributes_parser = subparsers.add_parser( - "receive-custom-attributes", - help=receive_messages_with_custom_attributes.__doc__, - ) - receive_with_custom_attributes_parser.add_argument("subscription_id") - receive_with_custom_attributes_parser.add_argument( - "timeout", default=None, type=float, nargs="?" - ) - - receive_with_flow_control_parser = subparsers.add_parser( - "receive-flow-control", help=receive_messages_with_flow_control.__doc__ - ) - receive_with_flow_control_parser.add_argument("subscription_id") - receive_with_flow_control_parser.add_argument( - "timeout", default=None, type=float, nargs="?" - ) - - synchronous_pull_parser = subparsers.add_parser( - "receive-synchronously", help=synchronous_pull.__doc__ - ) - synchronous_pull_parser.add_argument("subscription_id") - - synchronous_pull_with_lease_management_parser = subparsers.add_parser( - "receive-synchronously-with-lease", - help=synchronous_pull_with_lease_management.__doc__, - ) - synchronous_pull_with_lease_management_parser.add_argument("subscription_id") - - listen_for_errors_parser = subparsers.add_parser( - "listen-for-errors", help=listen_for_errors.__doc__ - ) - listen_for_errors_parser.add_argument("subscription_id") - listen_for_errors_parser.add_argument( - "timeout", default=None, type=float, nargs="?" - ) - - receive_messages_with_delivery_attempts_parser = subparsers.add_parser( - "receive-messages-with-delivery-attempts", - help=receive_messages_with_delivery_attempts.__doc__, - ) - receive_messages_with_delivery_attempts_parser.add_argument("subscription_id") - receive_messages_with_delivery_attempts_parser.add_argument( - "timeout", default=None, type=float, nargs="?" - ) - - args = parser.parse_args() - - if args.command == "list-in-topic": - list_subscriptions_in_topic(args.project_id, args.topic_id) - elif args.command == "list-in-project": - list_subscriptions_in_project(args.project_id) - elif args.command == "create": - create_subscription(args.project_id, args.topic_id, args.subscription_id) - elif args.command == "create-with-dead-letter-policy": - create_subscription_with_dead_letter_topic( - args.project_id, - args.topic_id, - args.subscription_id, - args.dead_letter_topic_id, - ) - elif args.command == "create-push": - create_push_subscription( - args.project_id, args.topic_id, args.subscription_id, args.endpoint, - ) - elif args.command == "delete": - delete_subscription(args.project_id, args.subscription_id) - elif args.command == "update-push": - update_push_subscription( - args.project_id, args.topic_id, args.subscription_id, args.endpoint, - ) - elif args.command == "update-dead-letter-policy": - update_subscription_with_dead_letter_policy( - args.project_id, - args.topic_id, - args.subscription_id, - args.dead_letter_topic_id, - ) - elif args.command == "remove-dead-letter-policy": - remove_dead_letter_policy(args.project_id, args.topic_id, args.subscription_id) - elif args.command == "receive": - receive_messages(args.project_id, args.subscription_id, args.timeout) - elif args.command == "receive-custom-attributes": - receive_messages_with_custom_attributes( - args.project_id, args.subscription_id, args.timeout - ) - elif args.command == "receive-flow-control": - receive_messages_with_flow_control( - args.project_id, args.subscription_id, args.timeout - ) - elif args.command == "receive-synchronously": - synchronous_pull(args.project_id, args.subscription_id) - elif args.command == "receive-synchronously-with-lease": - synchronous_pull_with_lease_management(args.project_id, args.subscription_id) - elif args.command == "listen-for-errors": - listen_for_errors(args.project_id, args.subscription_id, args.timeout) - elif args.command == "receive-messages-with-delivery-attempts": - receive_messages_with_delivery_attempts( - args.project_id, args.subscription_id, args.timeout - ) diff --git a/pubsub/cloud-client/subscriber_test.py b/pubsub/cloud-client/subscriber_test.py deleted file mode 100644 index a7f7c139c258..000000000000 --- a/pubsub/cloud-client/subscriber_test.py +++ /dev/null @@ -1,341 +0,0 @@ -# Copyright 2016 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import uuid - -import backoff -from google.cloud import pubsub_v1 -import pytest - -import subscriber - -UUID = uuid.uuid4().hex -PROJECT = os.environ["GOOGLE_CLOUD_PROJECT"] -TOPIC = "subscription-test-topic-" + UUID -DEAD_LETTER_TOPIC = "subscription-test-dead-letter-topic-" + UUID -SUBSCRIPTION_ADMIN = "subscription-test-subscription-admin-" + UUID -SUBSCRIPTION_ASYNC = "subscription-test-subscription-async-" + UUID -SUBSCRIPTION_SYNC = "subscription-test-subscription-sync-" + UUID -SUBSCRIPTION_DLQ = "subscription-test-subscription-dlq-" + UUID -ENDPOINT = "https://{}.appspot.com/push".format(PROJECT) -NEW_ENDPOINT = "https://{}.appspot.com/push2".format(PROJECT) - - -@pytest.fixture(scope="module") -def publisher_client(): - yield pubsub_v1.PublisherClient() - - -@pytest.fixture(scope="module") -def topic(publisher_client): - topic_path = publisher_client.topic_path(PROJECT, TOPIC) - - try: - topic = publisher_client.get_topic(topic_path) - except: # noqa - topic = publisher_client.create_topic(topic_path) - - yield topic.name - - publisher_client.delete_topic(topic.name) - - -@pytest.fixture(scope="module") -def dead_letter_topic(publisher_client): - topic_path = publisher_client.topic_path(PROJECT, DEAD_LETTER_TOPIC) - - try: - dead_letter_topic = publisher_client.get_topic(topic_path) - except: # noqa - dead_letter_topic = publisher_client.create_topic(topic_path) - - yield dead_letter_topic.name - - publisher_client.delete_topic(dead_letter_topic.name) - - -@pytest.fixture(scope="module") -def subscriber_client(): - subscriber_client = pubsub_v1.SubscriberClient() - yield subscriber_client - subscriber_client.close() - - -@pytest.fixture(scope="module") -def subscription_admin(subscriber_client, topic): - subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_ADMIN) - - try: - subscription = subscriber_client.get_subscription(subscription_path) - except: # noqa - subscription = subscriber_client.create_subscription( - subscription_path, topic=topic - ) - - yield subscription.name - - -@pytest.fixture(scope="module") -def subscription_sync(subscriber_client, topic): - subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_SYNC) - - try: - subscription = subscriber_client.get_subscription(subscription_path) - except: # noqa - subscription = subscriber_client.create_subscription( - subscription_path, topic=topic - ) - - yield subscription.name - - subscriber_client.delete_subscription(subscription.name) - - -@pytest.fixture(scope="module") -def subscription_async(subscriber_client, topic): - subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_ASYNC) - - try: - subscription = subscriber_client.get_subscription(subscription_path) - except: # noqa - subscription = subscriber_client.create_subscription( - subscription_path, topic=topic - ) - - yield subscription.name - - subscriber_client.delete_subscription(subscription.name) - - -@pytest.fixture(scope="module") -def subscription_dlq(subscriber_client, topic): - subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_DLQ) - - try: - subscription = subscriber_client.get_subscription(subscription_path) - except: # noqa - subscription = subscriber_client.create_subscription( - subscription_path, topic=topic - ) - - yield subscription.name - - subscriber_client.delete_subscription(subscription.name) - - -def test_list_in_topic(subscription_admin, capsys): - @backoff.on_exception(backoff.expo, AssertionError, max_time=60) - def eventually_consistent_test(): - subscriber.list_subscriptions_in_topic(PROJECT, TOPIC) - out, _ = capsys.readouterr() - assert subscription_admin in out - - eventually_consistent_test() - - -def test_list_in_project(subscription_admin, capsys): - @backoff.on_exception(backoff.expo, AssertionError, max_time=60) - def eventually_consistent_test(): - subscriber.list_subscriptions_in_project(PROJECT) - out, _ = capsys.readouterr() - assert subscription_admin in out - - eventually_consistent_test() - - -def test_create(subscriber_client): - subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_ADMIN) - - try: - subscriber_client.delete_subscription(subscription_path) - except Exception: - pass - - subscriber.create_subscription(PROJECT, TOPIC, SUBSCRIPTION_ADMIN) - - @backoff.on_exception(backoff.expo, AssertionError, max_time=60) - def eventually_consistent_test(): - assert subscriber_client.get_subscription(subscription_path) - - eventually_consistent_test() - - -def test_create_subscription_with_dead_letter_policy( - subscriber_client, publisher_client, topic, dead_letter_topic, capsys -): - subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_DLQ) - dead_letter_topic_path = publisher_client.topic_path(PROJECT, DEAD_LETTER_TOPIC) - - try: - subscriber_client.delete_subscription(subscription_path) - except Exception: - pass - - subscriber.create_subscription_with_dead_letter_topic( - PROJECT, TOPIC, SUBSCRIPTION_DLQ, DEAD_LETTER_TOPIC - ) - - out, _ = capsys.readouterr() - assert "Subscription created: " + subscription_path in out - assert "It will forward dead letter messages to: " + dead_letter_topic_path in out - assert "After 10 delivery attempts." in out - - -def test_create_push(subscriber_client): - subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_ADMIN) - try: - subscriber_client.delete_subscription(subscription_path) - except Exception: - pass - - subscriber.create_push_subscription(PROJECT, TOPIC, SUBSCRIPTION_ADMIN, ENDPOINT) - - @backoff.on_exception(backoff.expo, AssertionError, max_time=60) - def eventually_consistent_test(): - assert subscriber_client.get_subscription(subscription_path) - - eventually_consistent_test() - - -def test_update(subscriber_client, subscription_admin, capsys): - subscriber.update_push_subscription( - PROJECT, TOPIC, SUBSCRIPTION_ADMIN, NEW_ENDPOINT - ) - - out, _ = capsys.readouterr() - assert "Subscription updated" in out - - -def test_update_dead_letter_policy( - subscriber_client, topic, subscription_dlq, dead_letter_topic, capsys -): - _ = subscriber.update_subscription_with_dead_letter_policy( - PROJECT, TOPIC, SUBSCRIPTION_DLQ, DEAD_LETTER_TOPIC - ) - - out, _ = capsys.readouterr() - assert "max_delivery_attempts: 20" in out - - -def test_delete(subscriber_client, subscription_admin): - subscriber.delete_subscription(PROJECT, SUBSCRIPTION_ADMIN) - - @backoff.on_exception(backoff.expo, AssertionError, max_time=60) - def eventually_consistent_test(): - with pytest.raises(Exception): - subscriber_client.get_subscription(subscription_admin) - - eventually_consistent_test() - - -def _publish_messages(publisher_client, topic): - for n in range(5): - data = u"message {}".format(n).encode("utf-8") - publish_future = publisher_client.publish( - topic, data=data, origin="python-sample" - ) - publish_future.result() - - -def test_receive(publisher_client, topic, subscription_async, capsys): - _publish_messages(publisher_client, topic) - - subscriber.receive_messages(PROJECT, SUBSCRIPTION_ASYNC, 5) - - out, _ = capsys.readouterr() - assert "Listening" in out - assert subscription_async in out - assert "message" in out - - -def test_receive_with_custom_attributes( - publisher_client, topic, subscription_async, capsys -): - - _publish_messages(publisher_client, topic) - - subscriber.receive_messages_with_custom_attributes(PROJECT, SUBSCRIPTION_ASYNC, 5) - - out, _ = capsys.readouterr() - assert "message" in out - assert "origin" in out - assert "python-sample" in out - - -def test_receive_with_flow_control(publisher_client, topic, subscription_async, capsys): - - _publish_messages(publisher_client, topic) - - subscriber.receive_messages_with_flow_control(PROJECT, SUBSCRIPTION_ASYNC, 5) - - out, _ = capsys.readouterr() - assert "Listening" in out - assert subscription_async in out - assert "message" in out - - -def test_receive_synchronously(publisher_client, topic, subscription_sync, capsys): - _publish_messages(publisher_client, topic) - - subscriber.synchronous_pull(PROJECT, SUBSCRIPTION_SYNC) - - out, _ = capsys.readouterr() - assert "Done." in out - - -def test_receive_synchronously_with_lease( - publisher_client, topic, subscription_sync, capsys -): - _publish_messages(publisher_client, topic) - - subscriber.synchronous_pull_with_lease_management(PROJECT, SUBSCRIPTION_SYNC) - - out, _ = capsys.readouterr() - assert "Done." in out - - -def test_listen_for_errors(publisher_client, topic, subscription_async, capsys): - - _publish_messages(publisher_client, topic) - - subscriber.listen_for_errors(PROJECT, SUBSCRIPTION_ASYNC, 5) - - out, _ = capsys.readouterr() - assert "Listening" in out - assert subscription_async in out - assert "threw an exception" in out - - -def test_receive_with_delivery_attempts( - publisher_client, topic, subscription_dlq, dead_letter_topic, capsys -): - _publish_messages(publisher_client, topic) - - subscriber.receive_messages_with_delivery_attempts(PROJECT, SUBSCRIPTION_DLQ, 10) - - out, _ = capsys.readouterr() - assert "Listening" in out - assert subscription_dlq in out - assert "Received message: " in out - assert "message 4" in out - assert "With delivery attempts: " in out - - -def test_remove_dead_letter_policy(subscriber_client, subscription_dlq): - subscription_after_update = subscriber.remove_dead_letter_policy( - PROJECT, TOPIC, SUBSCRIPTION_DLQ - ) - - assert subscription_after_update.dead_letter_policy.dead_letter_topic == ""