Skip to content

Commit 2f8f3af

Browse files
anguillanneufplamut
authored andcommitted
Cloud Pub/Sub Quickstart V2 [(#2004)](GoogleCloudPlatform/python-docs-samples#2004)
* Quickstart V2 * Adopts Kir's suggestions * Adopted Tim's suggestions * proper resource deletion during teardown
1 parent f044c5b commit 2f8f3af

File tree

4 files changed

+311
-0
lines changed

4 files changed

+311
-0
lines changed

samples/snippets/quickstart/pub.py

+73
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
#!/usr/bin/env python
2+
3+
# Copyright 2019 Google LLC
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
# [START pubsub_quickstart_pub_all]
18+
import argparse
19+
import time
20+
# [START pubsub_quickstart_pub_deps]
21+
from google.cloud import pubsub_v1
22+
# [END pubsub_quickstart_pub_deps]
23+
24+
25+
def get_callback(api_future, data):
26+
"""Wrap message data in the context of the callback function."""
27+
28+
def callback(api_future):
29+
try:
30+
print("Published message {} now has message ID {}".format(
31+
data, api_future.result()))
32+
except Exception:
33+
print("A problem occurred when publishing {}: {}\n".format(
34+
data, api_future.exception()))
35+
raise
36+
return callback
37+
38+
39+
def pub(project_id, topic_name):
40+
"""Publishes a message to a Pub/Sub topic."""
41+
# [START pubsub_quickstart_pub_client]
42+
# Initialize a Publisher client
43+
client = pubsub_v1.PublisherClient()
44+
# [END pubsub_quickstart_pub_client]
45+
# Create a fully qualified identifier in the form of
46+
# `projects/{project_id}/topics/{topic_name}`
47+
topic_path = client.topic_path(project_id, topic_name)
48+
49+
# Data sent to Cloud Pub/Sub must be a bytestring
50+
data = b"Hello, World!"
51+
52+
# When you publish a message, the client returns a future.
53+
api_future = client.publish(topic_path, data=data)
54+
api_future.add_done_callback(get_callback(api_future, data))
55+
56+
# Keep the main thread from exiting until background message
57+
# is processed.
58+
while api_future.running():
59+
time.sleep(0.1)
60+
61+
62+
if __name__ == '__main__':
63+
parser = argparse.ArgumentParser(
64+
description=__doc__,
65+
formatter_class=argparse.RawDescriptionHelpFormatter
66+
)
67+
parser.add_argument('project_id', help='Google Cloud project ID')
68+
parser.add_argument('topic_name', help='Pub/Sub topic name')
69+
70+
args = parser.parse_args()
71+
72+
pub(args.project_id, args.topic_name)
73+
# [END pubsub_quickstart_pub_all]
+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
#!/usr/bin/env python
2+
3+
# Copyright 2019 Google LLC
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
import os
18+
import pytest
19+
20+
from google.api_core.exceptions import AlreadyExists
21+
from google.cloud import pubsub_v1
22+
23+
import pub
24+
25+
PROJECT = os.environ['GCLOUD_PROJECT']
26+
TOPIC = 'quickstart-pub-test-topic'
27+
28+
29+
@pytest.fixture(scope='module')
30+
def publisher_client():
31+
yield pubsub_v1.PublisherClient()
32+
33+
34+
@pytest.fixture(scope='module')
35+
def topic(publisher_client):
36+
topic_path = publisher_client.topic_path(PROJECT, TOPIC)
37+
38+
try:
39+
publisher_client.create_topic(topic_path)
40+
except AlreadyExists:
41+
pass
42+
43+
yield TOPIC
44+
45+
46+
@pytest.fixture
47+
def to_delete(publisher_client):
48+
doomed = []
49+
yield doomed
50+
for item in doomed:
51+
publisher_client.delete_topic(item)
52+
53+
54+
def test_pub(publisher_client, topic, to_delete, capsys):
55+
pub.pub(PROJECT, topic)
56+
57+
to_delete.append('projects/{}/topics/{}'.format(PROJECT, TOPIC))
58+
59+
out, _ = capsys.readouterr()
60+
61+
assert "Published message b'Hello, World!'" in out

samples/snippets/quickstart/sub.py

+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
#!/usr/bin/env python
2+
3+
# Copyright 2019 Google LLC
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
# [START pubsub_quickstart_sub_all]
18+
import argparse
19+
import time
20+
# [START pubsub_quickstart_sub_deps]
21+
from google.cloud import pubsub_v1
22+
# [END pubsub_quickstart_sub_deps]
23+
24+
25+
def sub(project_id, subscription_name):
26+
"""Receives messages from a Pub/Sub subscription."""
27+
# [START pubsub_quickstart_sub_client]
28+
# Initialize a Subscriber client
29+
client = pubsub_v1.SubscriberClient()
30+
# [END pubsub_quickstart_sub_client]
31+
# Create a fully qualified identifier in the form of
32+
# `projects/{project_id}/subscriptions/{subscription_name}`
33+
subscription_path = client.subscription_path(
34+
project_id, subscription_name)
35+
36+
def callback(message):
37+
print('Received message {} of message ID {}'.format(
38+
message, message.message_id))
39+
# Acknowledge the message. Unack'ed messages will be redelivered.
40+
message.ack()
41+
print('Acknowledged message of message ID {}\n'.format(
42+
message.message_id))
43+
44+
client.subscribe(subscription_path, callback=callback)
45+
print('Listening for messages on {}..\n'.format(subscription_path))
46+
47+
# Keep the main thread from exiting so the subscriber can
48+
# process messages in the background.
49+
while True:
50+
time.sleep(60)
51+
52+
53+
if __name__ == '__main__':
54+
parser = argparse.ArgumentParser(
55+
description=__doc__,
56+
formatter_class=argparse.RawDescriptionHelpFormatter
57+
)
58+
parser.add_argument('project_id', help='Google Cloud project ID')
59+
parser.add_argument('subscription_name', help='Pub/Sub subscription name')
60+
61+
args = parser.parse_args()
62+
63+
sub(args.project_id, args.subscription_name)
64+
# [END pubsub_quickstart_sub_all]
+113
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
#!/usr/bin/env python
2+
3+
# Copyright 2019 Google LLC
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
import mock
18+
import os
19+
import pytest
20+
import time
21+
22+
from google.api_core.exceptions import AlreadyExists
23+
from google.cloud import pubsub_v1
24+
25+
import sub
26+
27+
28+
PROJECT = os.environ['GCLOUD_PROJECT']
29+
TOPIC = 'quickstart-sub-test-topic'
30+
SUBSCRIPTION = 'quickstart-sub-test-topic-sub'
31+
32+
33+
@pytest.fixture(scope='module')
34+
def publisher_client():
35+
yield pubsub_v1.PublisherClient()
36+
37+
38+
@pytest.fixture(scope='module')
39+
def topic_path(publisher_client):
40+
topic_path = publisher_client.topic_path(PROJECT, TOPIC)
41+
42+
try:
43+
publisher_client.create_topic(topic_path)
44+
except AlreadyExists:
45+
pass
46+
47+
yield topic_path
48+
49+
50+
@pytest.fixture(scope='module')
51+
def subscriber_client():
52+
yield pubsub_v1.SubscriberClient()
53+
54+
55+
@pytest.fixture(scope='module')
56+
def subscription(subscriber_client, topic_path):
57+
subscription_path = subscriber_client.subscription_path(
58+
PROJECT, SUBSCRIPTION)
59+
60+
try:
61+
subscriber_client.create_subscription(subscription_path, topic_path)
62+
except AlreadyExists:
63+
pass
64+
65+
yield SUBSCRIPTION
66+
67+
68+
@pytest.fixture
69+
def to_delete(publisher_client, subscriber_client):
70+
doomed = []
71+
yield doomed
72+
for client, item in doomed:
73+
if 'topics' in item:
74+
publisher_client.delete_topic(item)
75+
if 'subscriptions' in item:
76+
subscriber_client.delete_subscription(item)
77+
78+
79+
def _make_sleep_patch():
80+
real_sleep = time.sleep
81+
82+
def new_sleep(period):
83+
if period == 60:
84+
real_sleep(10)
85+
raise RuntimeError('sigil')
86+
else:
87+
real_sleep(period)
88+
89+
return mock.patch('time.sleep', new=new_sleep)
90+
91+
92+
def test_sub(publisher_client,
93+
topic_path,
94+
subscriber_client,
95+
subscription,
96+
to_delete,
97+
capsys):
98+
99+
publisher_client.publish(topic_path, data=b'Hello, World!')
100+
101+
to_delete.append((publisher_client, topic_path))
102+
103+
with _make_sleep_patch():
104+
with pytest.raises(RuntimeError, match='sigil'):
105+
sub.sub(PROJECT, subscription)
106+
107+
to_delete.append((subscriber_client,
108+
'projects/{}/subscriptions/{}'.format(PROJECT,
109+
SUBSCRIPTION)))
110+
111+
out, _ = capsys.readouterr()
112+
assert "Received message" in out
113+
assert "Acknowledged message" in out

0 commit comments

Comments
 (0)