Skip to content

Commit cd848c0

Browse files
committed
Add base class for operation options, javadoc and tests (#996)
* Add base class for operation options, javadoc and tests * Refactor PullOption - Make maxMessages a method parameter rather than an optional option - Move MessageConsumer.PullOption to PubSub - Remove MessageConsumer.start/stop methods in favor of close()
1 parent 9879a16 commit cd848c0

File tree

6 files changed

+255
-79
lines changed

6 files changed

+255
-79
lines changed
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright 2016 Google Inc. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsub;
18+
19+
import static com.google.common.base.Preconditions.checkNotNull;
20+
21+
import com.google.common.base.MoreObjects;
22+
23+
import java.io.Serializable;
24+
import java.util.Objects;
25+
26+
/**
27+
* Base class for Pub/Sub operation options.
28+
*/
29+
abstract class Option implements Serializable {
30+
31+
private static final long serialVersionUID = 4956295408130172192L;
32+
33+
private final OptionType optionType;
34+
private final Object value;
35+
36+
interface OptionType {
37+
38+
String name();
39+
}
40+
41+
Option(OptionType optionType, Object value) {
42+
this.optionType = checkNotNull(optionType);
43+
this.value = value;
44+
}
45+
46+
@SuppressWarnings("unchecked")
47+
<T extends OptionType> T optionType() {
48+
return (T) optionType;
49+
}
50+
51+
Object value() {
52+
return value;
53+
}
54+
55+
@Override
56+
public boolean equals(Object obj) {
57+
if (!(obj instanceof Option)) {
58+
return false;
59+
}
60+
Option other = (Option) obj;
61+
return Objects.equals(optionType, other.optionType)
62+
&& Objects.equals(value, other.value);
63+
}
64+
65+
@Override
66+
public int hashCode() {
67+
return Objects.hash(optionType, value);
68+
}
69+
70+
@Override
71+
public String toString() {
72+
return MoreObjects.toStringHelper(this)
73+
.add("name", optionType.name())
74+
.add("value", value)
75+
.toString();
76+
}
77+
}

gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java

Lines changed: 52 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020
import com.google.cloud.Page;
2121
import com.google.cloud.Service;
2222

23-
import java.io.Serializable;
2423
import java.util.Iterator;
2524
import java.util.List;
25+
import java.util.Map;
2626
import java.util.concurrent.Future;
2727
import java.util.concurrent.TimeUnit;
2828

@@ -33,65 +33,79 @@
3333
*/
3434
public interface PubSub extends Service<PubSubOptions> {
3535

36-
final class ListOption implements Serializable {
36+
/**
37+
* Class for specifying options for listing topics and subscriptions.
38+
*/
39+
final class ListOption extends Option {
3740

3841
private static final long serialVersionUID = 6517442127283383124L;
3942

40-
private final Option option;
41-
private final Object value;
43+
enum OptionType implements Option.OptionType {
44+
PAGE_SIZE, PAGE_TOKEN;
4245

43-
enum Option {
44-
PAGE_SIZE, PAGE_TOKEN
45-
}
46+
@SuppressWarnings("unchecked")
47+
<T> T get(Map<Option.OptionType, ?> options) {
48+
return (T) options.get(this);
49+
}
4650

47-
private ListOption(Option option, Object value) {
48-
this.option = option;
49-
this.value = value;
50-
}
51+
String getString(Map<Option.OptionType, ?> options) {
52+
return get(options);
53+
}
5154

52-
Option option() {
53-
return option;
55+
Integer getInteger(Map<Option.OptionType, ?> options) {
56+
return get(options);
57+
}
5458
}
5559

56-
Object value() {
57-
return value;
60+
private ListOption(OptionType option, Object value) {
61+
super(option, value);
5862
}
5963

64+
/**
65+
* Returns an option to specify the maximum number of resources returned per page.
66+
*/
6067
public static ListOption pageSize(int pageSize) {
61-
return new ListOption(Option.PAGE_SIZE, pageSize);
68+
return new ListOption(OptionType.PAGE_SIZE, pageSize);
6269
}
6370

71+
/**
72+
* Returns an option to specify the page token from which to start listing resources.
73+
*/
6474
public static ListOption pageToken(String pageToken) {
65-
return new ListOption(Option.PAGE_TOKEN, pageToken);
75+
return new ListOption(OptionType.PAGE_TOKEN, pageToken);
6676
}
6777
}
6878

69-
final class PullOption implements Serializable {
70-
71-
private static final long serialVersionUID = -5220474819637439937L;
79+
/**
80+
* Class for specifying options for pulling messages.
81+
*/
82+
final class PullOption extends Option {
7283

73-
private final Option option;
74-
private final Object value;
84+
private static final long serialVersionUID = 4792164134340316582L;
7585

76-
enum Option {
77-
MAX_MESSAGES
78-
}
86+
enum OptionType implements Option.OptionType {
87+
MAX_CONCURRENT_CALLBACKS;
7988

80-
private PullOption(Option option, Object value) {
81-
this.option = option;
82-
this.value = value;
83-
}
89+
@SuppressWarnings("unchecked")
90+
<T> T get(Map<Option.OptionType, ?> options) {
91+
return (T) options.get(this);
92+
}
8493

85-
Option option() {
86-
return option;
94+
Integer getInteger(Map<Option.OptionType, ?> options) {
95+
return get(options);
96+
}
8797
}
8898

89-
Object value() {
90-
return value;
99+
private PullOption(Option.OptionType option, Object value) {
100+
super(option, value);
91101
}
92102

93-
public static PullOption maxMessages(int maxMessages) {
94-
return new PullOption(Option.MAX_MESSAGES, maxMessages);
103+
/**
104+
* Returns an option to specify the maximum number of messages that can be executed
105+
* concurrently at any time.
106+
*/
107+
public static PullOption maxConcurrentCallbacks(int maxConcurrency) {
108+
return new PullOption(OptionType.MAX_CONCURRENT_CALLBACKS, maxConcurrency);
95109
}
96110
}
97111

@@ -108,38 +122,6 @@ interface MessageProcessor {
108122
*/
109123
interface MessageConsumer extends AutoCloseable {
110124

111-
final class PullOption implements Serializable {
112-
113-
private static final long serialVersionUID = 4792164134340316582L;
114-
115-
private final Option option;
116-
private final Object value;
117-
118-
enum Option {
119-
MAX_CONCURRENT_CALLBACKS
120-
}
121-
122-
private PullOption(Option option, Object value) {
123-
this.option = option;
124-
this.value = value;
125-
}
126-
127-
Option option() {
128-
return option;
129-
}
130-
131-
Object value() {
132-
return value;
133-
}
134-
135-
public static PullOption maxConcurrentCallbacks(int maxConcurrency) {
136-
return new PullOption(Option.MAX_CONCURRENT_CALLBACKS, maxConcurrency);
137-
}
138-
}
139-
140-
void start(MessageConsumer.PullOption... options);
141-
142-
void stop();
143125
}
144126

145127
Topic create(TopicInfo topic);
@@ -198,11 +180,11 @@ public static PullOption maxConcurrentCallbacks(int maxConcurrency) {
198180

199181
Future<AsyncPage<SubscriptionId>> listSubscriptionsAsync(String topic, ListOption... options);
200182

201-
Iterator<ReceivedMessage> pull(String subscription, PullOption... options);
183+
Iterator<ReceivedMessage> pull(String subscription, int maxMessages);
202184

203-
Future<Iterator<ReceivedMessage>> pullAsync(String subscription, PullOption... options);
185+
Future<Iterator<ReceivedMessage>> pullAsync(String subscription, int maxMessages);
204186

205-
MessageConsumer pullAsync(String subscription, MessageProcessor callback);
187+
MessageConsumer pullAsync(String subscription, MessageProcessor callback, PullOption... options);
206188

207189
void ack(String subscription, String ackId, String... ackIds);
208190

gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -196,13 +196,13 @@ public Future<AsyncPage<SubscriptionId>> listSubscriptionsAsync(String topic,
196196
}
197197

198198
@Override
199-
public Iterator<ReceivedMessage> pull(String subscription, PullOption... options) {
199+
public Iterator<ReceivedMessage> pull(String subscription, int maxMessages) {
200200
// this should set return_immediately to true
201201
return null;
202202
}
203203

204204
@Override
205-
public Future<Iterator<ReceivedMessage>> pullAsync(String subscription, PullOption... options) {
205+
public Future<Iterator<ReceivedMessage>> pullAsync(String subscription, int maxMessages) {
206206
// though this method can set return_immediately to false (as future can be canceled) I
207207
// suggest to keep it false so sync could delegate to asyc and use the same options
208208
// this method also should use the VTKIT thread-pool to renew ack deadline for non consumed
@@ -211,7 +211,8 @@ public Future<Iterator<ReceivedMessage>> pullAsync(String subscription, PullOpti
211211
}
212212

213213
@Override
214-
public MessageConsumer pullAsync(String subscription, MessageProcessor callback) {
214+
public MessageConsumer pullAsync(String subscription, MessageProcessor callback,
215+
PullOption... options) {
215216
// this method should use the VTKIT thread-pool (maybe getting it should be part of the spi)
216217
return null;
217218
}

gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -151,16 +151,16 @@ public Future<Void> replacePushConfigAsync(PushConfig pushConfig) {
151151
return pubsub.replacePushConfigAsync(name(), pushConfig);
152152
}
153153

154-
public Iterator<ReceivedMessage> pull(PullOption... options) {
155-
return pubsub.pull(name(), options);
154+
public Iterator<ReceivedMessage> pull(int maxMessages) {
155+
return pubsub.pull(name(), maxMessages);
156156
}
157157

158-
public Future<Iterator<ReceivedMessage>> pullAsync(PullOption... options) {
159-
return pubsub.pullAsync(name(), options);
158+
public Future<Iterator<ReceivedMessage>> pullAsync(int maxMessages) {
159+
return pubsub.pullAsync(name(), maxMessages);
160160
}
161161

162-
public MessageConsumer pullAsync(MessageProcessor callback) {
163-
return pubsub.pullAsync(name(), callback);
162+
public MessageConsumer pullAsync(MessageProcessor callback, PullOption... options) {
163+
return pubsub.pullAsync(name(), callback, options);
164164
}
165165

166166
private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException {
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright 2016 Google Inc. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsub;
18+
19+
import static org.junit.Assert.assertEquals;
20+
import static org.junit.Assert.assertNotEquals;
21+
import static org.junit.Assert.assertNull;
22+
23+
import com.google.cloud.pubsub.Option.OptionType;
24+
import com.google.cloud.pubsub.PubSub.ListOption;
25+
26+
import org.junit.Rule;
27+
import org.junit.Test;
28+
import org.junit.rules.ExpectedException;
29+
30+
public class OptionTest {
31+
32+
private static final OptionType OPTION_TYPE = ListOption.OptionType.PAGE_SIZE;
33+
private static final OptionType ANOTHER_OPTION_TYPE = ListOption.OptionType.PAGE_TOKEN;
34+
private static final String VALUE = "some value";
35+
private static final String OTHER_VALUE = "another value";
36+
private static final Option OPTION = new Option(OPTION_TYPE, VALUE) {};
37+
private static final Option OPTION_EQUALS = new Option(OPTION_TYPE, VALUE) {};
38+
private static final Option OPTION_NOT_EQUALS1 = new Option(ANOTHER_OPTION_TYPE, OTHER_VALUE) {};
39+
private static final Option OPTION_NOT_EQUALS2 = new Option(ANOTHER_OPTION_TYPE, VALUE) {};
40+
41+
@Rule
42+
public ExpectedException thrown = ExpectedException.none();
43+
44+
@Test
45+
public void testEquals() {
46+
assertEquals(OPTION, OPTION_EQUALS);
47+
assertNotEquals(OPTION, OPTION_NOT_EQUALS1);
48+
assertNotEquals(OPTION, OPTION_NOT_EQUALS2);
49+
}
50+
51+
@Test
52+
public void testHashCode() {
53+
assertEquals(OPTION.hashCode(), OPTION_EQUALS.hashCode());
54+
}
55+
56+
@Test
57+
public void testConstructor() {
58+
assertEquals(OPTION_TYPE, OPTION.optionType());
59+
assertEquals(VALUE, OPTION.value());
60+
Option option = new Option(OPTION_TYPE, null) {};
61+
assertEquals(OPTION_TYPE, option.optionType());
62+
assertNull(option.value());
63+
thrown.expect(NullPointerException.class);
64+
new Option(null, VALUE) {};
65+
}
66+
}

0 commit comments

Comments
 (0)