Skip to content

Commit be280a7

Browse files
Add memory pubsub (#18)
1 parent 6e88dbf commit be280a7

22 files changed

+1168
-36
lines changed

pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,11 @@
102102
<artifactId>jvyaml</artifactId>
103103
<version>0.2.1</version>
104104
</dependency>
105+
<dependency>
106+
<groupId>org.asynchttpclient</groupId>
107+
<artifactId>async-http-client</artifactId>
108+
<version>2.0.37</version>
109+
</dependency>
105110
</dependencies>
106111

107112
<build>

src/main/java/com/yahoo/bullet/common/BulletConfig.java

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
@Slf4j
2222
public class BulletConfig extends Config {
2323
// Field names
24-
public static final String QUERY_DEFAULT_DURATION = "bullet.query.default.duration";
25-
public static final String QUERY_MAX_DURATION = "bullet.query.max.duration";
24+
public static final String QUERY_DEFAULT_DURATION = "bullet.query.default.duration.ms";
25+
public static final String QUERY_MAX_DURATION = "bullet.query.max.duration.ms";
2626
public static final String RECORD_INJECT_TIMESTAMP = "bullet.record.inject.timestamp.enable";
2727
public static final String RECORD_INJECT_TIMESTAMP_KEY = "bullet.record.inject.timestamp.key";
2828

@@ -55,7 +55,7 @@ public class BulletConfig extends Config {
5555
public static final String RESULT_METADATA_METRICS_NAME_KEY = "key";
5656

5757
public static final String WINDOW_DISABLE = "bullet.query.window.disable";
58-
public static final String WINDOW_MIN_EMIT_EVERY = "bullet.query.window.min.emit.every";
58+
public static final String WINDOW_MIN_EMIT_EVERY = "bullet.query.window.min.emit.every.ms";
5959

6060
public static final String RATE_LIMIT_ENABLE = "bullet.query.rate.limit.enable";
6161
public static final String RATE_LIMIT_MAX_EMIT_COUNT = "bullet.query.rate.limit.max.emit.count";
@@ -294,15 +294,15 @@ public class BulletConfig extends Config {
294294
*/
295295
public BulletConfig(String file) {
296296
super(file, DEFAULT_CONFIGURATION_NAME);
297-
validate(this);
297+
VALIDATOR.validate(this);
298298
}
299299

300300
/**
301301
* Constructor that loads just the defaults.
302302
*/
303303
public BulletConfig() {
304304
super(DEFAULT_CONFIGURATION_NAME);
305-
validate(this);
305+
VALIDATOR.validate(this);
306306
}
307307

308308
/**
@@ -318,7 +318,7 @@ public BulletConfig() {
318318
* @return This config for chaining.
319319
*/
320320
public BulletConfig validate() {
321-
validate(this);
321+
VALIDATOR.validate(this);
322322
return this;
323323
}
324324

@@ -332,20 +332,10 @@ public static Validator getValidator() {
332332
return VALIDATOR.copy();
333333
}
334334

335-
/**
336-
* Validates and fixes configuration for a given {@link BulletConfig}. This method checks, defaults and fixes the
337-
* various settings defined in this class.
338-
*
339-
* @param config The {@link BulletConfig} to validate.
340-
*/
341-
public static void validate(BulletConfig config) {
342-
VALIDATOR.validate(config);
343-
}
344-
345335
@Override
346336
public void merge(Config other) {
347337
super.merge(other);
348-
validate(this);
338+
validate();
349339
}
350340

351341
@SuppressWarnings("unchecked")

src/main/java/com/yahoo/bullet/common/Validator.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ void normalize(BulletConfig config) {
137137
Object value = config.get(key);
138138
boolean shouldGuard = guard.test(value);
139139
if (shouldGuard) {
140-
log.info("Guard satisfied for Key: {}. Using current value: {}", key, value);
140+
log.debug("Guard satisfied for Key: {}. Using current value: {}", key, value);
141141
return;
142142
}
143143
boolean isValid = validation.test(value);
@@ -147,7 +147,7 @@ void normalize(BulletConfig config) {
147147
}
148148
if (adapter != null) {
149149
value = adapter.apply(value);
150-
log.info("Changed the type for {}: {}", key, value);
150+
log.debug("Changed the type for {}: {}", key, value);
151151
}
152152
config.set(key, value);
153153
}
@@ -483,6 +483,16 @@ public static boolean isPowerOfTwo(Object value) {
483483
return (toCheck & toCheck - 1) == 0;
484484
}
485485

486+
/**
487+
* Checks to see if the value is a non-empty {@link List}.
488+
*
489+
* @param value The object to check.
490+
* @return A boolean denoting if the value was a non-empty List.
491+
*/
492+
public static boolean isNonEmptyList(Object value) {
493+
return isType(value, List.class) && !((List) value).isEmpty();
494+
}
495+
486496
// Unary Predicate Generators
487497

488498
/**

src/main/java/com/yahoo/bullet/pubsub/PubSub.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import com.yahoo.bullet.common.BulletConfig;
99
import lombok.Getter;
10+
import lombok.extern.slf4j.Slf4j;
1011

1112
import java.lang.reflect.Constructor;
1213
import java.util.List;
@@ -17,6 +18,7 @@
1718
* Implementations of PubSub should take in a {@link BulletConfig} and use the information to wire up and return
1819
* Publishers and Subscribers.
1920
*/
21+
@Slf4j
2022
public abstract class PubSub {
2123
/**
2224
* The context determines how the {@link Publisher} and {@link Subscriber} returned by PubSub behave. For example,
@@ -59,6 +61,7 @@ public void switchContext(Context context, BulletConfig config) throws PubSubExc
5961
if (this.context != context) {
6062
this.config.merge(config);
6163
this.context = context;
64+
log.info("Switched to context: {}", context);
6265
}
6366
}
6467

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright 2018, Yahoo Inc.
3+
* Licensed under the terms of the Apache License, Version 2.0.
4+
* See the LICENSE file associated with the project for terms.
5+
*/
6+
package com.yahoo.bullet.pubsub.rest;
7+
8+
import com.yahoo.bullet.common.BulletConfig;
9+
import com.yahoo.bullet.pubsub.PubSub;
10+
import com.yahoo.bullet.pubsub.PubSubException;
11+
import com.yahoo.bullet.pubsub.Publisher;
12+
import com.yahoo.bullet.pubsub.Subscriber;
13+
import lombok.extern.slf4j.Slf4j;
14+
import org.asynchttpclient.AsyncHttpClient;
15+
import org.asynchttpclient.AsyncHttpClientConfig;
16+
import org.asynchttpclient.DefaultAsyncHttpClient;
17+
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
18+
19+
import java.util.Collections;
20+
import java.util.List;
21+
import java.util.stream.Collectors;
22+
import java.util.stream.IntStream;
23+
24+
@Slf4j
25+
public class RESTPubSub extends PubSub {
26+
private static final int NO_TIMEOUT = -1;
27+
public static final int OK_200 = 200;
28+
public static final int NO_CONTENT_204 = 204;
29+
30+
/**
31+
* Create a RESTPubSub from a {@link BulletConfig}.
32+
*
33+
* @param config The config.
34+
* @throws PubSubException
35+
*/
36+
public RESTPubSub(BulletConfig config) throws PubSubException {
37+
super(config);
38+
this.config = new RESTPubSubConfig(config);
39+
}
40+
41+
@Override
42+
public Publisher getPublisher() {
43+
if (context == Context.QUERY_PROCESSING) {
44+
return new RESTResultPublisher(getClient());
45+
} else {
46+
String queryURL = ((List<String>) config.getAs(RESTPubSubConfig.QUERY_URLS, List.class)).get(0);
47+
String resultURL = config.getAs(RESTPubSubConfig.RESULT_URL, String.class);
48+
return new RESTQueryPublisher(getClient(), queryURL, resultURL);
49+
}
50+
}
51+
52+
@Override
53+
public List<Publisher> getPublishers(int n) {
54+
return IntStream.range(0, n).mapToObj(i -> getPublisher()).collect(Collectors.toList());
55+
}
56+
57+
@Override
58+
public Subscriber getSubscriber() {
59+
int maxUncommittedMessages = config.getAs(RESTPubSubConfig.MAX_UNCOMMITTED_MESSAGES, Integer.class);
60+
AsyncHttpClient client = getClient();
61+
List<String> urls;
62+
Long minWait;
63+
64+
if (context == Context.QUERY_PROCESSING) {
65+
urls = (List<String>) config.getAs(RESTPubSubConfig.QUERY_URLS, List.class);
66+
minWait = config.getAs(RESTPubSubConfig.QUERY_SUBSCRIBER_MIN_WAIT, Long.class);
67+
} else {
68+
urls = Collections.singletonList(config.getAs(RESTPubSubConfig.RESULT_URL, String.class));
69+
minWait = config.getAs(RESTPubSubConfig.RESULT_SUBSCRIBER_MIN_WAIT, Long.class);
70+
}
71+
return new RESTSubscriber(maxUncommittedMessages, urls, client, minWait);
72+
}
73+
74+
@Override
75+
public List<Subscriber> getSubscribers(int n) {
76+
return IntStream.range(0, n).mapToObj(i -> getSubscriber()).collect(Collectors.toList());
77+
}
78+
79+
private AsyncHttpClient getClient() {
80+
Long connectTimeout = config.getAs(RESTPubSubConfig.CONNECT_TIMEOUT, Long.class);
81+
int retryLimit = config.getAs(RESTPubSubConfig.CONNECT_RETRY_LIMIT, Integer.class);
82+
AsyncHttpClientConfig clientConfig =
83+
new DefaultAsyncHttpClientConfig.Builder().setConnectTimeout(connectTimeout.intValue())
84+
.setMaxRequestRetry(retryLimit)
85+
.setReadTimeout(NO_TIMEOUT)
86+
.setRequestTimeout(NO_TIMEOUT)
87+
.build();
88+
return new DefaultAsyncHttpClient(clientConfig);
89+
}
90+
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Copyright 2018, Yahoo Inc.
3+
* Licensed under the terms of the Apache License, Version 2.0.
4+
* See the LICENSE file associated with the project for terms.
5+
*/
6+
package com.yahoo.bullet.pubsub.rest;
7+
8+
import com.yahoo.bullet.common.BulletConfig;
9+
import com.yahoo.bullet.common.Config;
10+
import com.yahoo.bullet.common.Validator;
11+
import lombok.extern.slf4j.Slf4j;
12+
import java.util.Arrays;
13+
import java.util.List;
14+
15+
@Slf4j
16+
public class RESTPubSubConfig extends BulletConfig {
17+
// Field names
18+
public static final String PREFIX = "bullet.pubsub.rest.";
19+
public static final String CONNECT_TIMEOUT = PREFIX + "connect.timeout.ms";
20+
public static final String CONNECT_RETRY_LIMIT = PREFIX + "connect.retry.limit";
21+
public static final String MAX_UNCOMMITTED_MESSAGES = PREFIX + "subscriber.max.uncommitted.messages";
22+
public static final String QUERY_URLS = PREFIX + "query.urls";
23+
public static final String RESULT_URL = PREFIX + "result.url";
24+
public static final String RESULT_SUBSCRIBER_MIN_WAIT = PREFIX + "result.subscriber.min.wait.ms";
25+
public static final String QUERY_SUBSCRIBER_MIN_WAIT = PREFIX + "query.subscriber.min.wait.ms";
26+
27+
// Defaults
28+
public static final long DEFAULT_CONNECT_TIMEOUT = 5000L;
29+
public static final int DEFAULT_CONNECT_RETRY_LIMIT = 3;
30+
public static final int DEFAULT_MAX_UNCOMMITTED_MESSAGES = 100;
31+
public static final List<String> DEFAULT_QUERY_URLS = Arrays.asList("http://localhost:9901/api/bullet/pubsub/query",
32+
"http://localhost:9902/api/bullet/pubsub/query");
33+
public static final String DEFAULT_RESULT_URL = "http://localhost:9901/api/bullet/pubsub/result";
34+
public static final Long DEFAULT_RESULT_MIN_WAIT = 10L;
35+
public static final Long DEFAULT_QUERY_MIN_WAIT = 10L;
36+
37+
public static final String DEFAULT_REST_PUBSUB_CONFIGURATION_NAME = "rest_pubsub_defaults.yaml";
38+
39+
private static final Validator VALIDATOR = new Validator();
40+
static {
41+
VALIDATOR.define(CONNECT_TIMEOUT)
42+
.defaultTo(DEFAULT_CONNECT_TIMEOUT)
43+
.checkIf(Validator::isPositiveInt)
44+
.castTo(Validator::asLong);
45+
VALIDATOR.define(CONNECT_RETRY_LIMIT)
46+
.defaultTo(DEFAULT_CONNECT_RETRY_LIMIT)
47+
.checkIf(Validator::isPositiveInt)
48+
.castTo(Validator::asInt);
49+
VALIDATOR.define(MAX_UNCOMMITTED_MESSAGES)
50+
.defaultTo(DEFAULT_MAX_UNCOMMITTED_MESSAGES)
51+
.checkIf(Validator::isPositiveInt)
52+
.castTo(Validator::asInt);
53+
VALIDATOR.define(QUERY_URLS)
54+
.defaultTo(DEFAULT_QUERY_URLS)
55+
.checkIf(Validator::isNonEmptyList);
56+
VALIDATOR.define(RESULT_URL)
57+
.defaultTo(DEFAULT_RESULT_URL)
58+
.checkIf(Validator::isString)
59+
.castTo(Validator::asString);
60+
VALIDATOR.define(RESULT_SUBSCRIBER_MIN_WAIT)
61+
.defaultTo(DEFAULT_RESULT_MIN_WAIT)
62+
.checkIf(Validator::isPositiveInt)
63+
.castTo(Validator::asLong);
64+
VALIDATOR.define(QUERY_SUBSCRIBER_MIN_WAIT)
65+
.defaultTo(DEFAULT_QUERY_MIN_WAIT)
66+
.checkIf(Validator::isPositiveInt)
67+
.castTo(Validator::asLong);
68+
}
69+
70+
/**
71+
* Constructor that loads specific file augmented with defaults.
72+
*
73+
* @param file YAML file to load.
74+
*/
75+
public RESTPubSubConfig(String file) {
76+
this(new BulletConfig(file));
77+
}
78+
79+
/**
80+
* Constructor that loads the defaults and augments it with defaults.
81+
*
82+
* @param other The other config to wrap.
83+
*/
84+
public RESTPubSubConfig(Config other) {
85+
super(DEFAULT_REST_PUBSUB_CONFIGURATION_NAME);
86+
merge(other);
87+
VALIDATOR.validate(this);
88+
log.info("Merged settings:\n {}", this);
89+
}
90+
91+
@Override
92+
public BulletConfig validate() {
93+
super.validate();
94+
VALIDATOR.validate(this);
95+
return this;
96+
}
97+
}

0 commit comments

Comments
 (0)