Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Pub/Sub management methods, add javadoc and tests #1015

Merged
merged 2 commits into from
May 22, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions gcloud-java-pubsub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<version>3.4</version>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>
<profile>
Expand Down
217 changes: 209 additions & 8 deletions gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ private PullOption(Option.OptionType option, Object value) {
}

/**
* Returns an option to specify the maximum number of messages that can be executed
* Returns an option to specify the maximum number of messages that can be processed
* concurrently at any time.
*/
public static PullOption maxConcurrentCallbacks(int maxConcurrency) {
Expand All @@ -110,74 +110,275 @@ public static PullOption maxConcurrentCallbacks(int maxConcurrency) {
}

/**
* A callback to process pulled messages.
* The message will be ack'ed upon successful return or nack'ed if exception is thrown.
* A callback to process pulled messages. The received message will be ack'ed upon successful
* return or nack'ed if exception is thrown.
*/
interface MessageProcessor {
/**
* Processes the received {@code message}. If this method returns correctly the message is
* ack'ed. If this method throws an exception the message is nack'ed.
*/
void process(Message message) throws Exception;
}

/**
* An interface to control message consumer settings.
* An interface to control a message consumer.
*/
interface MessageConsumer extends AutoCloseable {

/**
* Stops pulling messages from the subscription associated with this {@code MessageConsumer} and
* frees all resources. Messages that have already been pulled are processed before closing.
*/
@Override
void close() throws Exception;
}

/**
* Creates a new topic.
*
* @return the created topic
* @throws PubSubException upon failure
*/
Topic create(TopicInfo topic);

/**
* Sends a request for creating a topic. This method returns a {@code Future} object to consume
* the result. {@link Future#get()} returns the created topic or {@code null} if not found.
*/
Future<Topic> createAsync(TopicInfo topic);

// null if not found
/**
* Returns the requested topic or {@code null} if not found.
*
* @throws PubSubException upon failure
*/
Topic getTopic(String topic);

/**
* Sends a request for getting a topic. This method returns a {@code Future} object to consume the
* result. {@link Future#get()} returns the requested topic or {@code null} if not found.
*
* @throws PubSubException upon failure
*/
Future<Topic> getTopicAsync(String topic);

// false if not found
/**
* Deletes the requested topic.
*
* @return {@code true} if the topic was deleted, {@code false} if it was not found
*/
boolean deleteTopic(String topic);

/**
* Sends a request for deleting a topic. This method returns a {@code Future} object to consume
* the result. {@link Future#get()} returns {@code true} if the topic was deleted, {@code false}
* if it was not found.
*/
Future<Boolean> deleteTopicAsync(String topic);

/**
* Lists the topics. This method returns a {@link Page} object that can be used to consume
* paginated results. Use {@link ListOption} to specify the page size or the page token from which
* to start listing topics.
*
* @throws PubSubException upon failure
*/
Page<Topic> listTopics(ListOption... options);

/**
* Sends a request for listing topics. This method returns a {@code Future} object to consume
* the result. {@link Future#get()} returns an {@link AsyncPage} object that can be used to
* asynchronously handle paginated results. Use {@link ListOption} to specify the page size or the
* page token from which to start listing topics.
*/
Future<AsyncPage<Topic>> listTopicsAsync(ListOption... options);

/**
* Publishes a message to the provided topic. This method returns a service-generated id for the
* published message. Service-generated ids are guaranteed to be unique within the topic.
*
* @param topic the topic where the message is published
* @param message the message to publish
* @return a unique service-generated id for the message
* @throws PubSubException upon failure, if the topic does not exist or if the message has empty
* payload and no attributes
*/
String publish(String topic, Message message);

/**
* Sends a request for publishing a message to the provided topic. This method returns a
* {@code Future} object to consume the result. {@link Future#get()} returns a service-generated
* id for the published message. Service-generated ids are guaranteed to be unique within the
* topic.
*
* @param topic the topic where the message is published
* @param message the message to publish
* @return a {@code Future} for the unique service-generated id for the message
*/
Future<String> publishAsync(String topic, Message message);

/**
* Publishes a number of messages to the provided topic. This method returns a list of
* service-generated ids for the published messages. Service-generated ids are guaranteed to be
* unique within the topic.
*
* @param topic the topic where the message is published
* @param message the first message to publish
* @param messages other messages to publish
* @return a list of unique, service-generated, ids. Ids are in the same order as the messages.
* @throws PubSubException upon failure, if the topic does not exist or if one of the messages has
* empty payload and no attributes
*/
List<String> publish(String topic, Message message, Message... messages);

/**
* Sends a request to publish a number of messages to the provided topic. This method returns a
* {@code Future} object to consume the result. {@link Future#get()} returns a list of
* service-generated ids for the published messages. Service-generated ids are guaranteed to be
* unique within the topic.
*
* @param topic the topic where the message is published
* @param message the first message to publish
* @param messages other messages to publish
* @return a {@code Future} for the unique, service-generated ids. Ids are in the same order as
* the messages.
*/
Future<List<String>> publishAsync(String topic, Message message, Message... messages);

/**
* Publishes a number of messages to the provided topic. This method returns a list of
* service-generated ids for the published messages. Service-generated ids are guaranteed to be
* unique within the topic.
*
* @param topic the topic where the message is published
* @param messages the messages to publish
* @return a list of unique, service-generated, ids. Ids are in the same order as the messages.
* @throws PubSubException upon failure, if the topic does not exist or if one of the messages has
* empty payload and no attributes
*/
List<String> publish(String topic, Iterable<Message> messages);

/**
* Sends a request to publish a number of messages to the provided topic. This method returns a
* {@code Future} object to consume the result. {@link Future#get()} returns a list of
* service-generated ids for the published messages. Service-generated ids are guaranteed to be
* unique within the topic.
*
* @param topic the topic where the message is published
* @param messages the messages to publish
* @return a {@code Future} for the unique, service-generated ids. Ids are in the same order as
* the messages
*/
Future<List<String>> publishAsync(String topic, Iterable<Message> messages);

/**
* Creates a new subscription.
*
* @return the created subscription
* @throws PubSubException upon failure
*/
Subscription create(SubscriptionInfo subscription);

/**
* Sends a request for creating a subscription. This method returns a {@code Future} object to
* consume the result. {@link Future#get()} returns the created subscription or {@code null} if
* not found.
*/
Future<Subscription> createAsync(SubscriptionInfo subscription);

// null if not found
/**
* Returns the requested subscription or {@code null} if not found.
*/
Subscription getSubscription(String subscription);

/**
* Sends a request for getting a subscription. This method returns a {@code Future} object to
* consume the result. {@link Future#get()} returns the requested subscription or {@code null} if
* not found.
*/
Future<Subscription> getSubscriptionAsync(String subscription);

/**
* Sets the push configuration for a specified subscription. This may be used to change a push
* subscription to a pull one (passing a {@code null} {@code pushConfig} parameter) or vice versa.
* This methods can also be used to change the endpoint URL and other attributes of a push
* subscription. Messages will accumulate for delivery regardless of changes to the push
* configuration.
*
* @param subscription the subscription for which to replace push configuration
* @param pushConfig the new push configuration. Use {@code null} to unset it
* @throws PubSubException upon failure, or if the subscription does not exist
*/
void replacePushConfig(String subscription, PushConfig pushConfig);

/**
* Sends a request for updating the push configuration for a specified subscription. This may be
* used to change a push subscription to a pull one (passing a {@code null} {@code pushConfig}
* parameter) or vice versa. This methods can also be used to change the endpoint URL and other
* attributes of a push subscription. Messages will accumulate for delivery regardless of changes
* to the push configuration. The method returns a {@code Future} object that can be used to wait
* for the replace operation to be completed.
*
* @param subscription the subscription for which to replace push configuration
* @param pushConfig the new push configuration. Use {@code null} to unset it
* @return a {@code Future} to wait for the replace operation to be completed.
*/
Future<Void> replacePushConfigAsync(String subscription, PushConfig pushConfig);

// false if not found
/**
* Deletes the requested subscription.
*
* @return {@code true} if the subscription was deleted, {@code false} if it was not found
* @throws PubSubException upon failure
*/
boolean deleteSubscription(String subscription);

/**
* Sends a request for deleting a subscription. This method returns a {@code Future} object to
* consume the result. {@link Future#get()} returns {@code true} if the subscription was deleted,
* {@code false} if it was not found.
*/
Future<Boolean> deleteSubscriptionAsync(String subscription);

/**
* Lists the subscriptions. This method returns a {@link Page} object that can be used to consume
* paginated results. Use {@link ListOption} to specify the page size or the page token from which
* to start listing subscriptions.
*
* @throws PubSubException upon failure
*/
Page<Subscription> listSubscriptions(ListOption... options);

/**
* Sends a request for listing subscriptions. This method returns a {@code Future} object to
* consume the result. {@link Future#get()} returns an {@link AsyncPage} object that can be used
* to asynchronously handle paginated results. Use {@link ListOption} to specify the page size or
* the page token from which to start listing subscriptions.
*
* @throws PubSubException upon failure
*/
Future<AsyncPage<Subscription>> listSubscriptionsAsync(ListOption... options);

/**
* Lists the identities of the subscriptions for the provided topic. This method returns a
* {@link Page} object that can be used to consume paginated results. Use {@link ListOption} to
* specify the page size or the page token from which to start listing subscriptions.
*
* @param topic the topic for which to list subscriptions
* @throws PubSubException upon failure
*/
Page<SubscriptionId> listSubscriptions(String topic, ListOption... options);

/**
* Sends a request for listing the identities of subscriptions for the provided topic. This method
* returns a {@code Future} object to consume the result. {@link Future#get()} returns an
* {@link AsyncPage} object that can be used to asynchronously handle paginated results. Use
* {@link ListOption} to specify the page size or the page token from which to start listing
* subscriptions.
*
* @param topic the topic for which to list subscriptions
*/
Future<AsyncPage<SubscriptionId>> listSubscriptionsAsync(String topic, ListOption... options);

Iterator<ReceivedMessage> pull(String subscription, int maxMessages);
Expand Down
Loading