Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,11 @@ build {

ext {
retrofit_version = "2.9.0"
jackson_version = "2.13.3"
jackson_version = "2.14.2"
swagger_annotations_version = "2.2.0"
lombok_version = "1.18.24"
okhttp_version = "4.9.3"
wasmtime_version = "0.11.0"
wasmtime_version = "0.14.0"

junit_version = "4.13.2"
mockito_core_version = "4.6.1"
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,8 @@
<swagger-core-version>2.2.0</swagger-core-version>
<retrofit-version>2.9.0</retrofit-version>
<lombok-version>1.18.24</lombok-version>
<wasmtime-version>0.11.0</wasmtime-version>
<jackson-version>2.13.3</jackson-version>
<wasmtime-version>0.14.0</wasmtime-version>
<jackson-version>2.14.2</jackson-version>
<junit-version>4.13.2</junit-version>
<mockito-core-version>4.6.1</mockito-core-version>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,15 @@ public class LocalBucketing {
AtomicReference<Memory> memRef; // reference to start of WASM's memory
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

private Set<Integer> pinnedAddresses;
private HashMap<String, Integer> sdkKeyAddresses;

public LocalBucketing() {
OBJECT_MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL);

pinnedAddresses = new HashSet<>();
sdkKeyAddresses = new HashMap<>();

store = Store.withoutData();
linker = new Linker(store.engine());
memRef = new AtomicReference<>();
Expand Down Expand Up @@ -122,78 +128,85 @@ private static long getUnsignedInt(byte[] data) {
}


public void storeConfig(String token, String config) {
int tokenAddress = newWasmString(token);
public void storeConfig(String sdkKey, String config) {
unpinAll();
int sdkKeyAddress = getSDKKeyAddress(sdkKey);
int configAddress = newWasmString(config);

Func setConfigDataPtr = linker.get(store, "", "setConfigData").get().func();
WasmFunctions.Consumer2<Integer, Integer> fn = WasmFunctions.consumer(store, setConfigDataPtr, I32, I32);
fn.accept(tokenAddress, configAddress);
fn.accept(sdkKeyAddress, configAddress);
}

public void setPlatformData(String platformData) {
unpinAll();
int platformDataAddress = newWasmString(platformData);

Func setPlatformDataPtr = linker.get(store, "", "setPlatformData").get().func();
WasmFunctions.Consumer1<Integer> fn = WasmFunctions.consumer(store, setPlatformDataPtr, I32);
fn.accept(platformDataAddress);
}

public BucketedUserConfig generateBucketedConfig(String token, User user) throws JsonProcessingException {
public BucketedUserConfig generateBucketedConfig(String sdkKey, User user) throws JsonProcessingException {
unpinAll();
String userString = OBJECT_MAPPER.writeValueAsString(user);

int tokenAddress = newWasmString(token);
int sdkKeyAddress = getSDKKeyAddress(sdkKey);
int userAddress = newWasmString(userString);

Func generateBucketedConfigForUserPtr = linker.get(store, "", "generateBucketedConfigForUser").get().func();
WasmFunctions.Function2<Integer, Integer, Integer> generateBucketedConfigForUser = WasmFunctions.func(
store, generateBucketedConfigForUserPtr, I32, I32, I32);

int resultAddress = generateBucketedConfigForUser.call(tokenAddress, userAddress);
int resultAddress = generateBucketedConfigForUser.call(sdkKeyAddress, userAddress);
String bucketedConfigString = readWasmString(resultAddress);

ObjectMapper objectMapper = new ObjectMapper();
BucketedUserConfig config = objectMapper.readValue(bucketedConfigString, BucketedUserConfig.class);
return config;
}

public void initEventQueue(String token, String options) {
int tokenAddress = newWasmString(token);
public void initEventQueue(String sdkKey, String options) {
unpinAll();
int sdkKeyAddress = getSDKKeyAddress(sdkKey);
int optionsAddress = newWasmString(options);

Func initEventQueuePtr = linker.get(store, "", "initEventQueue").get().func();
WasmFunctions.Consumer2<Integer, Integer> fn = WasmFunctions.consumer(store, initEventQueuePtr, I32, I32);
fn.accept(tokenAddress, optionsAddress);
fn.accept(sdkKeyAddress, optionsAddress);
}

public void queueEvent(String token, String user, String event) {
int tokenAddress = newWasmString(token);
int userAddress = newWasmString(user);
public void queueEvent(String sdkKey, String user, String event) {
unpinAll();
int sdkKeyAddress = newWasmString(sdkKey);
int userAddress = getPinnedParameter(user);
int eventAddress = newWasmString(event);

Func queueEventPtr = linker.get(store, "", "queueEvent").get().func();
WasmFunctions.Consumer3<Integer, Integer, Integer> fn = WasmFunctions.consumer(store, queueEventPtr, I32, I32, I32);
fn.accept(tokenAddress, userAddress, eventAddress);
fn.accept(sdkKeyAddress, userAddress, eventAddress);
}

public void queueAggregateEvent(String token, String event, String variableVariationMap) {
int tokenAddress = newWasmString(token);
int eventAddress = newWasmString(event);
public void queueAggregateEvent(String sdkKey, String event, String variableVariationMap) {
unpinAll();
int sdkKeyAddress = getSDKKeyAddress(sdkKey);
int eventAddress = getPinnedParameter(event);
int variableVariationMapAddress = newWasmString(variableVariationMap);

Func queueAggregateEventPtr = linker.get(store, "", "queueAggregateEvent").get().func();
WasmFunctions.Consumer3<Integer, Integer, Integer> fn = WasmFunctions.consumer(store, queueAggregateEventPtr, I32, I32, I32);
fn.accept(tokenAddress, eventAddress, variableVariationMapAddress);
fn.accept(sdkKeyAddress, eventAddress, variableVariationMapAddress);
}

public FlushPayload[] flushEventQueue(String token) throws JsonProcessingException {
int tokenAddress = newWasmString(token);
public FlushPayload[] flushEventQueue(String sdkKey) throws JsonProcessingException {
unpinAll();
int sdkKeyAddress = getSDKKeyAddress(sdkKey);

Func flushEventQueuePtr = linker.get(store, "", "flushEventQueue").get().func();
WasmFunctions.Function1<Integer, Integer> fn = WasmFunctions.func(
store, flushEventQueuePtr, I32, I32);

int resultAddress = fn.call(tokenAddress);
int resultAddress = fn.call(sdkKeyAddress);
String flushPayloadsStr = readWasmString(resultAddress);

ObjectMapper objectMapper = new ObjectMapper();
Expand All @@ -208,32 +221,71 @@ public FlushPayload[] flushEventQueue(String token) throws JsonProcessingExcepti
return payloads;
}

public void onPayloadFailure(String token, String payloadId, boolean retryable) {
int tokenAddress = newWasmString(token);
public void onPayloadFailure(String sdkKey, String payloadId, boolean retryable) {
unpinAll();
int sdkKeyAddress = getSDKKeyAddress(sdkKey);
int payloadIdAddress = newWasmString(payloadId);

Func onPayloadFailurePtr = linker.get(store, "", "onPayloadFailure").get().func();
WasmFunctions.Consumer3<Integer, Integer, Integer> fn = WasmFunctions.consumer(store, onPayloadFailurePtr, I32, I32, I32);
fn.accept(tokenAddress, payloadIdAddress, retryable ? 1 : 0);
fn.accept(sdkKeyAddress, payloadIdAddress, retryable ? 1 : 0);
}

public void onPayloadSuccess(String token, String payloadId) {
int tokenAddress = newWasmString(token);
public void onPayloadSuccess(String sdkKey, String payloadId) {
unpinAll();
int sdkKeyAddress = getSDKKeyAddress(sdkKey);
int payloadIdAddress = newWasmString(payloadId);

Func onPayloadSuccessPtr = linker.get(store, "", "onPayloadSuccess").get().func();
WasmFunctions.Consumer2<Integer, Integer> fn = WasmFunctions.consumer(store, onPayloadSuccessPtr, I32, I32);
fn.accept(tokenAddress, payloadIdAddress);
fn.accept(sdkKeyAddress, payloadIdAddress);
}

public int getEventQueueSize(String token) {
int tokenAddress = newWasmString(token);
public int getEventQueueSize(String sdkKey) {
unpinAll();
int sdkKeyAddress = getSDKKeyAddress(sdkKey);

Func getEventQueueSizePtr = linker.get(store, "", "eventQueueSize").get().func();
WasmFunctions.Function1<Integer, Integer> getEventQueueSize = WasmFunctions.func(
store, getEventQueueSizePtr, I32, I32);

return getEventQueueSize.call(tokenAddress);
return getEventQueueSize.call(sdkKeyAddress);
}

private void pinParameter(int address) {
Func pinPtr = linker.get(store, "", "__pin").get().func();
WasmFunctions.Consumer1<Integer> pin = WasmFunctions.consumer(store, pinPtr, I32);
pin.accept(address);
}

private void unpinParameter(int address) {
Func unpinPtr = linker.get(store, "", "__unpin").get().func();
WasmFunctions.Consumer1<Integer> unpin = WasmFunctions.consumer(store, unpinPtr, I32);
unpin.accept(address);
}

private void unpinAll() {
for(int address : pinnedAddresses) {
unpinParameter(address);
}
pinnedAddresses.clear();
}

private int getPinnedParameter(String param) {
int address = newWasmString(param);
pinParameter(address);
pinnedAddresses.add(address);
return address;
}

private int getSDKKeyAddress(String sdkKey) {
if(!sdkKeyAddresses.containsKey(sdkKey)) {
int sdkKeyAddress = newWasmString(sdkKey);
pinParameter(sdkKeyAddress);
sdkKeyAddresses.put(sdkKey, sdkKeyAddress);
}

return sdkKeyAddresses.get(sdkKey);
}
}