Skip to content

Commit 4ce1fa0

Browse files
(chore): refactor batch event processor to use blocking queue poll and take so as not to spin too much. (#343)
* need to reset the deadline after it is hit once. * use flush interval for event queue polling * use take if you have timed out enough * refactor to only reset interval in one place. * remove lambda after discussion with miked * fix log message * check if deadline is equal or passed since we set timeout at deadline
1 parent 2cddbac commit 4ce1fa0

File tree

2 files changed

+28
-18
lines changed

2 files changed

+28
-18
lines changed

core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public class BatchEventProcessor implements EventProcessor, AutoCloseable {
4848
public static final String CONFIG_CLOSE_TIMEOUT = "event.processor.close.timeout";
4949

5050
public static final int DEFAULT_QUEUE_CAPACITY = 1000;
51+
public static final int DEFAULT_EMPTY_COUNT = 2;
5152
public static final int DEFAULT_BATCH_SIZE = 10;
5253
public static final long DEFAULT_BATCH_INTERVAL = TimeUnit.SECONDS.toMillis(30);
5354
public static final long DEFAULT_TIMEOUT_INTERVAL = TimeUnit.SECONDS.toMillis(5);
@@ -129,19 +130,26 @@ public class EventConsumer implements Runnable {
129130
@Override
130131
public void run() {
131132
try {
133+
int emptyCount = 0;
134+
132135
while (true) {
133-
if (System.currentTimeMillis() > deadline) {
136+
if (System.currentTimeMillis() >= deadline) {
134137
logger.debug("Deadline exceeded flushing current batch.");
135138
flush();
139+
deadline = System.currentTimeMillis() + flushInterval;
136140
}
137141

138-
Object item = eventQueue.poll(50, TimeUnit.MILLISECONDS);
142+
long timeout = deadline - System.currentTimeMillis();
143+
Object item = emptyCount > DEFAULT_EMPTY_COUNT ? eventQueue.take() : eventQueue.poll(timeout, TimeUnit.MILLISECONDS);
144+
139145
if (item == null) {
140-
logger.debug("Empty item, sleeping for 50ms.");
141-
Thread.sleep(50);
146+
logger.debug("Empty item after waiting flush interval.");
147+
emptyCount++;
142148
continue;
143149
}
144150

151+
emptyCount = 0;
152+
145153
if (item == SHUTDOWN_SIGNAL) {
146154
logger.info("Received shutdown signal.");
147155
break;

core-api/src/test/java/com/optimizely/ab/event/BatchEventProcessorTest.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -84,43 +84,45 @@ public void testDrainOnClose() throws Exception {
8484
}
8585

8686
@Test
87-
public void testFlushOnMaxTimeout() throws Exception {
87+
public void testFlushMaxBatchSize() throws Exception {
8888
CountDownLatch countDownLatch = new CountDownLatch(1);
8989
setEventProcessor(logEvent -> {
90+
assertEquals(MAX_BATCH_SIZE, logEvent.getEventBatch().getVisitors().size());
9091
eventHandlerRule.dispatchEvent(logEvent);
9192
countDownLatch.countDown();
9293
});
9394

94-
UserEvent userEvent = buildConversionEvent(EVENT_NAME);
95-
eventProcessor.process(userEvent);
96-
eventHandlerRule.expectConversion(EVENT_NAME, USER_ID);
95+
for (int i = 0; i < MAX_BATCH_SIZE; i++) {
96+
String eventName = EVENT_NAME + i;
97+
UserEvent userEvent = buildConversionEvent(eventName);
98+
eventProcessor.process(userEvent);
99+
eventHandlerRule.expectConversion(eventName, USER_ID);
100+
}
97101

98102
if (!countDownLatch.await(MAX_DURATION_MS * 3, TimeUnit.MILLISECONDS)) {
99103
fail("Exceeded timeout waiting for events to flush.");
100104
}
101105

102-
eventProcessor.close();
103106
assertEquals(0, eventQueue.size());
104107
eventHandlerRule.expectCalls(1);
105108
}
106109

107110
@Test
108-
public void testFlushMaxBatchSize() throws Exception {
111+
public void testFlushOnMaxTimeout() throws Exception {
112+
UserEvent userEvent = buildConversionEvent(EVENT_NAME);
113+
109114
CountDownLatch countDownLatch = new CountDownLatch(1);
110115
setEventProcessor(logEvent -> {
111-
assertEquals(MAX_BATCH_SIZE, logEvent.getEventBatch().getVisitors().size());
112116
eventHandlerRule.dispatchEvent(logEvent);
113117
countDownLatch.countDown();
114118
});
115119

116-
for (int i = 0; i < MAX_BATCH_SIZE; i++) {
117-
String eventName = EVENT_NAME + i;
118-
UserEvent userEvent = buildConversionEvent(eventName);
119-
eventProcessor.process(userEvent);
120-
eventHandlerRule.expectConversion(eventName, USER_ID);
121-
}
120+
eventProcessor.process(userEvent);
121+
eventHandlerRule.expectConversion(EVENT_NAME, USER_ID);
122122

123-
if (!countDownLatch.await(MAX_DURATION_MS * 3, TimeUnit.MILLISECONDS)) {
123+
eventProcessor.close();
124+
125+
if (!countDownLatch.await( TIMEOUT_MS * 3, TimeUnit.MILLISECONDS)) {
124126
fail("Exceeded timeout waiting for events to flush.");
125127
}
126128

0 commit comments

Comments
 (0)