Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pub/Sub: synchronous pull with lease management #1701

Merged
merged 5 commits into from
Sep 14, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions pubsub/cloud-client/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@

import argparse
import time
import logging
import random
import multiprocessing
from collections import defaultdict

from google.cloud import pubsub_v1

Expand Down Expand Up @@ -239,9 +243,75 @@ def receive_messages_synchronously(project, subscription_name):

# Acknowledges the received messages so they will not be sent again.
subscriber.acknowledge(subscription_path, ack_ids)

print("Received and acknowledged all messages. Done.")
# [END pubsub_subscriber_sync_pull]


def synchronous_pull_with_lease_management(project, subscription_name):
"""Pulling messages synchronously with lease management"""
# [START pubsub_subscriber_sync_pull_with_lease]
# project = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)

NUM_MESSAGES = 2
# Builds a pull request with a specific number of messages to return.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe reword "Builds a pull request with a specific number of messages" to "The subscriber pulls for a specific number of messages"? A pull request makes me think of github :)

Can you explain a little further how the return_immediately and the timeout relate to each other and how each affects the behavior? I.e. what happens if I don't specify any of them and what the default values are?

Copy link
Member Author

@anguillanneuf anguillanneuf Sep 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both are optional. I think the default values are based on the server, but may not be fixed.

I just tested without specifying either of them, subscriber waits without timing out. I'm going to remove such specifications for the code sample because we should stick to the most simple case.

# `return_immediately` is set to False so that the system waits (for a
# bounded amount of time) until at lease one message is available.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you be more specific on the "bounded amount of time"? How long will it wait for messages to arrive? Is there a default value and/or can we change that?

Copy link
Member Author

@anguillanneuf anguillanneuf Sep 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the documentation I could find says the same thing word for word. If set to True, we can specify a timeout time though. I'm also asking your question here.

response = subscriber.pull(
subscription_path,
max_messages=NUM_MESSAGES,
return_immediately=False)

multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)

def worker(msg):
"""Simulates a long-running process."""
RUN_TIME = random.randint(1,60)
logger.info('{}: Running {} for {}s'.format(
time.strftime("%X", time.gmtime()), msg.message.data, RUN_TIME))
time.sleep(RUN_TIME)

# `d` stores process as key and ack id and message as values.
d = defaultdict(lambda: (str, str))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this really have to be a defaultdict? Since it's iterating over it's own elements sequentially, wouldn't it always succeed on popping the processes?

How about renaming d to processes?

Also, received_message can also be renamed to message since there is no clashing name to disambiguate from (to reduce verbosity).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

received_message has a message property. But I like the idea of reducing verbosity.

for received_message in response.received_messages:
process = multiprocessing.Process(target=worker,
args=(received_message,))
d[process] = (received_message.ack_id, received_message.message.data)
process.start()

ACK_DEADLINE=60
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this smaller, to let's say 30? That way it will be obvious that without resetting the deadline some messages won't be processed fully since the max running time is 60.


while d:
for process, (ack_id, msg_data) in d.items():
# If the process is still running, reset the ack deadline.
if process.is_alive():
# `ack_deadline_seconds` must be between 10s to 600s.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: can omit the s after the numbers since ack_deadline_seconds is already being explicit with it.

subscriber.modify_ack_deadline(subscription_path,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a more detailed explanation on how this affects the deadline, i.e. that it resets the deadline timer and change it to ACK_DEADLINE every 10 seconds.

[ack_id], ack_deadline_seconds=ACK_DEADLINE)
logger.info('{}: Reset ack deadline for {} for {}s'.format(
anguillanneuf marked this conversation as resolved.
Show resolved Hide resolved
time.strftime("%X", time.gmtime()), msg_data, ACK_DEADLINE))

# Otherwise, acknowledges using `ack_id`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise -> If the process finished

When reading from top to bottom, it's hard to remember where the else is. It's easier to read the "else" condition explicitly.

else:
subscriber.acknowledge(subscription_path, [ack_id])
logger.info("{}: Acknowledged {}".format(
time.strftime("%X", time.gmtime()), msg_data))
anguillanneuf marked this conversation as resolved.
Show resolved Hide resolved
d.pop(process)

# Sleeps the thread for 10s to save resources.
if d:
time.sleep(10)

print("Received and acknowledged all messages. Done.")
# [END pubsub_subscriber_sync_pull_with_lease]


def listen_for_errors(project, subscription_name):
"""Receives messages and catches errors from a pull subscription."""
# [START pubsub_subscriber_error_listener]
Expand Down Expand Up @@ -323,6 +393,11 @@ def callback(message):
help=receive_messages_synchronously.__doc__)
receive_messages_synchronously_parser.add_argument('subscription_name')

receive_messages_synchronously_with_lease_parser = subparsers.add_parser(
'receive-synchronously-with-lease',
help=synchronous_pull_with_lease_management.__doc__)
receive_messages_synchronously_with_lease_parser.add_argument('subscription_name')

listen_for_errors_parser = subparsers.add_parser(
'listen_for_errors', help=listen_for_errors.__doc__)
listen_for_errors_parser.add_argument('subscription_name')
Expand Down Expand Up @@ -359,5 +434,8 @@ def callback(message):
elif args.command == 'receive-synchronously':
receive_messages_synchronously(
args.project, args.subscription_name)
elif args.command == 'receive-synchronously-with-lease':
synchronous_pull_with_lease_management(
args.project, args.subscription_name)
elif args.command == 'listen_for_errors':
listen_for_errors(args.project, args.subscription_name)
26 changes: 11 additions & 15 deletions pubsub/cloud-client/subscriber_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,17 @@ def test_receive(publisher_client, topic, subscription, capsys):
assert 'Message 1' in out


def test_receive_synchronously(
publisher_client, topic, subscription, capsys):
_publish_messages(publisher_client, topic)

subscriber.receive_messages_synchronously(PROJECT, SUBSCRIPTION)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we test these two functions independently?

subscriber.synchronous_pull_with_lease_management(PROJECT, SUBSCRIPTION)

out, _ = capsys.readouterr()
assert 'Received and acknowledged all messages. Done.' in out


def test_receive_with_custom_attributes(
publisher_client, topic, subscription, capsys):
_publish_messages_with_custom_attributes(publisher_client, topic)
Expand Down Expand Up @@ -188,18 +199,3 @@ def test_receive_with_flow_control(
assert 'Listening' in out
assert subscription in out
assert 'Message 1' in out


def test_receive_synchronously(
publisher_client, topic, subscription, capsys):
_publish_messages(publisher_client, topic)

with _make_sleep_patch():
with pytest.raises(RuntimeError, match='sigil'):
subscriber.receive_messages_with_flow_control(
PROJECT, SUBSCRIPTION)

out, _ = capsys.readouterr()
assert 'Message 1' in out
assert 'Message 2' in out
assert 'Message 3' in out