Skip to content

Commit

Permalink
Pub/Sub: update how subscriber client listens to StreamingPullFuture (#…
Browse files Browse the repository at this point in the history
…2475)

* update sub.py & requirements.txt
* fix flaky subscriber test with separate subscriptions
  • Loading branch information
anguillanneuf authored Oct 21, 2019
1 parent 14995eb commit fbf6f4a
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 164 deletions.
2 changes: 1 addition & 1 deletion pubsub/cloud-client/iam_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2019 Google Inc. All Rights Reserved.
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion pubsub/cloud-client/publisher.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python

# Copyright 2019 Google LLC. All Rights Reserved.
# Copyright 2016 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
12 changes: 5 additions & 7 deletions pubsub/cloud-client/publisher_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2019 Google Inc. All Rights Reserved.
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,13 +36,11 @@ def topic(client):
topic_path = client.topic_path(PROJECT, TOPIC)

try:
client.delete_topic(topic_path)
except Exception:
pass

client.create_topic(topic_path)
response = client.get_topic(topic_path)
except: # noqa
response = client.create_topic(topic_path)

yield topic_path
yield response.name


def _make_sleep_patch():
Expand Down
2 changes: 1 addition & 1 deletion pubsub/cloud-client/quickstart.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python

# Copyright 2019 Google Inc. All Rights Reserved.
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
19 changes: 10 additions & 9 deletions pubsub/cloud-client/quickstart/sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

# [START pubsub_quickstart_sub_all]
import argparse
import time
# [START pubsub_quickstart_sub_deps]
from google.cloud import pubsub_v1
# [END pubsub_quickstart_sub_deps]
Expand All @@ -34,20 +33,22 @@ def sub(project_id, subscription_name):
project_id, subscription_name)

def callback(message):
print('Received message {} of message ID {}'.format(
print('Received message {} of message ID {}\n'.format(
message, message.message_id))
# Acknowledge the message. Unack'ed messages will be redelivered.
message.ack()
print('Acknowledged message of message ID {}\n'.format(
message.message_id))
print('Acknowledged message {}\n'.format(message.message_id))

client.subscribe(subscription_path, callback=callback)
streaming_pull_future = client.subscribe(
subscription_path, callback=callback)
print('Listening for messages on {}..\n'.format(subscription_path))

# Keep the main thread from exiting so the subscriber can
# process messages in the background.
while True:
time.sleep(60)
# Calling result() on StreamingPullFuture keeps the main thread from
# exiting while messages get processed in the callbacks.
try:
streaming_pull_future.result()
except: # noqa
streaming_pull_future.cancel()


if __name__ == '__main__':
Expand Down
91 changes: 42 additions & 49 deletions pubsub/cloud-client/quickstart/sub_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import mock
import os
import pytest
import time

from google.api_core.exceptions import AlreadyExists
from google.cloud import pubsub_v1
Expand All @@ -29,84 +27,79 @@
TOPIC = 'quickstart-sub-test-topic'
SUBSCRIPTION = 'quickstart-sub-test-topic-sub'


@pytest.fixture(scope='module')
def publisher_client():
yield pubsub_v1.PublisherClient()
publisher_client = pubsub_v1.PublisherClient()
subscriber_client = pubsub_v1.SubscriberClient()


@pytest.fixture(scope='module')
def topic_path(publisher_client):
def topic_path():
topic_path = publisher_client.topic_path(PROJECT, TOPIC)

try:
publisher_client.create_topic(topic_path)
topic = publisher_client.create_topic(topic_path)
return topic.name
except AlreadyExists:
pass

yield topic_path


@pytest.fixture(scope='module')
def subscriber_client():
yield pubsub_v1.SubscriberClient()
return topic_path


@pytest.fixture(scope='module')
def subscription(subscriber_client, topic_path):
def subscription_path(topic_path):
subscription_path = subscriber_client.subscription_path(
PROJECT, SUBSCRIPTION)

try:
subscriber_client.create_subscription(subscription_path, topic_path)
subscription = subscriber_client.create_subscription(
subscription_path, topic_path)
return subscription.name
except AlreadyExists:
pass

yield SUBSCRIPTION
return subscription_path


@pytest.fixture
def to_delete(publisher_client, subscriber_client):
doomed = []
yield doomed
for client, item in doomed:
def _to_delete(resource_paths):
for item in resource_paths:
if 'topics' in item:
publisher_client.delete_topic(item)
if 'subscriptions' in item:
subscriber_client.delete_subscription(item)


def _make_sleep_patch():
real_sleep = time.sleep
def _publish_messages(topic_path):
publish_future = publisher_client.publish(topic_path, data=b'Hello World!')
publish_future.result()


def new_sleep(period):
if period == 60:
real_sleep(10)
raise RuntimeError('sigil')
else:
real_sleep(period)
def _sub_timeout(project_id, subscription_name):
# This is an exactly copy of `sub.py` except
# StreamingPullFuture.result() will time out after 10s.
client = pubsub_v1.SubscriberClient()
subscription_path = client.subscription_path(
project_id, subscription_name)

return mock.patch('time.sleep', new=new_sleep)
def callback(message):
print('Received message {} of message ID {}\n'.format(
message, message.message_id))
message.ack()
print('Acknowledged message {}\n'.format(message.message_id))

streaming_pull_future = client.subscribe(
subscription_path, callback=callback)
print('Listening for messages on {}..\n'.format(subscription_path))

try:
streaming_pull_future.result(timeout=10)
except: # noqa
streaming_pull_future.cancel()

def test_sub(publisher_client,
topic_path,
subscriber_client,
subscription,
to_delete,
capsys):

publisher_client.publish(topic_path, data=b'Hello, World!')
def test_sub(monkeypatch, topic_path, subscription_path, capsys):
monkeypatch.setattr(sub, 'sub', _sub_timeout)

to_delete.append((publisher_client, topic_path))
_publish_messages(topic_path)

with _make_sleep_patch():
with pytest.raises(RuntimeError, match='sigil'):
sub.sub(PROJECT, subscription)
sub.sub(PROJECT, SUBSCRIPTION)

to_delete.append((subscriber_client,
'projects/{}/subscriptions/{}'.format(PROJECT,
SUBSCRIPTION)))
# Clean up resources.
_to_delete([topic_path, subscription_path])

out, _ = capsys.readouterr()
assert "Received message" in out
Expand Down
2 changes: 1 addition & 1 deletion pubsub/cloud-client/quickstart_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python

# Copyright 2019 Google Inc. All Rights Reserved.
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion pubsub/cloud-client/subscriber.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python

# Copyright 2019 Google Inc. All Rights Reserved.
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
Loading

0 comments on commit fbf6f4a

Please sign in to comment.