Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pub/Sub: fix all publisher examples #2231

Merged
merged 1 commit into from
Jun 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 37 additions & 35 deletions pubsub/cloud-client/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def publish_messages(project_id, topic_name):
data = data.encode('utf-8')
# When you publish a message, the client returns a future.
future = publisher.publish(topic_path, data=data)
print('Published {} of message ID {}.'.format(data, future.result()))
print(future.result())

print('Published messages.')
# [END pubsub_quickstart_publisher]
Expand All @@ -119,8 +119,9 @@ def publish_messages_with_custom_attributes(project_id, topic_name):
# Data must be a bytestring
data = data.encode('utf-8')
# Add two attributes, origin and username, to the message
publisher.publish(
future = publisher.publish(
topic_path, data, origin='python-sample', username='gcp')
print(future.result())

print('Published messages with custom attributes.')
# [END pubsub_publish_custom_attributes]
Expand All @@ -138,21 +139,15 @@ def publish_messages_with_futures(project_id, topic_name):
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

# When you publish a message, the client returns a Future. This Future
# can be used to track when the message is published.
futures = []

for n in range(1, 10):
data = u'Message number {}'.format(n)
# Data must be a bytestring
data = data.encode('utf-8')
message_future = publisher.publish(topic_path, data=data)
futures.append(message_future)

print('Published message IDs:')
for future in futures:
# result() blocks until the message is published.
# When you publish a message, the client returns a future.
future = publisher.publish(topic_path, data=data)
print(future.result())

print("Published messages with futures.")
# [END pubsub_publisher_concurrency_control]


Expand All @@ -169,28 +164,34 @@ def publish_messages_with_error_handler(project_id, topic_name):
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

def callback(message_future):
if message_future.exception():
print('{} needs handling.'.format(message_future.exception()))
else:
print(message_future.result())
futures = dict()

for n in range(1, 10):
data = u'Message number {}'.format(n)
# Data must be a bytestring
data = data.encode('utf-8')
# When you publish a message, the client returns a Future.
message_future = publisher.publish(topic_path, data=data)
# If you wish to handle publish failures, do it in the callback.
# Otherwise, it's okay to call `message_future.result()` directly.
message_future.add_done_callback(callback)

print('Published message IDs:')

# We keep the main thread from exiting so message futures can be
# resolved in the background.
while True:
time.sleep(60)
def get_callback(f, data):
def callback(f):
try:
print(f.result())
futures.pop(data)
except: # noqa
print("Please handle {} for {}.".format(f.exception(), data))
return callback

for i in range(10):
data = str(i)
futures.update({data: None})
# When you publish a message, the client returns a future.
future = publisher.publish(
topic_path,
data=data.encode("utf-8"), # data must be a bytestring.
)
futures[data] = future
# Publish failures shall be handled in the callback function.
future.add_done_callback(get_callback(future, data))

# Wait for all the publish futures to resolve before exiting.
while futures:
time.sleep(5)

print("Published message with error handler.")
# [END pubsub_publish_messages_error_handler]


Expand All @@ -215,9 +216,10 @@ def publish_messages_with_batch_settings(project_id, topic_name):
data = u'Message number {}'.format(n)
# Data must be a bytestring
data = data.encode('utf-8')
publisher.publish(topic_path, data=data)
future = publisher.publish(topic_path, data=data)
print(future.result())

print('Published messages.')
print('Published messages with batch settings.')
# [END pubsub_publisher_batch_settings]


Expand Down
6 changes: 1 addition & 5 deletions pubsub/cloud-client/publisher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,7 @@ def test_publish_with_batch_settings(topic, capsys):


def test_publish_with_error_handler(topic, capsys):

with _make_sleep_patch():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this being removed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We no longer wait indefinitely. We just wait for the dictionary futures to run dry now.

with pytest.raises(RuntimeError, match='sigil'):
publisher.publish_messages_with_error_handler(
PROJECT, TOPIC)
publisher.publish_messages_with_error_handler(PROJECT, TOPIC)

out, _ = capsys.readouterr()
assert 'Published' in out
Expand Down
6 changes: 4 additions & 2 deletions pubsub/cloud-client/subscriber_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,15 @@ def test_update(subscriber_client, subscription, capsys):
def _publish_messages(publisher_client, topic):
for n in range(5):
data = u'Message {}'.format(n).encode('utf-8')
publisher_client.publish(
future = publisher_client.publish(
topic, data=data)
future.result()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we care about printing the result of this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.



def _publish_messages_with_custom_attributes(publisher_client, topic):
data = u'Test message'.encode('utf-8')
publisher_client.publish(topic, data=data, origin='python-sample')
future = publisher_client.publish(topic, data=data, origin='python-sample')
future.result()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above: do we want to print this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No we don't.



def _make_sleep_patch():
Expand Down