-
Notifications
You must be signed in to change notification settings - Fork 6.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pub/Sub: synchronous pull with lease management #1701
Changes from 4 commits
a97a4a3
4402e8b
b9dbaf1
dba44a7
05dd406
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
google-cloud-pubsub==0.37.2 | ||
google-cloud-pubsub==0.38.0 |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,9 @@ | |
|
||
import argparse | ||
import time | ||
import logging | ||
import random | ||
import multiprocessing | ||
|
||
from google.cloud import pubsub_v1 | ||
|
||
|
@@ -224,13 +227,15 @@ def receive_messages_synchronously(project, subscription_name): | |
subscription_path = subscriber.subscription_path( | ||
project, subscription_name) | ||
|
||
NUM_MESSAGES=3 | ||
# Builds a pull request with a specific number of messages to return. | ||
# `return_immediately` is set to False so that the system waits (for a | ||
# bounded amount of time) until at lease one message is available. | ||
# `return_immediately` is set to False so that the system waits until | ||
# at lease one message is available before it times out. | ||
response = subscriber.pull( | ||
subscription_path, | ||
max_messages=3, | ||
return_immediately=False) | ||
max_messages=NUM_MESSAGES, | ||
return_immediately=False, | ||
timeout=900) | ||
|
||
ack_ids = [] | ||
for received_message in response.received_messages: | ||
|
@@ -239,9 +244,78 @@ def receive_messages_synchronously(project, subscription_name): | |
|
||
# Acknowledges the received messages so they will not be sent again. | ||
subscriber.acknowledge(subscription_path, ack_ids) | ||
|
||
print("Received and acknowledged {} messages. Done.".format(NUM_MESSAGES)) | ||
# [END pubsub_subscriber_sync_pull] | ||
|
||
|
||
def synchronous_pull_with_lease_management(project, subscription_name): | ||
"""Pulling messages synchronously with lease management""" | ||
# [START pubsub_subscriber_sync_pull_with_lease] | ||
# project = "Your Google Cloud Project ID" | ||
# subscription_name = "Your Pubsub subscription name" | ||
subscriber = pubsub_v1.SubscriberClient() | ||
subscription_path = subscriber.subscription_path( | ||
project, subscription_name) | ||
|
||
NUM_MESSAGES=2 | ||
ACK_DEADLINE=30 | ||
SLEEP_TIME=10 | ||
|
||
# Builds a pull request with a specific number of messages to return. | ||
# `return_immediately` is set to False so that the system waits until | ||
# at lease one message is available before it times out. | ||
response = subscriber.pull( | ||
subscription_path, | ||
max_messages=NUM_MESSAGES, | ||
return_immediately=False, | ||
timeout=900) | ||
|
||
multiprocessing.log_to_stderr() | ||
logger = multiprocessing.get_logger() | ||
logger.setLevel(logging.INFO) | ||
|
||
def worker(msg): | ||
"""Simulates a long-running process.""" | ||
RUN_TIME = random.randint(1,60) | ||
logger.info('{}: Running {} for {}s'.format( | ||
time.strftime("%X", time.gmtime()), msg.message.data, RUN_TIME)) | ||
time.sleep(RUN_TIME) | ||
|
||
# `processes` stores process as key and ack id and message as values. | ||
processes = dict() | ||
for message in response.received_messages: | ||
process = multiprocessing.Process(target=worker, args=(message,)) | ||
processes[process] = (message.ack_id, message.message.data) | ||
process.start() | ||
|
||
while processes: | ||
for process, (ack_id, msg_data) in processes.items(): | ||
# If the process is still running, reset the ack deadline as | ||
# specified by ACK_DEADLINE, and once every while as specified | ||
# by SLEEP_TIME. | ||
if process.is_alive(): | ||
# `ack_deadline_seconds` must be between 10s to 600s. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: can omit the |
||
subscriber.modify_ack_deadline(subscription_path, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add a more detailed explanation on how this affects the deadline, i.e. that it resets the deadline timer and change it to |
||
[ack_id], ack_deadline_seconds=ACK_DEADLINE) | ||
logger.info('{}: Reset ack deadline for {} for {}s'.format( | ||
anguillanneuf marked this conversation as resolved.
Show resolved
Hide resolved
|
||
time.strftime("%X", time.gmtime()), msg_data, ACK_DEADLINE)) | ||
|
||
# Otherwise, acknowledges using `ack_id`. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When reading from top to bottom, it's hard to remember where the else is. It's easier to read the "else" condition explicitly. |
||
else: | ||
subscriber.acknowledge(subscription_path, [ack_id]) | ||
logger.info("{}: Acknowledged {}".format( | ||
time.strftime("%X", time.gmtime()), msg_data)) | ||
anguillanneuf marked this conversation as resolved.
Show resolved
Hide resolved
|
||
processes.pop(process) | ||
|
||
# Sleeps the thread to save resources. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
if processes: | ||
time.sleep(SLEEP_TIME) | ||
|
||
print("Received and acknowledged {} messages. Done.".format(NUM_MESSAGES)) | ||
# [END pubsub_subscriber_sync_pull_with_lease] | ||
|
||
|
||
def listen_for_errors(project, subscription_name): | ||
"""Receives messages and catches errors from a pull subscription.""" | ||
# [START pubsub_subscriber_error_listener] | ||
|
@@ -323,6 +397,11 @@ def callback(message): | |
help=receive_messages_synchronously.__doc__) | ||
receive_messages_synchronously_parser.add_argument('subscription_name') | ||
|
||
receive_messages_synchronously_with_lease_parser = subparsers.add_parser( | ||
'receive-synchronously-with-lease', | ||
help=synchronous_pull_with_lease_management.__doc__) | ||
receive_messages_synchronously_with_lease_parser.add_argument('subscription_name') | ||
|
||
listen_for_errors_parser = subparsers.add_parser( | ||
'listen_for_errors', help=listen_for_errors.__doc__) | ||
listen_for_errors_parser.add_argument('subscription_name') | ||
|
@@ -359,5 +438,8 @@ def callback(message): | |
elif args.command == 'receive-synchronously': | ||
receive_messages_synchronously( | ||
args.project, args.subscription_name) | ||
elif args.command == 'receive-synchronously-with-lease': | ||
synchronous_pull_with_lease_management( | ||
args.project, args.subscription_name) | ||
elif args.command == 'listen_for_errors': | ||
listen_for_errors(args.project, args.subscription_name) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,8 @@ | |
PROJECT = os.environ['GCLOUD_PROJECT'] | ||
TOPIC = 'subscription-test-topic' | ||
SUBSCRIPTION = 'subscription-test-subscription' | ||
SUBSCRIPTION_SYNC1 = 'subscription-test-subscription-sync1' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since these are synchronous and shouldn't step over each other, can we use the same subscription for both synchronous tests? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. They sometimes did step on each other because the tests are run in parallel. |
||
SUBSCRIPTION_SYNC2 = 'subscription-test-subscription-sync2' | ||
ENDPOINT = 'https://{}.appspot.com/push'.format(PROJECT) | ||
|
||
|
||
|
@@ -67,6 +69,36 @@ def subscription(subscriber_client, topic): | |
yield subscription_path | ||
|
||
|
||
@pytest.fixture | ||
def subscription_sync1(subscriber_client, topic): | ||
subscription_sync_path = subscriber_client.subscription_path( | ||
PROJECT, SUBSCRIPTION_SYNC1) | ||
|
||
try: | ||
subscriber_client.delete_subscription(subscription_sync_path) | ||
except Exception: | ||
pass | ||
|
||
subscriber_client.create_subscription(subscription_sync_path, topic=topic) | ||
|
||
yield subscription_sync_path | ||
|
||
|
||
@pytest.fixture | ||
def subscription_sync2(subscriber_client, topic): | ||
subscription_sync_path = subscriber_client.subscription_path( | ||
PROJECT, SUBSCRIPTION_SYNC2) | ||
|
||
try: | ||
subscriber_client.delete_subscription(subscription_sync_path) | ||
except Exception: | ||
pass | ||
|
||
subscriber_client.create_subscription(subscription_sync_path, topic=topic) | ||
|
||
yield subscription_sync_path | ||
|
||
|
||
def test_list_in_topic(subscription, capsys): | ||
@eventually_consistent.call | ||
def _(): | ||
|
@@ -160,6 +192,27 @@ def test_receive(publisher_client, topic, subscription, capsys): | |
assert 'Message 1' in out | ||
|
||
|
||
def test_receive_synchronously( | ||
publisher_client, topic, subscription_sync1, capsys): | ||
_publish_messages(publisher_client, topic) | ||
|
||
subscriber.receive_messages_synchronously(PROJECT, SUBSCRIPTION_SYNC1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For consistency with the |
||
|
||
out, _ = capsys.readouterr() | ||
assert 'Done.' in out | ||
|
||
|
||
def test_receive_synchronously_with_lease( | ||
publisher_client, topic, subscription_sync2, capsys): | ||
_publish_messages(publisher_client, topic) | ||
|
||
subscriber.synchronous_pull_with_lease_management( | ||
PROJECT, SUBSCRIPTION_SYNC2) | ||
|
||
out, _ = capsys.readouterr() | ||
assert 'Done.' in out | ||
|
||
|
||
def test_receive_with_custom_attributes( | ||
publisher_client, topic, subscription, capsys): | ||
_publish_messages_with_custom_attributes(publisher_client, topic) | ||
|
@@ -188,18 +241,3 @@ def test_receive_with_flow_control( | |
assert 'Listening' in out | ||
assert subscription in out | ||
assert 'Message 1' in out | ||
|
||
|
||
def test_receive_synchronously( | ||
publisher_client, topic, subscription, capsys): | ||
_publish_messages(publisher_client, topic) | ||
|
||
with _make_sleep_patch(): | ||
with pytest.raises(RuntimeError, match='sigil'): | ||
subscriber.receive_messages_with_flow_control( | ||
PROJECT, SUBSCRIPTION) | ||
|
||
out, _ = capsys.readouterr() | ||
assert 'Message 1' in out | ||
assert 'Message 2' in out | ||
assert 'Message 3' in out |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe reword "Builds a pull request with a specific number of messages" to "The subscriber pulls for a specific number of messages"? A pull request makes me think of github :)
Can you explain a little further how the
return_immediately
and thetimeout
relate to each other and how each affects the behavior? I.e. what happens if I don't specify any of them and what the default values are?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both are optional. I think the default values are based on the server, but may not be fixed.
I just tested without specifying either of them, subscriber waits without timing out. I'm going to remove such specifications for the code sample because we should stick to the most simple case.