From 953a811c9bd1217a427aba94b126a02cbc1c4d5b Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Tue, 10 Dec 2019 16:31:20 -0800 Subject: [PATCH] Pub/Sub: update how to test with mock [(#2555)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/2555) * Update test with mock * Clean up resources after tests * Use unique resource names avoid test failures * Delete subscriptions in cleanup phase * Ensure unique topic name * Update assert to remove bytestring notation * Rewrite PubSubToGCS test using dataflow testing module --- samples/snippets/iam.py | 115 +++++----- samples/snippets/iam_test.py | 33 +-- samples/snippets/publisher.py | 167 ++++++++------- samples/snippets/publisher_test.py | 31 ++- samples/snippets/quickstart.py | 39 ++-- samples/snippets/quickstart/pub.py | 26 ++- samples/snippets/quickstart/pub_test.py | 24 +-- samples/snippets/quickstart/sub.py | 27 ++- samples/snippets/quickstart/sub_test.py | 82 ++++---- samples/snippets/quickstart_test.py | 23 +- samples/snippets/subscriber.py | 269 ++++++++++++++---------- samples/snippets/subscriber_test.py | 131 +++++++----- 12 files changed, 543 insertions(+), 424 deletions(-) diff --git a/samples/snippets/iam.py b/samples/snippets/iam.py index f9865ed39..f014ce749 100644 --- a/samples/snippets/iam.py +++ b/samples/snippets/iam.py @@ -34,9 +34,9 @@ def get_topic_policy(project, topic_name): policy = client.get_iam_policy(topic_path) - print('Policy for topic {}:'.format(topic_path)) + print("Policy for topic {}:".format(topic_path)) for binding in policy.bindings: - print('Role: {}, Members: {}'.format(binding.role, binding.members)) + print("Role: {}, Members: {}".format(binding.role, binding.members)) # [END pubsub_get_topic_policy] @@ -48,9 +48,9 @@ def get_subscription_policy(project, subscription_name): policy = client.get_iam_policy(subscription_path) - print('Policy for subscription {}:'.format(subscription_path)) + print("Policy for subscription {}:".format(subscription_path)) for binding in policy.bindings: - print('Role: {}, Members: {}'.format(binding.role, binding.members)) + print("Role: {}, Members: {}".format(binding.role, binding.members)) # [END pubsub_get_subscription_policy] @@ -63,20 +63,17 @@ def set_topic_policy(project, topic_name): policy = client.get_iam_policy(topic_path) # Add all users as viewers. - policy.bindings.add( - role='roles/pubsub.viewer', - members=['allUsers']) + policy.bindings.add(role="roles/pubsub.viewer", members=["allUsers"]) # Add a group as a publisher. policy.bindings.add( - role='roles/pubsub.publisher', - members=['group:cloud-logs@google.com']) + role="roles/pubsub.publisher", members=["group:cloud-logs@google.com"] + ) # Set the policy policy = client.set_iam_policy(topic_path, policy) - print('IAM policy for topic {} set: {}'.format( - topic_name, policy)) + print("IAM policy for topic {} set: {}".format(topic_name, policy)) # [END pubsub_set_topic_policy] @@ -89,20 +86,21 @@ def set_subscription_policy(project, subscription_name): policy = client.get_iam_policy(subscription_path) # Add all users as viewers. - policy.bindings.add( - role='roles/pubsub.viewer', - members=['allUsers']) + policy.bindings.add(role="roles/pubsub.viewer", members=["allUsers"]) # Add a group as an editor. policy.bindings.add( - role='roles/editor', - members=['group:cloud-logs@google.com']) + role="roles/editor", members=["group:cloud-logs@google.com"] + ) # Set the policy policy = client.set_iam_policy(subscription_path, policy) - print('IAM policy for subscription {} set: {}'.format( - subscription_name, policy)) + print( + "IAM policy for subscription {} set: {}".format( + subscription_name, policy + ) + ) # [END pubsub_set_subscription_policy] @@ -112,16 +110,17 @@ def check_topic_permissions(project, topic_name): client = pubsub_v1.PublisherClient() topic_path = client.topic_path(project, topic_name) - permissions_to_check = [ - 'pubsub.topics.publish', - 'pubsub.topics.update' - ] + permissions_to_check = ["pubsub.topics.publish", "pubsub.topics.update"] allowed_permissions = client.test_iam_permissions( - topic_path, permissions_to_check) + topic_path, permissions_to_check + ) - print('Allowed permissions for topic {}: {}'.format( - topic_path, allowed_permissions)) + print( + "Allowed permissions for topic {}: {}".format( + topic_path, allowed_permissions + ) + ) # [END pubsub_test_topic_permissions] @@ -132,63 +131,73 @@ def check_subscription_permissions(project, subscription_name): subscription_path = client.subscription_path(project, subscription_name) permissions_to_check = [ - 'pubsub.subscriptions.consume', - 'pubsub.subscriptions.update' + "pubsub.subscriptions.consume", + "pubsub.subscriptions.update", ] allowed_permissions = client.test_iam_permissions( - subscription_path, permissions_to_check) + subscription_path, permissions_to_check + ) - print('Allowed permissions for subscription {}: {}'.format( - subscription_path, allowed_permissions)) + print( + "Allowed permissions for subscription {}: {}".format( + subscription_path, allowed_permissions + ) + ) # [END pubsub_test_subscription_permissions] -if __name__ == '__main__': +if __name__ == "__main__": parser = argparse.ArgumentParser( description=__doc__, - formatter_class=argparse.RawDescriptionHelpFormatter + formatter_class=argparse.RawDescriptionHelpFormatter, ) - parser.add_argument('project', help='Your Google Cloud project ID') + parser.add_argument("project", help="Your Google Cloud project ID") - subparsers = parser.add_subparsers(dest='command') + subparsers = parser.add_subparsers(dest="command") get_topic_policy_parser = subparsers.add_parser( - 'get-topic-policy', help=get_topic_policy.__doc__) - get_topic_policy_parser.add_argument('topic_name') + "get-topic-policy", help=get_topic_policy.__doc__ + ) + get_topic_policy_parser.add_argument("topic_name") get_subscription_policy_parser = subparsers.add_parser( - 'get-subscription-policy', help=get_subscription_policy.__doc__) - get_subscription_policy_parser.add_argument('subscription_name') + "get-subscription-policy", help=get_subscription_policy.__doc__ + ) + get_subscription_policy_parser.add_argument("subscription_name") set_topic_policy_parser = subparsers.add_parser( - 'set-topic-policy', help=set_topic_policy.__doc__) - set_topic_policy_parser.add_argument('topic_name') + "set-topic-policy", help=set_topic_policy.__doc__ + ) + set_topic_policy_parser.add_argument("topic_name") set_subscription_policy_parser = subparsers.add_parser( - 'set-subscription-policy', help=set_subscription_policy.__doc__) - set_subscription_policy_parser.add_argument('subscription_name') + "set-subscription-policy", help=set_subscription_policy.__doc__ + ) + set_subscription_policy_parser.add_argument("subscription_name") check_topic_permissions_parser = subparsers.add_parser( - 'check-topic-permissions', help=check_topic_permissions.__doc__) - check_topic_permissions_parser.add_argument('topic_name') + "check-topic-permissions", help=check_topic_permissions.__doc__ + ) + check_topic_permissions_parser.add_argument("topic_name") check_subscription_permissions_parser = subparsers.add_parser( - 'check-subscription-permissions', - help=check_subscription_permissions.__doc__) - check_subscription_permissions_parser.add_argument('subscription_name') + "check-subscription-permissions", + help=check_subscription_permissions.__doc__, + ) + check_subscription_permissions_parser.add_argument("subscription_name") args = parser.parse_args() - if args.command == 'get-topic-policy': + if args.command == "get-topic-policy": get_topic_policy(args.project, args.topic_name) - elif args.command == 'get-subscription-policy': + elif args.command == "get-subscription-policy": get_subscription_policy(args.project, args.subscription_name) - elif args.command == 'set-topic-policy': + elif args.command == "set-topic-policy": set_topic_policy(args.project, args.topic_name) - elif args.command == 'set-subscription-policy': + elif args.command == "set-subscription-policy": set_subscription_policy(args.project, args.subscription_name) - elif args.command == 'check-topic-permissions': + elif args.command == "check-topic-permissions": check_topic_permissions(args.project, args.topic_name) - elif args.command == 'check-subscription-permissions': + elif args.command == "check-subscription-permissions": check_subscription_permissions(args.project, args.subscription_name) diff --git a/samples/snippets/iam_test.py b/samples/snippets/iam_test.py index 8a524c35a..2b019f9ea 100644 --- a/samples/snippets/iam_test.py +++ b/samples/snippets/iam_test.py @@ -13,23 +13,25 @@ # limitations under the License. import os +import uuid from google.cloud import pubsub_v1 import pytest import iam -PROJECT = os.environ['GCLOUD_PROJECT'] -TOPIC = 'iam-test-topic' -SUBSCRIPTION = 'iam-test-subscription' +UUID = uuid.uuid4().hex +PROJECT = os.environ["GCLOUD_PROJECT"] +TOPIC = "iam-test-topic-" + UUID +SUBSCRIPTION = "iam-test-subscription-" + UUID -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def publisher_client(): yield pubsub_v1.PublisherClient() -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def topic(publisher_client): topic_path = publisher_client.topic_path(PROJECT, TOPIC) @@ -42,8 +44,10 @@ def topic(publisher_client): yield topic_path + publisher_client.delete_topic(topic_path) -@pytest.fixture(scope='module') + +@pytest.fixture(scope="module") def subscriber_client(): yield pubsub_v1.SubscriberClient() @@ -51,7 +55,8 @@ def subscriber_client(): @pytest.fixture def subscription(subscriber_client, topic): subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION) + PROJECT, SUBSCRIPTION + ) try: subscriber_client.delete_subscription(subscription_path) @@ -62,6 +67,8 @@ def subscription(subscriber_client, topic): yield subscription_path + subscriber_client.delete_subscription(subscription_path) + def test_get_topic_policy(topic, capsys): iam.get_topic_policy(PROJECT, TOPIC) @@ -81,16 +88,16 @@ def test_set_topic_policy(publisher_client, topic): iam.set_topic_policy(PROJECT, TOPIC) policy = publisher_client.get_iam_policy(topic) - assert 'roles/pubsub.publisher' in str(policy) - assert 'allUsers' in str(policy) + assert "roles/pubsub.publisher" in str(policy) + assert "allUsers" in str(policy) def test_set_subscription_policy(subscriber_client, subscription): iam.set_subscription_policy(PROJECT, SUBSCRIPTION) policy = subscriber_client.get_iam_policy(subscription) - assert 'roles/pubsub.viewer' in str(policy) - assert 'allUsers' in str(policy) + assert "roles/pubsub.viewer" in str(policy) + assert "allUsers" in str(policy) def test_check_topic_permissions(topic, capsys): @@ -99,7 +106,7 @@ def test_check_topic_permissions(topic, capsys): out, _ = capsys.readouterr() assert topic in out - assert 'pubsub.topics.publish' in out + assert "pubsub.topics.publish" in out def test_check_subscription_permissions(subscription, capsys): @@ -108,4 +115,4 @@ def test_check_subscription_permissions(subscription, capsys): out, _ = capsys.readouterr() assert subscription in out - assert 'pubsub.subscriptions.consume' in out + assert "pubsub.subscriptions.consume" in out diff --git a/samples/snippets/publisher.py b/samples/snippets/publisher.py index 490c903b2..d227baab9 100644 --- a/samples/snippets/publisher.py +++ b/samples/snippets/publisher.py @@ -53,7 +53,7 @@ def create_topic(project_id, topic_name): topic = publisher.create_topic(topic_path) - print('Topic created: {}'.format(topic)) + print("Topic created: {}".format(topic)) # [END pubsub_quickstart_create_topic] # [END pubsub_create_topic] @@ -71,7 +71,7 @@ def delete_topic(project_id, topic_name): publisher.delete_topic(topic_path) - print('Topic deleted: {}'.format(topic_path)) + print("Topic deleted: {}".format(topic_path)) # [END pubsub_delete_topic] @@ -90,14 +90,14 @@ def publish_messages(project_id, topic_name): topic_path = publisher.topic_path(project_id, topic_name) for n in range(1, 10): - data = u'Message number {}'.format(n) + data = u"Message number {}".format(n) # Data must be a bytestring - data = data.encode('utf-8') + data = data.encode("utf-8") # When you publish a message, the client returns a future. future = publisher.publish(topic_path, data=data) print(future.result()) - print('Published messages.') + print("Published messages.") # [END pubsub_quickstart_publisher] # [END pubsub_publish] @@ -115,16 +115,16 @@ def publish_messages_with_custom_attributes(project_id, topic_name): topic_path = publisher.topic_path(project_id, topic_name) for n in range(1, 10): - data = u'Message number {}'.format(n) + data = u"Message number {}".format(n) # Data must be a bytestring - data = data.encode('utf-8') + data = data.encode("utf-8") # Add two attributes, origin and username, to the message future = publisher.publish( - topic_path, data, origin='python-sample', username='gcp' + topic_path, data, origin="python-sample", username="gcp" ) print(future.result()) - print('Published messages with custom attributes.') + print("Published messages with custom attributes.") # [END pubsub_publish_custom_attributes] @@ -141,14 +141,14 @@ def publish_messages_with_futures(project_id, topic_name): topic_path = publisher.topic_path(project_id, topic_name) for n in range(1, 10): - data = u'Message number {}'.format(n) + data = u"Message number {}".format(n) # Data must be a bytestring - data = data.encode('utf-8') + data = data.encode("utf-8") # When you publish a message, the client returns a future. future = publisher.publish(topic_path, data=data) print(future.result()) - print('Published messages with futures.') + print("Published messages with futures.") # [END pubsub_publisher_concurrency_control] @@ -173,7 +173,7 @@ def callback(f): print(f.result()) futures.pop(data) except: # noqa - print('Please handle {} for {}.'.format(f.exception(), data)) + print("Please handle {} for {}.".format(f.exception(), data)) return callback @@ -182,7 +182,7 @@ def callback(f): futures.update({data: None}) # When you publish a message, the client returns a future. future = publisher.publish( - topic_path, data=data.encode('utf-8') # data must be a bytestring. + topic_path, data=data.encode("utf-8") # data must be a bytestring. ) futures[data] = future # Publish failures shall be handled in the callback function. @@ -192,7 +192,7 @@ def callback(f): while futures: time.sleep(5) - print('Published message with error handler.') + print("Published message with error handler.") # [END pubsub_publish_messages_error_handler] @@ -207,20 +207,19 @@ def publish_messages_with_batch_settings(project_id, topic_name): # Configure the batch to publish as soon as there is one kilobyte # of data or one second has passed. batch_settings = pubsub_v1.types.BatchSettings( - max_bytes=1024, # One kilobyte - max_latency=1, # One second + max_bytes=1024, max_latency=1 # One kilobyte # One second ) publisher = pubsub_v1.PublisherClient(batch_settings) topic_path = publisher.topic_path(project_id, topic_name) for n in range(1, 10): - data = u'Message number {}'.format(n) + data = u"Message number {}".format(n) # Data must be a bytestring - data = data.encode('utf-8') + data = data.encode("utf-8") future = publisher.publish(topic_path, data=data) print(future.result()) - print('Published messages with batch settings.') + print("Published messages with batch settings.") # [END pubsub_publisher_batch_settings] @@ -234,34 +233,34 @@ def publish_messages_with_retry_settings(project_id, topic_name): # Configure the retry settings. Defaults will be overwritten. retry_settings = { - 'interfaces': { - 'google.pubsub.v1.Publisher': { - 'retry_codes': { - 'publish': [ - 'ABORTED', - 'CANCELLED', - 'DEADLINE_EXCEEDED', - 'INTERNAL', - 'RESOURCE_EXHAUSTED', - 'UNAVAILABLE', - 'UNKNOWN', + "interfaces": { + "google.pubsub.v1.Publisher": { + "retry_codes": { + "publish": [ + "ABORTED", + "CANCELLED", + "DEADLINE_EXCEEDED", + "INTERNAL", + "RESOURCE_EXHAUSTED", + "UNAVAILABLE", + "UNKNOWN", ] }, - 'retry_params': { - 'messaging': { - 'initial_retry_delay_millis': 100, # default: 100 - 'retry_delay_multiplier': 1.3, # default: 1.3 - 'max_retry_delay_millis': 60000, # default: 60000 - 'initial_rpc_timeout_millis': 5000, # default: 25000 - 'rpc_timeout_multiplier': 1.0, # default: 1.0 - 'max_rpc_timeout_millis': 600000, # default: 30000 - 'total_timeout_millis': 600000, # default: 600000 + "retry_params": { + "messaging": { + "initial_retry_delay_millis": 100, # default: 100 + "retry_delay_multiplier": 1.3, # default: 1.3 + "max_retry_delay_millis": 60000, # default: 60000 + "initial_rpc_timeout_millis": 5000, # default: 25000 + "rpc_timeout_multiplier": 1.0, # default: 1.0 + "max_rpc_timeout_millis": 600000, # default: 30000 + "total_timeout_millis": 600000, # default: 600000 } }, - 'methods': { - 'Publish': { - 'retry_codes_name': 'publish', - 'retry_params_name': 'messaging', + "methods": { + "Publish": { + "retry_codes_name": "publish", + "retry_params_name": "messaging", } }, } @@ -272,85 +271,85 @@ def publish_messages_with_retry_settings(project_id, topic_name): topic_path = publisher.topic_path(project_id, topic_name) for n in range(1, 10): - data = u'Message number {}'.format(n) + data = u"Message number {}".format(n) # Data must be a bytestring - data = data.encode('utf-8') + data = data.encode("utf-8") future = publisher.publish(topic_path, data=data) print(future.result()) - print('Published messages with retry settings.') + print("Published messages with retry settings.") # [END pubsub_publisher_retry_settings] if __name__ == "__main__": parser = argparse.ArgumentParser( description=__doc__, - formatter_class=argparse.RawDescriptionHelpFormatter + formatter_class=argparse.RawDescriptionHelpFormatter, ) - parser.add_argument('project_id', help='Your Google Cloud project ID') + parser.add_argument("project_id", help="Your Google Cloud project ID") - subparsers = parser.add_subparsers(dest='command') - subparsers.add_parser('list', help=list_topics.__doc__) + subparsers = parser.add_subparsers(dest="command") + subparsers.add_parser("list", help=list_topics.__doc__) - create_parser = subparsers.add_parser('create', - help=create_topic.__doc__) - create_parser.add_argument('topic_name') + create_parser = subparsers.add_parser("create", help=create_topic.__doc__) + create_parser.add_argument("topic_name") - delete_parser = subparsers.add_parser('delete', - help=delete_topic.__doc__) - delete_parser.add_argument('topic_name') + delete_parser = subparsers.add_parser("delete", help=delete_topic.__doc__) + delete_parser.add_argument("topic_name") - publish_parser = subparsers.add_parser('publish', - help=publish_messages.__doc__) - publish_parser.add_argument('topic_name') + publish_parser = subparsers.add_parser( + "publish", help=publish_messages.__doc__ + ) + publish_parser.add_argument("topic_name") publish_with_custom_attributes_parser = subparsers.add_parser( - 'publish-with-custom-attributes', + "publish-with-custom-attributes", help=publish_messages_with_custom_attributes.__doc__, ) - publish_with_custom_attributes_parser.add_argument('topic_name') + publish_with_custom_attributes_parser.add_argument("topic_name") publish_with_futures_parser = subparsers.add_parser( - 'publish-with-futures', help=publish_messages_with_futures.__doc__ + "publish-with-futures", help=publish_messages_with_futures.__doc__ ) - publish_with_futures_parser.add_argument('topic_name') + publish_with_futures_parser.add_argument("topic_name") publish_with_error_handler_parser = subparsers.add_parser( - 'publish-with-error-handler', - help=publish_messages_with_error_handler.__doc__ + "publish-with-error-handler", + help=publish_messages_with_error_handler.__doc__, ) - publish_with_error_handler_parser.add_argument('topic_name') + publish_with_error_handler_parser.add_argument("topic_name") publish_with_batch_settings_parser = subparsers.add_parser( - 'publish-with-batch-settings', - help=publish_messages_with_batch_settings.__doc__ + "publish-with-batch-settings", + help=publish_messages_with_batch_settings.__doc__, ) - publish_with_batch_settings_parser.add_argument('topic_name') + publish_with_batch_settings_parser.add_argument("topic_name") publish_with_retry_settings_parser = subparsers.add_parser( - 'publish-with-retry-settings', - help=publish_messages_with_retry_settings.__doc__ + "publish-with-retry-settings", + help=publish_messages_with_retry_settings.__doc__, ) - publish_with_retry_settings_parser.add_argument('topic_name') + publish_with_retry_settings_parser.add_argument("topic_name") args = parser.parse_args() - if args.command == 'list': + if args.command == "list": list_topics(args.project_id) - elif args.command == 'create': + elif args.command == "create": create_topic(args.project_id, args.topic_name) - elif args.command == 'delete': + elif args.command == "delete": delete_topic(args.project_id, args.topic_name) - elif args.command == 'publish': + elif args.command == "publish": publish_messages(args.project_id, args.topic_name) - elif args.command == 'publish-with-custom-attributes': - publish_messages_with_custom_attributes(args.project_id, - args.topic_name) - elif args.command == 'publish-with-futures': + elif args.command == "publish-with-custom-attributes": + publish_messages_with_custom_attributes( + args.project_id, args.topic_name + ) + elif args.command == "publish-with-futures": publish_messages_with_futures(args.project_id, args.topic_name) - elif args.command == 'publish-with-error-handler': + elif args.command == "publish-with-error-handler": publish_messages_with_error_handler(args.project_id, args.topic_name) - elif args.command == 'publish-with-batch-settings': + elif args.command == "publish-with-batch-settings": publish_messages_with_batch_settings(args.project_id, args.topic_name) - elif args.command == 'publish-with-retry-settings': + elif args.command == "publish-with-retry-settings": publish_messages_with_retry_settings(args.project_id, args.topic_name) diff --git a/samples/snippets/publisher_test.py b/samples/snippets/publisher_test.py index 5e550abd6..125fae3c0 100644 --- a/samples/snippets/publisher_test.py +++ b/samples/snippets/publisher_test.py @@ -14,6 +14,7 @@ import os import time +import uuid from gcp_devrel.testing import eventually_consistent from google.cloud import pubsub_v1 @@ -22,8 +23,9 @@ import publisher -PROJECT = os.environ['GCLOUD_PROJECT'] -TOPIC = 'publisher-test-topic' +UUID = uuid.uuid4().hex +PROJECT = os.environ["GCLOUD_PROJECT"] +TOPIC = "publisher-test-topic-" + UUID @pytest.fixture @@ -49,11 +51,18 @@ def _make_sleep_patch(): def new_sleep(period): if period == 60: real_sleep(5) - raise RuntimeError('sigil') + raise RuntimeError("sigil") else: real_sleep(period) - return mock.patch('time.sleep', new=new_sleep) + return mock.patch("time.sleep", new=new_sleep) + + +def _to_delete(): + publisher_client = pubsub_v1.PublisherClient() + publisher_client.delete_topic( + "projects/{}/topics/{}".format(PROJECT, TOPIC) + ) def test_list(client, topic, capsys): @@ -91,39 +100,41 @@ def test_publish(topic, capsys): publisher.publish_messages(PROJECT, TOPIC) out, _ = capsys.readouterr() - assert 'Published' in out + assert "Published" in out def test_publish_with_custom_attributes(topic, capsys): publisher.publish_messages_with_custom_attributes(PROJECT, TOPIC) out, _ = capsys.readouterr() - assert 'Published' in out + assert "Published" in out def test_publish_with_batch_settings(topic, capsys): publisher.publish_messages_with_batch_settings(PROJECT, TOPIC) out, _ = capsys.readouterr() - assert 'Published' in out + assert "Published" in out def test_publish_with_retry_settings(topic, capsys): publisher.publish_messages_with_retry_settings(PROJECT, TOPIC) out, _ = capsys.readouterr() - assert 'Published' in out + assert "Published" in out def test_publish_with_error_handler(topic, capsys): publisher.publish_messages_with_error_handler(PROJECT, TOPIC) out, _ = capsys.readouterr() - assert 'Published' in out + assert "Published" in out def test_publish_with_futures(topic, capsys): publisher.publish_messages_with_futures(PROJECT, TOPIC) out, _ = capsys.readouterr() - assert 'Published' in out + assert "Published" in out + + _to_delete() diff --git a/samples/snippets/quickstart.py b/samples/snippets/quickstart.py index f48d085e0..d01105885 100644 --- a/samples/snippets/quickstart.py +++ b/samples/snippets/quickstart.py @@ -39,34 +39,36 @@ def end_to_end(project_id, topic_name, subscription_name, num_messages): # The `subscription_path` method creates a fully qualified identifier # in the form `projects/{project_id}/subscriptions/{subscription_name}` subscription_path = subscriber.subscription_path( - project_id, subscription_name) + project_id, subscription_name + ) # Create the topic. topic = publisher.create_topic(topic_path) - print('\nTopic created: {}'.format(topic.name)) + print("\nTopic created: {}".format(topic.name)) # Create a subscription. subscription = subscriber.create_subscription( - subscription_path, topic_path) - print('\nSubscription created: {}\n'.format(subscription.name)) + subscription_path, topic_path + ) + print("\nSubscription created: {}\n".format(subscription.name)) publish_begin = time.time() # Publish messages. for n in range(num_messages): - data = u'Message number {}'.format(n) + data = u"Message number {}".format(n) # Data must be a bytestring - data = data.encode('utf-8') + data = data.encode("utf-8") # When you publish a message, the client returns a future. future = publisher.publish(topic_path, data=data) - print('Published {} of message ID {}.'.format(data, future.result())) + print("Published {} of message ID {}.".format(data, future.result())) publish_time = time.time() - publish_begin messages = set() def callback(message): - print('Received message: {}'.format(message)) + print("Received message: {}".format(message)) # Unacknowledged messages will be sent again. message.ack() messages.add(message) @@ -76,7 +78,7 @@ def callback(message): # Receive messages. The subscriber is nonblocking. subscriber.subscribe(subscription_path, callback=callback) - print('\nListening for messages on {}...\n'.format(subscription_path)) + print("\nListening for messages on {}...\n".format(subscription_path)) while True: if len(messages) == num_messages: @@ -87,22 +89,23 @@ def callback(message): break else: # Sleeps the thread at 50Hz to save on resources. - time.sleep(1. / 50) + time.sleep(1.0 / 50) # [END pubsub_end_to_end] -if __name__ == '__main__': +if __name__ == "__main__": parser = argparse.ArgumentParser( description=__doc__, - formatter_class=argparse.RawDescriptionHelpFormatter + formatter_class=argparse.RawDescriptionHelpFormatter, ) - parser.add_argument('project_id', help='Your Google Cloud project ID') - parser.add_argument('topic_name', help='Your topic name') - parser.add_argument('subscription_name', help='Your subscription name') - parser.add_argument('num_msgs', type=int, help='Number of test messages') + parser.add_argument("project_id", help="Your Google Cloud project ID") + parser.add_argument("topic_name", help="Your topic name") + parser.add_argument("subscription_name", help="Your subscription name") + parser.add_argument("num_msgs", type=int, help="Number of test messages") args = parser.parse_args() - end_to_end(args.project_id, args.topic_name, args.subscription_name, - args.num_msgs) + end_to_end( + args.project_id, args.topic_name, args.subscription_name, args.num_msgs + ) diff --git a/samples/snippets/quickstart/pub.py b/samples/snippets/quickstart/pub.py index e340eb4f3..a3f8087ec 100644 --- a/samples/snippets/quickstart/pub.py +++ b/samples/snippets/quickstart/pub.py @@ -17,22 +17,32 @@ # [START pubsub_quickstart_pub_all] import argparse import time + # [START pubsub_quickstart_pub_deps] from google.cloud import pubsub_v1 + # [END pubsub_quickstart_pub_deps] def get_callback(api_future, data, ref): """Wrap message data in the context of the callback function.""" + def callback(api_future): try: - print("Published message {} now has message ID {}".format( - data, api_future.result())) + print( + "Published message {} now has message ID {}".format( + data, api_future.result() + ) + ) ref["num_messages"] += 1 except Exception: - print("A problem occurred when publishing {}: {}\n".format( - data, api_future.exception())) + print( + "A problem occurred when publishing {}: {}\n".format( + data, api_future.exception() + ) + ) raise + return callback @@ -63,13 +73,13 @@ def pub(project_id, topic_name): print("Published {} message(s).".format(ref["num_messages"])) -if __name__ == '__main__': +if __name__ == "__main__": parser = argparse.ArgumentParser( description=__doc__, - formatter_class=argparse.RawDescriptionHelpFormatter + formatter_class=argparse.RawDescriptionHelpFormatter, ) - parser.add_argument('project_id', help='Google Cloud project ID') - parser.add_argument('topic_name', help='Pub/Sub topic name') + parser.add_argument("project_id", help="Google Cloud project ID") + parser.add_argument("topic_name", help="Pub/Sub topic name") args = parser.parse_args() diff --git a/samples/snippets/quickstart/pub_test.py b/samples/snippets/quickstart/pub_test.py index 09443364a..b9a6f807f 100644 --- a/samples/snippets/quickstart/pub_test.py +++ b/samples/snippets/quickstart/pub_test.py @@ -16,22 +16,24 @@ import os import pytest +import uuid from google.api_core.exceptions import AlreadyExists from google.cloud import pubsub_v1 import pub -PROJECT = os.environ['GCLOUD_PROJECT'] -TOPIC = 'quickstart-pub-test-topic' +UUID = uuid.uuid4().hex +PROJECT = os.environ["GCLOUD_PROJECT"] +TOPIC = "quickstart-pub-test-topic-" + UUID -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def publisher_client(): yield pubsub_v1.PublisherClient() -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def topic(publisher_client): topic_path = publisher_client.topic_path(PROJECT, TOPIC) @@ -42,20 +44,12 @@ def topic(publisher_client): yield TOPIC + publisher_client.delete_topic(topic_path) -@pytest.fixture -def to_delete(publisher_client): - doomed = [] - yield doomed - for item in doomed: - publisher_client.delete_topic(item) - -def test_pub(publisher_client, topic, to_delete, capsys): +def test_pub(publisher_client, topic, capsys): pub.pub(PROJECT, topic) - to_delete.append('projects/{}/topics/{}'.format(PROJECT, TOPIC)) - out, _ = capsys.readouterr() - assert "Published message b'Hello, World!'" in out + assert "Hello, World!" in out diff --git a/samples/snippets/quickstart/sub.py b/samples/snippets/quickstart/sub.py index e39f14105..5791af14d 100644 --- a/samples/snippets/quickstart/sub.py +++ b/samples/snippets/quickstart/sub.py @@ -16,8 +16,10 @@ # [START pubsub_quickstart_sub_all] import argparse + # [START pubsub_quickstart_sub_deps] from google.cloud import pubsub_v1 + # [END pubsub_quickstart_sub_deps] @@ -29,19 +31,22 @@ def sub(project_id, subscription_name): # [END pubsub_quickstart_sub_client] # Create a fully qualified identifier in the form of # `projects/{project_id}/subscriptions/{subscription_name}` - subscription_path = client.subscription_path( - project_id, subscription_name) + subscription_path = client.subscription_path(project_id, subscription_name) def callback(message): - print('Received message {} of message ID {}\n'.format( - message, message.message_id)) + print( + "Received message {} of message ID {}\n".format( + message, message.message_id + ) + ) # Acknowledge the message. Unack'ed messages will be redelivered. message.ack() - print('Acknowledged message {}\n'.format(message.message_id)) + print("Acknowledged message {}\n".format(message.message_id)) streaming_pull_future = client.subscribe( - subscription_path, callback=callback) - print('Listening for messages on {}..\n'.format(subscription_path)) + subscription_path, callback=callback + ) + print("Listening for messages on {}..\n".format(subscription_path)) # Calling result() on StreamingPullFuture keeps the main thread from # exiting while messages get processed in the callbacks. @@ -51,13 +56,13 @@ def callback(message): streaming_pull_future.cancel() -if __name__ == '__main__': +if __name__ == "__main__": parser = argparse.ArgumentParser( description=__doc__, - formatter_class=argparse.RawDescriptionHelpFormatter + formatter_class=argparse.RawDescriptionHelpFormatter, ) - parser.add_argument('project_id', help='Google Cloud project ID') - parser.add_argument('subscription_name', help='Pub/Sub subscription name') + parser.add_argument("project_id", help="Google Cloud project ID") + parser.add_argument("subscription_name", help="Pub/Sub subscription name") args = parser.parse_args() diff --git a/samples/snippets/quickstart/sub_test.py b/samples/snippets/quickstart/sub_test.py index 476139a02..07edfad7c 100644 --- a/samples/snippets/quickstart/sub_test.py +++ b/samples/snippets/quickstart/sub_test.py @@ -14,8 +14,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import mock import os import pytest +import uuid from google.api_core.exceptions import AlreadyExists from google.cloud import pubsub_v1 @@ -23,84 +25,80 @@ import sub -PROJECT = os.environ['GCLOUD_PROJECT'] -TOPIC = 'quickstart-sub-test-topic' -SUBSCRIPTION = 'quickstart-sub-test-topic-sub' +UUID = uuid.uuid4().hex +PROJECT = os.environ["GCLOUD_PROJECT"] +TOPIC = "quickstart-sub-test-topic-" + UUID +SUBSCRIPTION = "quickstart-sub-test-topic-sub-" + UUID publisher_client = pubsub_v1.PublisherClient() subscriber_client = pubsub_v1.SubscriberClient() -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def topic_path(): topic_path = publisher_client.topic_path(PROJECT, TOPIC) try: topic = publisher_client.create_topic(topic_path) - return topic.name + yield topic.name except AlreadyExists: - return topic_path + yield topic_path + publisher_client.delete_topic(topic_path) -@pytest.fixture(scope='module') + +@pytest.fixture(scope="module") def subscription_path(topic_path): subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION) + PROJECT, SUBSCRIPTION + ) try: subscription = subscriber_client.create_subscription( - subscription_path, topic_path) - return subscription.name + subscription_path, topic_path + ) + yield subscription.name except AlreadyExists: - return subscription_path - + yield subscription_path -def _to_delete(resource_paths): - for item in resource_paths: - if 'topics' in item: - publisher_client.delete_topic(item) - if 'subscriptions' in item: - subscriber_client.delete_subscription(item) + subscriber_client.delete_subscription(subscription_path) def _publish_messages(topic_path): - publish_future = publisher_client.publish(topic_path, data=b'Hello World!') + publish_future = publisher_client.publish(topic_path, data=b"Hello World!") publish_future.result() -def _sub_timeout(project_id, subscription_name): - # This is an exactly copy of `sub.py` except - # StreamingPullFuture.result() will time out after 10s. - client = pubsub_v1.SubscriberClient() - subscription_path = client.subscription_path( - project_id, subscription_name) +def test_sub(monkeypatch, topic_path, subscription_path, capsys): - def callback(message): - print('Received message {} of message ID {}\n'.format( - message, message.message_id)) - message.ack() - print('Acknowledged message {}\n'.format(message.message_id)) + real_client = pubsub_v1.SubscriberClient() + mock_client = mock.Mock(spec=pubsub_v1.SubscriberClient, wraps=real_client) - streaming_pull_future = client.subscribe( - subscription_path, callback=callback) - print('Listening for messages on {}..\n'.format(subscription_path)) + # Attributes on mock_client_constructor uses the corresponding + # attributes on pubsub_v1.SubscriberClient. + mock_client_constructor = mock.create_autospec(pubsub_v1.SubscriberClient) + mock_client_constructor.return_value = mock_client - try: - streaming_pull_future.result(timeout=10) - except: # noqa - streaming_pull_future.cancel() + monkeypatch.setattr(pubsub_v1, "SubscriberClient", mock_client_constructor) + def mock_subscribe(subscription_path, callback=None): + real_future = real_client.subscribe( + subscription_path, callback=callback + ) + mock_future = mock.Mock(spec=real_future, wraps=real_future) -def test_sub(monkeypatch, topic_path, subscription_path, capsys): - monkeypatch.setattr(sub, 'sub', _sub_timeout) + def mock_result(): + return real_future.result(timeout=10) + + mock_future.result.side_effect = mock_result + return mock_future + + mock_client.subscribe.side_effect = mock_subscribe _publish_messages(topic_path) sub.sub(PROJECT, SUBSCRIPTION) - # Clean up resources. - _to_delete([topic_path, subscription_path]) - out, _ = capsys.readouterr() assert "Received message" in out assert "Acknowledged message" in out diff --git a/samples/snippets/quickstart_test.py b/samples/snippets/quickstart_test.py index d318b260c..6a1d4aae1 100644 --- a/samples/snippets/quickstart_test.py +++ b/samples/snippets/quickstart_test.py @@ -15,23 +15,25 @@ # limitations under the License. import os +import uuid from google.cloud import pubsub_v1 import pytest import quickstart -PROJECT = os.environ['GCLOUD_PROJECT'] -TOPIC = 'end-to-end-test-topic' -SUBSCRIPTION = 'end-to-end-test-topic-sub' +UUID = uuid.uuid4().hex +PROJECT = os.environ["GCLOUD_PROJECT"] +TOPIC = "end-to-end-test-topic-" + UUID +SUBSCRIPTION = "end-to-end-test-topic-sub-" + UUID N = 10 -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def publisher_client(): yield pubsub_v1.PublisherClient() -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def topic(publisher_client): topic_path = publisher_client.topic_path(PROJECT, TOPIC) @@ -42,16 +44,19 @@ def topic(publisher_client): yield TOPIC + publisher_client.delete_topic(topic_path) -@pytest.fixture(scope='module') + +@pytest.fixture(scope="module") def subscriber_client(): yield pubsub_v1.SubscriberClient() -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def subscription(subscriber_client, topic): subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION) + PROJECT, SUBSCRIPTION + ) try: subscriber_client.delete_subscription(subscription_path) @@ -60,6 +65,8 @@ def subscription(subscriber_client, topic): yield SUBSCRIPTION + subscriber_client.delete_subscription(subscription_path) + def test_end_to_end(topic, subscription, capsys): diff --git a/samples/snippets/subscriber.py b/samples/snippets/subscriber.py index 3bbad0ead..ea1cc9ff9 100644 --- a/samples/snippets/subscriber.py +++ b/samples/snippets/subscriber.py @@ -67,19 +67,20 @@ def create_subscription(project_id, topic_name, subscription_name): subscriber = pubsub_v1.SubscriberClient() topic_path = subscriber.topic_path(project_id, topic_name) subscription_path = subscriber.subscription_path( - project_id, subscription_name) + project_id, subscription_name + ) subscription = subscriber.create_subscription( - subscription_path, topic_path) + subscription_path, topic_path + ) - print('Subscription created: {}'.format(subscription)) + print("Subscription created: {}".format(subscription)) # [END pubsub_create_pull_subscription] -def create_push_subscription(project_id, - topic_name, - subscription_name, - endpoint): +def create_push_subscription( + project_id, topic_name, subscription_name, endpoint +): """Create a new push subscription on the given topic.""" # [START pubsub_create_push_subscription] from google.cloud import pubsub_v1 @@ -92,16 +93,17 @@ def create_push_subscription(project_id, subscriber = pubsub_v1.SubscriberClient() topic_path = subscriber.topic_path(project_id, topic_name) subscription_path = subscriber.subscription_path( - project_id, subscription_name) + project_id, subscription_name + ) - push_config = pubsub_v1.types.PushConfig( - push_endpoint=endpoint) + push_config = pubsub_v1.types.PushConfig(push_endpoint=endpoint) subscription = subscriber.create_subscription( - subscription_path, topic_path, push_config) + subscription_path, topic_path, push_config + ) - print('Push subscription created: {}'.format(subscription)) - print('Endpoint for subscription is: {}'.format(endpoint)) + print("Push subscription created: {}".format(subscription)) + print("Endpoint for subscription is: {}".format(endpoint)) # [END pubsub_create_push_subscription] @@ -115,11 +117,12 @@ def delete_subscription(project_id, subscription_name): subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( - project_id, subscription_name) + project_id, subscription_name + ) subscriber.delete_subscription(subscription_path) - print('Subscription deleted: {}'.format(subscription_path)) + print("Subscription deleted: {}".format(subscription_path)) # [END pubsub_delete_subscription] @@ -139,27 +142,22 @@ def update_subscription(project_id, subscription_name, endpoint): subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( - project_id, subscription_name) + project_id, subscription_name + ) - push_config = pubsub_v1.types.PushConfig( - push_endpoint=endpoint) + push_config = pubsub_v1.types.PushConfig(push_endpoint=endpoint) subscription = pubsub_v1.types.Subscription( - name=subscription_path, - push_config=push_config) + name=subscription_path, push_config=push_config + ) - update_mask = { - 'paths': { - 'push_config', - } - } + update_mask = {"paths": {"push_config"}} subscriber.update_subscription(subscription, update_mask) result = subscriber.get_subscription(subscription_path) - print('Subscription updated: {}'.format(subscription_path)) - print('New endpoint for subscription is: {}'.format( - result.push_config)) + print("Subscription updated: {}".format(subscription_path)) + print("New endpoint for subscription is: {}".format(result.push_config)) # [END pubsub_update_push_configuration] @@ -178,17 +176,18 @@ def receive_messages(project_id, subscription_name): # The `subscription_path` method creates a fully qualified identifier # in the form `projects/{project_id}/subscriptions/{subscription_name}` subscription_path = subscriber.subscription_path( - project_id, subscription_name) + project_id, subscription_name + ) def callback(message): - print('Received message: {}'.format(message)) + print("Received message: {}".format(message)) message.ack() subscriber.subscribe(subscription_path, callback=callback) # The subscriber is non-blocking. We must keep the main thread from # exiting to allow it to process messages asynchronously in the background. - print('Listening for messages on {}'.format(subscription_path)) + print("Listening for messages on {}".format(subscription_path)) while True: time.sleep(60) # [END pubsub_subscriber_async_pull] @@ -208,22 +207,23 @@ def receive_messages_with_custom_attributes(project_id, subscription_name): subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( - project_id, subscription_name) + project_id, subscription_name + ) def callback(message): - print('Received message: {}'.format(message.data)) + print("Received message: {}".format(message.data)) if message.attributes: - print('Attributes:') + print("Attributes:") for key in message.attributes: value = message.attributes.get(key) - print('{}: {}'.format(key, value)) + print("{}: {}".format(key, value)) message.ack() subscriber.subscribe(subscription_path, callback=callback) # The subscriber is non-blocking, so we must keep the main thread from # exiting to allow it to process messages in the background. - print('Listening for messages on {}'.format(subscription_path)) + print("Listening for messages on {}".format(subscription_path)) while True: time.sleep(60) # [END pubsub_subscriber_async_pull_custom_attributes] @@ -242,20 +242,22 @@ def receive_messages_with_flow_control(project_id, subscription_name): subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( - project_id, subscription_name) + project_id, subscription_name + ) def callback(message): - print('Received message: {}'.format(message.data)) + print("Received message: {}".format(message.data)) message.ack() # Limit the subscriber to only have ten outstanding messages at a time. flow_control = pubsub_v1.types.FlowControl(max_messages=10) subscriber.subscribe( - subscription_path, callback=callback, flow_control=flow_control) + subscription_path, callback=callback, flow_control=flow_control + ) # The subscriber is non-blocking, so we must keep the main thread from # exiting to allow it to process messages in the background. - print('Listening for messages on {}'.format(subscription_path)) + print("Listening for messages on {}".format(subscription_path)) while True: time.sleep(60) # [END pubsub_subscriber_flow_settings] @@ -271,7 +273,8 @@ def synchronous_pull(project_id, subscription_name): subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( - project_id, subscription_name) + project_id, subscription_name + ) NUM_MESSAGES = 3 @@ -286,8 +289,11 @@ def synchronous_pull(project_id, subscription_name): # Acknowledges the received messages so they will not be sent again. subscriber.acknowledge(subscription_path, ack_ids) - print('Received and acknowledged {} messages. Done.'.format( - len(response.received_messages))) + print( + "Received and acknowledged {} messages. Done.".format( + len(response.received_messages) + ) + ) # [END pubsub_subscriber_sync_pull] @@ -306,7 +312,8 @@ def synchronous_pull_with_lease_management(project_id, subscription_name): subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( - project_id, subscription_name) + project_id, subscription_name + ) NUM_MESSAGES = 2 ACK_DEADLINE = 30 @@ -322,8 +329,11 @@ def synchronous_pull_with_lease_management(project_id, subscription_name): def worker(msg): """Simulates a long-running process.""" RUN_TIME = random.randint(1, 60) - logger.info('{}: Running {} for {}s'.format( - time.strftime("%X", time.gmtime()), msg.message.data, RUN_TIME)) + logger.info( + "{}: Running {} for {}s".format( + time.strftime("%X", time.gmtime()), msg.message.data, RUN_TIME + ) + ) time.sleep(RUN_TIME) # `processes` stores process as key and ack id and message as values. @@ -344,24 +354,35 @@ def worker(msg): subscriber.modify_ack_deadline( subscription_path, [ack_id], - ack_deadline_seconds=ACK_DEADLINE) - logger.info('{}: Reset ack deadline for {} for {}s'.format( - time.strftime("%X", time.gmtime()), - msg_data, ACK_DEADLINE)) + ack_deadline_seconds=ACK_DEADLINE, + ) + logger.info( + "{}: Reset ack deadline for {} for {}s".format( + time.strftime("%X", time.gmtime()), + msg_data, + ACK_DEADLINE, + ) + ) # If the processs is finished, acknowledges using `ack_id`. else: subscriber.acknowledge(subscription_path, [ack_id]) - logger.info("{}: Acknowledged {}".format( - time.strftime("%X", time.gmtime()), msg_data)) + logger.info( + "{}: Acknowledged {}".format( + time.strftime("%X", time.gmtime()), msg_data + ) + ) processes.pop(process) # If there are still processes running, sleeps the thread. if processes: time.sleep(SLEEP_TIME) - print('Received and acknowledged {} messages. Done.'.format( - len(response.received_messages))) + print( + "Received and acknowledged {} messages. Done.".format( + len(response.received_messages) + ) + ) # [END pubsub_subscriber_sync_pull_with_lease] @@ -375,10 +396,11 @@ def listen_for_errors(project_id, subscription_name): subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( - project_id, subscription_name) + project_id, subscription_name + ) def callback(message): - print('Received message: {}'.format(message)) + print("Received message: {}".format(message)) message.ack() future = subscriber.subscribe(subscription_path, callback=callback) @@ -390,109 +412,126 @@ def callback(message): future.result(timeout=30) except Exception as e: print( - 'Listening for messages on {} threw an Exception: {}.'.format( - subscription_name, e)) + "Listening for messages on {} threw an Exception: {}.".format( + subscription_name, e + ) + ) # [END pubsub_subscriber_error_listener] -if __name__ == '__main__': +if __name__ == "__main__": parser = argparse.ArgumentParser( description=__doc__, - formatter_class=argparse.RawDescriptionHelpFormatter + formatter_class=argparse.RawDescriptionHelpFormatter, ) - parser.add_argument('project_id', help='Your Google Cloud project ID') + parser.add_argument("project_id", help="Your Google Cloud project ID") - subparsers = parser.add_subparsers(dest='command') + subparsers = parser.add_subparsers(dest="command") list_in_topic_parser = subparsers.add_parser( - 'list_in_topic', help=list_subscriptions_in_topic.__doc__) - list_in_topic_parser.add_argument('topic_name') + "list_in_topic", help=list_subscriptions_in_topic.__doc__ + ) + list_in_topic_parser.add_argument("topic_name") list_in_project_parser = subparsers.add_parser( - 'list_in_project', help=list_subscriptions_in_project.__doc__) + "list_in_project", help=list_subscriptions_in_project.__doc__ + ) create_parser = subparsers.add_parser( - 'create', help=create_subscription.__doc__) - create_parser.add_argument('topic_name') - create_parser.add_argument('subscription_name') + "create", help=create_subscription.__doc__ + ) + create_parser.add_argument("topic_name") + create_parser.add_argument("subscription_name") create_push_parser = subparsers.add_parser( - 'create-push', help=create_push_subscription.__doc__) - create_push_parser.add_argument('topic_name') - create_push_parser.add_argument('subscription_name') - create_push_parser.add_argument('endpoint') + "create-push", help=create_push_subscription.__doc__ + ) + create_push_parser.add_argument("topic_name") + create_push_parser.add_argument("subscription_name") + create_push_parser.add_argument("endpoint") delete_parser = subparsers.add_parser( - 'delete', help=delete_subscription.__doc__) - delete_parser.add_argument('subscription_name') + "delete", help=delete_subscription.__doc__ + ) + delete_parser.add_argument("subscription_name") update_parser = subparsers.add_parser( - 'update', help=update_subscription.__doc__) - update_parser.add_argument('subscription_name') - update_parser.add_argument('endpoint') + "update", help=update_subscription.__doc__ + ) + update_parser.add_argument("subscription_name") + update_parser.add_argument("endpoint") receive_parser = subparsers.add_parser( - 'receive', help=receive_messages.__doc__) - receive_parser.add_argument('subscription_name') + "receive", help=receive_messages.__doc__ + ) + receive_parser.add_argument("subscription_name") receive_with_custom_attributes_parser = subparsers.add_parser( - 'receive-custom-attributes', - help=receive_messages_with_custom_attributes.__doc__) - receive_with_custom_attributes_parser.add_argument('subscription_name') + "receive-custom-attributes", + help=receive_messages_with_custom_attributes.__doc__, + ) + receive_with_custom_attributes_parser.add_argument("subscription_name") receive_with_flow_control_parser = subparsers.add_parser( - 'receive-flow-control', - help=receive_messages_with_flow_control.__doc__) - receive_with_flow_control_parser.add_argument('subscription_name') + "receive-flow-control", help=receive_messages_with_flow_control.__doc__ + ) + receive_with_flow_control_parser.add_argument("subscription_name") synchronous_pull_parser = subparsers.add_parser( - 'receive-synchronously', - help=synchronous_pull.__doc__) - synchronous_pull_parser.add_argument('subscription_name') + "receive-synchronously", help=synchronous_pull.__doc__ + ) + synchronous_pull_parser.add_argument("subscription_name") synchronous_pull_with_lease_management_parser = subparsers.add_parser( - 'receive-synchronously-with-lease', - help=synchronous_pull_with_lease_management.__doc__) + "receive-synchronously-with-lease", + help=synchronous_pull_with_lease_management.__doc__, + ) synchronous_pull_with_lease_management_parser.add_argument( - 'subscription_name') + "subscription_name" + ) listen_for_errors_parser = subparsers.add_parser( - 'listen_for_errors', help=listen_for_errors.__doc__) - listen_for_errors_parser.add_argument('subscription_name') + "listen_for_errors", help=listen_for_errors.__doc__ + ) + listen_for_errors_parser.add_argument("subscription_name") args = parser.parse_args() - if args.command == 'list_in_topic': + if args.command == "list_in_topic": list_subscriptions_in_topic(args.project_id, args.topic_name) - elif args.command == 'list_in_project': + elif args.command == "list_in_project": list_subscriptions_in_project(args.project_id) - elif args.command == 'create': + elif args.command == "create": create_subscription( - args.project_id, args.topic_name, args.subscription_name) - elif args.command == 'create-push': + args.project_id, args.topic_name, args.subscription_name + ) + elif args.command == "create-push": create_push_subscription( args.project_id, args.topic_name, args.subscription_name, - args.endpoint) - elif args.command == 'delete': - delete_subscription( - args.project_id, args.subscription_name) - elif args.command == 'update': + args.endpoint, + ) + elif args.command == "delete": + delete_subscription(args.project_id, args.subscription_name) + elif args.command == "update": update_subscription( - args.project_id, args.subscription_name, args.endpoint) - elif args.command == 'receive': + args.project_id, args.subscription_name, args.endpoint + ) + elif args.command == "receive": receive_messages(args.project_id, args.subscription_name) - elif args.command == 'receive-custom-attributes': + elif args.command == "receive-custom-attributes": receive_messages_with_custom_attributes( - args.project_id, args.subscription_name) - elif args.command == 'receive-flow-control': + args.project_id, args.subscription_name + ) + elif args.command == "receive-flow-control": receive_messages_with_flow_control( - args.project_id, args.subscription_name) - elif args.command == 'receive-synchronously': - synchronous_pull( - args.project_id, args.subscription_name) - elif args.command == 'receive-synchronously-with-lease': + args.project_id, args.subscription_name + ) + elif args.command == "receive-synchronously": + synchronous_pull(args.project_id, args.subscription_name) + elif args.command == "receive-synchronously-with-lease": synchronous_pull_with_lease_management( - args.project_id, args.subscription_name) - elif args.command == 'listen_for_errors': + args.project_id, args.subscription_name + ) + elif args.command == "listen_for_errors": listen_for_errors(args.project_id, args.subscription_name) diff --git a/samples/snippets/subscriber_test.py b/samples/snippets/subscriber_test.py index 4c5fd6122..0645c0738 100644 --- a/samples/snippets/subscriber_test.py +++ b/samples/snippets/subscriber_test.py @@ -14,6 +14,7 @@ import os import time +import uuid from gcp_devrel.testing import eventually_consistent from google.cloud import pubsub_v1 @@ -22,21 +23,22 @@ import subscriber -PROJECT = os.environ['GCLOUD_PROJECT'] -TOPIC = 'subscription-test-topic' -SUBSCRIPTION_ONE = 'subscription-test-subscription-one' -SUBSCRIPTION_TWO = 'subscription-test-subscription-two' -SUBSCRIPTION_THREE = 'subscription-test-subscription-three' -ENDPOINT = 'https://{}.appspot.com/push'.format(PROJECT) -NEW_ENDPOINT = 'https://{}.appspot.com/push2'.format(PROJECT) +UUID = uuid.uuid4().hex +PROJECT = os.environ["GCLOUD_PROJECT"] +TOPIC = "subscription-test-topic-" + UUID +SUBSCRIPTION_ONE = "subscription-test-subscription-one-" + UUID +SUBSCRIPTION_TWO = "subscription-test-subscription-two-" + UUID +SUBSCRIPTION_THREE = "subscription-test-subscription-three-" + UUID +ENDPOINT = "https://{}.appspot.com/push".format(PROJECT) +NEW_ENDPOINT = "https://{}.appspot.com/push2".format(PROJECT) -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def publisher_client(): yield pubsub_v1.PublisherClient() -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def topic(publisher_client): topic_path = publisher_client.topic_path(PROJECT, TOPIC) @@ -48,49 +50,55 @@ def topic(publisher_client): yield response.name -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def subscriber_client(): yield pubsub_v1.SubscriberClient() -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def subscription_one(subscriber_client, topic): subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION_ONE) + PROJECT, SUBSCRIPTION_ONE + ) try: response = subscriber_client.get_subscription(subscription_path) except: # noqa response = subscriber_client.create_subscription( - subscription_path, topic=topic) + subscription_path, topic=topic + ) yield response.name -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def subscription_two(subscriber_client, topic): subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION_TWO) + PROJECT, SUBSCRIPTION_TWO + ) try: response = subscriber_client.get_subscription(subscription_path) except: # noqa response = subscriber_client.create_subscription( - subscription_path, topic=topic) + subscription_path, topic=topic + ) yield response.name -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def subscription_three(subscriber_client, topic): subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION_THREE) + PROJECT, SUBSCRIPTION_THREE + ) try: response = subscriber_client.get_subscription(subscription_path) except: # noqa response = subscriber_client.create_subscription( - subscription_path, topic=topic) + subscription_path, topic=topic + ) yield response.name @@ -113,7 +121,8 @@ def _(): def test_create(subscriber_client): subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION_ONE) + PROJECT, SUBSCRIPTION_ONE + ) try: subscriber_client.delete_subscription(subscription_path) @@ -129,14 +138,16 @@ def _(): def test_create_push(subscriber_client): subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION_ONE) + PROJECT, SUBSCRIPTION_ONE + ) try: subscriber_client.delete_subscription(subscription_path) except Exception: pass subscriber.create_push_subscription( - PROJECT, TOPIC, SUBSCRIPTION_ONE, ENDPOINT) + PROJECT, TOPIC, SUBSCRIPTION_ONE, ENDPOINT + ) @eventually_consistent.call def _(): @@ -147,7 +158,7 @@ def test_update(subscriber_client, subscription_one, capsys): subscriber.update_subscription(PROJECT, SUBSCRIPTION_ONE, NEW_ENDPOINT) out, _ = capsys.readouterr() - assert 'Subscription updated' in out + assert "Subscription updated" in out def test_delete(subscriber_client, subscription_one): @@ -161,14 +172,14 @@ def _(): def _publish_messages(publisher_client, topic): for n in range(5): - data = u'Message {}'.format(n).encode('utf-8') + data = u"Message {}".format(n).encode("utf-8") future = publisher_client.publish(topic, data=data) future.result() def _publish_messages_with_custom_attributes(publisher_client, topic): - data = u'Test message'.encode('utf-8') - future = publisher_client.publish(topic, data=data, origin='python-sample') + data = u"Test message".encode("utf-8") + future = publisher_client.publish(topic, data=data, origin="python-sample") future.result() @@ -178,74 +189,100 @@ def _make_sleep_patch(): def new_sleep(period): if period == 60: real_sleep(5) - raise RuntimeError('sigil') + raise RuntimeError("sigil") else: real_sleep(period) - return mock.patch('time.sleep', new=new_sleep) + return mock.patch("time.sleep", new=new_sleep) + + +def _to_delete(): + publisher_client = pubsub_v1.PublisherClient() + subscriber_client = pubsub_v1.SubscriberClient() + resources = [TOPIC, SUBSCRIPTION_TWO, SUBSCRIPTION_THREE] + + for item in resources: + if "subscription-test-topic" in item: + publisher_client.delete_topic( + "projects/{}/topics/{}".format(PROJECT, item) + ) + if "subscription-test-subscription" in item: + subscriber_client.delete_subscription( + "projects/{}/subscriptions/{}".format(PROJECT, item) + ) def test_receive(publisher_client, topic, subscription_two, capsys): _publish_messages(publisher_client, topic) with _make_sleep_patch(): - with pytest.raises(RuntimeError, match='sigil'): + with pytest.raises(RuntimeError, match="sigil"): subscriber.receive_messages(PROJECT, SUBSCRIPTION_TWO) out, _ = capsys.readouterr() - assert 'Listening' in out + assert "Listening" in out assert subscription_two in out - assert 'Message 1' in out + assert "Message" in out def test_receive_with_custom_attributes( - publisher_client, topic, subscription_two, capsys): + publisher_client, topic, subscription_two, capsys +): _publish_messages_with_custom_attributes(publisher_client, topic) with _make_sleep_patch(): - with pytest.raises(RuntimeError, match='sigil'): + with pytest.raises(RuntimeError, match="sigil"): subscriber.receive_messages_with_custom_attributes( - PROJECT, SUBSCRIPTION_TWO) + PROJECT, SUBSCRIPTION_TWO + ) out, _ = capsys.readouterr() - assert 'Test message' in out - assert 'origin' in out - assert 'python-sample' in out + assert "Test message" in out + assert "origin" in out + assert "python-sample" in out def test_receive_with_flow_control( - publisher_client, topic, subscription_two, capsys): + publisher_client, topic, subscription_two, capsys +): _publish_messages(publisher_client, topic) with _make_sleep_patch(): - with pytest.raises(RuntimeError, match='sigil'): + with pytest.raises(RuntimeError, match="sigil"): subscriber.receive_messages_with_flow_control( - PROJECT, SUBSCRIPTION_TWO) + PROJECT, SUBSCRIPTION_TWO + ) out, _ = capsys.readouterr() - assert 'Listening' in out + assert "Listening" in out assert subscription_two in out - assert 'Message 1' in out + assert "Message" in out def test_receive_synchronously( - publisher_client, topic, subscription_three, capsys): + publisher_client, topic, subscription_three, capsys +): _publish_messages(publisher_client, topic) subscriber.synchronous_pull(PROJECT, SUBSCRIPTION_THREE) out, _ = capsys.readouterr() - assert 'Done.' in out + assert "Done." in out def test_receive_synchronously_with_lease( - publisher_client, topic, subscription_three, capsys): + publisher_client, topic, subscription_three, capsys +): _publish_messages(publisher_client, topic) subscriber.synchronous_pull_with_lease_management( - PROJECT, SUBSCRIPTION_THREE) + PROJECT, SUBSCRIPTION_THREE + ) out, _ = capsys.readouterr() - assert 'Done.' in out + assert "Done." in out + + # Clean up resources after all the tests. + _to_delete()