-
Notifications
You must be signed in to change notification settings - Fork 6.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pub/Sub: synchronous pull with lease management #1701
Changes from 1 commit
a97a4a3
4402e8b
b9dbaf1
dba44a7
05dd406
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,10 @@ | |
|
||
import argparse | ||
import time | ||
import logging | ||
import random | ||
import multiprocessing | ||
from collections import defaultdict | ||
|
||
from google.cloud import pubsub_v1 | ||
|
||
|
@@ -239,9 +243,75 @@ def receive_messages_synchronously(project, subscription_name): | |
|
||
# Acknowledges the received messages so they will not be sent again. | ||
subscriber.acknowledge(subscription_path, ack_ids) | ||
|
||
print("Received and acknowledged all messages. Done.") | ||
# [END pubsub_subscriber_sync_pull] | ||
|
||
|
||
def synchronous_pull_with_lease_management(project, subscription_name): | ||
"""Pulling messages synchronously with lease management""" | ||
# [START pubsub_subscriber_sync_pull_with_lease] | ||
# project = "Your Google Cloud Project ID" | ||
# subscription_name = "Your Pubsub subscription name" | ||
subscriber = pubsub_v1.SubscriberClient() | ||
subscription_path = subscriber.subscription_path( | ||
project, subscription_name) | ||
|
||
NUM_MESSAGES = 2 | ||
# Builds a pull request with a specific number of messages to return. | ||
# `return_immediately` is set to False so that the system waits (for a | ||
# bounded amount of time) until at lease one message is available. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you be more specific on the "bounded amount of time"? How long will it wait for messages to arrive? Is there a default value and/or can we change that? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All the documentation I could find says the same thing word for word. If set to True, we can specify a |
||
response = subscriber.pull( | ||
subscription_path, | ||
max_messages=NUM_MESSAGES, | ||
return_immediately=False) | ||
|
||
multiprocessing.log_to_stderr() | ||
logger = multiprocessing.get_logger() | ||
logger.setLevel(logging.INFO) | ||
|
||
def worker(msg): | ||
"""Simulates a long-running process.""" | ||
RUN_TIME = random.randint(1,60) | ||
logger.info('{}: Running {} for {}s'.format( | ||
time.strftime("%X", time.gmtime()), msg.message.data, RUN_TIME)) | ||
time.sleep(RUN_TIME) | ||
|
||
# `d` stores process as key and ack id and message as values. | ||
d = defaultdict(lambda: (str, str)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this really have to be a How about renaming Also, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
for received_message in response.received_messages: | ||
process = multiprocessing.Process(target=worker, | ||
args=(received_message,)) | ||
d[process] = (received_message.ack_id, received_message.message.data) | ||
process.start() | ||
|
||
ACK_DEADLINE=60 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we make this smaller, to let's say 30? That way it will be obvious that without resetting the deadline some messages won't be processed fully since the max running time is 60. |
||
|
||
while d: | ||
for process, (ack_id, msg_data) in d.items(): | ||
# If the process is still running, reset the ack deadline. | ||
if process.is_alive(): | ||
# `ack_deadline_seconds` must be between 10s to 600s. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: can omit the |
||
subscriber.modify_ack_deadline(subscription_path, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add a more detailed explanation on how this affects the deadline, i.e. that it resets the deadline timer and change it to |
||
[ack_id], ack_deadline_seconds=ACK_DEADLINE) | ||
logger.info('{}: Reset ack deadline for {} for {}s'.format( | ||
anguillanneuf marked this conversation as resolved.
Show resolved
Hide resolved
|
||
time.strftime("%X", time.gmtime()), msg_data, ACK_DEADLINE)) | ||
|
||
# Otherwise, acknowledges using `ack_id`. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When reading from top to bottom, it's hard to remember where the else is. It's easier to read the "else" condition explicitly. |
||
else: | ||
subscriber.acknowledge(subscription_path, [ack_id]) | ||
logger.info("{}: Acknowledged {}".format( | ||
time.strftime("%X", time.gmtime()), msg_data)) | ||
anguillanneuf marked this conversation as resolved.
Show resolved
Hide resolved
|
||
d.pop(process) | ||
|
||
# Sleeps the thread for 10s to save resources. | ||
if d: | ||
time.sleep(10) | ||
|
||
print("Received and acknowledged all messages. Done.") | ||
# [END pubsub_subscriber_sync_pull_with_lease] | ||
|
||
|
||
def listen_for_errors(project, subscription_name): | ||
"""Receives messages and catches errors from a pull subscription.""" | ||
# [START pubsub_subscriber_error_listener] | ||
|
@@ -323,6 +393,11 @@ def callback(message): | |
help=receive_messages_synchronously.__doc__) | ||
receive_messages_synchronously_parser.add_argument('subscription_name') | ||
|
||
receive_messages_synchronously_with_lease_parser = subparsers.add_parser( | ||
'receive-synchronously-with-lease', | ||
help=synchronous_pull_with_lease_management.__doc__) | ||
receive_messages_synchronously_with_lease_parser.add_argument('subscription_name') | ||
|
||
listen_for_errors_parser = subparsers.add_parser( | ||
'listen_for_errors', help=listen_for_errors.__doc__) | ||
listen_for_errors_parser.add_argument('subscription_name') | ||
|
@@ -359,5 +434,8 @@ def callback(message): | |
elif args.command == 'receive-synchronously': | ||
receive_messages_synchronously( | ||
args.project, args.subscription_name) | ||
elif args.command == 'receive-synchronously-with-lease': | ||
synchronous_pull_with_lease_management( | ||
args.project, args.subscription_name) | ||
elif args.command == 'listen_for_errors': | ||
listen_for_errors(args.project, args.subscription_name) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -160,6 +160,17 @@ def test_receive(publisher_client, topic, subscription, capsys): | |
assert 'Message 1' in out | ||
|
||
|
||
def test_receive_synchronously( | ||
publisher_client, topic, subscription, capsys): | ||
_publish_messages(publisher_client, topic) | ||
|
||
subscriber.receive_messages_synchronously(PROJECT, SUBSCRIPTION) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we test these two functions independently? |
||
subscriber.synchronous_pull_with_lease_management(PROJECT, SUBSCRIPTION) | ||
|
||
out, _ = capsys.readouterr() | ||
assert 'Received and acknowledged all messages. Done.' in out | ||
|
||
|
||
def test_receive_with_custom_attributes( | ||
publisher_client, topic, subscription, capsys): | ||
_publish_messages_with_custom_attributes(publisher_client, topic) | ||
|
@@ -188,18 +199,3 @@ def test_receive_with_flow_control( | |
assert 'Listening' in out | ||
assert subscription in out | ||
assert 'Message 1' in out | ||
|
||
|
||
def test_receive_synchronously( | ||
publisher_client, topic, subscription, capsys): | ||
_publish_messages(publisher_client, topic) | ||
|
||
with _make_sleep_patch(): | ||
with pytest.raises(RuntimeError, match='sigil'): | ||
subscriber.receive_messages_with_flow_control( | ||
PROJECT, SUBSCRIPTION) | ||
|
||
out, _ = capsys.readouterr() | ||
assert 'Message 1' in out | ||
assert 'Message 2' in out | ||
assert 'Message 3' in out |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe reword "Builds a pull request with a specific number of messages" to "The subscriber pulls for a specific number of messages"? A pull request makes me think of github :)
Can you explain a little further how the
return_immediately
and thetimeout
relate to each other and how each affects the behavior? I.e. what happens if I don't specify any of them and what the default values are?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both are optional. I think the default values are based on the server, but may not be fixed.
I just tested without specifying either of them, subscriber waits without timing out. I'm going to remove such specifications for the code sample because we should stick to the most simple case.