Skip to content

Commit

Permalink
Resolve all futures (#2231)
Browse files Browse the repository at this point in the history
  • Loading branch information
anguillanneuf authored Jun 20, 2019
1 parent b32f7df commit 2d2820c
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 42 deletions.
72 changes: 37 additions & 35 deletions pubsub/cloud-client/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def publish_messages(project_id, topic_name):
data = data.encode('utf-8')
# When you publish a message, the client returns a future.
future = publisher.publish(topic_path, data=data)
print('Published {} of message ID {}.'.format(data, future.result()))
print(future.result())

print('Published messages.')
# [END pubsub_quickstart_publisher]
Expand All @@ -119,8 +119,9 @@ def publish_messages_with_custom_attributes(project_id, topic_name):
# Data must be a bytestring
data = data.encode('utf-8')
# Add two attributes, origin and username, to the message
publisher.publish(
future = publisher.publish(
topic_path, data, origin='python-sample', username='gcp')
print(future.result())

print('Published messages with custom attributes.')
# [END pubsub_publish_custom_attributes]
Expand All @@ -138,21 +139,15 @@ def publish_messages_with_futures(project_id, topic_name):
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

# When you publish a message, the client returns a Future. This Future
# can be used to track when the message is published.
futures = []

for n in range(1, 10):
data = u'Message number {}'.format(n)
# Data must be a bytestring
data = data.encode('utf-8')
message_future = publisher.publish(topic_path, data=data)
futures.append(message_future)

print('Published message IDs:')
for future in futures:
# result() blocks until the message is published.
# When you publish a message, the client returns a future.
future = publisher.publish(topic_path, data=data)
print(future.result())

print("Published messages with futures.")
# [END pubsub_publisher_concurrency_control]


Expand All @@ -169,28 +164,34 @@ def publish_messages_with_error_handler(project_id, topic_name):
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

def callback(message_future):
if message_future.exception():
print('{} needs handling.'.format(message_future.exception()))
else:
print(message_future.result())
futures = dict()

for n in range(1, 10):
data = u'Message number {}'.format(n)
# Data must be a bytestring
data = data.encode('utf-8')
# When you publish a message, the client returns a Future.
message_future = publisher.publish(topic_path, data=data)
# If you wish to handle publish failures, do it in the callback.
# Otherwise, it's okay to call `message_future.result()` directly.
message_future.add_done_callback(callback)

print('Published message IDs:')

# We keep the main thread from exiting so message futures can be
# resolved in the background.
while True:
time.sleep(60)
def get_callback(f, data):
def callback(f):
try:
print(f.result())
futures.pop(data)
except: # noqa
print("Please handle {} for {}.".format(f.exception(), data))
return callback

for i in range(10):
data = str(i)
futures.update({data: None})
# When you publish a message, the client returns a future.
future = publisher.publish(
topic_path,
data=data.encode("utf-8"), # data must be a bytestring.
)
futures[data] = future
# Publish failures shall be handled in the callback function.
future.add_done_callback(get_callback(future, data))

# Wait for all the publish futures to resolve before exiting.
while futures:
time.sleep(5)

print("Published message with error handler.")
# [END pubsub_publish_messages_error_handler]


Expand All @@ -215,9 +216,10 @@ def publish_messages_with_batch_settings(project_id, topic_name):
data = u'Message number {}'.format(n)
# Data must be a bytestring
data = data.encode('utf-8')
publisher.publish(topic_path, data=data)
future = publisher.publish(topic_path, data=data)
print(future.result())

print('Published messages.')
print('Published messages with batch settings.')
# [END pubsub_publisher_batch_settings]


Expand Down
6 changes: 1 addition & 5 deletions pubsub/cloud-client/publisher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,7 @@ def test_publish_with_batch_settings(topic, capsys):


def test_publish_with_error_handler(topic, capsys):

with _make_sleep_patch():
with pytest.raises(RuntimeError, match='sigil'):
publisher.publish_messages_with_error_handler(
PROJECT, TOPIC)
publisher.publish_messages_with_error_handler(PROJECT, TOPIC)

out, _ = capsys.readouterr()
assert 'Published' in out
Expand Down
6 changes: 4 additions & 2 deletions pubsub/cloud-client/subscriber_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,15 @@ def test_update(subscriber_client, subscription, capsys):
def _publish_messages(publisher_client, topic):
for n in range(5):
data = u'Message {}'.format(n).encode('utf-8')
publisher_client.publish(
future = publisher_client.publish(
topic, data=data)
future.result()


def _publish_messages_with_custom_attributes(publisher_client, topic):
data = u'Test message'.encode('utf-8')
publisher_client.publish(topic, data=data, origin='python-sample')
future = publisher_client.publish(topic, data=data, origin='python-sample')
future.result()


def _make_sleep_patch():
Expand Down

0 comments on commit 2d2820c

Please sign in to comment.