diff --git a/pubsub/cloud-client/requirements.txt b/pubsub/cloud-client/requirements.txt index 936a9f0ed2f2..81a62427cc0b 100644 --- a/pubsub/cloud-client/requirements.txt +++ b/pubsub/cloud-client/requirements.txt @@ -1 +1 @@ -google-cloud-pubsub==0.37.2 +google-cloud-pubsub==0.38.0 diff --git a/pubsub/cloud-client/subscriber.py b/pubsub/cloud-client/subscriber.py index e31560722077..3915a82ef9c8 100644 --- a/pubsub/cloud-client/subscriber.py +++ b/pubsub/cloud-client/subscriber.py @@ -23,6 +23,9 @@ import argparse import time +import logging +import random +import multiprocessing from google.cloud import pubsub_v1 @@ -215,7 +218,7 @@ def callback(message): # [END pubsub_subscriber_flow_settings] -def receive_messages_synchronously(project, subscription_name): +def synchronous_pull(project, subscription_name): """Pulling messages synchronously.""" # [START pubsub_subscriber_sync_pull] # project = "Your Google Cloud Project ID" @@ -224,13 +227,10 @@ def receive_messages_synchronously(project, subscription_name): subscription_path = subscriber.subscription_path( project, subscription_name) - # Builds a pull request with a specific number of messages to return. - # `return_immediately` is set to False so that the system waits (for a - # bounded amount of time) until at lease one message is available. - response = subscriber.pull( - subscription_path, - max_messages=3, - return_immediately=False) + NUM_MESSAGES=3 + + # The subscriber pulls a specific number of messages. + response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES) ack_ids = [] for received_message in response.received_messages: @@ -239,9 +239,72 @@ def receive_messages_synchronously(project, subscription_name): # Acknowledges the received messages so they will not be sent again. subscriber.acknowledge(subscription_path, ack_ids) + + print("Received and acknowledged {} messages. Done.".format(NUM_MESSAGES)) # [END pubsub_subscriber_sync_pull] +def synchronous_pull_with_lease_management(project, subscription_name): + """Pulling messages synchronously with lease management""" + # [START pubsub_subscriber_sync_pull_with_lease] + # project = "Your Google Cloud Project ID" + # subscription_name = "Your Pubsub subscription name" + subscriber = pubsub_v1.SubscriberClient() + subscription_path = subscriber.subscription_path( + project, subscription_name) + + NUM_MESSAGES=2 + ACK_DEADLINE=30 + SLEEP_TIME=10 + + # The subscriber pulls a specific number of messages. + response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES) + + multiprocessing.log_to_stderr() + logger = multiprocessing.get_logger() + logger.setLevel(logging.INFO) + + def worker(msg): + """Simulates a long-running process.""" + RUN_TIME = random.randint(1,60) + logger.info('{}: Running {} for {}s'.format( + time.strftime("%X", time.gmtime()), msg.message.data, RUN_TIME)) + time.sleep(RUN_TIME) + + # `processes` stores process as key and ack id and message as values. + processes = dict() + for message in response.received_messages: + process = multiprocessing.Process(target=worker, args=(message,)) + processes[process] = (message.ack_id, message.message.data) + process.start() + + while processes: + for process, (ack_id, msg_data) in processes.items(): + # If the process is still running, reset the ack deadline as + # specified by ACK_DEADLINE once every while as specified + # by SLEEP_TIME. + if process.is_alive(): + # `ack_deadline_seconds` must be between 10 to 600. + subscriber.modify_ack_deadline(subscription_path, + [ack_id], ack_deadline_seconds=ACK_DEADLINE) + logger.info('{}: Reset ack deadline for {} for {}s'.format( + time.strftime("%X", time.gmtime()), msg_data, ACK_DEADLINE)) + + # If the processs is finished, acknowledges using `ack_id`. + else: + subscriber.acknowledge(subscription_path, [ack_id]) + logger.info("{}: Acknowledged {}".format( + time.strftime("%X", time.gmtime()), msg_data)) + processes.pop(process) + + # If there are still processes running, sleeps the thread. + if processes: + time.sleep(SLEEP_TIME) + + print("Received and acknowledged {} messages. Done.".format(NUM_MESSAGES)) + # [END pubsub_subscriber_sync_pull_with_lease] + + def listen_for_errors(project, subscription_name): """Receives messages and catches errors from a pull subscription.""" # [START pubsub_subscriber_error_listener] @@ -318,10 +381,15 @@ def callback(message): help=receive_messages_with_flow_control.__doc__) receive_with_flow_control_parser.add_argument('subscription_name') - receive_messages_synchronously_parser = subparsers.add_parser( + synchronous_pull_parser = subparsers.add_parser( 'receive-synchronously', - help=receive_messages_synchronously.__doc__) - receive_messages_synchronously_parser.add_argument('subscription_name') + help=synchronous_pull.__doc__) + synchronous_pull_parser.add_argument('subscription_name') + + synchronous_pull_with_lease_management_parser = subparsers.add_parser( + 'receive-synchronously-with-lease', + help=synchronous_pull_with_lease_management.__doc__) + synchronous_pull_with_lease_management_parser.add_argument('subscription_name') listen_for_errors_parser = subparsers.add_parser( 'listen_for_errors', help=listen_for_errors.__doc__) @@ -357,7 +425,10 @@ def callback(message): receive_messages_with_flow_control( args.project, args.subscription_name) elif args.command == 'receive-synchronously': - receive_messages_synchronously( + synchronous_pull( + args.project, args.subscription_name) + elif args.command == 'receive-synchronously-with-lease': + synchronous_pull_with_lease_management( args.project, args.subscription_name) elif args.command == 'listen_for_errors': listen_for_errors(args.project, args.subscription_name) diff --git a/pubsub/cloud-client/subscriber_test.py b/pubsub/cloud-client/subscriber_test.py index 728f971f2e71..9f554398ef45 100644 --- a/pubsub/cloud-client/subscriber_test.py +++ b/pubsub/cloud-client/subscriber_test.py @@ -25,6 +25,8 @@ PROJECT = os.environ['GCLOUD_PROJECT'] TOPIC = 'subscription-test-topic' SUBSCRIPTION = 'subscription-test-subscription' +SUBSCRIPTION_SYNC1 = 'subscription-test-subscription-sync1' +SUBSCRIPTION_SYNC2 = 'subscription-test-subscription-sync2' ENDPOINT = 'https://{}.appspot.com/push'.format(PROJECT) @@ -67,6 +69,36 @@ def subscription(subscriber_client, topic): yield subscription_path +@pytest.fixture +def subscription_sync1(subscriber_client, topic): + subscription_sync_path = subscriber_client.subscription_path( + PROJECT, SUBSCRIPTION_SYNC1) + + try: + subscriber_client.delete_subscription(subscription_sync_path) + except Exception: + pass + + subscriber_client.create_subscription(subscription_sync_path, topic=topic) + + yield subscription_sync_path + + +@pytest.fixture +def subscription_sync2(subscriber_client, topic): + subscription_sync_path = subscriber_client.subscription_path( + PROJECT, SUBSCRIPTION_SYNC2) + + try: + subscriber_client.delete_subscription(subscription_sync_path) + except Exception: + pass + + subscriber_client.create_subscription(subscription_sync_path, topic=topic) + + yield subscription_sync_path + + def test_list_in_topic(subscription, capsys): @eventually_consistent.call def _(): @@ -160,6 +192,27 @@ def test_receive(publisher_client, topic, subscription, capsys): assert 'Message 1' in out +def test_receive_synchronously( + publisher_client, topic, subscription_sync1, capsys): + _publish_messages(publisher_client, topic) + + subscriber.synchronous_pull(PROJECT, SUBSCRIPTION_SYNC1) + + out, _ = capsys.readouterr() + assert 'Done.' in out + + +def test_receive_synchronously_with_lease( + publisher_client, topic, subscription_sync2, capsys): + _publish_messages(publisher_client, topic) + + subscriber.synchronous_pull_with_lease_management( + PROJECT, SUBSCRIPTION_SYNC2) + + out, _ = capsys.readouterr() + assert 'Done.' in out + + def test_receive_with_custom_attributes( publisher_client, topic, subscription, capsys): _publish_messages_with_custom_attributes(publisher_client, topic) @@ -188,18 +241,3 @@ def test_receive_with_flow_control( assert 'Listening' in out assert subscription in out assert 'Message 1' in out - - -def test_receive_synchronously( - publisher_client, topic, subscription, capsys): - _publish_messages(publisher_client, topic) - - with _make_sleep_patch(): - with pytest.raises(RuntimeError, match='sigil'): - subscriber.receive_messages_with_flow_control( - PROJECT, SUBSCRIPTION) - - out, _ = capsys.readouterr() - assert 'Message 1' in out - assert 'Message 2' in out - assert 'Message 3' in out