forked from googleapis/python-pubsub
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Cloud Pub/Sub Quickstart V2 [(#2004)](GoogleCloudPlatform/python-docs…
…-samples#2004) * Quickstart V2 * Adopts Kir's suggestions * Adopted Tim's suggestions * proper resource deletion during teardown
- Loading branch information
1 parent
1a49ed8
commit 0ca9ca0
Showing
4 changed files
with
311 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
#!/usr/bin/env python | ||
|
||
# Copyright 2019 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
# [START pubsub_quickstart_pub_all] | ||
import argparse | ||
import time | ||
# [START pubsub_quickstart_pub_deps] | ||
from google.cloud import pubsub_v1 | ||
# [END pubsub_quickstart_pub_deps] | ||
|
||
|
||
def get_callback(api_future, data): | ||
"""Wrap message data in the context of the callback function.""" | ||
|
||
def callback(api_future): | ||
try: | ||
print("Published message {} now has message ID {}".format( | ||
data, api_future.result())) | ||
except Exception: | ||
print("A problem occurred when publishing {}: {}\n".format( | ||
data, api_future.exception())) | ||
raise | ||
return callback | ||
|
||
|
||
def pub(project_id, topic_name): | ||
"""Publishes a message to a Pub/Sub topic.""" | ||
# [START pubsub_quickstart_pub_client] | ||
# Initialize a Publisher client | ||
client = pubsub_v1.PublisherClient() | ||
# [END pubsub_quickstart_pub_client] | ||
# Create a fully qualified identifier in the form of | ||
# `projects/{project_id}/topics/{topic_name}` | ||
topic_path = client.topic_path(project_id, topic_name) | ||
|
||
# Data sent to Cloud Pub/Sub must be a bytestring | ||
data = b"Hello, World!" | ||
|
||
# When you publish a message, the client returns a future. | ||
api_future = client.publish(topic_path, data=data) | ||
api_future.add_done_callback(get_callback(api_future, data)) | ||
|
||
# Keep the main thread from exiting until background message | ||
# is processed. | ||
while api_future.running(): | ||
time.sleep(0.1) | ||
|
||
|
||
if __name__ == '__main__': | ||
parser = argparse.ArgumentParser( | ||
description=__doc__, | ||
formatter_class=argparse.RawDescriptionHelpFormatter | ||
) | ||
parser.add_argument('project_id', help='Google Cloud project ID') | ||
parser.add_argument('topic_name', help='Pub/Sub topic name') | ||
|
||
args = parser.parse_args() | ||
|
||
pub(args.project_id, args.topic_name) | ||
# [END pubsub_quickstart_pub_all] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
#!/usr/bin/env python | ||
|
||
# Copyright 2019 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import os | ||
import pytest | ||
|
||
from google.api_core.exceptions import AlreadyExists | ||
from google.cloud import pubsub_v1 | ||
|
||
import pub | ||
|
||
PROJECT = os.environ['GCLOUD_PROJECT'] | ||
TOPIC = 'quickstart-pub-test-topic' | ||
|
||
|
||
@pytest.fixture(scope='module') | ||
def publisher_client(): | ||
yield pubsub_v1.PublisherClient() | ||
|
||
|
||
@pytest.fixture(scope='module') | ||
def topic(publisher_client): | ||
topic_path = publisher_client.topic_path(PROJECT, TOPIC) | ||
|
||
try: | ||
publisher_client.create_topic(topic_path) | ||
except AlreadyExists: | ||
pass | ||
|
||
yield TOPIC | ||
|
||
|
||
@pytest.fixture | ||
def to_delete(publisher_client): | ||
doomed = [] | ||
yield doomed | ||
for item in doomed: | ||
publisher_client.delete_topic(item) | ||
|
||
|
||
def test_pub(publisher_client, topic, to_delete, capsys): | ||
pub.pub(PROJECT, topic) | ||
|
||
to_delete.append('projects/{}/topics/{}'.format(PROJECT, TOPIC)) | ||
|
||
out, _ = capsys.readouterr() | ||
|
||
assert "Published message b'Hello, World!'" in out |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
#!/usr/bin/env python | ||
|
||
# Copyright 2019 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
# [START pubsub_quickstart_sub_all] | ||
import argparse | ||
import time | ||
# [START pubsub_quickstart_sub_deps] | ||
from google.cloud import pubsub_v1 | ||
# [END pubsub_quickstart_sub_deps] | ||
|
||
|
||
def sub(project_id, subscription_name): | ||
"""Receives messages from a Pub/Sub subscription.""" | ||
# [START pubsub_quickstart_sub_client] | ||
# Initialize a Subscriber client | ||
client = pubsub_v1.SubscriberClient() | ||
# [END pubsub_quickstart_sub_client] | ||
# Create a fully qualified identifier in the form of | ||
# `projects/{project_id}/subscriptions/{subscription_name}` | ||
subscription_path = client.subscription_path( | ||
project_id, subscription_name) | ||
|
||
def callback(message): | ||
print('Received message {} of message ID {}'.format( | ||
message, message.message_id)) | ||
# Acknowledge the message. Unack'ed messages will be redelivered. | ||
message.ack() | ||
print('Acknowledged message of message ID {}\n'.format( | ||
message.message_id)) | ||
|
||
client.subscribe(subscription_path, callback=callback) | ||
print('Listening for messages on {}..\n'.format(subscription_path)) | ||
|
||
# Keep the main thread from exiting so the subscriber can | ||
# process messages in the background. | ||
while True: | ||
time.sleep(60) | ||
|
||
|
||
if __name__ == '__main__': | ||
parser = argparse.ArgumentParser( | ||
description=__doc__, | ||
formatter_class=argparse.RawDescriptionHelpFormatter | ||
) | ||
parser.add_argument('project_id', help='Google Cloud project ID') | ||
parser.add_argument('subscription_name', help='Pub/Sub subscription name') | ||
|
||
args = parser.parse_args() | ||
|
||
sub(args.project_id, args.subscription_name) | ||
# [END pubsub_quickstart_sub_all] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
#!/usr/bin/env python | ||
|
||
# Copyright 2019 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import mock | ||
import os | ||
import pytest | ||
import time | ||
|
||
from google.api_core.exceptions import AlreadyExists | ||
from google.cloud import pubsub_v1 | ||
|
||
import sub | ||
|
||
|
||
PROJECT = os.environ['GCLOUD_PROJECT'] | ||
TOPIC = 'quickstart-sub-test-topic' | ||
SUBSCRIPTION = 'quickstart-sub-test-topic-sub' | ||
|
||
|
||
@pytest.fixture(scope='module') | ||
def publisher_client(): | ||
yield pubsub_v1.PublisherClient() | ||
|
||
|
||
@pytest.fixture(scope='module') | ||
def topic_path(publisher_client): | ||
topic_path = publisher_client.topic_path(PROJECT, TOPIC) | ||
|
||
try: | ||
publisher_client.create_topic(topic_path) | ||
except AlreadyExists: | ||
pass | ||
|
||
yield topic_path | ||
|
||
|
||
@pytest.fixture(scope='module') | ||
def subscriber_client(): | ||
yield pubsub_v1.SubscriberClient() | ||
|
||
|
||
@pytest.fixture(scope='module') | ||
def subscription(subscriber_client, topic_path): | ||
subscription_path = subscriber_client.subscription_path( | ||
PROJECT, SUBSCRIPTION) | ||
|
||
try: | ||
subscriber_client.create_subscription(subscription_path, topic_path) | ||
except AlreadyExists: | ||
pass | ||
|
||
yield SUBSCRIPTION | ||
|
||
|
||
@pytest.fixture | ||
def to_delete(publisher_client, subscriber_client): | ||
doomed = [] | ||
yield doomed | ||
for client, item in doomed: | ||
if 'topics' in item: | ||
publisher_client.delete_topic(item) | ||
if 'subscriptions' in item: | ||
subscriber_client.delete_subscription(item) | ||
|
||
|
||
def _make_sleep_patch(): | ||
real_sleep = time.sleep | ||
|
||
def new_sleep(period): | ||
if period == 60: | ||
real_sleep(10) | ||
raise RuntimeError('sigil') | ||
else: | ||
real_sleep(period) | ||
|
||
return mock.patch('time.sleep', new=new_sleep) | ||
|
||
|
||
def test_sub(publisher_client, | ||
topic_path, | ||
subscriber_client, | ||
subscription, | ||
to_delete, | ||
capsys): | ||
|
||
publisher_client.publish(topic_path, data=b'Hello, World!') | ||
|
||
to_delete.append((publisher_client, topic_path)) | ||
|
||
with _make_sleep_patch(): | ||
with pytest.raises(RuntimeError, match='sigil'): | ||
sub.sub(PROJECT, subscription) | ||
|
||
to_delete.append((subscriber_client, | ||
'projects/{}/subscriptions/{}'.format(PROJECT, | ||
SUBSCRIPTION))) | ||
|
||
out, _ = capsys.readouterr() | ||
assert "Received message" in out | ||
assert "Acknowledged message" in out |