From b1915807726c846f78bd7fa7c87aa2be6600b471 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Tue, 10 Dec 2019 16:31:20 -0800 Subject: [PATCH] Pub/Sub: update how to test with mock (#2555) * Update test with mock * Clean up resources after tests * Use unique resource names avoid test failures * Delete subscriptions in cleanup phase * Ensure unique topic name * Update assert to remove bytestring notation * Rewrite PubSubToGCS test using dataflow testing module --- pubsub/cloud-client/iam.py | 115 ++++---- pubsub/cloud-client/iam_test.py | 33 ++- pubsub/cloud-client/publisher.py | 167 ++++++----- pubsub/cloud-client/publisher_test.py | 31 +- pubsub/cloud-client/quickstart.py | 39 +-- pubsub/cloud-client/quickstart/pub.py | 26 +- pubsub/cloud-client/quickstart/pub_test.py | 24 +- pubsub/cloud-client/quickstart/sub.py | 27 +- pubsub/cloud-client/quickstart/sub_test.py | 82 +++--- pubsub/cloud-client/quickstart_test.py | 23 +- pubsub/cloud-client/subscriber.py | 269 ++++++++++-------- pubsub/cloud-client/subscriber_test.py | 131 ++++++--- pubsub/streaming-analytics/PubSubToGCS.py | 88 +++--- .../streaming-analytics/PubSubToGCS_test.py | 116 +++----- pubsub/streaming-analytics/requirements.txt | 2 +- 15 files changed, 635 insertions(+), 538 deletions(-) diff --git a/pubsub/cloud-client/iam.py b/pubsub/cloud-client/iam.py index f9865ed3934e..f014ce749022 100644 --- a/pubsub/cloud-client/iam.py +++ b/pubsub/cloud-client/iam.py @@ -34,9 +34,9 @@ def get_topic_policy(project, topic_name): policy = client.get_iam_policy(topic_path) - print('Policy for topic {}:'.format(topic_path)) + print("Policy for topic {}:".format(topic_path)) for binding in policy.bindings: - print('Role: {}, Members: {}'.format(binding.role, binding.members)) + print("Role: {}, Members: {}".format(binding.role, binding.members)) # [END pubsub_get_topic_policy] @@ -48,9 +48,9 @@ def get_subscription_policy(project, subscription_name): policy = client.get_iam_policy(subscription_path) - print('Policy for subscription {}:'.format(subscription_path)) + print("Policy for subscription {}:".format(subscription_path)) for binding in policy.bindings: - print('Role: {}, Members: {}'.format(binding.role, binding.members)) + print("Role: {}, Members: {}".format(binding.role, binding.members)) # [END pubsub_get_subscription_policy] @@ -63,20 +63,17 @@ def set_topic_policy(project, topic_name): policy = client.get_iam_policy(topic_path) # Add all users as viewers. - policy.bindings.add( - role='roles/pubsub.viewer', - members=['allUsers']) + policy.bindings.add(role="roles/pubsub.viewer", members=["allUsers"]) # Add a group as a publisher. policy.bindings.add( - role='roles/pubsub.publisher', - members=['group:cloud-logs@google.com']) + role="roles/pubsub.publisher", members=["group:cloud-logs@google.com"] + ) # Set the policy policy = client.set_iam_policy(topic_path, policy) - print('IAM policy for topic {} set: {}'.format( - topic_name, policy)) + print("IAM policy for topic {} set: {}".format(topic_name, policy)) # [END pubsub_set_topic_policy] @@ -89,20 +86,21 @@ def set_subscription_policy(project, subscription_name): policy = client.get_iam_policy(subscription_path) # Add all users as viewers. - policy.bindings.add( - role='roles/pubsub.viewer', - members=['allUsers']) + policy.bindings.add(role="roles/pubsub.viewer", members=["allUsers"]) # Add a group as an editor. policy.bindings.add( - role='roles/editor', - members=['group:cloud-logs@google.com']) + role="roles/editor", members=["group:cloud-logs@google.com"] + ) # Set the policy policy = client.set_iam_policy(subscription_path, policy) - print('IAM policy for subscription {} set: {}'.format( - subscription_name, policy)) + print( + "IAM policy for subscription {} set: {}".format( + subscription_name, policy + ) + ) # [END pubsub_set_subscription_policy] @@ -112,16 +110,17 @@ def check_topic_permissions(project, topic_name): client = pubsub_v1.PublisherClient() topic_path = client.topic_path(project, topic_name) - permissions_to_check = [ - 'pubsub.topics.publish', - 'pubsub.topics.update' - ] + permissions_to_check = ["pubsub.topics.publish", "pubsub.topics.update"] allowed_permissions = client.test_iam_permissions( - topic_path, permissions_to_check) + topic_path, permissions_to_check + ) - print('Allowed permissions for topic {}: {}'.format( - topic_path, allowed_permissions)) + print( + "Allowed permissions for topic {}: {}".format( + topic_path, allowed_permissions + ) + ) # [END pubsub_test_topic_permissions] @@ -132,63 +131,73 @@ def check_subscription_permissions(project, subscription_name): subscription_path = client.subscription_path(project, subscription_name) permissions_to_check = [ - 'pubsub.subscriptions.consume', - 'pubsub.subscriptions.update' + "pubsub.subscriptions.consume", + "pubsub.subscriptions.update", ] allowed_permissions = client.test_iam_permissions( - subscription_path, permissions_to_check) + subscription_path, permissions_to_check + ) - print('Allowed permissions for subscription {}: {}'.format( - subscription_path, allowed_permissions)) + print( + "Allowed permissions for subscription {}: {}".format( + subscription_path, allowed_permissions + ) + ) # [END pubsub_test_subscription_permissions] -if __name__ == '__main__': +if __name__ == "__main__": parser = argparse.ArgumentParser( description=__doc__, - formatter_class=argparse.RawDescriptionHelpFormatter + formatter_class=argparse.RawDescriptionHelpFormatter, ) - parser.add_argument('project', help='Your Google Cloud project ID') + parser.add_argument("project", help="Your Google Cloud project ID") - subparsers = parser.add_subparsers(dest='command') + subparsers = parser.add_subparsers(dest="command") get_topic_policy_parser = subparsers.add_parser( - 'get-topic-policy', help=get_topic_policy.__doc__) - get_topic_policy_parser.add_argument('topic_name') + "get-topic-policy", help=get_topic_policy.__doc__ + ) + get_topic_policy_parser.add_argument("topic_name") get_subscription_policy_parser = subparsers.add_parser( - 'get-subscription-policy', help=get_subscription_policy.__doc__) - get_subscription_policy_parser.add_argument('subscription_name') + "get-subscription-policy", help=get_subscription_policy.__doc__ + ) + get_subscription_policy_parser.add_argument("subscription_name") set_topic_policy_parser = subparsers.add_parser( - 'set-topic-policy', help=set_topic_policy.__doc__) - set_topic_policy_parser.add_argument('topic_name') + "set-topic-policy", help=set_topic_policy.__doc__ + ) + set_topic_policy_parser.add_argument("topic_name") set_subscription_policy_parser = subparsers.add_parser( - 'set-subscription-policy', help=set_subscription_policy.__doc__) - set_subscription_policy_parser.add_argument('subscription_name') + "set-subscription-policy", help=set_subscription_policy.__doc__ + ) + set_subscription_policy_parser.add_argument("subscription_name") check_topic_permissions_parser = subparsers.add_parser( - 'check-topic-permissions', help=check_topic_permissions.__doc__) - check_topic_permissions_parser.add_argument('topic_name') + "check-topic-permissions", help=check_topic_permissions.__doc__ + ) + check_topic_permissions_parser.add_argument("topic_name") check_subscription_permissions_parser = subparsers.add_parser( - 'check-subscription-permissions', - help=check_subscription_permissions.__doc__) - check_subscription_permissions_parser.add_argument('subscription_name') + "check-subscription-permissions", + help=check_subscription_permissions.__doc__, + ) + check_subscription_permissions_parser.add_argument("subscription_name") args = parser.parse_args() - if args.command == 'get-topic-policy': + if args.command == "get-topic-policy": get_topic_policy(args.project, args.topic_name) - elif args.command == 'get-subscription-policy': + elif args.command == "get-subscription-policy": get_subscription_policy(args.project, args.subscription_name) - elif args.command == 'set-topic-policy': + elif args.command == "set-topic-policy": set_topic_policy(args.project, args.topic_name) - elif args.command == 'set-subscription-policy': + elif args.command == "set-subscription-policy": set_subscription_policy(args.project, args.subscription_name) - elif args.command == 'check-topic-permissions': + elif args.command == "check-topic-permissions": check_topic_permissions(args.project, args.topic_name) - elif args.command == 'check-subscription-permissions': + elif args.command == "check-subscription-permissions": check_subscription_permissions(args.project, args.subscription_name) diff --git a/pubsub/cloud-client/iam_test.py b/pubsub/cloud-client/iam_test.py index 8a524c35a061..2b019f9ea16f 100644 --- a/pubsub/cloud-client/iam_test.py +++ b/pubsub/cloud-client/iam_test.py @@ -13,23 +13,25 @@ # limitations under the License. import os +import uuid from google.cloud import pubsub_v1 import pytest import iam -PROJECT = os.environ['GCLOUD_PROJECT'] -TOPIC = 'iam-test-topic' -SUBSCRIPTION = 'iam-test-subscription' +UUID = uuid.uuid4().hex +PROJECT = os.environ["GCLOUD_PROJECT"] +TOPIC = "iam-test-topic-" + UUID +SUBSCRIPTION = "iam-test-subscription-" + UUID -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def publisher_client(): yield pubsub_v1.PublisherClient() -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def topic(publisher_client): topic_path = publisher_client.topic_path(PROJECT, TOPIC) @@ -42,8 +44,10 @@ def topic(publisher_client): yield topic_path + publisher_client.delete_topic(topic_path) -@pytest.fixture(scope='module') + +@pytest.fixture(scope="module") def subscriber_client(): yield pubsub_v1.SubscriberClient() @@ -51,7 +55,8 @@ def subscriber_client(): @pytest.fixture def subscription(subscriber_client, topic): subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION) + PROJECT, SUBSCRIPTION + ) try: subscriber_client.delete_subscription(subscription_path) @@ -62,6 +67,8 @@ def subscription(subscriber_client, topic): yield subscription_path + subscriber_client.delete_subscription(subscription_path) + def test_get_topic_policy(topic, capsys): iam.get_topic_policy(PROJECT, TOPIC) @@ -81,16 +88,16 @@ def test_set_topic_policy(publisher_client, topic): iam.set_topic_policy(PROJECT, TOPIC) policy = publisher_client.get_iam_policy(topic) - assert 'roles/pubsub.publisher' in str(policy) - assert 'allUsers' in str(policy) + assert "roles/pubsub.publisher" in str(policy) + assert "allUsers" in str(policy) def test_set_subscription_policy(subscriber_client, subscription): iam.set_subscription_policy(PROJECT, SUBSCRIPTION) policy = subscriber_client.get_iam_policy(subscription) - assert 'roles/pubsub.viewer' in str(policy) - assert 'allUsers' in str(policy) + assert "roles/pubsub.viewer" in str(policy) + assert "allUsers" in str(policy) def test_check_topic_permissions(topic, capsys): @@ -99,7 +106,7 @@ def test_check_topic_permissions(topic, capsys): out, _ = capsys.readouterr() assert topic in out - assert 'pubsub.topics.publish' in out + assert "pubsub.topics.publish" in out def test_check_subscription_permissions(subscription, capsys): @@ -108,4 +115,4 @@ def test_check_subscription_permissions(subscription, capsys): out, _ = capsys.readouterr() assert subscription in out - assert 'pubsub.subscriptions.consume' in out + assert "pubsub.subscriptions.consume" in out diff --git a/pubsub/cloud-client/publisher.py b/pubsub/cloud-client/publisher.py index 490c903b2c1b..d227baab9584 100644 --- a/pubsub/cloud-client/publisher.py +++ b/pubsub/cloud-client/publisher.py @@ -53,7 +53,7 @@ def create_topic(project_id, topic_name): topic = publisher.create_topic(topic_path) - print('Topic created: {}'.format(topic)) + print("Topic created: {}".format(topic)) # [END pubsub_quickstart_create_topic] # [END pubsub_create_topic] @@ -71,7 +71,7 @@ def delete_topic(project_id, topic_name): publisher.delete_topic(topic_path) - print('Topic deleted: {}'.format(topic_path)) + print("Topic deleted: {}".format(topic_path)) # [END pubsub_delete_topic] @@ -90,14 +90,14 @@ def publish_messages(project_id, topic_name): topic_path = publisher.topic_path(project_id, topic_name) for n in range(1, 10): - data = u'Message number {}'.format(n) + data = u"Message number {}".format(n) # Data must be a bytestring - data = data.encode('utf-8') + data = data.encode("utf-8") # When you publish a message, the client returns a future. future = publisher.publish(topic_path, data=data) print(future.result()) - print('Published messages.') + print("Published messages.") # [END pubsub_quickstart_publisher] # [END pubsub_publish] @@ -115,16 +115,16 @@ def publish_messages_with_custom_attributes(project_id, topic_name): topic_path = publisher.topic_path(project_id, topic_name) for n in range(1, 10): - data = u'Message number {}'.format(n) + data = u"Message number {}".format(n) # Data must be a bytestring - data = data.encode('utf-8') + data = data.encode("utf-8") # Add two attributes, origin and username, to the message future = publisher.publish( - topic_path, data, origin='python-sample', username='gcp' + topic_path, data, origin="python-sample", username="gcp" ) print(future.result()) - print('Published messages with custom attributes.') + print("Published messages with custom attributes.") # [END pubsub_publish_custom_attributes] @@ -141,14 +141,14 @@ def publish_messages_with_futures(project_id, topic_name): topic_path = publisher.topic_path(project_id, topic_name) for n in range(1, 10): - data = u'Message number {}'.format(n) + data = u"Message number {}".format(n) # Data must be a bytestring - data = data.encode('utf-8') + data = data.encode("utf-8") # When you publish a message, the client returns a future. future = publisher.publish(topic_path, data=data) print(future.result()) - print('Published messages with futures.') + print("Published messages with futures.") # [END pubsub_publisher_concurrency_control] @@ -173,7 +173,7 @@ def callback(f): print(f.result()) futures.pop(data) except: # noqa - print('Please handle {} for {}.'.format(f.exception(), data)) + print("Please handle {} for {}.".format(f.exception(), data)) return callback @@ -182,7 +182,7 @@ def callback(f): futures.update({data: None}) # When you publish a message, the client returns a future. future = publisher.publish( - topic_path, data=data.encode('utf-8') # data must be a bytestring. + topic_path, data=data.encode("utf-8") # data must be a bytestring. ) futures[data] = future # Publish failures shall be handled in the callback function. @@ -192,7 +192,7 @@ def callback(f): while futures: time.sleep(5) - print('Published message with error handler.') + print("Published message with error handler.") # [END pubsub_publish_messages_error_handler] @@ -207,20 +207,19 @@ def publish_messages_with_batch_settings(project_id, topic_name): # Configure the batch to publish as soon as there is one kilobyte # of data or one second has passed. batch_settings = pubsub_v1.types.BatchSettings( - max_bytes=1024, # One kilobyte - max_latency=1, # One second + max_bytes=1024, max_latency=1 # One kilobyte # One second ) publisher = pubsub_v1.PublisherClient(batch_settings) topic_path = publisher.topic_path(project_id, topic_name) for n in range(1, 10): - data = u'Message number {}'.format(n) + data = u"Message number {}".format(n) # Data must be a bytestring - data = data.encode('utf-8') + data = data.encode("utf-8") future = publisher.publish(topic_path, data=data) print(future.result()) - print('Published messages with batch settings.') + print("Published messages with batch settings.") # [END pubsub_publisher_batch_settings] @@ -234,34 +233,34 @@ def publish_messages_with_retry_settings(project_id, topic_name): # Configure the retry settings. Defaults will be overwritten. retry_settings = { - 'interfaces': { - 'google.pubsub.v1.Publisher': { - 'retry_codes': { - 'publish': [ - 'ABORTED', - 'CANCELLED', - 'DEADLINE_EXCEEDED', - 'INTERNAL', - 'RESOURCE_EXHAUSTED', - 'UNAVAILABLE', - 'UNKNOWN', + "interfaces": { + "google.pubsub.v1.Publisher": { + "retry_codes": { + "publish": [ + "ABORTED", + "CANCELLED", + "DEADLINE_EXCEEDED", + "INTERNAL", + "RESOURCE_EXHAUSTED", + "UNAVAILABLE", + "UNKNOWN", ] }, - 'retry_params': { - 'messaging': { - 'initial_retry_delay_millis': 100, # default: 100 - 'retry_delay_multiplier': 1.3, # default: 1.3 - 'max_retry_delay_millis': 60000, # default: 60000 - 'initial_rpc_timeout_millis': 5000, # default: 25000 - 'rpc_timeout_multiplier': 1.0, # default: 1.0 - 'max_rpc_timeout_millis': 600000, # default: 30000 - 'total_timeout_millis': 600000, # default: 600000 + "retry_params": { + "messaging": { + "initial_retry_delay_millis": 100, # default: 100 + "retry_delay_multiplier": 1.3, # default: 1.3 + "max_retry_delay_millis": 60000, # default: 60000 + "initial_rpc_timeout_millis": 5000, # default: 25000 + "rpc_timeout_multiplier": 1.0, # default: 1.0 + "max_rpc_timeout_millis": 600000, # default: 30000 + "total_timeout_millis": 600000, # default: 600000 } }, - 'methods': { - 'Publish': { - 'retry_codes_name': 'publish', - 'retry_params_name': 'messaging', + "methods": { + "Publish": { + "retry_codes_name": "publish", + "retry_params_name": "messaging", } }, } @@ -272,85 +271,85 @@ def publish_messages_with_retry_settings(project_id, topic_name): topic_path = publisher.topic_path(project_id, topic_name) for n in range(1, 10): - data = u'Message number {}'.format(n) + data = u"Message number {}".format(n) # Data must be a bytestring - data = data.encode('utf-8') + data = data.encode("utf-8") future = publisher.publish(topic_path, data=data) print(future.result()) - print('Published messages with retry settings.') + print("Published messages with retry settings.") # [END pubsub_publisher_retry_settings] if __name__ == "__main__": parser = argparse.ArgumentParser( description=__doc__, - formatter_class=argparse.RawDescriptionHelpFormatter + formatter_class=argparse.RawDescriptionHelpFormatter, ) - parser.add_argument('project_id', help='Your Google Cloud project ID') + parser.add_argument("project_id", help="Your Google Cloud project ID") - subparsers = parser.add_subparsers(dest='command') - subparsers.add_parser('list', help=list_topics.__doc__) + subparsers = parser.add_subparsers(dest="command") + subparsers.add_parser("list", help=list_topics.__doc__) - create_parser = subparsers.add_parser('create', - help=create_topic.__doc__) - create_parser.add_argument('topic_name') + create_parser = subparsers.add_parser("create", help=create_topic.__doc__) + create_parser.add_argument("topic_name") - delete_parser = subparsers.add_parser('delete', - help=delete_topic.__doc__) - delete_parser.add_argument('topic_name') + delete_parser = subparsers.add_parser("delete", help=delete_topic.__doc__) + delete_parser.add_argument("topic_name") - publish_parser = subparsers.add_parser('publish', - help=publish_messages.__doc__) - publish_parser.add_argument('topic_name') + publish_parser = subparsers.add_parser( + "publish", help=publish_messages.__doc__ + ) + publish_parser.add_argument("topic_name") publish_with_custom_attributes_parser = subparsers.add_parser( - 'publish-with-custom-attributes', + "publish-with-custom-attributes", help=publish_messages_with_custom_attributes.__doc__, ) - publish_with_custom_attributes_parser.add_argument('topic_name') + publish_with_custom_attributes_parser.add_argument("topic_name") publish_with_futures_parser = subparsers.add_parser( - 'publish-with-futures', help=publish_messages_with_futures.__doc__ + "publish-with-futures", help=publish_messages_with_futures.__doc__ ) - publish_with_futures_parser.add_argument('topic_name') + publish_with_futures_parser.add_argument("topic_name") publish_with_error_handler_parser = subparsers.add_parser( - 'publish-with-error-handler', - help=publish_messages_with_error_handler.__doc__ + "publish-with-error-handler", + help=publish_messages_with_error_handler.__doc__, ) - publish_with_error_handler_parser.add_argument('topic_name') + publish_with_error_handler_parser.add_argument("topic_name") publish_with_batch_settings_parser = subparsers.add_parser( - 'publish-with-batch-settings', - help=publish_messages_with_batch_settings.__doc__ + "publish-with-batch-settings", + help=publish_messages_with_batch_settings.__doc__, ) - publish_with_batch_settings_parser.add_argument('topic_name') + publish_with_batch_settings_parser.add_argument("topic_name") publish_with_retry_settings_parser = subparsers.add_parser( - 'publish-with-retry-settings', - help=publish_messages_with_retry_settings.__doc__ + "publish-with-retry-settings", + help=publish_messages_with_retry_settings.__doc__, ) - publish_with_retry_settings_parser.add_argument('topic_name') + publish_with_retry_settings_parser.add_argument("topic_name") args = parser.parse_args() - if args.command == 'list': + if args.command == "list": list_topics(args.project_id) - elif args.command == 'create': + elif args.command == "create": create_topic(args.project_id, args.topic_name) - elif args.command == 'delete': + elif args.command == "delete": delete_topic(args.project_id, args.topic_name) - elif args.command == 'publish': + elif args.command == "publish": publish_messages(args.project_id, args.topic_name) - elif args.command == 'publish-with-custom-attributes': - publish_messages_with_custom_attributes(args.project_id, - args.topic_name) - elif args.command == 'publish-with-futures': + elif args.command == "publish-with-custom-attributes": + publish_messages_with_custom_attributes( + args.project_id, args.topic_name + ) + elif args.command == "publish-with-futures": publish_messages_with_futures(args.project_id, args.topic_name) - elif args.command == 'publish-with-error-handler': + elif args.command == "publish-with-error-handler": publish_messages_with_error_handler(args.project_id, args.topic_name) - elif args.command == 'publish-with-batch-settings': + elif args.command == "publish-with-batch-settings": publish_messages_with_batch_settings(args.project_id, args.topic_name) - elif args.command == 'publish-with-retry-settings': + elif args.command == "publish-with-retry-settings": publish_messages_with_retry_settings(args.project_id, args.topic_name) diff --git a/pubsub/cloud-client/publisher_test.py b/pubsub/cloud-client/publisher_test.py index 5e550abd641d..125fae3c06b9 100644 --- a/pubsub/cloud-client/publisher_test.py +++ b/pubsub/cloud-client/publisher_test.py @@ -14,6 +14,7 @@ import os import time +import uuid from gcp_devrel.testing import eventually_consistent from google.cloud import pubsub_v1 @@ -22,8 +23,9 @@ import publisher -PROJECT = os.environ['GCLOUD_PROJECT'] -TOPIC = 'publisher-test-topic' +UUID = uuid.uuid4().hex +PROJECT = os.environ["GCLOUD_PROJECT"] +TOPIC = "publisher-test-topic-" + UUID @pytest.fixture @@ -49,11 +51,18 @@ def _make_sleep_patch(): def new_sleep(period): if period == 60: real_sleep(5) - raise RuntimeError('sigil') + raise RuntimeError("sigil") else: real_sleep(period) - return mock.patch('time.sleep', new=new_sleep) + return mock.patch("time.sleep", new=new_sleep) + + +def _to_delete(): + publisher_client = pubsub_v1.PublisherClient() + publisher_client.delete_topic( + "projects/{}/topics/{}".format(PROJECT, TOPIC) + ) def test_list(client, topic, capsys): @@ -91,39 +100,41 @@ def test_publish(topic, capsys): publisher.publish_messages(PROJECT, TOPIC) out, _ = capsys.readouterr() - assert 'Published' in out + assert "Published" in out def test_publish_with_custom_attributes(topic, capsys): publisher.publish_messages_with_custom_attributes(PROJECT, TOPIC) out, _ = capsys.readouterr() - assert 'Published' in out + assert "Published" in out def test_publish_with_batch_settings(topic, capsys): publisher.publish_messages_with_batch_settings(PROJECT, TOPIC) out, _ = capsys.readouterr() - assert 'Published' in out + assert "Published" in out def test_publish_with_retry_settings(topic, capsys): publisher.publish_messages_with_retry_settings(PROJECT, TOPIC) out, _ = capsys.readouterr() - assert 'Published' in out + assert "Published" in out def test_publish_with_error_handler(topic, capsys): publisher.publish_messages_with_error_handler(PROJECT, TOPIC) out, _ = capsys.readouterr() - assert 'Published' in out + assert "Published" in out def test_publish_with_futures(topic, capsys): publisher.publish_messages_with_futures(PROJECT, TOPIC) out, _ = capsys.readouterr() - assert 'Published' in out + assert "Published" in out + + _to_delete() diff --git a/pubsub/cloud-client/quickstart.py b/pubsub/cloud-client/quickstart.py index f48d085e06b5..d01105885cb8 100644 --- a/pubsub/cloud-client/quickstart.py +++ b/pubsub/cloud-client/quickstart.py @@ -39,34 +39,36 @@ def end_to_end(project_id, topic_name, subscription_name, num_messages): # The `subscription_path` method creates a fully qualified identifier # in the form `projects/{project_id}/subscriptions/{subscription_name}` subscription_path = subscriber.subscription_path( - project_id, subscription_name) + project_id, subscription_name + ) # Create the topic. topic = publisher.create_topic(topic_path) - print('\nTopic created: {}'.format(topic.name)) + print("\nTopic created: {}".format(topic.name)) # Create a subscription. subscription = subscriber.create_subscription( - subscription_path, topic_path) - print('\nSubscription created: {}\n'.format(subscription.name)) + subscription_path, topic_path + ) + print("\nSubscription created: {}\n".format(subscription.name)) publish_begin = time.time() # Publish messages. for n in range(num_messages): - data = u'Message number {}'.format(n) + data = u"Message number {}".format(n) # Data must be a bytestring - data = data.encode('utf-8') + data = data.encode("utf-8") # When you publish a message, the client returns a future. future = publisher.publish(topic_path, data=data) - print('Published {} of message ID {}.'.format(data, future.result())) + print("Published {} of message ID {}.".format(data, future.result())) publish_time = time.time() - publish_begin messages = set() def callback(message): - print('Received message: {}'.format(message)) + print("Received message: {}".format(message)) # Unacknowledged messages will be sent again. message.ack() messages.add(message) @@ -76,7 +78,7 @@ def callback(message): # Receive messages. The subscriber is nonblocking. subscriber.subscribe(subscription_path, callback=callback) - print('\nListening for messages on {}...\n'.format(subscription_path)) + print("\nListening for messages on {}...\n".format(subscription_path)) while True: if len(messages) == num_messages: @@ -87,22 +89,23 @@ def callback(message): break else: # Sleeps the thread at 50Hz to save on resources. - time.sleep(1. / 50) + time.sleep(1.0 / 50) # [END pubsub_end_to_end] -if __name__ == '__main__': +if __name__ == "__main__": parser = argparse.ArgumentParser( description=__doc__, - formatter_class=argparse.RawDescriptionHelpFormatter + formatter_class=argparse.RawDescriptionHelpFormatter, ) - parser.add_argument('project_id', help='Your Google Cloud project ID') - parser.add_argument('topic_name', help='Your topic name') - parser.add_argument('subscription_name', help='Your subscription name') - parser.add_argument('num_msgs', type=int, help='Number of test messages') + parser.add_argument("project_id", help="Your Google Cloud project ID") + parser.add_argument("topic_name", help="Your topic name") + parser.add_argument("subscription_name", help="Your subscription name") + parser.add_argument("num_msgs", type=int, help="Number of test messages") args = parser.parse_args() - end_to_end(args.project_id, args.topic_name, args.subscription_name, - args.num_msgs) + end_to_end( + args.project_id, args.topic_name, args.subscription_name, args.num_msgs + ) diff --git a/pubsub/cloud-client/quickstart/pub.py b/pubsub/cloud-client/quickstart/pub.py index e340eb4f36ec..a3f8087ecd15 100644 --- a/pubsub/cloud-client/quickstart/pub.py +++ b/pubsub/cloud-client/quickstart/pub.py @@ -17,22 +17,32 @@ # [START pubsub_quickstart_pub_all] import argparse import time + # [START pubsub_quickstart_pub_deps] from google.cloud import pubsub_v1 + # [END pubsub_quickstart_pub_deps] def get_callback(api_future, data, ref): """Wrap message data in the context of the callback function.""" + def callback(api_future): try: - print("Published message {} now has message ID {}".format( - data, api_future.result())) + print( + "Published message {} now has message ID {}".format( + data, api_future.result() + ) + ) ref["num_messages"] += 1 except Exception: - print("A problem occurred when publishing {}: {}\n".format( - data, api_future.exception())) + print( + "A problem occurred when publishing {}: {}\n".format( + data, api_future.exception() + ) + ) raise + return callback @@ -63,13 +73,13 @@ def pub(project_id, topic_name): print("Published {} message(s).".format(ref["num_messages"])) -if __name__ == '__main__': +if __name__ == "__main__": parser = argparse.ArgumentParser( description=__doc__, - formatter_class=argparse.RawDescriptionHelpFormatter + formatter_class=argparse.RawDescriptionHelpFormatter, ) - parser.add_argument('project_id', help='Google Cloud project ID') - parser.add_argument('topic_name', help='Pub/Sub topic name') + parser.add_argument("project_id", help="Google Cloud project ID") + parser.add_argument("topic_name", help="Pub/Sub topic name") args = parser.parse_args() diff --git a/pubsub/cloud-client/quickstart/pub_test.py b/pubsub/cloud-client/quickstart/pub_test.py index 09443364a3f6..b9a6f807f37d 100644 --- a/pubsub/cloud-client/quickstart/pub_test.py +++ b/pubsub/cloud-client/quickstart/pub_test.py @@ -16,22 +16,24 @@ import os import pytest +import uuid from google.api_core.exceptions import AlreadyExists from google.cloud import pubsub_v1 import pub -PROJECT = os.environ['GCLOUD_PROJECT'] -TOPIC = 'quickstart-pub-test-topic' +UUID = uuid.uuid4().hex +PROJECT = os.environ["GCLOUD_PROJECT"] +TOPIC = "quickstart-pub-test-topic-" + UUID -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def publisher_client(): yield pubsub_v1.PublisherClient() -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def topic(publisher_client): topic_path = publisher_client.topic_path(PROJECT, TOPIC) @@ -42,20 +44,12 @@ def topic(publisher_client): yield TOPIC + publisher_client.delete_topic(topic_path) -@pytest.fixture -def to_delete(publisher_client): - doomed = [] - yield doomed - for item in doomed: - publisher_client.delete_topic(item) - -def test_pub(publisher_client, topic, to_delete, capsys): +def test_pub(publisher_client, topic, capsys): pub.pub(PROJECT, topic) - to_delete.append('projects/{}/topics/{}'.format(PROJECT, TOPIC)) - out, _ = capsys.readouterr() - assert "Published message b'Hello, World!'" in out + assert "Hello, World!" in out diff --git a/pubsub/cloud-client/quickstart/sub.py b/pubsub/cloud-client/quickstart/sub.py index e39f14105b1a..5791af14d799 100644 --- a/pubsub/cloud-client/quickstart/sub.py +++ b/pubsub/cloud-client/quickstart/sub.py @@ -16,8 +16,10 @@ # [START pubsub_quickstart_sub_all] import argparse + # [START pubsub_quickstart_sub_deps] from google.cloud import pubsub_v1 + # [END pubsub_quickstart_sub_deps] @@ -29,19 +31,22 @@ def sub(project_id, subscription_name): # [END pubsub_quickstart_sub_client] # Create a fully qualified identifier in the form of # `projects/{project_id}/subscriptions/{subscription_name}` - subscription_path = client.subscription_path( - project_id, subscription_name) + subscription_path = client.subscription_path(project_id, subscription_name) def callback(message): - print('Received message {} of message ID {}\n'.format( - message, message.message_id)) + print( + "Received message {} of message ID {}\n".format( + message, message.message_id + ) + ) # Acknowledge the message. Unack'ed messages will be redelivered. message.ack() - print('Acknowledged message {}\n'.format(message.message_id)) + print("Acknowledged message {}\n".format(message.message_id)) streaming_pull_future = client.subscribe( - subscription_path, callback=callback) - print('Listening for messages on {}..\n'.format(subscription_path)) + subscription_path, callback=callback + ) + print("Listening for messages on {}..\n".format(subscription_path)) # Calling result() on StreamingPullFuture keeps the main thread from # exiting while messages get processed in the callbacks. @@ -51,13 +56,13 @@ def callback(message): streaming_pull_future.cancel() -if __name__ == '__main__': +if __name__ == "__main__": parser = argparse.ArgumentParser( description=__doc__, - formatter_class=argparse.RawDescriptionHelpFormatter + formatter_class=argparse.RawDescriptionHelpFormatter, ) - parser.add_argument('project_id', help='Google Cloud project ID') - parser.add_argument('subscription_name', help='Pub/Sub subscription name') + parser.add_argument("project_id", help="Google Cloud project ID") + parser.add_argument("subscription_name", help="Pub/Sub subscription name") args = parser.parse_args() diff --git a/pubsub/cloud-client/quickstart/sub_test.py b/pubsub/cloud-client/quickstart/sub_test.py index 476139a02642..07edfad7c4d2 100644 --- a/pubsub/cloud-client/quickstart/sub_test.py +++ b/pubsub/cloud-client/quickstart/sub_test.py @@ -14,8 +14,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import mock import os import pytest +import uuid from google.api_core.exceptions import AlreadyExists from google.cloud import pubsub_v1 @@ -23,84 +25,80 @@ import sub -PROJECT = os.environ['GCLOUD_PROJECT'] -TOPIC = 'quickstart-sub-test-topic' -SUBSCRIPTION = 'quickstart-sub-test-topic-sub' +UUID = uuid.uuid4().hex +PROJECT = os.environ["GCLOUD_PROJECT"] +TOPIC = "quickstart-sub-test-topic-" + UUID +SUBSCRIPTION = "quickstart-sub-test-topic-sub-" + UUID publisher_client = pubsub_v1.PublisherClient() subscriber_client = pubsub_v1.SubscriberClient() -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def topic_path(): topic_path = publisher_client.topic_path(PROJECT, TOPIC) try: topic = publisher_client.create_topic(topic_path) - return topic.name + yield topic.name except AlreadyExists: - return topic_path + yield topic_path + publisher_client.delete_topic(topic_path) -@pytest.fixture(scope='module') + +@pytest.fixture(scope="module") def subscription_path(topic_path): subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION) + PROJECT, SUBSCRIPTION + ) try: subscription = subscriber_client.create_subscription( - subscription_path, topic_path) - return subscription.name + subscription_path, topic_path + ) + yield subscription.name except AlreadyExists: - return subscription_path - + yield subscription_path -def _to_delete(resource_paths): - for item in resource_paths: - if 'topics' in item: - publisher_client.delete_topic(item) - if 'subscriptions' in item: - subscriber_client.delete_subscription(item) + subscriber_client.delete_subscription(subscription_path) def _publish_messages(topic_path): - publish_future = publisher_client.publish(topic_path, data=b'Hello World!') + publish_future = publisher_client.publish(topic_path, data=b"Hello World!") publish_future.result() -def _sub_timeout(project_id, subscription_name): - # This is an exactly copy of `sub.py` except - # StreamingPullFuture.result() will time out after 10s. - client = pubsub_v1.SubscriberClient() - subscription_path = client.subscription_path( - project_id, subscription_name) +def test_sub(monkeypatch, topic_path, subscription_path, capsys): - def callback(message): - print('Received message {} of message ID {}\n'.format( - message, message.message_id)) - message.ack() - print('Acknowledged message {}\n'.format(message.message_id)) + real_client = pubsub_v1.SubscriberClient() + mock_client = mock.Mock(spec=pubsub_v1.SubscriberClient, wraps=real_client) - streaming_pull_future = client.subscribe( - subscription_path, callback=callback) - print('Listening for messages on {}..\n'.format(subscription_path)) + # Attributes on mock_client_constructor uses the corresponding + # attributes on pubsub_v1.SubscriberClient. + mock_client_constructor = mock.create_autospec(pubsub_v1.SubscriberClient) + mock_client_constructor.return_value = mock_client - try: - streaming_pull_future.result(timeout=10) - except: # noqa - streaming_pull_future.cancel() + monkeypatch.setattr(pubsub_v1, "SubscriberClient", mock_client_constructor) + def mock_subscribe(subscription_path, callback=None): + real_future = real_client.subscribe( + subscription_path, callback=callback + ) + mock_future = mock.Mock(spec=real_future, wraps=real_future) -def test_sub(monkeypatch, topic_path, subscription_path, capsys): - monkeypatch.setattr(sub, 'sub', _sub_timeout) + def mock_result(): + return real_future.result(timeout=10) + + mock_future.result.side_effect = mock_result + return mock_future + + mock_client.subscribe.side_effect = mock_subscribe _publish_messages(topic_path) sub.sub(PROJECT, SUBSCRIPTION) - # Clean up resources. - _to_delete([topic_path, subscription_path]) - out, _ = capsys.readouterr() assert "Received message" in out assert "Acknowledged message" in out diff --git a/pubsub/cloud-client/quickstart_test.py b/pubsub/cloud-client/quickstart_test.py index d318b260c63c..6a1d4aae1b5f 100644 --- a/pubsub/cloud-client/quickstart_test.py +++ b/pubsub/cloud-client/quickstart_test.py @@ -15,23 +15,25 @@ # limitations under the License. import os +import uuid from google.cloud import pubsub_v1 import pytest import quickstart -PROJECT = os.environ['GCLOUD_PROJECT'] -TOPIC = 'end-to-end-test-topic' -SUBSCRIPTION = 'end-to-end-test-topic-sub' +UUID = uuid.uuid4().hex +PROJECT = os.environ["GCLOUD_PROJECT"] +TOPIC = "end-to-end-test-topic-" + UUID +SUBSCRIPTION = "end-to-end-test-topic-sub-" + UUID N = 10 -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def publisher_client(): yield pubsub_v1.PublisherClient() -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def topic(publisher_client): topic_path = publisher_client.topic_path(PROJECT, TOPIC) @@ -42,16 +44,19 @@ def topic(publisher_client): yield TOPIC + publisher_client.delete_topic(topic_path) -@pytest.fixture(scope='module') + +@pytest.fixture(scope="module") def subscriber_client(): yield pubsub_v1.SubscriberClient() -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def subscription(subscriber_client, topic): subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION) + PROJECT, SUBSCRIPTION + ) try: subscriber_client.delete_subscription(subscription_path) @@ -60,6 +65,8 @@ def subscription(subscriber_client, topic): yield SUBSCRIPTION + subscriber_client.delete_subscription(subscription_path) + def test_end_to_end(topic, subscription, capsys): diff --git a/pubsub/cloud-client/subscriber.py b/pubsub/cloud-client/subscriber.py index 3bbad0ead1b0..ea1cc9ff9e72 100644 --- a/pubsub/cloud-client/subscriber.py +++ b/pubsub/cloud-client/subscriber.py @@ -67,19 +67,20 @@ def create_subscription(project_id, topic_name, subscription_name): subscriber = pubsub_v1.SubscriberClient() topic_path = subscriber.topic_path(project_id, topic_name) subscription_path = subscriber.subscription_path( - project_id, subscription_name) + project_id, subscription_name + ) subscription = subscriber.create_subscription( - subscription_path, topic_path) + subscription_path, topic_path + ) - print('Subscription created: {}'.format(subscription)) + print("Subscription created: {}".format(subscription)) # [END pubsub_create_pull_subscription] -def create_push_subscription(project_id, - topic_name, - subscription_name, - endpoint): +def create_push_subscription( + project_id, topic_name, subscription_name, endpoint +): """Create a new push subscription on the given topic.""" # [START pubsub_create_push_subscription] from google.cloud import pubsub_v1 @@ -92,16 +93,17 @@ def create_push_subscription(project_id, subscriber = pubsub_v1.SubscriberClient() topic_path = subscriber.topic_path(project_id, topic_name) subscription_path = subscriber.subscription_path( - project_id, subscription_name) + project_id, subscription_name + ) - push_config = pubsub_v1.types.PushConfig( - push_endpoint=endpoint) + push_config = pubsub_v1.types.PushConfig(push_endpoint=endpoint) subscription = subscriber.create_subscription( - subscription_path, topic_path, push_config) + subscription_path, topic_path, push_config + ) - print('Push subscription created: {}'.format(subscription)) - print('Endpoint for subscription is: {}'.format(endpoint)) + print("Push subscription created: {}".format(subscription)) + print("Endpoint for subscription is: {}".format(endpoint)) # [END pubsub_create_push_subscription] @@ -115,11 +117,12 @@ def delete_subscription(project_id, subscription_name): subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( - project_id, subscription_name) + project_id, subscription_name + ) subscriber.delete_subscription(subscription_path) - print('Subscription deleted: {}'.format(subscription_path)) + print("Subscription deleted: {}".format(subscription_path)) # [END pubsub_delete_subscription] @@ -139,27 +142,22 @@ def update_subscription(project_id, subscription_name, endpoint): subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( - project_id, subscription_name) + project_id, subscription_name + ) - push_config = pubsub_v1.types.PushConfig( - push_endpoint=endpoint) + push_config = pubsub_v1.types.PushConfig(push_endpoint=endpoint) subscription = pubsub_v1.types.Subscription( - name=subscription_path, - push_config=push_config) + name=subscription_path, push_config=push_config + ) - update_mask = { - 'paths': { - 'push_config', - } - } + update_mask = {"paths": {"push_config"}} subscriber.update_subscription(subscription, update_mask) result = subscriber.get_subscription(subscription_path) - print('Subscription updated: {}'.format(subscription_path)) - print('New endpoint for subscription is: {}'.format( - result.push_config)) + print("Subscription updated: {}".format(subscription_path)) + print("New endpoint for subscription is: {}".format(result.push_config)) # [END pubsub_update_push_configuration] @@ -178,17 +176,18 @@ def receive_messages(project_id, subscription_name): # The `subscription_path` method creates a fully qualified identifier # in the form `projects/{project_id}/subscriptions/{subscription_name}` subscription_path = subscriber.subscription_path( - project_id, subscription_name) + project_id, subscription_name + ) def callback(message): - print('Received message: {}'.format(message)) + print("Received message: {}".format(message)) message.ack() subscriber.subscribe(subscription_path, callback=callback) # The subscriber is non-blocking. We must keep the main thread from # exiting to allow it to process messages asynchronously in the background. - print('Listening for messages on {}'.format(subscription_path)) + print("Listening for messages on {}".format(subscription_path)) while True: time.sleep(60) # [END pubsub_subscriber_async_pull] @@ -208,22 +207,23 @@ def receive_messages_with_custom_attributes(project_id, subscription_name): subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( - project_id, subscription_name) + project_id, subscription_name + ) def callback(message): - print('Received message: {}'.format(message.data)) + print("Received message: {}".format(message.data)) if message.attributes: - print('Attributes:') + print("Attributes:") for key in message.attributes: value = message.attributes.get(key) - print('{}: {}'.format(key, value)) + print("{}: {}".format(key, value)) message.ack() subscriber.subscribe(subscription_path, callback=callback) # The subscriber is non-blocking, so we must keep the main thread from # exiting to allow it to process messages in the background. - print('Listening for messages on {}'.format(subscription_path)) + print("Listening for messages on {}".format(subscription_path)) while True: time.sleep(60) # [END pubsub_subscriber_async_pull_custom_attributes] @@ -242,20 +242,22 @@ def receive_messages_with_flow_control(project_id, subscription_name): subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( - project_id, subscription_name) + project_id, subscription_name + ) def callback(message): - print('Received message: {}'.format(message.data)) + print("Received message: {}".format(message.data)) message.ack() # Limit the subscriber to only have ten outstanding messages at a time. flow_control = pubsub_v1.types.FlowControl(max_messages=10) subscriber.subscribe( - subscription_path, callback=callback, flow_control=flow_control) + subscription_path, callback=callback, flow_control=flow_control + ) # The subscriber is non-blocking, so we must keep the main thread from # exiting to allow it to process messages in the background. - print('Listening for messages on {}'.format(subscription_path)) + print("Listening for messages on {}".format(subscription_path)) while True: time.sleep(60) # [END pubsub_subscriber_flow_settings] @@ -271,7 +273,8 @@ def synchronous_pull(project_id, subscription_name): subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( - project_id, subscription_name) + project_id, subscription_name + ) NUM_MESSAGES = 3 @@ -286,8 +289,11 @@ def synchronous_pull(project_id, subscription_name): # Acknowledges the received messages so they will not be sent again. subscriber.acknowledge(subscription_path, ack_ids) - print('Received and acknowledged {} messages. Done.'.format( - len(response.received_messages))) + print( + "Received and acknowledged {} messages. Done.".format( + len(response.received_messages) + ) + ) # [END pubsub_subscriber_sync_pull] @@ -306,7 +312,8 @@ def synchronous_pull_with_lease_management(project_id, subscription_name): subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( - project_id, subscription_name) + project_id, subscription_name + ) NUM_MESSAGES = 2 ACK_DEADLINE = 30 @@ -322,8 +329,11 @@ def synchronous_pull_with_lease_management(project_id, subscription_name): def worker(msg): """Simulates a long-running process.""" RUN_TIME = random.randint(1, 60) - logger.info('{}: Running {} for {}s'.format( - time.strftime("%X", time.gmtime()), msg.message.data, RUN_TIME)) + logger.info( + "{}: Running {} for {}s".format( + time.strftime("%X", time.gmtime()), msg.message.data, RUN_TIME + ) + ) time.sleep(RUN_TIME) # `processes` stores process as key and ack id and message as values. @@ -344,24 +354,35 @@ def worker(msg): subscriber.modify_ack_deadline( subscription_path, [ack_id], - ack_deadline_seconds=ACK_DEADLINE) - logger.info('{}: Reset ack deadline for {} for {}s'.format( - time.strftime("%X", time.gmtime()), - msg_data, ACK_DEADLINE)) + ack_deadline_seconds=ACK_DEADLINE, + ) + logger.info( + "{}: Reset ack deadline for {} for {}s".format( + time.strftime("%X", time.gmtime()), + msg_data, + ACK_DEADLINE, + ) + ) # If the processs is finished, acknowledges using `ack_id`. else: subscriber.acknowledge(subscription_path, [ack_id]) - logger.info("{}: Acknowledged {}".format( - time.strftime("%X", time.gmtime()), msg_data)) + logger.info( + "{}: Acknowledged {}".format( + time.strftime("%X", time.gmtime()), msg_data + ) + ) processes.pop(process) # If there are still processes running, sleeps the thread. if processes: time.sleep(SLEEP_TIME) - print('Received and acknowledged {} messages. Done.'.format( - len(response.received_messages))) + print( + "Received and acknowledged {} messages. Done.".format( + len(response.received_messages) + ) + ) # [END pubsub_subscriber_sync_pull_with_lease] @@ -375,10 +396,11 @@ def listen_for_errors(project_id, subscription_name): subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( - project_id, subscription_name) + project_id, subscription_name + ) def callback(message): - print('Received message: {}'.format(message)) + print("Received message: {}".format(message)) message.ack() future = subscriber.subscribe(subscription_path, callback=callback) @@ -390,109 +412,126 @@ def callback(message): future.result(timeout=30) except Exception as e: print( - 'Listening for messages on {} threw an Exception: {}.'.format( - subscription_name, e)) + "Listening for messages on {} threw an Exception: {}.".format( + subscription_name, e + ) + ) # [END pubsub_subscriber_error_listener] -if __name__ == '__main__': +if __name__ == "__main__": parser = argparse.ArgumentParser( description=__doc__, - formatter_class=argparse.RawDescriptionHelpFormatter + formatter_class=argparse.RawDescriptionHelpFormatter, ) - parser.add_argument('project_id', help='Your Google Cloud project ID') + parser.add_argument("project_id", help="Your Google Cloud project ID") - subparsers = parser.add_subparsers(dest='command') + subparsers = parser.add_subparsers(dest="command") list_in_topic_parser = subparsers.add_parser( - 'list_in_topic', help=list_subscriptions_in_topic.__doc__) - list_in_topic_parser.add_argument('topic_name') + "list_in_topic", help=list_subscriptions_in_topic.__doc__ + ) + list_in_topic_parser.add_argument("topic_name") list_in_project_parser = subparsers.add_parser( - 'list_in_project', help=list_subscriptions_in_project.__doc__) + "list_in_project", help=list_subscriptions_in_project.__doc__ + ) create_parser = subparsers.add_parser( - 'create', help=create_subscription.__doc__) - create_parser.add_argument('topic_name') - create_parser.add_argument('subscription_name') + "create", help=create_subscription.__doc__ + ) + create_parser.add_argument("topic_name") + create_parser.add_argument("subscription_name") create_push_parser = subparsers.add_parser( - 'create-push', help=create_push_subscription.__doc__) - create_push_parser.add_argument('topic_name') - create_push_parser.add_argument('subscription_name') - create_push_parser.add_argument('endpoint') + "create-push", help=create_push_subscription.__doc__ + ) + create_push_parser.add_argument("topic_name") + create_push_parser.add_argument("subscription_name") + create_push_parser.add_argument("endpoint") delete_parser = subparsers.add_parser( - 'delete', help=delete_subscription.__doc__) - delete_parser.add_argument('subscription_name') + "delete", help=delete_subscription.__doc__ + ) + delete_parser.add_argument("subscription_name") update_parser = subparsers.add_parser( - 'update', help=update_subscription.__doc__) - update_parser.add_argument('subscription_name') - update_parser.add_argument('endpoint') + "update", help=update_subscription.__doc__ + ) + update_parser.add_argument("subscription_name") + update_parser.add_argument("endpoint") receive_parser = subparsers.add_parser( - 'receive', help=receive_messages.__doc__) - receive_parser.add_argument('subscription_name') + "receive", help=receive_messages.__doc__ + ) + receive_parser.add_argument("subscription_name") receive_with_custom_attributes_parser = subparsers.add_parser( - 'receive-custom-attributes', - help=receive_messages_with_custom_attributes.__doc__) - receive_with_custom_attributes_parser.add_argument('subscription_name') + "receive-custom-attributes", + help=receive_messages_with_custom_attributes.__doc__, + ) + receive_with_custom_attributes_parser.add_argument("subscription_name") receive_with_flow_control_parser = subparsers.add_parser( - 'receive-flow-control', - help=receive_messages_with_flow_control.__doc__) - receive_with_flow_control_parser.add_argument('subscription_name') + "receive-flow-control", help=receive_messages_with_flow_control.__doc__ + ) + receive_with_flow_control_parser.add_argument("subscription_name") synchronous_pull_parser = subparsers.add_parser( - 'receive-synchronously', - help=synchronous_pull.__doc__) - synchronous_pull_parser.add_argument('subscription_name') + "receive-synchronously", help=synchronous_pull.__doc__ + ) + synchronous_pull_parser.add_argument("subscription_name") synchronous_pull_with_lease_management_parser = subparsers.add_parser( - 'receive-synchronously-with-lease', - help=synchronous_pull_with_lease_management.__doc__) + "receive-synchronously-with-lease", + help=synchronous_pull_with_lease_management.__doc__, + ) synchronous_pull_with_lease_management_parser.add_argument( - 'subscription_name') + "subscription_name" + ) listen_for_errors_parser = subparsers.add_parser( - 'listen_for_errors', help=listen_for_errors.__doc__) - listen_for_errors_parser.add_argument('subscription_name') + "listen_for_errors", help=listen_for_errors.__doc__ + ) + listen_for_errors_parser.add_argument("subscription_name") args = parser.parse_args() - if args.command == 'list_in_topic': + if args.command == "list_in_topic": list_subscriptions_in_topic(args.project_id, args.topic_name) - elif args.command == 'list_in_project': + elif args.command == "list_in_project": list_subscriptions_in_project(args.project_id) - elif args.command == 'create': + elif args.command == "create": create_subscription( - args.project_id, args.topic_name, args.subscription_name) - elif args.command == 'create-push': + args.project_id, args.topic_name, args.subscription_name + ) + elif args.command == "create-push": create_push_subscription( args.project_id, args.topic_name, args.subscription_name, - args.endpoint) - elif args.command == 'delete': - delete_subscription( - args.project_id, args.subscription_name) - elif args.command == 'update': + args.endpoint, + ) + elif args.command == "delete": + delete_subscription(args.project_id, args.subscription_name) + elif args.command == "update": update_subscription( - args.project_id, args.subscription_name, args.endpoint) - elif args.command == 'receive': + args.project_id, args.subscription_name, args.endpoint + ) + elif args.command == "receive": receive_messages(args.project_id, args.subscription_name) - elif args.command == 'receive-custom-attributes': + elif args.command == "receive-custom-attributes": receive_messages_with_custom_attributes( - args.project_id, args.subscription_name) - elif args.command == 'receive-flow-control': + args.project_id, args.subscription_name + ) + elif args.command == "receive-flow-control": receive_messages_with_flow_control( - args.project_id, args.subscription_name) - elif args.command == 'receive-synchronously': - synchronous_pull( - args.project_id, args.subscription_name) - elif args.command == 'receive-synchronously-with-lease': + args.project_id, args.subscription_name + ) + elif args.command == "receive-synchronously": + synchronous_pull(args.project_id, args.subscription_name) + elif args.command == "receive-synchronously-with-lease": synchronous_pull_with_lease_management( - args.project_id, args.subscription_name) - elif args.command == 'listen_for_errors': + args.project_id, args.subscription_name + ) + elif args.command == "listen_for_errors": listen_for_errors(args.project_id, args.subscription_name) diff --git a/pubsub/cloud-client/subscriber_test.py b/pubsub/cloud-client/subscriber_test.py index 4c5fd61223db..0645c0738e1c 100644 --- a/pubsub/cloud-client/subscriber_test.py +++ b/pubsub/cloud-client/subscriber_test.py @@ -14,6 +14,7 @@ import os import time +import uuid from gcp_devrel.testing import eventually_consistent from google.cloud import pubsub_v1 @@ -22,21 +23,22 @@ import subscriber -PROJECT = os.environ['GCLOUD_PROJECT'] -TOPIC = 'subscription-test-topic' -SUBSCRIPTION_ONE = 'subscription-test-subscription-one' -SUBSCRIPTION_TWO = 'subscription-test-subscription-two' -SUBSCRIPTION_THREE = 'subscription-test-subscription-three' -ENDPOINT = 'https://{}.appspot.com/push'.format(PROJECT) -NEW_ENDPOINT = 'https://{}.appspot.com/push2'.format(PROJECT) +UUID = uuid.uuid4().hex +PROJECT = os.environ["GCLOUD_PROJECT"] +TOPIC = "subscription-test-topic-" + UUID +SUBSCRIPTION_ONE = "subscription-test-subscription-one-" + UUID +SUBSCRIPTION_TWO = "subscription-test-subscription-two-" + UUID +SUBSCRIPTION_THREE = "subscription-test-subscription-three-" + UUID +ENDPOINT = "https://{}.appspot.com/push".format(PROJECT) +NEW_ENDPOINT = "https://{}.appspot.com/push2".format(PROJECT) -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def publisher_client(): yield pubsub_v1.PublisherClient() -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def topic(publisher_client): topic_path = publisher_client.topic_path(PROJECT, TOPIC) @@ -48,49 +50,55 @@ def topic(publisher_client): yield response.name -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def subscriber_client(): yield pubsub_v1.SubscriberClient() -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def subscription_one(subscriber_client, topic): subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION_ONE) + PROJECT, SUBSCRIPTION_ONE + ) try: response = subscriber_client.get_subscription(subscription_path) except: # noqa response = subscriber_client.create_subscription( - subscription_path, topic=topic) + subscription_path, topic=topic + ) yield response.name -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def subscription_two(subscriber_client, topic): subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION_TWO) + PROJECT, SUBSCRIPTION_TWO + ) try: response = subscriber_client.get_subscription(subscription_path) except: # noqa response = subscriber_client.create_subscription( - subscription_path, topic=topic) + subscription_path, topic=topic + ) yield response.name -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def subscription_three(subscriber_client, topic): subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION_THREE) + PROJECT, SUBSCRIPTION_THREE + ) try: response = subscriber_client.get_subscription(subscription_path) except: # noqa response = subscriber_client.create_subscription( - subscription_path, topic=topic) + subscription_path, topic=topic + ) yield response.name @@ -113,7 +121,8 @@ def _(): def test_create(subscriber_client): subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION_ONE) + PROJECT, SUBSCRIPTION_ONE + ) try: subscriber_client.delete_subscription(subscription_path) @@ -129,14 +138,16 @@ def _(): def test_create_push(subscriber_client): subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION_ONE) + PROJECT, SUBSCRIPTION_ONE + ) try: subscriber_client.delete_subscription(subscription_path) except Exception: pass subscriber.create_push_subscription( - PROJECT, TOPIC, SUBSCRIPTION_ONE, ENDPOINT) + PROJECT, TOPIC, SUBSCRIPTION_ONE, ENDPOINT + ) @eventually_consistent.call def _(): @@ -147,7 +158,7 @@ def test_update(subscriber_client, subscription_one, capsys): subscriber.update_subscription(PROJECT, SUBSCRIPTION_ONE, NEW_ENDPOINT) out, _ = capsys.readouterr() - assert 'Subscription updated' in out + assert "Subscription updated" in out def test_delete(subscriber_client, subscription_one): @@ -161,14 +172,14 @@ def _(): def _publish_messages(publisher_client, topic): for n in range(5): - data = u'Message {}'.format(n).encode('utf-8') + data = u"Message {}".format(n).encode("utf-8") future = publisher_client.publish(topic, data=data) future.result() def _publish_messages_with_custom_attributes(publisher_client, topic): - data = u'Test message'.encode('utf-8') - future = publisher_client.publish(topic, data=data, origin='python-sample') + data = u"Test message".encode("utf-8") + future = publisher_client.publish(topic, data=data, origin="python-sample") future.result() @@ -178,74 +189,100 @@ def _make_sleep_patch(): def new_sleep(period): if period == 60: real_sleep(5) - raise RuntimeError('sigil') + raise RuntimeError("sigil") else: real_sleep(period) - return mock.patch('time.sleep', new=new_sleep) + return mock.patch("time.sleep", new=new_sleep) + + +def _to_delete(): + publisher_client = pubsub_v1.PublisherClient() + subscriber_client = pubsub_v1.SubscriberClient() + resources = [TOPIC, SUBSCRIPTION_TWO, SUBSCRIPTION_THREE] + + for item in resources: + if "subscription-test-topic" in item: + publisher_client.delete_topic( + "projects/{}/topics/{}".format(PROJECT, item) + ) + if "subscription-test-subscription" in item: + subscriber_client.delete_subscription( + "projects/{}/subscriptions/{}".format(PROJECT, item) + ) def test_receive(publisher_client, topic, subscription_two, capsys): _publish_messages(publisher_client, topic) with _make_sleep_patch(): - with pytest.raises(RuntimeError, match='sigil'): + with pytest.raises(RuntimeError, match="sigil"): subscriber.receive_messages(PROJECT, SUBSCRIPTION_TWO) out, _ = capsys.readouterr() - assert 'Listening' in out + assert "Listening" in out assert subscription_two in out - assert 'Message 1' in out + assert "Message" in out def test_receive_with_custom_attributes( - publisher_client, topic, subscription_two, capsys): + publisher_client, topic, subscription_two, capsys +): _publish_messages_with_custom_attributes(publisher_client, topic) with _make_sleep_patch(): - with pytest.raises(RuntimeError, match='sigil'): + with pytest.raises(RuntimeError, match="sigil"): subscriber.receive_messages_with_custom_attributes( - PROJECT, SUBSCRIPTION_TWO) + PROJECT, SUBSCRIPTION_TWO + ) out, _ = capsys.readouterr() - assert 'Test message' in out - assert 'origin' in out - assert 'python-sample' in out + assert "Test message" in out + assert "origin" in out + assert "python-sample" in out def test_receive_with_flow_control( - publisher_client, topic, subscription_two, capsys): + publisher_client, topic, subscription_two, capsys +): _publish_messages(publisher_client, topic) with _make_sleep_patch(): - with pytest.raises(RuntimeError, match='sigil'): + with pytest.raises(RuntimeError, match="sigil"): subscriber.receive_messages_with_flow_control( - PROJECT, SUBSCRIPTION_TWO) + PROJECT, SUBSCRIPTION_TWO + ) out, _ = capsys.readouterr() - assert 'Listening' in out + assert "Listening" in out assert subscription_two in out - assert 'Message 1' in out + assert "Message" in out def test_receive_synchronously( - publisher_client, topic, subscription_three, capsys): + publisher_client, topic, subscription_three, capsys +): _publish_messages(publisher_client, topic) subscriber.synchronous_pull(PROJECT, SUBSCRIPTION_THREE) out, _ = capsys.readouterr() - assert 'Done.' in out + assert "Done." in out def test_receive_synchronously_with_lease( - publisher_client, topic, subscription_three, capsys): + publisher_client, topic, subscription_three, capsys +): _publish_messages(publisher_client, topic) subscriber.synchronous_pull_with_lease_management( - PROJECT, SUBSCRIPTION_THREE) + PROJECT, SUBSCRIPTION_THREE + ) out, _ = capsys.readouterr() - assert 'Done.' in out + assert "Done." in out + + # Clean up resources after all the tests. + _to_delete() diff --git a/pubsub/streaming-analytics/PubSubToGCS.py b/pubsub/streaming-analytics/PubSubToGCS.py index f003e3362234..a3c8179b21e9 100644 --- a/pubsub/streaming-analytics/PubSubToGCS.py +++ b/pubsub/streaming-analytics/PubSubToGCS.py @@ -34,24 +34,25 @@ def __init__(self, window_size): self.window_size = int(window_size * 60) def expand(self, pcoll): - return (pcoll - # Assigns window info to each Pub/Sub message based on its - # publish timestamp. - | 'Window into Fixed Intervals' >> beam.WindowInto( - window.FixedWindows(self.window_size)) - | 'Add timestamps to messages' >> (beam.ParDo(AddTimestamps())) - # Use a dummy key to group the elements in the same window. - # Note that all the elements in one window must fit into memory - # for this. If the windowed elements do not fit into memory, - # please consider using `beam.util.BatchElements`. - # https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements - | 'Add Dummy Key' >> beam.Map(lambda elem: (None, elem)) - | 'Groupby' >> beam.GroupByKey() - | 'Abandon Dummy Key' >> beam.MapTuple(lambda _, val: val)) + return ( + pcoll + # Assigns window info to each Pub/Sub message based on its + # publish timestamp. + | "Window into Fixed Intervals" + >> beam.WindowInto(window.FixedWindows(self.window_size)) + | "Add timestamps to messages" >> beam.ParDo(AddTimestamps()) + # Use a dummy key to group the elements in the same window. + # Note that all the elements in one window must fit into memory + # for this. If the windowed elements do not fit into memory, + # please consider using `beam.util.BatchElements`. + # https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements + | "Add Dummy Key" >> beam.Map(lambda elem: (None, elem)) + | "Groupby" >> beam.GroupByKey() + | "Abandon Dummy Key" >> beam.MapTuple(lambda _, val: val) + ) class AddTimestamps(beam.DoFn): - def process(self, element, publish_time=beam.DoFn.TimestampParam): """Processes each incoming windowed element by extracting the Pub/Sub message and its publish timestamp into a dictionary. `publish_time` @@ -60,61 +61,72 @@ def process(self, element, publish_time=beam.DoFn.TimestampParam): """ yield { - 'message_body': element.decode('utf-8'), - 'publish_time': datetime.datetime.utcfromtimestamp( - float(publish_time)).strftime("%Y-%m-%d %H:%M:%S.%f"), + "message_body": element.decode("utf-8"), + "publish_time": datetime.datetime.utcfromtimestamp( + float(publish_time) + ).strftime("%Y-%m-%d %H:%M:%S.%f"), } class WriteBatchesToGCS(beam.DoFn): - def __init__(self, output_path): self.output_path = output_path def process(self, batch, window=beam.DoFn.WindowParam): """Write one batch per file to a Google Cloud Storage bucket. """ - ts_format = '%H:%M' + ts_format = "%H:%M" window_start = window.start.to_utc_datetime().strftime(ts_format) window_end = window.end.to_utc_datetime().strftime(ts_format) - filename = '-'.join([self.output_path, window_start, window_end]) + filename = "-".join([self.output_path, window_start, window_end]) - with beam.io.gcp.gcsio.GcsIO().open(filename=filename, mode='w') as f: + with beam.io.gcp.gcsio.GcsIO().open(filename=filename, mode="w") as f: for element in batch: - f.write('{}\n'.format(json.dumps(element)).encode('utf-8')) + f.write("{}\n".format(json.dumps(element)).encode("utf-8")) def run(input_topic, output_path, window_size=1.0, pipeline_args=None): # `save_main_session` is set to true because some DoFn's rely on # globally imported modules. pipeline_options = PipelineOptions( - pipeline_args, streaming=True, save_main_session=True) + pipeline_args, streaming=True, save_main_session=True + ) with beam.Pipeline(options=pipeline_options) as pipeline: - (pipeline - | 'Read PubSub Messages' >> beam.io.ReadFromPubSub(topic=input_topic) - | 'Window into' >> GroupWindowsIntoBatches(window_size) - | 'Write to GCS' >> beam.ParDo(WriteBatchesToGCS(output_path))) + ( + pipeline + | "Read PubSub Messages" + >> beam.io.ReadFromPubSub(topic=input_topic) + | "Window into" >> GroupWindowsIntoBatches(window_size) + | "Write to GCS" >> beam.ParDo(WriteBatchesToGCS(output_path)) + ) -if __name__ == '__main__': # noqa +if __name__ == "__main__": # noqa logging.getLogger().setLevel(logging.INFO) parser = argparse.ArgumentParser() parser.add_argument( - '--input_topic', - help='The Cloud Pub/Sub topic to read from.\n' - '"projects//topics/".') + "--input_topic", + help="The Cloud Pub/Sub topic to read from.\n" + '"projects//topics/".', + ) parser.add_argument( - '--window_size', + "--window_size", type=float, default=1.0, - help='Output file\'s window size in number of minutes.') + help="Output file's window size in number of minutes.", + ) parser.add_argument( - '--output_path', - help='GCS Path of the output file including filename prefix.') + "--output_path", + help="GCS Path of the output file including filename prefix.", + ) known_args, pipeline_args = parser.parse_known_args() - run(known_args.input_topic, known_args.output_path, known_args.window_size, - pipeline_args) + run( + known_args.input_topic, + known_args.output_path, + known_args.window_size, + pipeline_args, + ) # [END pubsub_to_gcs] diff --git a/pubsub/streaming-analytics/PubSubToGCS_test.py b/pubsub/streaming-analytics/PubSubToGCS_test.py index 644cf0865d06..d39fb568184e 100644 --- a/pubsub/streaming-analytics/PubSubToGCS_test.py +++ b/pubsub/streaming-analytics/PubSubToGCS_test.py @@ -12,89 +12,55 @@ # See the License for the specific language governing permissions and # limitations under the License. -import multiprocessing as mp +import mock import os -import pytest -import subprocess as sp -import tempfile -import time import uuid import apache_beam as beam -from google.cloud import pubsub_v1 - - -PROJECT = os.environ['GCLOUD_PROJECT'] -BUCKET = os.environ['CLOUD_STORAGE_BUCKET'] -TOPIC = 'test-topic' -UUID = uuid.uuid4().hex - - -@pytest.fixture -def publisher_client(): - yield pubsub_v1.PublisherClient() - - -@pytest.fixture -def topic_path(publisher_client): - topic_path = publisher_client.topic_path(PROJECT, TOPIC) - - try: - publisher_client.delete_topic(topic_path) - except Exception: - pass - - response = publisher_client.create_topic(topic_path) - yield response.name - - -def _infinite_publish_job(publisher_client, topic_path): - while True: - future = publisher_client.publish( - topic_path, data='Hello World!'.encode('utf-8')) - future.result() - time.sleep(10) - - -def test_run(publisher_client, topic_path): - """This is an integration test that runs `PubSubToGCS.py` in its entirety. - It checks for output files on GCS. - """ - - # Use one process to publish messages to a topic. - publish_process = mp.Process( - target=lambda: _infinite_publish_job(publisher_client, topic_path)) - - # Use another process to run the streaming pipeline that should write one - # file to GCS every minute (according to the default window size). - pipeline_process = mp.Process( - target=lambda: sp.call([ - 'python', 'PubSubToGCS.py', - '--project', PROJECT, - '--runner', 'DirectRunner', - '--temp_location', tempfile.mkdtemp(), - '--input_topic', topic_path, - '--output_path', 'gs://{}/pubsub/{}/output'.format(BUCKET, UUID), - ]) +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.test_stream import TestStream +from apache_beam.testing.test_utils import TempDir +from apache_beam.transforms.window import TimestampedValue + +import PubSubToGCS + +PROJECT = os.environ["GCLOUD_PROJECT"] +BUCKET = os.environ["CLOUD_STORAGE_BUCKET"] +UUID = uuid.uuid1().hex + + +@mock.patch("apache_beam.Pipeline", TestPipeline) +@mock.patch( + "apache_beam.io.ReadFromPubSub", + lambda topic: ( + TestStream() + .advance_watermark_to(0) + .advance_processing_time(30) + .add_elements([TimestampedValue(b"a", 1575937195)]) + .advance_processing_time(30) + .add_elements([TimestampedValue(b"b", 1575937225)]) + .advance_processing_time(30) + .add_elements([TimestampedValue(b"c", 1575937255)]) + .advance_watermark_to_infinity() + ), +) +def test_pubsub_to_gcs(): + PubSubToGCS.run( + input_topic="unused", # mocked by TestStream + output_path="gs://{}/pubsub/{}/output".format(BUCKET, UUID), + window_size=1, # 1 minute + pipeline_args=[ + "--project", + PROJECT, + "--temp_location", + TempDir().get_path(), + ], ) - publish_process.start() - pipeline_process.start() - - # Times out the streaming pipeline after 90 seconds. - pipeline_process.join(timeout=90) - # Immediately kills the publish process after the pipeline shuts down. - publish_process.join(timeout=0) - - pipeline_process.terminate() - publish_process.terminate() - # Check for output files on GCS. gcs_client = beam.io.gcp.gcsio.GcsIO() - # This returns a dictionary. - files = gcs_client.list_prefix('gs://{}/pubsub/{}'.format(BUCKET, UUID)) + files = gcs_client.list_prefix("gs://{}/pubsub/{}".format(BUCKET, UUID)) assert len(files) > 0 - # Clean up. Delete topic. Delete files. - publisher_client.delete_topic(topic_path) + # Clean up. gcs_client.delete_batch(list(files)) diff --git a/pubsub/streaming-analytics/requirements.txt b/pubsub/streaming-analytics/requirements.txt index d1ce1adeb94a..7ffca51187b5 100644 --- a/pubsub/streaming-analytics/requirements.txt +++ b/pubsub/streaming-analytics/requirements.txt @@ -1 +1 @@ -apache-beam[gcp]==2.15.0 +apache-beam[gcp,test]==2.16.0