Skip to content

Commit fe68ea5

Browse files
author
Mike Davis
authored
Prevent negative batch config values. (#331)
1 parent 80a1019 commit fe68ea5

File tree

3 files changed

+132
-60
lines changed

3 files changed

+132
-60
lines changed

core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java

Lines changed: 56 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,9 @@ public class BatchEventProcessor implements EventProcessor, AutoCloseable {
5858
private final BlockingQueue<Object> eventQueue;
5959
private final EventHandler eventHandler;
6060

61-
private final int batchSize;
62-
private final long flushInterval;
63-
private final long timeoutMillis;
61+
final int batchSize;
62+
final long flushInterval;
63+
final long timeoutMillis;
6464
private final ExecutorService executor;
6565
private final NotificationCenter notificationCenter;
6666

@@ -70,21 +70,11 @@ public class BatchEventProcessor implements EventProcessor, AutoCloseable {
7070
private BatchEventProcessor(BlockingQueue<Object> eventQueue, EventHandler eventHandler, Integer batchSize, Long flushInterval, Long timeoutMillis, ExecutorService executor, NotificationCenter notificationCenter) {
7171
this.eventHandler = eventHandler;
7272
this.eventQueue = eventQueue;
73-
this.batchSize = batchSize == null ? PropertyUtils.getInteger(CONFIG_BATCH_SIZE, DEFAULT_BATCH_SIZE) : batchSize;
74-
this.flushInterval = flushInterval == null ? PropertyUtils.getLong(CONFIG_BATCH_INTERVAL, DEFAULT_BATCH_INTERVAL) : flushInterval;
75-
this.timeoutMillis = timeoutMillis == null ? PropertyUtils.getLong(CONFIG_CLOSE_TIMEOUT, DEFAULT_TIMEOUT_INTERVAL) : timeoutMillis;
73+
this.batchSize = batchSize;
74+
this.flushInterval = flushInterval;
75+
this.timeoutMillis = timeoutMillis;
7676
this.notificationCenter = notificationCenter;
77-
78-
if (executor == null) {
79-
final ThreadFactory threadFactory = Executors.defaultThreadFactory();
80-
this.executor = Executors.newSingleThreadExecutor(runnable -> {
81-
Thread thread = threadFactory.newThread(runnable);
82-
thread.setDaemon(true);
83-
return thread;
84-
});
85-
} else {
86-
this.executor = executor;
87-
}
77+
this.executor = executor;
8878
}
8979

9080
public synchronized void start() {
@@ -240,42 +230,64 @@ public static Builder builder() {
240230
public static class Builder {
241231
private BlockingQueue<Object> eventQueue = new ArrayBlockingQueue<>(DEFAULT_QUEUE_CAPACITY);
242232
private EventHandler eventHandler = null;
243-
private Integer batchSize = null;
244-
private Long flushInterval = null;
233+
private Integer batchSize = PropertyUtils.getInteger(CONFIG_BATCH_SIZE, DEFAULT_BATCH_SIZE);
234+
private Long flushInterval = PropertyUtils.getLong(CONFIG_BATCH_INTERVAL, DEFAULT_BATCH_INTERVAL);
235+
private Long timeoutMillis = PropertyUtils.getLong(CONFIG_CLOSE_TIMEOUT, DEFAULT_TIMEOUT_INTERVAL);
245236
private ExecutorService executor = null;
246237
private NotificationCenter notificationCenter = null;
247-
private Long timeoutMillis = null;
248238

239+
/**
240+
* {@link EventHandler} implementation used to dispatch events to Optimizely.
241+
*/
249242
public Builder withEventHandler(EventHandler eventHandler) {
250243
this.eventHandler = eventHandler;
251244
return this;
252245
}
253246

247+
/**
248+
* EventQueue is the underlying BlockingQueue used to buffer events before being added to the batch payload.
249+
*/
254250
public Builder withEventQueue(BlockingQueue<Object> eventQueue) {
255251
this.eventQueue = eventQueue;
256252
return this;
257253
}
258254

255+
/**
256+
* BatchSize is the maximum number of events contained within a single event batch.
257+
*/
259258
public Builder withBatchSize(Integer batchSize) {
260259
this.batchSize = batchSize;
261260
return this;
262261
}
263262

263+
/**
264+
* FlushInterval is the maximum duration, in milliseconds, that an event will remain in flight before
265+
* being flushed to the event dispatcher.
266+
*/
264267
public Builder withFlushInterval(Long flushInterval) {
265268
this.flushInterval = flushInterval;
266269
return this;
267270
}
268271

272+
/**
273+
* ExecutorService used to execute the {@link EventConsumer} thread.
274+
*/
269275
public Builder withExecutor(ExecutorService executor) {
270276
this.executor = executor;
271277
return this;
272278
}
273279

280+
/**
281+
* Timeout is the maximum time to wait for the EventProcessor to close.
282+
*/
274283
public Builder withTimeout(long duration, TimeUnit timeUnit) {
275284
this.timeoutMillis = timeUnit.toMillis(duration);
276285
return this;
277286
}
278287

288+
/**
289+
* NotificationCenter used to notify when event batches are flushed.
290+
*/
279291
public Builder withNotificationCenter(NotificationCenter notificationCenter) {
280292
this.notificationCenter = notificationCenter;
281293
return this;
@@ -286,6 +298,30 @@ public BatchEventProcessor build() {
286298
}
287299

288300
public BatchEventProcessor build(boolean shouldStart) {
301+
if (batchSize < 0) {
302+
logger.warn("Invalid batchSize of {}, Defaulting to {}", batchSize, DEFAULT_BATCH_SIZE);
303+
batchSize = DEFAULT_BATCH_SIZE;
304+
}
305+
306+
if (flushInterval < 0) {
307+
logger.warn("Invalid flushInterval of {}, Defaulting to {}", flushInterval, DEFAULT_BATCH_INTERVAL);
308+
flushInterval = DEFAULT_BATCH_INTERVAL;
309+
}
310+
311+
if (timeoutMillis < 0) {
312+
logger.warn("Invalid timeoutMillis of {}, Defaulting to {}", timeoutMillis, DEFAULT_TIMEOUT_INTERVAL);
313+
timeoutMillis = DEFAULT_TIMEOUT_INTERVAL;
314+
}
315+
316+
if (executor == null) {
317+
final ThreadFactory threadFactory = Executors.defaultThreadFactory();
318+
executor = Executors.newSingleThreadExecutor(runnable -> {
319+
Thread thread = threadFactory.newThread(runnable);
320+
thread.setDaemon(true);
321+
return thread;
322+
});
323+
}
324+
289325
BatchEventProcessor batchEventProcessor = new BatchEventProcessor(eventQueue, eventHandler, batchSize, flushInterval, timeoutMillis, executor, notificationCenter);
290326

291327
if (shouldStart) {

core-api/src/test/java/com/optimizely/ab/EventHandlerRule.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ public class EventHandlerRule implements EventHandler, TestRule {
5151

5252
private List<CanonicalEvent> expectedEvents;
5353
private LinkedList<CanonicalEvent> actualEvents;
54+
private int actualCalls;
55+
private Integer expectedCalls;
5456

5557
@Override
5658
public Statement apply(final Statement base, Description description) {
@@ -71,12 +73,19 @@ public void evaluate() throws Throwable {
7173
private void before() {
7274
expectedEvents = new LinkedList<>();
7375
actualEvents = new LinkedList<>();
76+
77+
expectedCalls = null;
78+
actualCalls = 0;
7479
}
7580

7681
private void after() {
7782
}
7883

7984
private void verify() {
85+
if (expectedCalls != null) {
86+
assertEquals(expectedCalls.intValue(), actualCalls);
87+
}
88+
8089
assertEquals(expectedEvents.size(), actualEvents.size());
8190

8291
ListIterator<CanonicalEvent> expectedIterator = expectedEvents.listIterator();
@@ -90,6 +99,10 @@ private void verify() {
9099
}
91100
}
92101

102+
public void expectCalls(int expected) {
103+
expectedCalls = expected;
104+
}
105+
93106
public void expectImpression(String experientId, String variationId, String userId) {
94107
expectImpression(experientId, variationId, userId, Collections.emptyMap());
95108
}
@@ -119,6 +132,7 @@ public void expect(String experientId, String variationId, String eventName, Str
119132
@Override
120133
public void dispatchEvent(LogEvent logEvent) {
121134
logger.info("Receiving event: {}", logEvent);
135+
actualCalls++;
122136

123137
List<Visitor> visitors = logEvent.getEventBatch().getVisitors();
124138

core-api/src/test/java/com/optimizely/ab/event/BatchEventProcessorTest.java

Lines changed: 62 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
import java.util.Collections;
3232
import java.util.concurrent.*;
33+
import java.util.concurrent.atomic.AtomicInteger;
3334

3435
import static org.junit.Assert.*;
3536
import static org.mockito.Mockito.*;
@@ -93,11 +94,12 @@ public void testFlushOnMaxTimeout() throws Exception {
9394
eventHandlerRule.expectConversion(EVENT_NAME, USER_ID);
9495

9596
if (!countDownLatch.await(MAX_DURATION_MS * 3, TimeUnit.MILLISECONDS)) {
96-
fail("Exceeded timeout waiting for notification.");
97+
fail("Exceeded timeout waiting for events to flush.");
9798
}
9899

99100
eventProcessor.close();
100101
assertEquals(0, eventQueue.size());
102+
eventHandlerRule.expectCalls(1);
101103
}
102104

103105
@Test
@@ -116,17 +118,17 @@ public void testFlushMaxBatchSize() throws Exception {
116118
eventHandlerRule.expectConversion(eventName, USER_ID);
117119
}
118120

119-
countDownLatch.await();
121+
if (!countDownLatch.await(MAX_DURATION_MS * 3, TimeUnit.MILLISECONDS)) {
122+
fail("Exceeded timeout waiting for events to flush.");
123+
}
124+
120125
assertEquals(0, eventQueue.size());
126+
eventHandlerRule.expectCalls(1);
121127
}
122128

123129
@Test
124130
public void testFlush() throws Exception {
125-
CountDownLatch countDownLatch = new CountDownLatch(2);
126-
setEventProcessor(logEvent -> {
127-
eventHandlerRule.dispatchEvent(logEvent);
128-
countDownLatch.countDown();
129-
});
131+
setEventProcessor(logEvent -> eventHandlerRule.dispatchEvent(logEvent));
130132

131133
UserEvent userEvent = buildConversionEvent(EVENT_NAME);
132134
eventProcessor.process(userEvent);
@@ -137,18 +139,12 @@ public void testFlush() throws Exception {
137139
eventProcessor.flush();
138140
eventHandlerRule.expectConversion(EVENT_NAME, USER_ID);
139141

140-
if (!countDownLatch.await(MAX_DURATION_MS / 2, TimeUnit.MILLISECONDS)) {
141-
fail("Exceeded timeout waiting for notification.");
142-
}
142+
eventHandlerRule.expectCalls(2);
143143
}
144144

145145
@Test
146146
public void testFlushOnMismatchRevision() throws Exception {
147-
CountDownLatch countDownLatch = new CountDownLatch(2);
148-
setEventProcessor(logEvent -> {
149-
eventHandlerRule.dispatchEvent(logEvent);
150-
countDownLatch.countDown();
151-
});
147+
setEventProcessor(logEvent -> eventHandlerRule.dispatchEvent(logEvent));
152148

153149
ProjectConfig projectConfig1 = mock(ProjectConfig.class);
154150
when(projectConfig1.getRevision()).thenReturn("1");
@@ -165,18 +161,12 @@ public void testFlushOnMismatchRevision() throws Exception {
165161
eventHandlerRule.expectConversion(EVENT_NAME, USER_ID);
166162

167163
eventProcessor.close();
168-
if (!countDownLatch.await(MAX_DURATION_MS * 3, TimeUnit.MILLISECONDS)) {
169-
fail("Exceeded timeout waiting for notification.");
170-
}
164+
eventHandlerRule.expectCalls(2);
171165
}
172166

173167
@Test
174168
public void testFlushOnMismatchProjectId() throws Exception {
175-
CountDownLatch countDownLatch = new CountDownLatch(2);
176-
setEventProcessor(logEvent -> {
177-
eventHandlerRule.dispatchEvent(logEvent);
178-
countDownLatch.countDown();
179-
});
169+
setEventProcessor(logEvent -> eventHandlerRule.dispatchEvent(logEvent));
180170

181171
ProjectConfig projectConfig1 = mock(ProjectConfig.class);
182172
when(projectConfig1.getRevision()).thenReturn("1");
@@ -193,18 +183,12 @@ public void testFlushOnMismatchProjectId() throws Exception {
193183
eventHandlerRule.expectConversion(EVENT_NAME, USER_ID);
194184

195185
eventProcessor.close();
196-
if (!countDownLatch.await(MAX_DURATION_MS * 3, TimeUnit.MILLISECONDS)) {
197-
fail("Exceeded timeout waiting for notification.");
198-
}
186+
eventHandlerRule.expectCalls(2);
199187
}
200188

201189
@Test
202190
public void testStopAndStart() throws Exception {
203-
CountDownLatch countDownLatch = new CountDownLatch(2);
204-
setEventProcessor(logEvent -> {
205-
eventHandlerRule.dispatchEvent(logEvent);
206-
countDownLatch.countDown();
207-
});
191+
setEventProcessor(logEvent -> eventHandlerRule.dispatchEvent(logEvent));
208192

209193
UserEvent userEvent = buildConversionEvent(EVENT_NAME);
210194
eventProcessor.process(userEvent);
@@ -218,31 +202,27 @@ public void testStopAndStart() throws Exception {
218202
eventProcessor.start();
219203

220204
eventProcessor.close();
221-
if (!countDownLatch.await(MAX_DURATION_MS * 3, TimeUnit.MILLISECONDS)) {
222-
fail("Exceeded timeout waiting for notification.");
223-
}
205+
eventHandlerRule.expectCalls(2);
224206
}
225207

226208
@Test
227209
public void testNotificationCenter() throws Exception {
228-
CountDownLatch countDownLatch = new CountDownLatch(1);
229-
notificationCenter.addNotificationHandler(LogEvent.class, x -> countDownLatch.countDown());
210+
AtomicInteger counter = new AtomicInteger();
211+
notificationCenter.addNotificationHandler(LogEvent.class, x -> counter.incrementAndGet());
230212
setEventProcessor(logEvent -> {});
231213

232214
UserEvent userEvent = buildConversionEvent(EVENT_NAME);
233215
eventProcessor.process(userEvent);
234216
eventProcessor.close();
235217

236-
if (!countDownLatch.await(MAX_DURATION_MS * 3, TimeUnit.MILLISECONDS)) {
237-
fail("Exceeded timeout waiting for notification.");
238-
}
218+
assertEquals(1, counter.intValue());
239219
}
240220

241221
@Test
242222
public void testCloseTimeout() throws Exception {
243223
CountDownLatch countDownLatch = new CountDownLatch(1);
244224
setEventProcessor(logEvent -> {
245-
if (!countDownLatch.await(TIMEOUT_MS * 2, TimeUnit.SECONDS)) {
225+
if (!countDownLatch.await(TIMEOUT_MS * 2, TimeUnit.MILLISECONDS)) {
246226
fail("Exceeded timeout waiting for close.");
247227
}
248228
});
@@ -266,6 +246,48 @@ public void testCloseEventHandler() throws Exception {
266246
verify((AutoCloseable) mockEventHandler).close();
267247
}
268248

249+
@Test
250+
public void testInvalidBatchSizeUsesDefault() {
251+
eventProcessor = BatchEventProcessor.builder()
252+
.withEventQueue(eventQueue)
253+
.withBatchSize(-1)
254+
.withFlushInterval(MAX_DURATION_MS)
255+
.withEventHandler(new NoopEventHandler())
256+
.withNotificationCenter(notificationCenter)
257+
.withTimeout(TIMEOUT_MS, TimeUnit.MILLISECONDS)
258+
.build();
259+
260+
assertEquals(eventProcessor.batchSize, BatchEventProcessor.DEFAULT_BATCH_SIZE);
261+
}
262+
263+
@Test
264+
public void testInvalidFlushIntervalUsesDefault() {
265+
eventProcessor = BatchEventProcessor.builder()
266+
.withEventQueue(eventQueue)
267+
.withBatchSize(MAX_BATCH_SIZE)
268+
.withFlushInterval(-1L)
269+
.withEventHandler(new NoopEventHandler())
270+
.withNotificationCenter(notificationCenter)
271+
.withTimeout(TIMEOUT_MS, TimeUnit.MILLISECONDS)
272+
.build();
273+
274+
assertEquals(eventProcessor.flushInterval, BatchEventProcessor.DEFAULT_BATCH_INTERVAL);
275+
}
276+
277+
@Test
278+
public void testInvalidTimeoutUsesDefault() {
279+
eventProcessor = BatchEventProcessor.builder()
280+
.withEventQueue(eventQueue)
281+
.withBatchSize(MAX_BATCH_SIZE)
282+
.withFlushInterval(MAX_DURATION_MS)
283+
.withEventHandler(new NoopEventHandler())
284+
.withNotificationCenter(notificationCenter)
285+
.withTimeout(-1L, TimeUnit.MILLISECONDS)
286+
.build();
287+
288+
assertEquals(eventProcessor.timeoutMillis, BatchEventProcessor.DEFAULT_TIMEOUT_INTERVAL);
289+
}
290+
269291
private void setEventProcessor(EventHandler eventHandler) {
270292
eventProcessor = BatchEventProcessor.builder()
271293
.withEventQueue(eventQueue)

0 commit comments

Comments
 (0)