Skip to content

Commit c0672b0

Browse files
cherry pick batch event processor changes
1 parent 38d6606 commit c0672b0

File tree

2 files changed

+31
-18
lines changed

2 files changed

+31
-18
lines changed

core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public class BatchEventProcessor implements EventProcessor, AutoCloseable {
4848
public static final String CONFIG_CLOSE_TIMEOUT = "event.processor.close.timeout";
4949

5050
public static final int DEFAULT_QUEUE_CAPACITY = 1000;
51+
public static final int DEFAULT_EMPTY_COUNT = 2;
5152
public static final int DEFAULT_BATCH_SIZE = 10;
5253
public static final long DEFAULT_BATCH_INTERVAL = TimeUnit.SECONDS.toMillis(30);
5354
public static final long DEFAULT_TIMEOUT_INTERVAL = TimeUnit.SECONDS.toMillis(5);
@@ -139,19 +140,26 @@ public class EventConsumer implements Runnable {
139140
@Override
140141
public void run() {
141142
try {
143+
int emptyCount = 0;
144+
142145
while (true) {
143-
if (System.currentTimeMillis() > deadline) {
146+
if (System.currentTimeMillis() >= deadline) {
144147
logger.debug("Deadline exceeded flushing current batch.");
145148
flush();
149+
deadline = System.currentTimeMillis() + flushInterval;
146150
}
147151

148-
Object item = eventQueue.poll(50, TimeUnit.MILLISECONDS);
152+
long timeout = deadline - System.currentTimeMillis();
153+
Object item = emptyCount > DEFAULT_EMPTY_COUNT ? eventQueue.take() : eventQueue.poll(timeout, TimeUnit.MILLISECONDS);
154+
149155
if (item == null) {
150-
logger.debug("Empty item, sleeping for 50ms.");
151-
Thread.sleep(50);
156+
logger.debug("Empty item after waiting flush interval.");
157+
emptyCount++;
152158
continue;
153159
}
154160

161+
emptyCount = 0;
162+
155163
if (item == SHUTDOWN_SIGNAL) {
156164
logger.info("Received shutdown signal.");
157165
break;

core-api/src/test/java/com/optimizely/ab/event/BatchEventProcessorTest.java

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -81,42 +81,47 @@ public void testDrainOnClose() throws Exception {
8181
}
8282

8383
@Test
84-
public void testFlushOnMaxTimeout() throws Exception {
84+
public void testFlushMaxBatchSize() throws Exception {
8585
CountDownLatch countDownLatch = new CountDownLatch(1);
8686
setEventProcessor(logEvent -> {
87+
assertEquals(MAX_BATCH_SIZE, logEvent.getEventBatch().getVisitors().size());
8788
eventHandlerRule.dispatchEvent(logEvent);
8889
countDownLatch.countDown();
8990
});
9091

91-
UserEvent userEvent = buildConversionEvent(EVENT_NAME);
92-
eventProcessor.process(userEvent);
93-
eventHandlerRule.expectConversion(EVENT_NAME, USER_ID);
92+
for (int i = 0; i < MAX_BATCH_SIZE; i++) {
93+
String eventName = EVENT_NAME + i;
94+
UserEvent userEvent = buildConversionEvent(eventName);
95+
eventProcessor.process(userEvent);
96+
eventHandlerRule.expectConversion(eventName, USER_ID);
97+
}
9498

9599
if (!countDownLatch.await(MAX_DURATION_MS * 3, TimeUnit.MILLISECONDS)) {
96100
fail("Exceeded timeout waiting for notification.");
97101
}
98102

99-
eventProcessor.close();
100103
assertEquals(0, eventQueue.size());
101104
}
102105

103106
@Test
104-
public void testFlushMaxBatchSize() throws Exception {
107+
public void testFlushOnMaxTimeout() throws Exception {
108+
UserEvent userEvent = buildConversionEvent(EVENT_NAME);
109+
105110
CountDownLatch countDownLatch = new CountDownLatch(1);
106111
setEventProcessor(logEvent -> {
107-
assertEquals(MAX_BATCH_SIZE, logEvent.getEventBatch().getVisitors().size());
108112
eventHandlerRule.dispatchEvent(logEvent);
109113
countDownLatch.countDown();
110114
});
111115

112-
for (int i = 0; i < MAX_BATCH_SIZE; i++) {
113-
String eventName = EVENT_NAME + i;
114-
UserEvent userEvent = buildConversionEvent(eventName);
115-
eventProcessor.process(userEvent);
116-
eventHandlerRule.expectConversion(eventName, USER_ID);
116+
eventProcessor.process(userEvent);
117+
eventHandlerRule.expectConversion(EVENT_NAME, USER_ID);
118+
119+
eventProcessor.close();
120+
121+
if (!countDownLatch.await( TIMEOUT_MS * 3, TimeUnit.MILLISECONDS)) {
122+
fail("Exceeded timeout waiting for events to flush.");
117123
}
118124

119-
countDownLatch.await();
120125
assertEquals(0, eventQueue.size());
121126
}
122127

@@ -285,4 +290,4 @@ private static ConversionEvent buildConversionEvent(String eventName, ProjectCon
285290
return UserEventFactory.createConversionEvent(projectConfig, USER_ID, EVENT_ID, eventName,
286291
Collections.emptyMap(), Collections.emptyMap());
287292
}
288-
}
293+
}

0 commit comments

Comments
 (0)