Skip to content

Commit d42f54c

Browse files
anguillanneufbusunkim96
authored andcommitted
Pub/Sub: remove infinite while loops in subscriber examples (#2604)
* use result() on streaming pull futures instead of infinite while * remove unused imports
1 parent 2329466 commit d42f54c

File tree

3 files changed

+179
-174
lines changed

3 files changed

+179
-174
lines changed

pubsub/cloud-client/publisher_test.py

Lines changed: 41 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@
2525

2626
UUID = uuid.uuid4().hex
2727
PROJECT = os.environ["GCLOUD_PROJECT"]
28-
TOPIC = "publisher-test-topic-" + UUID
28+
TOPIC_ADMIN = "publisher-test-topic-admin-" + UUID
29+
TOPIC_PUBLISH = "publisher-test-topic-publish-" + UUID
2930

3031

3132
@pytest.fixture
@@ -34,15 +35,30 @@ def client():
3435

3536

3637
@pytest.fixture
37-
def topic(client):
38-
topic_path = client.topic_path(PROJECT, TOPIC)
38+
def topic_admin(client):
39+
topic_path = client.topic_path(PROJECT, TOPIC_ADMIN)
3940

4041
try:
41-
response = client.get_topic(topic_path)
42+
topic = client.get_topic(topic_path)
4243
except: # noqa
43-
response = client.create_topic(topic_path)
44+
topic = client.create_topic(topic_path)
4445

45-
yield response.name
46+
yield topic.name
47+
# Teardown of `topic_admin` is handled in `test_delete()`.
48+
49+
50+
@pytest.fixture
51+
def topic_publish(client):
52+
topic_path = client.topic_path(PROJECT, TOPIC_PUBLISH)
53+
54+
try:
55+
topic = client.get_topic(topic_path)
56+
except: # noqa
57+
topic = client.create_topic(topic_path)
58+
59+
yield topic.name
60+
61+
client.delete_topic(topic.name)
4662

4763

4864
def _make_sleep_patch():
@@ -58,83 +74,74 @@ def new_sleep(period):
5874
return mock.patch("time.sleep", new=new_sleep)
5975

6076

61-
def _to_delete():
62-
publisher_client = pubsub_v1.PublisherClient()
63-
publisher_client.delete_topic(
64-
"projects/{}/topics/{}".format(PROJECT, TOPIC)
65-
)
66-
67-
68-
def test_list(client, topic, capsys):
77+
def test_list(client, topic_admin, capsys):
6978
@eventually_consistent.call
7079
def _():
7180
publisher.list_topics(PROJECT)
7281
out, _ = capsys.readouterr()
73-
assert topic in out
82+
assert topic_admin in out
7483

7584

7685
def test_create(client):
77-
topic_path = client.topic_path(PROJECT, TOPIC)
86+
topic_path = client.topic_path(PROJECT, TOPIC_ADMIN)
7887
try:
7988
client.delete_topic(topic_path)
8089
except Exception:
8190
pass
8291

83-
publisher.create_topic(PROJECT, TOPIC)
92+
publisher.create_topic(PROJECT, TOPIC_ADMIN)
8493

8594
@eventually_consistent.call
8695
def _():
8796
assert client.get_topic(topic_path)
8897

8998

90-
def test_delete(client, topic):
91-
publisher.delete_topic(PROJECT, TOPIC)
99+
def test_delete(client, topic_admin):
100+
publisher.delete_topic(PROJECT, TOPIC_ADMIN)
92101

93102
@eventually_consistent.call
94103
def _():
95104
with pytest.raises(Exception):
96-
client.get_topic(client.topic_path(PROJECT, TOPIC))
105+
client.get_topic(client.topic_path(PROJECT, TOPIC_ADMIN))
97106

98107

99-
def test_publish(topic, capsys):
100-
publisher.publish_messages(PROJECT, TOPIC)
108+
def test_publish(topic_publish, capsys):
109+
publisher.publish_messages(PROJECT, TOPIC_PUBLISH)
101110

102111
out, _ = capsys.readouterr()
103112
assert "Published" in out
104113

105114

106-
def test_publish_with_custom_attributes(topic, capsys):
107-
publisher.publish_messages_with_custom_attributes(PROJECT, TOPIC)
115+
def test_publish_with_custom_attributes(topic_publish, capsys):
116+
publisher.publish_messages_with_custom_attributes(PROJECT, TOPIC_PUBLISH)
108117

109118
out, _ = capsys.readouterr()
110119
assert "Published" in out
111120

112121

113-
def test_publish_with_batch_settings(topic, capsys):
114-
publisher.publish_messages_with_batch_settings(PROJECT, TOPIC)
122+
def test_publish_with_batch_settings(topic_publish, capsys):
123+
publisher.publish_messages_with_batch_settings(PROJECT, TOPIC_PUBLISH)
115124

116125
out, _ = capsys.readouterr()
117126
assert "Published" in out
118127

119128

120-
def test_publish_with_retry_settings(topic, capsys):
121-
publisher.publish_messages_with_retry_settings(PROJECT, TOPIC)
129+
def test_publish_with_retry_settings(topic_publish, capsys):
130+
publisher.publish_messages_with_retry_settings(PROJECT, TOPIC_PUBLISH)
122131

123132
out, _ = capsys.readouterr()
124133
assert "Published" in out
125134

126135

127-
def test_publish_with_error_handler(topic, capsys):
128-
publisher.publish_messages_with_error_handler(PROJECT, TOPIC)
136+
def test_publish_with_error_handler(topic_publish, capsys):
137+
publisher.publish_messages_with_error_handler(PROJECT, TOPIC_PUBLISH)
129138

130139
out, _ = capsys.readouterr()
131140
assert "Published" in out
132141

133142

134-
def test_publish_with_futures(topic, capsys):
135-
publisher.publish_messages_with_futures(PROJECT, TOPIC)
143+
def test_publish_with_futures(topic_publish, capsys):
144+
publisher.publish_messages_with_futures(PROJECT, TOPIC_PUBLISH)
136145

137146
out, _ = capsys.readouterr()
138147
assert "Published" in out
139-
140-
_to_delete()

pubsub/cloud-client/subscriber.py

Lines changed: 60 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -161,16 +161,16 @@ def update_subscription(project_id, subscription_name, endpoint):
161161
# [END pubsub_update_push_configuration]
162162

163163

164-
def receive_messages(project_id, subscription_name):
164+
def receive_messages(project_id, subscription_name, timeout=None):
165165
"""Receives messages from a pull subscription."""
166166
# [START pubsub_subscriber_async_pull]
167167
# [START pubsub_quickstart_subscriber]
168-
import time
169-
170168
from google.cloud import pubsub_v1
171169

172170
# TODO project_id = "Your Google Cloud Project ID"
173171
# TODO subscription_name = "Your Pub/Sub subscription name"
172+
# TODO timeout = 5.0 # "How long the subscriber should listen for
173+
# messages in seconds"
174174

175175
subscriber = pubsub_v1.SubscriberClient()
176176
# The `subscription_path` method creates a fully qualified identifier
@@ -183,27 +183,33 @@ def callback(message):
183183
print("Received message: {}".format(message))
184184
message.ack()
185185

186-
subscriber.subscribe(subscription_path, callback=callback)
186+
streaming_pull_future = subscriber.subscribe(
187+
subscription_path, callback=callback
188+
)
189+
print("Listening for messages on {}..\n".format(subscription_path))
187190

188-
# The subscriber is non-blocking. We must keep the main thread from
189-
# exiting to allow it to process messages asynchronously in the background.
190-
print("Listening for messages on {}".format(subscription_path))
191-
while True:
192-
time.sleep(60)
191+
# result() in a future will block indefinitely if `timeout` is not set,
192+
# unless an exception is encountered first.
193+
try:
194+
streaming_pull_future.result(timeout=timeout)
195+
except: # noqa
196+
streaming_pull_future.cancel()
193197
# [END pubsub_subscriber_async_pull]
194198
# [END pubsub_quickstart_subscriber]
195199

196200

197-
def receive_messages_with_custom_attributes(project_id, subscription_name):
201+
def receive_messages_with_custom_attributes(
202+
project_id, subscription_name, timeout=None
203+
):
198204
"""Receives messages from a pull subscription."""
199205
# [START pubsub_subscriber_sync_pull_custom_attributes]
200206
# [START pubsub_subscriber_async_pull_custom_attributes]
201-
import time
202-
203207
from google.cloud import pubsub_v1
204208

205209
# TODO project_id = "Your Google Cloud Project ID"
206210
# TODO subscription_name = "Your Pub/Sub subscription name"
211+
# TODO timeout = 5.0 # "How long the subscriber should listen for
212+
# messages in seconds"
207213

208214
subscriber = pubsub_v1.SubscriberClient()
209215
subscription_path = subscriber.subscription_path(
@@ -219,26 +225,32 @@ def callback(message):
219225
print("{}: {}".format(key, value))
220226
message.ack()
221227

222-
subscriber.subscribe(subscription_path, callback=callback)
228+
streaming_pull_future = subscriber.subscribe(
229+
subscription_path, callback=callback
230+
)
231+
print("Listening for messages on {}..\n".format(subscription_path))
223232

224-
# The subscriber is non-blocking, so we must keep the main thread from
225-
# exiting to allow it to process messages in the background.
226-
print("Listening for messages on {}".format(subscription_path))
227-
while True:
228-
time.sleep(60)
233+
# result() in a future will block indefinitely if `timeout` is not set,
234+
# unless an exception is encountered first.
235+
try:
236+
streaming_pull_future.result(timeout=timeout)
237+
except: # noqa
238+
streaming_pull_future.cancel()
229239
# [END pubsub_subscriber_async_pull_custom_attributes]
230240
# [END pubsub_subscriber_sync_pull_custom_attributes]
231241

232242

233-
def receive_messages_with_flow_control(project_id, subscription_name):
243+
def receive_messages_with_flow_control(
244+
project_id, subscription_name, timeout=None
245+
):
234246
"""Receives messages from a pull subscription with flow control."""
235247
# [START pubsub_subscriber_flow_settings]
236-
import time
237-
238248
from google.cloud import pubsub_v1
239249

240250
# TODO project_id = "Your Google Cloud Project ID"
241251
# TODO subscription_name = "Your Pub/Sub subscription name"
252+
# TODO timeout = 5.0 # "How long the subscriber should listen for
253+
# messages in seconds"
242254

243255
subscriber = pubsub_v1.SubscriberClient()
244256
subscription_path = subscriber.subscription_path(
@@ -251,15 +263,18 @@ def callback(message):
251263

252264
# Limit the subscriber to only have ten outstanding messages at a time.
253265
flow_control = pubsub_v1.types.FlowControl(max_messages=10)
254-
subscriber.subscribe(
266+
267+
streaming_pull_future = subscriber.subscribe(
255268
subscription_path, callback=callback, flow_control=flow_control
256269
)
270+
print("Listening for messages on {}..\n".format(subscription_path))
257271

258-
# The subscriber is non-blocking, so we must keep the main thread from
259-
# exiting to allow it to process messages in the background.
260-
print("Listening for messages on {}".format(subscription_path))
261-
while True:
262-
time.sleep(60)
272+
# result() in a future will block indefinitely if `timeout` is not set,
273+
# unless an exception is encountered first.
274+
try:
275+
streaming_pull_future.result(timeout=timeout)
276+
except: # noqa
277+
streaming_pull_future.cancel()
263278
# [END pubsub_subscriber_flow_settings]
264279

265280

@@ -386,13 +401,15 @@ def worker(msg):
386401
# [END pubsub_subscriber_sync_pull_with_lease]
387402

388403

389-
def listen_for_errors(project_id, subscription_name):
404+
def listen_for_errors(project_id, subscription_name, timeout=None):
390405
"""Receives messages and catches errors from a pull subscription."""
391406
# [START pubsub_subscriber_error_listener]
392407
from google.cloud import pubsub_v1
393408

394409
# TODO project_id = "Your Google Cloud Project ID"
395410
# TODO subscription_name = "Your Pubsub subscription name"
411+
# TODO timeout = 5.0 # "How long the subscriber should listen for
412+
# messages in seconds"
396413

397414
subscriber = pubsub_v1.SubscriberClient()
398415
subscription_path = subscriber.subscription_path(
@@ -403,16 +420,19 @@ def callback(message):
403420
print("Received message: {}".format(message))
404421
message.ack()
405422

406-
future = subscriber.subscribe(subscription_path, callback=callback)
423+
streaming_pull_future = subscriber.subscribe(
424+
subscription_path, callback=callback
425+
)
426+
print("Listening for messages on {}..\n".format(subscription_path))
407427

408-
# Blocks the thread while messages are coming in through the stream. Any
409-
# exceptions that crop up on the thread will be set on the future.
428+
# result() in a future will block indefinitely if `timeout` is not set,
429+
# unless an exception is encountered first.
410430
try:
411-
# When timeout is unspecified, the result method waits indefinitely.
412-
future.result(timeout=30)
431+
streaming_pull_future.result(timeout=timeout)
413432
except Exception as e:
433+
streaming_pull_future.cancel()
414434
print(
415-
"Listening for messages on {} threw an Exception: {}.".format(
435+
"Listening for messages on {} threw an exception: {}.".format(
416436
subscription_name, e
417437
)
418438
)
@@ -518,14 +538,14 @@ def callback(message):
518538
args.project_id, args.subscription_name, args.endpoint
519539
)
520540
elif args.command == "receive":
521-
receive_messages(args.project_id, args.subscription_name)
541+
receive_messages(args.project_id, args.subscription_name, args.timeout)
522542
elif args.command == "receive-custom-attributes":
523543
receive_messages_with_custom_attributes(
524-
args.project_id, args.subscription_name
544+
args.project_id, args.subscription_name, args.timeout
525545
)
526546
elif args.command == "receive-flow-control":
527547
receive_messages_with_flow_control(
528-
args.project_id, args.subscription_name
548+
args.project_id, args.subscription_name, args.timeout
529549
)
530550
elif args.command == "receive-synchronously":
531551
synchronous_pull(args.project_id, args.subscription_name)
@@ -534,4 +554,6 @@ def callback(message):
534554
args.project_id, args.subscription_name
535555
)
536556
elif args.command == "listen_for_errors":
537-
listen_for_errors(args.project_id, args.subscription_name)
557+
listen_for_errors(
558+
args.project_id, args.subscription_name, args.timeout
559+
)

0 commit comments

Comments
 (0)