Skip to content

Commit

Permalink
Pub/Sub: synchronous pull with lease management (#1701)
Browse files Browse the repository at this point in the history
* Synchronous pull with lease management

* Updated library version
  • Loading branch information
anguillanneuf authored Sep 14, 2018
1 parent 9fee707 commit 68a4622
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 28 deletions.
2 changes: 1 addition & 1 deletion pubsub/cloud-client/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
google-cloud-pubsub==0.37.2
google-cloud-pubsub==0.38.0
95 changes: 83 additions & 12 deletions pubsub/cloud-client/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@

import argparse
import time
import logging
import random
import multiprocessing

from google.cloud import pubsub_v1

Expand Down Expand Up @@ -215,7 +218,7 @@ def callback(message):
# [END pubsub_subscriber_flow_settings]


def receive_messages_synchronously(project, subscription_name):
def synchronous_pull(project, subscription_name):
"""Pulling messages synchronously."""
# [START pubsub_subscriber_sync_pull]
# project = "Your Google Cloud Project ID"
Expand All @@ -224,13 +227,10 @@ def receive_messages_synchronously(project, subscription_name):
subscription_path = subscriber.subscription_path(
project, subscription_name)

# Builds a pull request with a specific number of messages to return.
# `return_immediately` is set to False so that the system waits (for a
# bounded amount of time) until at lease one message is available.
response = subscriber.pull(
subscription_path,
max_messages=3,
return_immediately=False)
NUM_MESSAGES=3

# The subscriber pulls a specific number of messages.
response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES)

ack_ids = []
for received_message in response.received_messages:
Expand All @@ -239,9 +239,72 @@ def receive_messages_synchronously(project, subscription_name):

# Acknowledges the received messages so they will not be sent again.
subscriber.acknowledge(subscription_path, ack_ids)

print("Received and acknowledged {} messages. Done.".format(NUM_MESSAGES))
# [END pubsub_subscriber_sync_pull]


def synchronous_pull_with_lease_management(project, subscription_name):
"""Pulling messages synchronously with lease management"""
# [START pubsub_subscriber_sync_pull_with_lease]
# project = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)

NUM_MESSAGES=2
ACK_DEADLINE=30
SLEEP_TIME=10

# The subscriber pulls a specific number of messages.
response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES)

multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)

def worker(msg):
"""Simulates a long-running process."""
RUN_TIME = random.randint(1,60)
logger.info('{}: Running {} for {}s'.format(
time.strftime("%X", time.gmtime()), msg.message.data, RUN_TIME))
time.sleep(RUN_TIME)

# `processes` stores process as key and ack id and message as values.
processes = dict()
for message in response.received_messages:
process = multiprocessing.Process(target=worker, args=(message,))
processes[process] = (message.ack_id, message.message.data)
process.start()

while processes:
for process, (ack_id, msg_data) in processes.items():
# If the process is still running, reset the ack deadline as
# specified by ACK_DEADLINE once every while as specified
# by SLEEP_TIME.
if process.is_alive():
# `ack_deadline_seconds` must be between 10 to 600.
subscriber.modify_ack_deadline(subscription_path,
[ack_id], ack_deadline_seconds=ACK_DEADLINE)
logger.info('{}: Reset ack deadline for {} for {}s'.format(
time.strftime("%X", time.gmtime()), msg_data, ACK_DEADLINE))

# If the processs is finished, acknowledges using `ack_id`.
else:
subscriber.acknowledge(subscription_path, [ack_id])
logger.info("{}: Acknowledged {}".format(
time.strftime("%X", time.gmtime()), msg_data))
processes.pop(process)

# If there are still processes running, sleeps the thread.
if processes:
time.sleep(SLEEP_TIME)

print("Received and acknowledged {} messages. Done.".format(NUM_MESSAGES))
# [END pubsub_subscriber_sync_pull_with_lease]


def listen_for_errors(project, subscription_name):
"""Receives messages and catches errors from a pull subscription."""
# [START pubsub_subscriber_error_listener]
Expand Down Expand Up @@ -318,10 +381,15 @@ def callback(message):
help=receive_messages_with_flow_control.__doc__)
receive_with_flow_control_parser.add_argument('subscription_name')

