Skip to content

Commit d28770f

Browse files
Add rest publisher timeout (#31)
1 parent babfe78 commit d28770f

File tree

11 files changed

+68
-45
lines changed

11 files changed

+68
-45
lines changed

src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,13 @@ public RESTPubSub(BulletConfig config) throws PubSubException {
3636

3737
@Override
3838
public Publisher getPublisher() {
39+
int connectTimeout = config.getAs(RESTPubSubConfig.PUBLISHER_CONNECT_TIMEOUT, Integer.class);
3940
if (context == Context.QUERY_PROCESSING) {
40-
return new RESTResultPublisher(HttpClients.createDefault());
41+
return new RESTResultPublisher(HttpClients.createDefault(), connectTimeout);
4142
} else {
4243
String queryURL = ((List<String>) config.getAs(RESTPubSubConfig.QUERY_URLS, List.class)).get(0);
4344
String resultURL = config.getAs(RESTPubSubConfig.RESULT_URL, String.class);
44-
return new RESTQueryPublisher(HttpClients.createDefault(), queryURL, resultURL);
45+
return new RESTQueryPublisher(HttpClients.createDefault(), queryURL, resultURL, connectTimeout);
4546
}
4647
}
4748

@@ -53,7 +54,7 @@ public List<Publisher> getPublishers(int n) {
5354
@Override
5455
public Subscriber getSubscriber() {
5556
int maxUncommittedMessages = config.getAs(RESTPubSubConfig.MAX_UNCOMMITTED_MESSAGES, Integer.class);
56-
int connectTimeout = config.getAs(RESTPubSubConfig.CONNECT_TIMEOUT, Integer.class);
57+
int connectTimeout = config.getAs(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT, Integer.class);
5758
List<String> urls;
5859
Long minWait;
5960

src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSubConfig.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,17 @@
1616
public class RESTPubSubConfig extends BulletConfig {
1717
// Field names
1818
public static final String PREFIX = "bullet.pubsub.rest.";
19-
public static final String CONNECT_TIMEOUT = PREFIX + "connect.timeout.ms";
19+
public static final String SUBSCRIBER_CONNECT_TIMEOUT = PREFIX + "subscriber.connect.timeout.ms";
20+
public static final String PUBLISHER_CONNECT_TIMEOUT = PREFIX + "publisher.connect.timeout.ms";
2021
public static final String MAX_UNCOMMITTED_MESSAGES = PREFIX + "subscriber.max.uncommitted.messages";
2122
public static final String QUERY_URLS = PREFIX + "query.urls";
2223
public static final String RESULT_URL = PREFIX + "result.url";
2324
public static final String RESULT_SUBSCRIBER_MIN_WAIT = PREFIX + "result.subscriber.min.wait.ms";
2425
public static final String QUERY_SUBSCRIBER_MIN_WAIT = PREFIX + "query.subscriber.min.wait.ms";
2526

2627
// Defaults
27-
public static final int DEFAULT_CONNECT_TIMEOUT = 5000;
28+
public static final int DEFAULT_SUBSCRIBER_CONNECT_TIMEOUT = 5000;
29+
public static final int DEFAULT_PUBLISHER_CONNECT_TIMEOUT = 5000;
2830
public static final int DEFAULT_MAX_UNCOMMITTED_MESSAGES = 100;
2931
public static final List<String> DEFAULT_QUERY_URLS = Arrays.asList("http://localhost:9901/api/bullet/pubsub/query",
3032
"http://localhost:9902/api/bullet/pubsub/query");
@@ -36,10 +38,14 @@ public class RESTPubSubConfig extends BulletConfig {
3638

3739
private static final Validator VALIDATOR = new Validator();
3840
static {
39-
VALIDATOR.define(CONNECT_TIMEOUT)
40-
.defaultTo(DEFAULT_CONNECT_TIMEOUT)
41+
VALIDATOR.define(SUBSCRIBER_CONNECT_TIMEOUT)
42+
.defaultTo(DEFAULT_SUBSCRIBER_CONNECT_TIMEOUT)
4143
.checkIf(Validator::isPositiveInt)
4244
.castTo(Validator::asInt);
45+
VALIDATOR.define(PUBLISHER_CONNECT_TIMEOUT)
46+
.defaultTo(DEFAULT_PUBLISHER_CONNECT_TIMEOUT)
47+
.checkIf(Validator::isPositiveInt)
48+
.castTo(Validator::asInt);
4349
VALIDATOR.define(MAX_UNCOMMITTED_MESSAGES)
4450
.defaultTo(DEFAULT_MAX_UNCOMMITTED_MESSAGES)
4551
.checkIf(Validator::isPositiveInt)

src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,19 @@
88
import com.yahoo.bullet.pubsub.PubSubMessage;
99
import com.yahoo.bullet.pubsub.Publisher;
1010
import lombok.extern.slf4j.Slf4j;
11+
import org.apache.http.client.config.RequestConfig;
1112
import org.apache.http.client.methods.HttpPost;
1213
import org.apache.http.HttpResponse;
1314
import org.apache.http.entity.StringEntity;
1415
import org.apache.http.impl.client.CloseableHttpClient;
1516
import java.io.IOException;
17+
import java.io.UnsupportedEncodingException;
1618

1719
@Slf4j
1820
public abstract class RESTPublisher implements Publisher {
1921
public static final String APPLICATION_JSON = "application/json";
2022
public static final String CONTENT_TYPE = "content-type";
23+
private int connectTimeout;
2124

2225
private CloseableHttpClient client;
2326

@@ -26,8 +29,9 @@ public abstract class RESTPublisher implements Publisher {
2629
*
2730
* @param client The client.
2831
*/
29-
public RESTPublisher(CloseableHttpClient client) {
32+
public RESTPublisher(CloseableHttpClient client, int connectTimeout) {
3033
this.client = client;
34+
this.connectTimeout = connectTimeout;
3135
}
3236

3337
@Override
@@ -48,17 +52,26 @@ public void close() {
4852
protected void sendToURL(String url, PubSubMessage message) {
4953
log.debug("Sending message: {} to url: {}", message, url);
5054
try {
51-
HttpPost httpPost = new HttpPost(url);
52-
httpPost.setEntity(new StringEntity(message.asJSON()));
53-
httpPost.setHeader(CONTENT_TYPE, APPLICATION_JSON);
54-
HttpResponse response = client.execute(httpPost);
55+
HttpResponse response = client.execute(makeHttpPost(url, message));
5556
if (response == null || response.getStatusLine().getStatusCode() != RESTPubSub.OK_200) {
5657
log.error("Couldn't reach REST pubsub server. Got response: {}", response);
5758
return;
5859
}
5960
log.debug("Successfully wrote message with status code {}. Response was: {}", response.getStatusLine().getStatusCode(), response);
6061
} catch (Exception e) {
61-
log.error("Error encoding message in preparation for POST: ", e);
62+
log.error("Error when trying to POST. Message was: {}. Error was: ", message.asJSON(), e);
6263
}
6364
}
65+
66+
private HttpPost makeHttpPost(String url, PubSubMessage message) throws UnsupportedEncodingException {
67+
HttpPost httpPost = new HttpPost(url);
68+
httpPost.setEntity(new StringEntity(message.asJSON()));
69+
httpPost.setHeader(CONTENT_TYPE, APPLICATION_JSON);
70+
RequestConfig requestConfig =
71+
RequestConfig.custom().setConnectTimeout(connectTimeout)
72+
.setSocketTimeout(connectTimeout)
73+
.build();
74+
httpPost.setConfig(requestConfig);
75+
return httpPost;
76+
}
6477
}

src/main/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ public class RESTQueryPublisher extends RESTPublisher {
2626
* @param queryURL The URL to which to POST queries.
2727
* @param resultURL The URL that will be added to the Metadata (results will be sent to this URL from the backend).
2828
*/
29-
public RESTQueryPublisher(CloseableHttpClient client, String queryURL, String resultURL) {
30-
super(client);
29+
public RESTQueryPublisher(CloseableHttpClient client, String queryURL, String resultURL, int connectTimeout) {
30+
super(client, connectTimeout);
3131
this.queryURL = queryURL;
3232
this.resultURL = resultURL;
3333
}

src/main/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ public class RESTResultPublisher extends RESTPublisher {
1616
*
1717
* @param client The client.
1818
*/
19-
public RESTResultPublisher(CloseableHttpClient client) {
20-
super(client);
19+
public RESTResultPublisher(CloseableHttpClient client, int connectTimeout) {
20+
super(client, connectTimeout);
2121
}
2222

2323
@Override

src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,14 @@ public List<PubSubMessage> getMessages() {
6464
int statusCode = response.getStatusLine().getStatusCode();
6565
if (statusCode == RESTPubSub.OK_200) {
6666
String message = EntityUtils.toString(response.getEntity(), RESTPubSub.UTF_8);
67+
log.debug("Received message from url: {}. Message was {}", url, message);
6768
messages.add(PubSubMessage.fromJSON(message));
6869
} else if (statusCode != RESTPubSub.NO_CONTENT_204) {
6970
// NO_CONTENT_204 indicates there are no new messages - anything else indicates a problem
7071
log.error("Http call failed with status code {} and response {}.", statusCode, response);
7172
}
7273
} catch (Exception e) {
73-
log.error("Http call to {} failed with error: {}", url, e);
74+
log.error("Http call to {} failed with error:", url, e);
7475
}
7576
}
7677
return messages;

src/main/resources/rest_pubsub_defaults.yaml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1-
# Http connection timout (used by both the web service and the backend)
2-
bullet.pubsub.rest.connect.timeout.ms: 5000
1+
# Http connection timout for subscribers
2+
bullet.pubsub.rest.subscriber.connect.timeout.ms: 5000
3+
# Http connection timout for publishers
4+
bullet.pubsub.rest.publisher.connect.timeout.ms: 5000
35
# Maxiumum number of uncommitted messages allowed before read requests will wait for commits (used by both the web service and the backend)
46
bullet.pubsub.rest.subscriber.max.uncommitted.messages: 100
57
# Minimum time (ms) between http calls to the result subscriber REST endpoint. This can be used to limit the number of http requests to the REST endpoints

src/test/java/com/yahoo/bullet/pubsub/rest/RESTPubSubConfigTest.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,25 +20,25 @@ public class RESTPubSubConfigTest {
2020
@Test
2121
public void testNoFiles() {
2222
RESTPubSubConfig config = new RESTPubSubConfig((String) null);
23-
Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 5000);
23+
Assert.assertEquals(config.get(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT), 5000);
2424

2525
config = new RESTPubSubConfig((Config) null);
26-
Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 5000);
26+
Assert.assertEquals(config.get(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT), 5000);
2727

2828
config = new RESTPubSubConfig("");
29-
Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 5000);
29+
Assert.assertEquals(config.get(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT), 5000);
3030
}
3131

3232
@Test
3333
public void testMissingFile() {
3434
RESTPubSubConfig config = new RESTPubSubConfig("/path/to/non/existant/file");
35-
Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 5000);
35+
Assert.assertEquals(config.get(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT), 5000);
3636
}
3737

3838
@Test
3939
public void testCustomConfig() {
4040
RESTPubSubConfig config = new RESTPubSubConfig("src/test/resources/test_config.yaml");
41-
Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 88);
41+
Assert.assertEquals(config.get(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT), 88);
4242
Assert.assertEquals(config.get(RESTPubSubConfig.PUBSUB_CLASS_NAME), "com.yahoo.bullet.pubsub.MockPubSub");
4343
List<String> queries = ((List<String>) config.getAs(RESTPubSubConfig.QUERY_URLS, List.class));
4444
Assert.assertEquals(queries.size(), 2);
@@ -57,7 +57,7 @@ public void testCustomProperties() {
5757
@Test
5858
public void testGettingWithDefault() {
5959
RESTPubSubConfig config = new RESTPubSubConfig("src/test/resources/test_config.yaml");
60-
Assert.assertEquals(config.getOrDefault(RESTPubSubConfig.CONNECT_TIMEOUT, "51"), 88);
60+
Assert.assertEquals(config.getOrDefault(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT, "51"), 88);
6161
Assert.assertEquals(config.getOrDefault("does.not.exist", "foo"), "foo");
6262
Assert.assertEquals(config.getOrDefault("fake.setting", "bar"), "bar");
6363
}
@@ -90,12 +90,12 @@ public void testMerging() {
9090
RESTPubSubConfig config = new RESTPubSubConfig("src/test/resources/test_config.yaml");
9191

9292
int configSize = config.getAll(Optional.empty()).size();
93-
Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 88);
93+
Assert.assertEquals(config.get(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT), 88);
9494
Assert.assertEquals(config.get(RESTPubSubConfig.PUBSUB_CLASS_NAME), "com.yahoo.bullet.pubsub.MockPubSub");
9595

9696
Config another = new RESTPubSubConfig((String) null);
9797
another.clear();
98-
another.set(RESTPubSubConfig.CONNECT_TIMEOUT, 51L);
98+
another.set(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT, 51L);
9999
// This is a bad setting
100100
another.set(RESTPubSubConfig.AGGREGATION_MAX_SIZE, -1);
101101
// Some other non-Bullet setting
@@ -104,7 +104,7 @@ public void testMerging() {
104104
config.merge(another);
105105

106106
Assert.assertEquals(config.getAll(Optional.empty()).size(), configSize + 1);
107-
Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 51);
107+
Assert.assertEquals(config.get(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT), 51);
108108
// Bad setting gets defaulted.
109109
Assert.assertEquals(config.get(RESTPubSubConfig.AGGREGATION_MAX_SIZE), RESTPubSubConfig.DEFAULT_AGGREGATION_MAX_SIZE);
110110
// Other setting is preserved.
@@ -113,7 +113,7 @@ public void testMerging() {
113113
// Test null and verify it is unchanged
114114
config.merge(null);
115115
Assert.assertEquals(config.getAll(Optional.empty()).size(), configSize + 1);
116-
Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 51);
116+
Assert.assertEquals(config.get(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT), 51);
117117
Assert.assertEquals(config.get(RESTPubSubConfig.AGGREGATION_MAX_SIZE), RESTPubSubConfig.DEFAULT_AGGREGATION_MAX_SIZE);
118118
Assert.assertEquals(config.get("pi"), 3.14);
119119
}
@@ -125,7 +125,7 @@ public void testPropertiesWithPrefix() {
125125
String fieldValue = "com.yahoo.bullet.pubsub.MockPubSub";
126126

127127
int configSize = config.getAllWithPrefix(Optional.empty(), prefix, false).size();
128-
Assert.assertEquals(configSize, 8);
128+
Assert.assertEquals(configSize, 9);
129129

130130
Map<String, Object> properties = config.getAllWithPrefix(Optional.empty(), prefix, false);
131131
Assert.assertEquals(properties.get(RESTPubSubConfig.PUBSUB_CLASS_NAME), fieldValue);
@@ -139,7 +139,7 @@ public void testPropertiesStripPrefix() {
139139
String fieldValue = "com.yahoo.bullet.pubsub.MockPubSub";
140140

141141
int configSize = config.getAllWithPrefix(Optional.empty(), prefix, true).size();
142-
Assert.assertEquals(configSize, 8);
142+
Assert.assertEquals(configSize, 9);
143143

144144
Map<String, Object> properties = config.getAllWithPrefix(Optional.empty(), prefix, true);
145145
Assert.assertNull(properties.get(RESTPubSubConfig.PUBSUB_CLASS_NAME));
@@ -196,9 +196,9 @@ public void testValidate() {
196196
Assert.assertEquals(config.get(BulletConfig.AGGREGATION_DEFAULT_SIZE), BulletConfig.DEFAULT_AGGREGATION_SIZE);
197197

198198
// Test validate() corrects RESTPubSubConfig settings
199-
config.set(RESTPubSubConfig.CONNECT_TIMEOUT, -88);
200-
Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), -88);
199+
config.set(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT, -88);
200+
Assert.assertEquals(config.get(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT), -88);
201201
config.validate();
202-
Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), RESTPubSubConfig.DEFAULT_CONNECT_TIMEOUT);
202+
Assert.assertEquals(config.get(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT), RESTPubSubConfig.DEFAULT_SUBSCRIBER_CONNECT_TIMEOUT);
203203
}
204204
}

src/test/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisherTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public void testSendResultUrlPutInMetadataAckPreserved() throws Exception {
3434
doReturn(200).when(mockStatusLine).getStatusCode();
3535
doReturn(mockStatusLine).when(mockResponse).getStatusLine();
3636
doReturn(mockResponse).when(mockClient).execute(any());
37-
RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url");
37+
RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url", 5000);
3838
publisher.send(new PubSubMessage("foo", "bar", Metadata.Signal.ACKNOWLEDGE));
3939

4040
ArgumentCaptor<HttpPost> argumentCaptor = ArgumentCaptor.forClass(HttpPost.class);
@@ -52,7 +52,7 @@ public void testSendResultUrlPutInMetadataAckPreserved() throws Exception {
5252
@Test
5353
public void testSendResultUrlPutInMetadataCompletePreserved() throws Exception {
5454
CloseableHttpClient mockClient = mock(CloseableHttpClient.class);
55-
RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url");
55+
RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url", 5000);
5656
publisher.send(new PubSubMessage("foo", "bar", Metadata.Signal.COMPLETE));
5757

5858
ArgumentCaptor<HttpPost> argumentCaptor = ArgumentCaptor.forClass(HttpPost.class);
@@ -70,7 +70,7 @@ public void testSendResultUrlPutInMetadataCompletePreserved() throws Exception {
7070
@Test
7171
public void testSendMetadataCreated() throws Exception {
7272
CloseableHttpClient mockClient = mock(CloseableHttpClient.class);
73-
RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url");
73+
RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url", 5000);
7474
publisher.send("foo", "bar");
7575

7676
ArgumentCaptor<HttpPost> argumentCaptor = ArgumentCaptor.forClass(HttpPost.class);
@@ -89,7 +89,7 @@ public void testSendMetadataCreated() throws Exception {
8989
public void testClose() throws Exception {
9090
CloseableHttpClient mockClient = mock(CloseableHttpClient.class);
9191
doNothing().when(mockClient).close();
92-
RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url");
92+
RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url", 5000);
9393
publisher.close();
9494
verify(mockClient).close();
9595
}
@@ -98,7 +98,7 @@ public void testClose() throws Exception {
9898
public void testCloseDoesNotThrow() throws Exception {
9999
CloseableHttpClient mockClient = mock(CloseableHttpClient.class);
100100
doThrow(new IOException("error!")).when(mockClient).close();
101-
RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, null, null);
101+
RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, null, null, 5000);
102102

103103
publisher.close();
104104
verify(mockClient).close();
@@ -107,7 +107,7 @@ public void testCloseDoesNotThrow() throws Exception {
107107
@Test
108108
public void testBadResponseDoesNotThrow() throws Exception {
109109
CloseableHttpClient mockClient = mock(CloseableHttpClient.class);
110-
RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/result/url");
110+
RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/result/url", 5000);
111111
PubSubMessage message = mock(PubSubMessage.class);
112112
// This will compel the HttpPost object to throw an exception in RESTPublisher.sendToURL()
113113
doReturn(null).when(message).asJSON();

0 commit comments

Comments
 (0)