Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pub/Sub: synchronous pull with lease management #1701

Merged
merged 5 commits into from
Sep 14, 2018
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pubsub/cloud-client/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
google-cloud-pubsub==0.37.2
google-cloud-pubsub==0.38.0
90 changes: 86 additions & 4 deletions pubsub/cloud-client/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@

import argparse
import time
import logging
import random
import multiprocessing

from google.cloud import pubsub_v1

Expand Down Expand Up @@ -224,13 +227,15 @@ def receive_messages_synchronously(project, subscription_name):
subscription_path = subscriber.subscription_path(
project, subscription_name)

NUM_MESSAGES=3
# Builds a pull request with a specific number of messages to return.
# `return_immediately` is set to False so that the system waits (for a
# bounded amount of time) until at lease one message is available.
# `return_immediately` is set to False so that the system waits until
# at lease one message is available before it times out.
response = subscriber.pull(
subscription_path,
max_messages=3,
return_immediately=False)
max_messages=NUM_MESSAGES,
return_immediately=False,
timeout=900)

ack_ids = []
for received_message in response.received_messages:
Expand All @@ -239,9 +244,78 @@ def receive_messages_synchronously(project, subscription_name):

# Acknowledges the received messages so they will not be sent again.
subscriber.acknowledge(subscription_path, ack_ids)

print("Received and acknowledged {} messages. Done.".format(NUM_MESSAGES))
# [END pubsub_subscriber_sync_pull]


def synchronous_pull_with_lease_management(project, subscription_name):
"""Pulling messages synchronously with lease management"""
# [START pubsub_subscriber_sync_pull_with_lease]
# project = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)

NUM_MESSAGES=2
ACK_DEADLINE=30
SLEEP_TIME=10

# Builds a pull request with a specific number of messages to return.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe reword "Builds a pull request with a specific number of messages" to "The subscriber pulls for a specific number of messages"? A pull request makes me think of github :)

Can you explain a little further how the return_immediately and the timeout relate to each other and how each affects the behavior? I.e. what happens if I don't specify any of them and what the default values are?

Copy link
Member Author

@anguillanneuf anguillanneuf Sep 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both are optional. I think the default values are based on the server, but may not be fixed.

I just tested without specifying either of them, subscriber waits without timing out. I'm going to remove such specifications for the code sample because we should stick to the most simple case.

# `return_immediately` is set to False so that the system waits until
# at lease one message is available before it times out.
response = subscriber.pull(
subscription_path,
max_messages=NUM_MESSAGES,
return_immediately=False,
timeout=900)

multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)

def worker(msg):
"""Simulates a long-running process."""
RUN_TIME = random.randint(1,60)
logger.info('{}: Running {} for {}s'.format(
time.strftime("%X", time.gmtime()), msg.message.data, RUN_TIME))
time.sleep(RUN_TIME)

# `processes` stores process as key and ack id and message as values.
processes = dict()
for message in response.received_messages:
process = multiprocessing.Process(target=worker, args=(message,))
processes[process] = (message.ack_id, message.message.data)
process.start()

while processes:
for process, (ack_id, msg_data) in processes.items():
# If the process is still running, reset the ack deadline as
# specified by ACK_DEADLINE, and once every while as specified
# by SLEEP_TIME.
if process.is_alive():
# `ack_deadline_seconds` must be between 10s to 600s.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: can omit the s after the numbers since ack_deadline_seconds is already being explicit with it.

subscriber.modify_ack_deadline(subscription_path,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a more detailed explanation on how this affects the deadline, i.e. that it resets the deadline timer and change it to ACK_DEADLINE every 10 seconds.

[ack_id], ack_deadline_seconds=ACK_DEADLINE)
logger.info('{}: Reset ack deadline for {} for {}s'.format(
anguillanneuf marked this conversation as resolved.
Show resolved Hide resolved
time.strftime("%X", time.gmtime()), msg_data, ACK_DEADLINE))

# Otherwise, acknowledges using `ack_id`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise -> If the process finished

When reading from top to bottom, it's hard to remember where the else is. It's easier to read the "else" condition explicitly.

else:
subscriber.acknowledge(subscription_path, [ack_id])
logger.info("{}: Acknowledged {}".format(
time.strftime("%X", time.gmtime()), msg_data))
anguillanneuf marked this conversation as resolved.
Show resolved Hide resolved
processes.pop(process)

# Sleeps the thread to save resources.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are still processes running, sleeps the ...

if processes:
time.sleep(SLEEP_TIME)

