|
18 | 18 | // [START pubsub_quickstart_publisher]
|
19 | 19 |
|
20 | 20 | import com.google.api.core.ApiFuture;
|
| 21 | +import com.google.api.core.ApiFutureCallback; |
21 | 22 | import com.google.api.core.ApiFutures;
|
| 23 | +import com.google.api.gax.rpc.ApiException; |
22 | 24 | import com.google.cloud.ServiceOptions;
|
23 | 25 | import com.google.cloud.pubsub.v1.Publisher;
|
24 | 26 | import com.google.protobuf.ByteString;
|
25 | 27 | import com.google.pubsub.v1.PubsubMessage;
|
26 | 28 | import com.google.pubsub.v1.TopicName;
|
27 |
| -import java.util.ArrayList; |
28 |
| -import java.util.List; |
29 | 29 |
|
30 | 30 | public class PublisherExample {
|
31 | 31 |
|
32 |
| - static final int MESSAGE_COUNT = 5; |
33 |
| - |
34 | 32 | // use the default project id
|
35 | 33 | private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();
|
36 | 34 |
|
37 |
| - //schedule a message to be published, messages are automatically batched |
38 |
| - private static ApiFuture<String> publishMessage(Publisher publisher, String message) |
39 |
| - throws Exception { |
40 |
| - // convert message to bytes |
41 |
| - ByteString data = ByteString.copyFromUtf8(message); |
42 |
| - PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build(); |
43 |
| - return publisher.publish(pubsubMessage); |
44 |
| - } |
45 |
| - |
46 |
| - /** Publish messages to a topic. */ |
| 35 | + /** Publish messages to a topic. |
| 36 | + * @param args topic name, number of messages |
| 37 | + */ |
47 | 38 | public static void main(String... args) throws Exception {
|
48 | 39 | // topic id, eg. "my-topic"
|
49 | 40 | String topicId = args[0];
|
50 |
| - TopicName topicName = TopicName.create(PROJECT_ID, topicId); |
| 41 | + int messageCount = Integer.parseInt(args[1]); |
| 42 | + TopicName topicName = TopicName.of(PROJECT_ID, topicId); |
51 | 43 | Publisher publisher = null;
|
52 |
| - List<ApiFuture<String>> apiFutures = new ArrayList<>(); |
53 | 44 | try {
|
54 | 45 | // Create a publisher instance with default settings bound to the topic
|
55 |
| - publisher = Publisher.defaultBuilder(topicName).build(); |
56 |
| - for (int i = 0; i < MESSAGE_COUNT; i++) { |
| 46 | + publisher = Publisher.newBuilder(topicName).build(); |
| 47 | + |
| 48 | + for (int i = 0; i < messageCount; i++) { |
57 | 49 | String message = "message-" + i;
|
58 |
| - ApiFuture<String> messageId = publishMessage(publisher, message); |
59 |
| - apiFutures.add(messageId); |
| 50 | + |
| 51 | + // convert message to bytes |
| 52 | + ByteString data = ByteString.copyFromUtf8(message); |
| 53 | + PubsubMessage pubsubMessage = PubsubMessage.newBuilder() |
| 54 | + .setData(data) |
| 55 | + .build(); |
| 56 | + |
| 57 | + //schedule a message to be published, messages are automatically batched |
| 58 | + ApiFuture<String> future = publisher.publish(pubsubMessage); |
| 59 | + |
| 60 | + // add an asynchronous callback to handle success / failure |
| 61 | + ApiFutures.addCallback(future, new ApiFutureCallback<String>() { |
| 62 | + |
| 63 | + @Override |
| 64 | + public void onFailure(Throwable throwable) { |
| 65 | + if (throwable instanceof ApiException) { |
| 66 | + ApiException apiException = ((ApiException) throwable); |
| 67 | + // details on the API exception |
| 68 | + System.out.println(apiException.getStatusCode().getCode()); |
| 69 | + System.out.println(apiException.isRetryable()); |
| 70 | + } |
| 71 | + System.out.println("Error publishing message : " + message); |
| 72 | + } |
| 73 | + |
| 74 | + @Override |
| 75 | + public void onSuccess(String messageId) { |
| 76 | + // Once published, returns server-assigned message ids (unique within the topic) |
| 77 | + System.out.println(messageId); |
| 78 | + } |
| 79 | + }); |
60 | 80 | }
|
61 | 81 | } finally {
|
62 |
| - // Once published, returns server-assigned message ids (unique within the topic) |
63 |
| - List<String> messageIds = ApiFutures.allAsList(apiFutures).get(); |
64 |
| - for (String messageId : messageIds) { |
65 |
| - System.out.println(messageId); |
66 |
| - } |
67 | 82 | if (publisher != null) {
|
68 | 83 | // When finished with the publisher, shutdown to free up resources.
|
69 | 84 | publisher.shutdown();
|
|
0 commit comments