Skip to content

Commit 2a38338

Browse files
authored
feat: Added ODPEventManager implementation (#487)
## Summary Added `ODPEventManager` implementation. ## Test plan - Unit tests pending. ## Jira [OASIS-8386](https://optimizely.atlassian.net/browse/OASIS-8386)
1 parent 8b8a983 commit 2a38338

File tree

3 files changed

+439
-3
lines changed

3 files changed

+439
-3
lines changed

core-api/src/main/java/com/optimizely/ab/odp/ODPEvent.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
*/
1616
package com.optimizely.ab.odp;
1717

18+
import javax.annotation.Nonnull;
19+
import javax.annotation.Nullable;
20+
import java.util.Collections;
1821
import java.util.Map;
1922

2023
public class ODPEvent {
@@ -23,11 +26,11 @@ public class ODPEvent {
2326
private Map<String, String > identifiers;
2427
private Map<String, Object> data;
2528

26-
public ODPEvent(String type, String action, Map<String, String> identifiers, Map<String, Object> data) {
29+
public ODPEvent(@Nonnull String type, @Nonnull String action, @Nullable Map<String, String> identifiers, @Nullable Map<String, Object> data) {
2730
this.type = type;
2831
this.action = action;
29-
this.identifiers = identifiers;
30-
this.data = data;
32+
this.identifiers = identifiers != null ? identifiers : Collections.emptyMap();
33+
this.data = data != null ? data : Collections.emptyMap();
3134
}
3235

3336
public String getType() {
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
/**
2+
*
3+
* Copyright 2022, Optimizely
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package com.optimizely.ab.odp;
18+
19+
import com.optimizely.ab.event.internal.BuildVersionInfo;
20+
import com.optimizely.ab.event.internal.ClientEngineInfo;
21+
import com.optimizely.ab.odp.serializer.ODPJsonSerializerFactory;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
import javax.annotation.Nonnull;
26+
import javax.annotation.Nullable;
27+
import java.util.*;
28+
import java.util.concurrent.*;
29+
30+
public class ODPEventManager {
31+
private static final Logger logger = LoggerFactory.getLogger(ODPEventManager.class);
32+
private static final int DEFAULT_BATCH_SIZE = 10;
33+
private static final int DEFAULT_QUEUE_SIZE = 10000;
34+
private static final int DEFAULT_FLUSH_INTERVAL = 1000;
35+
private static final int MAX_RETRIES = 3;
36+
private static final String EVENT_URL_PATH = "/v3/events";
37+
38+
private final int queueSize;
39+
private final int batchSize;
40+
private final int flushInterval;
41+
42+
private Boolean isRunning = false;
43+
44+
// This needs to be volatile because it will be updated in the main thread and the event dispatcher thread
45+
// needs to see the change immediately.
46+
private volatile ODPConfig odpConfig;
47+
private EventDispatcherThread eventDispatcherThread;
48+
49+
private final ODPApiManager apiManager;
50+
51+
// The eventQueue needs to be thread safe. We are not doing anything extra for thread safety here
52+
// because `LinkedBlockingQueue` itself is thread safe.
53+
private final BlockingQueue<ODPEvent> eventQueue = new LinkedBlockingQueue<>();
54+
55+
public ODPEventManager(@Nonnull ODPConfig odpConfig, @Nonnull ODPApiManager apiManager) {
56+
this(odpConfig, apiManager, null, null, null);
57+
}
58+
59+
public ODPEventManager(@Nonnull ODPConfig odpConfig, @Nonnull ODPApiManager apiManager, @Nullable Integer batchSize, @Nullable Integer queueSize, @Nullable Integer flushInterval) {
60+
this.odpConfig = odpConfig;
61+
this.apiManager = apiManager;
62+
this.batchSize = (batchSize != null && batchSize > 1) ? batchSize : DEFAULT_BATCH_SIZE;
63+
this.queueSize = queueSize != null ? queueSize : DEFAULT_QUEUE_SIZE;
64+
this.flushInterval = (flushInterval != null && flushInterval > 0) ? flushInterval : DEFAULT_FLUSH_INTERVAL;
65+
}
66+
67+
public void start() {
68+
isRunning = true;
69+
eventDispatcherThread = new EventDispatcherThread();
70+
eventDispatcherThread.start();
71+
}
72+
73+
public void updateSettings(ODPConfig odpConfig) {
74+
this.odpConfig = odpConfig;
75+
}
76+
77+
public void identifyUser(@Nullable String vuid, String userId) {
78+
Map<String, String> identifiers = new HashMap<>();
79+
if (vuid != null) {
80+
identifiers.put(ODPUserKey.VUID.getKeyString(), vuid);
81+
}
82+
identifiers.put(ODPUserKey.FS_USER_ID.getKeyString(), userId);
83+
ODPEvent event = new ODPEvent("fullstack", "client_initialized", identifiers, null);
84+
sendEvent(event);
85+
}
86+
87+
public void sendEvent(ODPEvent event) {
88+
event.setData(augmentCommonData(event.getData()));
89+
processEvent(event);
90+
}
91+
92+
private Map<String, Object> augmentCommonData(Map<String, Object> sourceData) {
93+
Map<String, Object> data = new HashMap<>();
94+
data.put("idempotence_id", UUID.randomUUID().toString());
95+
data.put("data_source_type", "sdk");
96+
data.put("data_source", ClientEngineInfo.getClientEngine().getClientEngineValue());
97+
data.put("data_source_version", BuildVersionInfo.getClientVersion());
98+
data.putAll(sourceData);
99+
return data;
100+
}
101+
102+
private void processEvent(ODPEvent event) {
103+
if (!isRunning) {
104+
logger.warn("Failed to Process ODP Event. ODPEventManager is not running");
105+
return;
106+
}
107+
108+
if (!odpConfig.isReady()) {
109+
logger.debug("Unable to Process ODP Event. ODPConfig is not ready.");
110+
return;
111+
}
112+
113+
if (eventQueue.size() >= queueSize) {
114+
logger.warn("Failed to Process ODP Event. Event Queue full. queueSize = " + queueSize);
115+
return;
116+
}
117+
118+
if (!eventQueue.offer(event)) {
119+
logger.error("Failed to Process ODP Event. Event Queue is not accepting any more events");
120+
}
121+
}
122+
123+
public void stop() {
124+
logger.debug("Sending stop signal to ODP Event Dispatcher Thread");
125+
eventDispatcherThread.signalStop();
126+
}
127+
128+
private class EventDispatcherThread extends Thread {
129+
130+
private volatile boolean shouldStop = false;
131+
132+
private final List<ODPEvent> currentBatch = new ArrayList<>();
133+
134+
private long nextFlushTime = new Date().getTime();
135+
136+
@Override
137+
public void run() {
138+
while (true) {
139+
try {
140+
ODPEvent nextEvent;
141+
142+
// If batch has events, set the timeout to remaining time for flush interval,
143+
// otherwise wait for the new event indefinitely
144+
if (currentBatch.size() > 0) {
145+
nextEvent = eventQueue.poll(nextFlushTime - new Date().getTime(), TimeUnit.MILLISECONDS);
146+
} else {
147+
nextEvent = eventQueue.poll();
148+
}
149+
150+
if (nextEvent == null) {
151+
// null means no new events received and flush interval is over, dispatch whatever is in the batch.
152+
if (!currentBatch.isEmpty()) {
153+
flush();
154+
}
155+
if (shouldStop) {
156+
break;
157+
}
158+
continue;
159+
}
160+
161+
if (currentBatch.size() == 0) {
162+
// Batch starting, create a new flush time
163+
nextFlushTime = new Date().getTime() + flushInterval;
164+
}
165+
166+
currentBatch.add(nextEvent);
167+
168+
if (currentBatch.size() >= batchSize) {
169+
flush();
170+
}
171+
} catch (InterruptedException e) {
172+
Thread.currentThread().interrupt();
173+
}
174+
}
175+
176+
logger.debug("Exiting ODP Event Dispatcher Thread.");
177+
}
178+
179+
private void flush() {
180+
if (odpConfig.isReady()) {
181+
String payload = ODPJsonSerializerFactory.getSerializer().serializeEvents(currentBatch);
182+
String endpoint = odpConfig.getApiHost() + EVENT_URL_PATH;
183+
Integer statusCode;
184+
int numAttempts = 0;
185+
do {
186+
statusCode = apiManager.sendEvents(odpConfig.getApiKey(), endpoint, payload);
187+
numAttempts ++;
188+
} while (numAttempts < MAX_RETRIES && statusCode != null && (statusCode == 0 || statusCode >= 500));
189+
} else {
190+
logger.debug("ODPConfig not ready, discarding event batch");
191+
}
192+
currentBatch.clear();
193+
}
194+
195+
public void signalStop() {
196+
shouldStop = true;
197+
}
198+
}
199+
}

0 commit comments

Comments
 (0)