print("Received and acknowledged {} messages. Done.".format(NUM_MESSAGES))
# [END pubsub_subscriber_sync_pull_with_lease]


def listen_for_errors(project, subscription_name):
"""Receives messages and catches errors from a pull subscription."""
# [START pubsub_subscriber_error_listener]
Expand Down Expand Up @@ -323,6 +397,11 @@ def callback(message):
help=receive_messages_synchronously.__doc__)
receive_messages_synchronously_parser.add_argument('subscription_name')

receive_messages_synchronously_with_lease_parser = subparsers.add_parser(
'receive-synchronously-with-lease',
help=synchronous_pull_with_lease_management.__doc__)
receive_messages_synchronously_with_lease_parser.add_argument('subscription_name')

listen_for_errors_parser = subparsers.add_parser(
'listen_for_errors', help=listen_for_errors.__doc__)
listen_for_errors_parser.add_argument('subscription_name')
Expand Down Expand Up @@ -359,5 +438,8 @@ def callback(message):
elif args.command == 'receive-synchronously':
receive_messages_synchronously(
args.project, args.subscription_name)
elif args.command == 'receive-synchronously-with-lease':
synchronous_pull_with_lease_management(
args.project, args.subscription_name)
elif args.command == 'listen_for_errors':
listen_for_errors(args.project, args.subscription_name)
68 changes: 53 additions & 15 deletions pubsub/cloud-client/subscriber_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
PROJECT = os.environ['GCLOUD_PROJECT']
TOPIC = 'subscription-test-topic'
SUBSCRIPTION = 'subscription-test-subscription'
SUBSCRIPTION_SYNC1 = 'subscription-test-subscription-sync1'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since these are synchronous and shouldn't step over each other, can we use the same subscription for both synchronous tests?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They sometimes did step on each other because the tests are run in parallel.

SUBSCRIPTION_SYNC2 = 'subscription-test-subscription-sync2'
ENDPOINT = 'https://{}.appspot.com/push'.format(PROJECT)


Expand Down Expand Up @@ -67,6 +69,36 @@ def subscription(subscriber_client, topic):
yield subscription_path


@pytest.fixture
def subscription_sync1(subscriber_client, topic):
subscription_sync_path = subscriber_client.subscription_path(
PROJECT, SUBSCRIPTION_SYNC1)

try:
subscriber_client.delete_subscription(subscription_sync_path)
except Exception:
pass

subscriber_client.create_subscription(subscription_sync_path, topic=topic)

yield subscription_sync_path


@pytest.fixture
def subscription_sync2(subscriber_client, topic):
subscription_sync_path = subscriber_client.subscription_path(
PROJECT, SUBSCRIPTION_SYNC2)

try:
subscriber_client.delete_subscription(subscription_sync_path)
except Exception:
pass

subscriber_client.create_subscription(subscription_sync_path, topic=topic)

yield subscription_sync_path


def test_list_in_topic(subscription, capsys):
@eventually_consistent.call
def _():
Expand Down Expand Up @@ -160,6 +192,27 @@ def test_receive(publisher_client, topic, subscription, capsys):
assert 'Message 1' in out


def test_receive_synchronously(
publisher_client, topic, subscription_sync1, capsys):
_publish_messages(publisher_client, topic)

subscriber.receive_messages_synchronously(PROJECT, SUBSCRIPTION_SYNC1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency with the synchronous_pull_with_lease_management, can we rename this function to synchronous_pull?


out, _ = capsys.readouterr()
assert 'Done.' in out


def test_receive_synchronously_with_lease(
publisher_client, topic, subscription_sync2, capsys):
_publish_messages(publisher_client, topic)

subscriber.synchronous_pull_with_lease_management(
PROJECT, SUBSCRIPTION_SYNC2)

out, _ = capsys.readouterr()
assert 'Done.' in out


def test_receive_with_custom_attributes(
publisher_client, topic, subscription, capsys):
_publish_messages_with_custom_attributes(publisher_client, topic)
Expand Down Expand Up @@ -188,18 +241,3 @@ def test_receive_with_flow_control(
assert 'Listening' in out
assert subscription in out
assert 'Message 1' in out


def test_receive_synchronously(
publisher_client, topic, subscription, capsys):
_publish_messages(publisher_client, topic)

with _make_sleep_patch():
with pytest.raises(RuntimeError, match='sigil'):
subscriber.receive_messages_with_flow_control(
PROJECT, SUBSCRIPTION)

out, _ = capsys.readouterr()
assert 'Message 1' in out
assert 'Message 2' in out
assert 'Message 3' in out