receive_messages_synchronously_parser = subparsers.add_parser(
synchronous_pull_parser = subparsers.add_parser(
'receive-synchronously',
help=receive_messages_synchronously.__doc__)
receive_messages_synchronously_parser.add_argument('subscription_name')
help=synchronous_pull.__doc__)
synchronous_pull_parser.add_argument('subscription_name')

synchronous_pull_with_lease_management_parser = subparsers.add_parser(
'receive-synchronously-with-lease',
help=synchronous_pull_with_lease_management.__doc__)
synchronous_pull_with_lease_management_parser.add_argument('subscription_name')

listen_for_errors_parser = subparsers.add_parser(
'listen_for_errors', help=listen_for_errors.__doc__)
Expand Down Expand Up @@ -357,7 +425,10 @@ def callback(message):
receive_messages_with_flow_control(
args.project, args.subscription_name)
elif args.command == 'receive-synchronously':
receive_messages_synchronously(
synchronous_pull(
args.project, args.subscription_name)
elif args.command == 'receive-synchronously-with-lease':
synchronous_pull_with_lease_management(
args.project, args.subscription_name)
elif args.command == 'listen_for_errors':
listen_for_errors(args.project, args.subscription_name)
68 changes: 53 additions & 15 deletions pubsub/cloud-client/subscriber_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
PROJECT = os.environ['GCLOUD_PROJECT']
TOPIC = 'subscription-test-topic'
SUBSCRIPTION = 'subscription-test-subscription'
SUBSCRIPTION_SYNC1 = 'subscription-test-subscription-sync1'
SUBSCRIPTION_SYNC2 = 'subscription-test-subscription-sync2'
ENDPOINT = 'https://{}.appspot.com/push'.format(PROJECT)


Expand Down Expand Up @@ -67,6 +69,36 @@ def subscription(subscriber_client, topic):
yield subscription_path


@pytest.fixture
def subscription_sync1(subscriber_client, topic):
subscription_sync_path = subscriber_client.subscription_path(
PROJECT, SUBSCRIPTION_SYNC1)

try:
subscriber_client.delete_subscription(subscription_sync_path)
except Exception:
pass

subscriber_client.create_subscription(subscription_sync_path, topic=topic)

yield subscription_sync_path


@pytest.fixture
def subscription_sync2(subscriber_client, topic):
subscription_sync_path = subscriber_client.subscription_path(
PROJECT, SUBSCRIPTION_SYNC2)

try:
subscriber_client.delete_subscription(subscription_sync_path)
except Exception:
pass

subscriber_client.create_subscription(subscription_sync_path, topic=topic)

yield subscription_sync_path


def test_list_in_topic(subscription, capsys):
@eventually_consistent.call
def _():
Expand Down Expand Up @@ -160,6 +192,27 @@ def test_receive(publisher_client, topic, subscription, capsys):
assert 'Message 1' in out


def test_receive_synchronously(
publisher_client, topic, subscription_sync1, capsys):
_publish_messages(publisher_client, topic)

subscriber.synchronous_pull(PROJECT, SUBSCRIPTION_SYNC1)

out, _ = capsys.readouterr()
assert 'Done.' in out


def test_receive_synchronously_with_lease(
publisher_client, topic, subscription_sync2, capsys):
_publish_messages(publisher_client, topic)

subscriber.synchronous_pull_with_lease_management(
PROJECT, SUBSCRIPTION_SYNC2)

out, _ = capsys.readouterr()
assert 'Done.' in out


def test_receive_with_custom_attributes(
publisher_client, topic, subscription, capsys):
_publish_messages_with_custom_attributes(publisher_client, topic)
Expand Down Expand Up @@ -188,18 +241,3 @@ def test_receive_with_flow_control(
assert 'Listening' in out
assert subscription in out
assert 'Message 1' in out


def test_receive_synchronously(
publisher_client, topic, subscription, capsys):
_publish_messages(publisher_client, topic)

with _make_sleep_patch():
with pytest.raises(RuntimeError, match='sigil'):
subscriber.receive_messages_with_flow_control(
PROJECT, SUBSCRIPTION)

out, _ = capsys.readouterr()
assert 'Message 1' in out
assert 'Message 2' in out
assert 'Message 3' in out

0 comments on commit 68a4622

Please sign in to comment.