Skip to content

Commit 5554232

Browse files
NathanSpeidelakshaisarma
authored andcommitted
Bug fix for not closing the RESTPubsub connections (#32)
1 parent 9172f84 commit 5554232

File tree

5 files changed

+18
-24
lines changed

5 files changed

+18
-24
lines changed

src/main/java/com/yahoo/bullet/common/BulletConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ public class BulletConfig extends Config {
126126
public static final int DEFAULT_WINDOW_MIN_EMIT_EVERY = 1000;
127127

128128
public static final boolean DEFAULT_RATE_LIMIT_ENABLE = true;
129-
public static final long DEFAULT_RATE_LIMIT_MAX_EMIT_COUNT = 100;
129+
public static final long DEFAULT_RATE_LIMIT_MAX_EMIT_COUNT = 50;
130130
public static final long DEFAULT_RATE_LIMIT_TIME_INTERVAL = 100;
131131

132132
public static final String DEFAULT_PUBSUB_CONTEXT_NAME = Context.QUERY_PROCESSING.name();
@@ -255,7 +255,7 @@ public class BulletConfig extends Config {
255255

256256
VALIDATOR.define(PUBSUB_CONTEXT_NAME)
257257
.defaultTo(DEFAULT_PUBSUB_CONTEXT_NAME)
258-
.checkIf(Validator.isIn(String.class, Context.QUERY_PROCESSING.name(), Context.QUERY_SUBMISSION.name()));
258+
.checkIf(Validator.isIn(Context.QUERY_PROCESSING.name(), Context.QUERY_SUBMISSION.name()));
259259
VALIDATOR.define(PUBSUB_CLASS_NAME)
260260
.defaultTo(DEFAULT_PUBSUB_CLASS_NAME)
261261
.checkIf(Validator::isString);

src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
import com.yahoo.bullet.pubsub.Publisher;
1010
import lombok.extern.slf4j.Slf4j;
1111
import org.apache.http.client.config.RequestConfig;
12+
import org.apache.http.client.methods.CloseableHttpResponse;
1213
import org.apache.http.client.methods.HttpPost;
13-
import org.apache.http.HttpResponse;
1414
import org.apache.http.entity.StringEntity;
1515
import org.apache.http.impl.client.CloseableHttpClient;
1616
import java.io.IOException;
@@ -51,13 +51,10 @@ public void close() {
5151
*/
5252
protected void sendToURL(String url, PubSubMessage message) {
5353
log.debug("Sending message: {} to url: {}", message, url);
54-
try {
55-
HttpResponse response = client.execute(makeHttpPost(url, message));
54+
try (CloseableHttpResponse response = client.execute(makeHttpPost(url, message))) {
5655
if (response == null || response.getStatusLine().getStatusCode() != RESTPubSub.OK_200) {
57-
log.error("Couldn't reach REST pubsub server. Got response: {}", response);
58-
return;
56+
log.error("Couldn't POST to REST pubsub server. Got response: {}", response);
5957
}
60-
log.debug("Successfully wrote message with status code {}. Response was: {}", response.getStatusLine().getStatusCode(), response);
6158
} catch (Exception e) {
6259
log.error("Error when trying to POST. Message was: {}. Error was: ", message.asJSON(), e);
6360
}
@@ -68,9 +65,7 @@ private HttpPost makeHttpPost(String url, PubSubMessage message) throws Unsuppor
6865
httpPost.setEntity(new StringEntity(message.asJSON()));
6966
httpPost.setHeader(CONTENT_TYPE, APPLICATION_JSON);
7067
RequestConfig requestConfig =
71-
RequestConfig.custom().setConnectTimeout(connectTimeout)
72-
.setSocketTimeout(connectTimeout)
73-
.build();
68+
RequestConfig.custom().setConnectTimeout(connectTimeout).setSocketTimeout(connectTimeout).build();
7469
httpPost.setConfig(requestConfig);
7570
return httpPost;
7671
}

src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@
1111
import lombok.Getter;
1212
import lombok.Setter;
1313
import lombok.extern.slf4j.Slf4j;
14-
import org.apache.http.HttpResponse;
14+
import org.apache.http.HttpEntity;
1515
import org.apache.http.client.config.RequestConfig;
16+
import org.apache.http.client.methods.CloseableHttpResponse;
1617
import org.apache.http.client.methods.HttpGet;
1718
import org.apache.http.impl.client.CloseableHttpClient;
1819
import org.apache.http.util.EntityUtils;
@@ -58,20 +59,20 @@ public List<PubSubMessage> getMessages() {
5859
}
5960
lastRequest = currentTime;
6061
for (String url : urls) {
61-
try {
62-
log.debug("Getting messages from url: {}", url);
63-
HttpResponse response = client.execute(makeHttpGet(url));
62+
try (CloseableHttpResponse response = client.execute(makeHttpGet(url))) {
6463
int statusCode = response.getStatusLine().getStatusCode();
6564
if (statusCode == RESTPubSub.OK_200) {
66-
String message = EntityUtils.toString(response.getEntity(), RESTPubSub.UTF_8);
65+
HttpEntity httpEntity = response.getEntity();
66+
String message = EntityUtils.toString(httpEntity, RESTPubSub.UTF_8);
6767
log.debug("Received message from url: {}. Message was {}", url, message);
6868
messages.add(PubSubMessage.fromJSON(message));
69+
EntityUtils.consume(httpEntity);
6970
} else if (statusCode != RESTPubSub.NO_CONTENT_204) {
7071
// NO_CONTENT_204 indicates there are no new messages - anything else indicates a problem
71-
log.error("Http call failed with status code {} and response {}.", statusCode, response);
72+
log.error("HTTP call to {} failed with status code {} and response {}.", url, statusCode, response);
7273
}
7374
} catch (Exception e) {
74-
log.error("Http call to {} failed with error:", url, e);
75+
log.error("HTTP call to {} failed with error:", url, e);
7576
}
7677
}
7778
return messages;
@@ -82,16 +83,14 @@ public void close() {
8283
try {
8384
client.close();
8485
} catch (IOException e) {
85-
log.warn("Caught exception when closing AsyncHttpClient: ", e);
86+
log.warn("Caught exception when closing HTTP client: ", e);
8687
}
8788
}
8889

8990
private HttpGet makeHttpGet(String url) {
9091
HttpGet httpGet = new HttpGet(url);
9192
RequestConfig requestConfig =
92-
RequestConfig.custom().setConnectTimeout(connectTimeout)
93-
.setSocketTimeout(connectTimeout)
94-
.build();
93+
RequestConfig.custom().setConnectTimeout(connectTimeout).setSocketTimeout(connectTimeout).build();
9594
httpGet.setConfig(requestConfig);
9695
return httpGet;
9796
}

src/main/resources/bullet_defaults.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ bullet.query.window.min.emit.every.ms: 1000
214214
bullet.query.rate.limit.enable: true
215215
# This is the maximum amount of times data can be retrieved for a query in a given time interval (bullet.query.rate.limit.time.interval)
216216
# before it is considered as exceeding the rate limit.
217-
bullet.query.rate.limit.max.emit.count: 100
217+
bullet.query.rate.limit.max.emit.count: 50
218218
# This is the smallest interval in ms at which the check for whether the rate limit is being exceeded check is done if your backend uses rate limiting.
219219
bullet.query.rate.limit.time.interval: 100
220220

src/test/java/com/yahoo/bullet/querying/RateLimitErrorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public void testMetaAndRateConversion() {
2828
Assert.assertTrue(((List<BulletError>) actual.get(Meta.ERROR_KEY)).get(0) == error);
2929

3030
String asJSON = error.asJSON();
31-
double defaultRate = (DEFAULT_RATE_LIMIT_MAX_EMIT_COUNT / DEFAULT_RATE_LIMIT_TIME_INTERVAL) * RateLimiter.SECOND;
31+
double defaultRate = ((double) DEFAULT_RATE_LIMIT_MAX_EMIT_COUNT / DEFAULT_RATE_LIMIT_TIME_INTERVAL) * RateLimiter.SECOND;
3232
double actualRate = 19.34 * RateLimiter.SECOND;
3333
assertJSONEquals(asJSON, "{'error': '" + String.format(RateLimitError.ERROR_FORMAT, defaultRate, actualRate) + "', " +
3434
"'resolutions': ['" + RateLimitError.NARROW_FILTER + "', '" + RateLimitError.TIME_WINDOW + "']" +

0 commit comments

Comments
 (0)