22
22
# [END pubsub_quickstart_pub_deps]
23
23
24
24
25
- def get_callback (api_future , data ):
25
+ def get_callback (api_future , data , ref ):
26
26
"""Wrap message data in the context of the callback function."""
27
-
28
27
def callback (api_future ):
29
28
try :
30
29
print ("Published message {} now has message ID {}" .format (
31
30
data , api_future .result ()))
31
+ ref ["num_messages" ] += 1
32
32
except Exception :
33
33
print ("A problem occurred when publishing {}: {}\n " .format (
34
34
data , api_future .exception ()))
@@ -39,24 +39,28 @@ def callback(api_future):
39
39
def pub (project_id , topic_name ):
40
40
"""Publishes a message to a Pub/Sub topic."""
41
41
# [START pubsub_quickstart_pub_client]
42
- # Initialize a Publisher client
42
+ # Initialize a Publisher client.
43
43
client = pubsub_v1 .PublisherClient ()
44
44
# [END pubsub_quickstart_pub_client]
45
45
# Create a fully qualified identifier in the form of
46
46
# `projects/{project_id}/topics/{topic_name}`
47
47
topic_path = client .topic_path (project_id , topic_name )
48
48
49
- # Data sent to Cloud Pub/Sub must be a bytestring
49
+ # Data sent to Cloud Pub/Sub must be a bytestring.
50
50
data = b"Hello, World!"
51
51
52
+ # Keep track of the number of published messages.
53
+ ref = dict ({"num_messages" : 0 })
54
+
52
55
# When you publish a message, the client returns a future.
53
56
api_future = client .publish (topic_path , data = data )
54
- api_future .add_done_callback (get_callback (api_future , data ))
57
+ api_future .add_done_callback (get_callback (api_future , data , ref ))
55
58
56
- # Keep the main thread from exiting until background message
57
- # is processed .
59
+ # Keep the main thread from exiting while the message future
60
+ # gets resolved in the background .
58
61
while api_future .running ():
59
- time .sleep (0.1 )
62
+ time .sleep (0.5 )
63
+ print ("Published {} message(s)." .format (ref ["num_messages" ]))
60
64
61
65
62
66
if __name__ == '__main__' :
0 commit comments