Skip to content

Commit d826b5c

Browse files
committed
[fix] DVC-6373: use address pinning in Local Bucketing
1 parent f05f0bc commit d826b5c

File tree

2 files changed

+106
-29
lines changed

2 files changed

+106
-29
lines changed
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.devcycle.sdk.server.common.model;
2+
3+
import com.devcycle.sdk.server.common.model.Variable.TypeEnum;
4+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
5+
import com.fasterxml.jackson.annotation.JsonProperty;
6+
import io.swagger.v3.oas.annotations.media.Schema;
7+
import lombok.AllArgsConstructor;
8+
import lombok.Data;
9+
10+
@Data
11+
@AllArgsConstructor
12+
@JsonIgnoreProperties(ignoreUnknown = true)
13+
public class VariableSet<T> {
14+
@Schema(required = false, description = "unique database id")
15+
@JsonProperty("_id")
16+
private String id;
17+
18+
@Schema(required = true, description = "Unique key by Project, can be used in the SDK / API to reference by 'key' rather than _id.")
19+
private String key;
20+
21+
@Schema(required = true, description = "Variable type")
22+
private TypeEnum type;
23+
24+
@Schema(required = true, description = "Variable value can be a string, number, boolean, or JSON")
25+
private T value;
26+
}

src/main/java/com/devcycle/sdk/server/local/bucketing/LocalBucketing.java

Lines changed: 80 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,15 @@ public class LocalBucketing {
2828
AtomicReference<Memory> memRef; // reference to start of WASM's memory
2929
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
3030

31+
private Set<Integer> pinnedAddresses;
32+
private HashMap<String, Integer> sdkKeyAddresses;
33+
3134
public LocalBucketing() {
3235
OBJECT_MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL);
3336

37+
pinnedAddresses = new HashSet<>();
38+
sdkKeyAddresses = new HashMap<>();
39+
3440
store = Store.withoutData();
3541
linker = new Linker(store.engine());
3642
memRef = new AtomicReference<>();
@@ -122,13 +128,14 @@ private static long getUnsignedInt(byte[] data) {
122128
}
123129

124130

125-
public void storeConfig(String token, String config) {
126-
int tokenAddress = newWasmString(token);
131+
public void storeConfig(String sdkKey, String config) {
132+
unpinAll();
133+
int sdkKeyAddress = getSDKKeyAddress(sdkKey);
127134
int configAddress = newWasmString(config);
128135

129136
Func setConfigDataPtr = linker.get(store, "", "setConfigData").get().func();
130137
WasmFunctions.Consumer2<Integer, Integer> fn = WasmFunctions.consumer(store, setConfigDataPtr, I32, I32);
131-
fn.accept(tokenAddress, configAddress);
138+
fn.accept(sdkKeyAddress, configAddress);
132139
}
133140

134141
public void setPlatformData(String platformData) {
@@ -139,61 +146,66 @@ public void setPlatformData(String platformData) {
139146
fn.accept(platformDataAddress);
140147
}
141148

142-
public BucketedUserConfig generateBucketedConfig(String token, User user) throws JsonProcessingException {
149+
public BucketedUserConfig generateBucketedConfig(String sdkKey, User user) throws JsonProcessingException {
150+
unpinAll();
143151
String userString = OBJECT_MAPPER.writeValueAsString(user);
144152

145-
int tokenAddress = newWasmString(token);
153+
int sdkKeyAddress = getSDKKeyAddress(sdkKey);
146154
int userAddress = newWasmString(userString);
147155

148156
Func generateBucketedConfigForUserPtr = linker.get(store, "", "generateBucketedConfigForUser").get().func();
149157
WasmFunctions.Function2<Integer, Integer, Integer> generateBucketedConfigForUser = WasmFunctions.func(
150158
store, generateBucketedConfigForUserPtr, I32, I32, I32);
151159

152-
int resultAddress = generateBucketedConfigForUser.call(tokenAddress, userAddress);
160+
int resultAddress = generateBucketedConfigForUser.call(sdkKeyAddress, userAddress);
153161
String bucketedConfigString = readWasmString(resultAddress);
154162

155163
ObjectMapper objectMapper = new ObjectMapper();
156164
BucketedUserConfig config = objectMapper.readValue(bucketedConfigString, BucketedUserConfig.class);
157165
return config;
158166
}
159167

160-
public void initEventQueue(String token, String options) {
161-
int tokenAddress = newWasmString(token);
168+
public void initEventQueue(String sdkKey, String options) {
169+
unpinAll();
170+
int sdkKeyAddress = getSDKKeyAddress(sdkKey);
162171
int optionsAddress = newWasmString(options);
163172

164173
Func initEventQueuePtr = linker.get(store, "", "initEventQueue").get().func();
165174
WasmFunctions.Consumer2<Integer, Integer> fn = WasmFunctions.consumer(store, initEventQueuePtr, I32, I32);
166-
fn.accept(tokenAddress, optionsAddress);
175+
fn.accept(sdkKeyAddress, optionsAddress);
167176
}
168177

169-
public void queueEvent(String token, String user, String event) {
170-
int tokenAddress = newWasmString(token);
171-
int userAddress = newWasmString(user);
178+
public void queueEvent(String sdkKey, String user, String event) {
179+
unpinAll();
180+
int sdkKeyAddress = newWasmString(sdkKey);
181+
int userAddress = getPinnedParameter(user);
172182
int eventAddress = newWasmString(event);
173183

174184
Func queueEventPtr = linker.get(store, "", "queueEvent").get().func();
175185
WasmFunctions.Consumer3<Integer, Integer, Integer> fn = WasmFunctions.consumer(store, queueEventPtr, I32, I32, I32);
176-
fn.accept(tokenAddress, userAddress, eventAddress);
186+
fn.accept(sdkKeyAddress, userAddress, eventAddress);
177187
}
178188

179-
public void queueAggregateEvent(String token, String event, String variableVariationMap) {
180-
int tokenAddress = newWasmString(token);
181-
int eventAddress = newWasmString(event);
189+
public void queueAggregateEvent(String sdkKey, String event, String variableVariationMap) {
190+
unpinAll();
191+
int sdkKeyAddress = getSDKKeyAddress(sdkKey);
192+
int eventAddress = getPinnedParameter(event);
182193
int variableVariationMapAddress = newWasmString(variableVariationMap);
183194

184195
Func queueAggregateEventPtr = linker.get(store, "", "queueAggregateEvent").get().func();
185196
WasmFunctions.Consumer3<Integer, Integer, Integer> fn = WasmFunctions.consumer(store, queueAggregateEventPtr, I32, I32, I32);
186-
fn.accept(tokenAddress, eventAddress, variableVariationMapAddress);
197+
fn.accept(sdkKeyAddress, eventAddress, variableVariationMapAddress);
187198
}
188199

189-
public FlushPayload[] flushEventQueue(String token) throws JsonProcessingException {
190-
int tokenAddress = newWasmString(token);
200+
public FlushPayload[] flushEventQueue(String sdkKey) throws JsonProcessingException {
201+
unpinAll();
202+
int sdkKeyAddress = getSDKKeyAddress(sdkKey);
191203

192204
Func flushEventQueuePtr = linker.get(store, "", "flushEventQueue").get().func();
193205
WasmFunctions.Function1<Integer, Integer> fn = WasmFunctions.func(
194206
store, flushEventQueuePtr, I32, I32);
195207

196-
int resultAddress = fn.call(tokenAddress);
208+
int resultAddress = fn.call(sdkKeyAddress);
197209
String flushPayloadsStr = readWasmString(resultAddress);
198210

199211
ObjectMapper objectMapper = new ObjectMapper();
@@ -208,32 +220,71 @@ public FlushPayload[] flushEventQueue(String token) throws JsonProcessingExcepti
208220
return payloads;
209221
}
210222

211-
public void onPayloadFailure(String token, String payloadId, boolean retryable) {
212-
int tokenAddress = newWasmString(token);
223+
public void onPayloadFailure(String sdkKey, String payloadId, boolean retryable) {
224+
unpinAll();
225+
int sdkKeyAddress = getSDKKeyAddress(sdkKey);
213226
int payloadIdAddress = newWasmString(payloadId);
214227

215228
Func onPayloadFailurePtr = linker.get(store, "", "onPayloadFailure").get().func();
216229
WasmFunctions.Consumer3<Integer, Integer, Integer> fn = WasmFunctions.consumer(store, onPayloadFailurePtr, I32, I32, I32);
217-
fn.accept(tokenAddress, payloadIdAddress, retryable ? 1 : 0);
230+
fn.accept(sdkKeyAddress, payloadIdAddress, retryable ? 1 : 0);
218231
}
219232

220-
public void onPayloadSuccess(String token, String payloadId) {
221-
int tokenAddress = newWasmString(token);
233+
public void onPayloadSuccess(String sdkKey, String payloadId) {
234+
unpinAll();
235+
int sdkKeyAddress = getSDKKeyAddress(sdkKey);
222236
int payloadIdAddress = newWasmString(payloadId);
223237

224238
Func onPayloadSuccessPtr = linker.get(store, "", "onPayloadSuccess").get().func();
225239
WasmFunctions.Consumer2<Integer, Integer> fn = WasmFunctions.consumer(store, onPayloadSuccessPtr, I32, I32);
226-
fn.accept(tokenAddress, payloadIdAddress);
240+
fn.accept(sdkKeyAddress, payloadIdAddress);
227241
}
228242

229-
public int getEventQueueSize(String token) {
230-
int tokenAddress = newWasmString(token);
243+
public int getEventQueueSize(String sdkKey) {
244+
unpinAll();
245+
int sdkKeyAddress = getSDKKeyAddress(sdkKey);
231246

232247
Func getEventQueueSizePtr = linker.get(store, "", "eventQueueSize").get().func();
233248
WasmFunctions.Function1<Integer, Integer> getEventQueueSize = WasmFunctions.func(
234249
store, getEventQueueSizePtr, I32, I32);
235250

236-
return getEventQueueSize.call(tokenAddress);
251+
return getEventQueueSize.call(sdkKeyAddress);
252+
}
253+
254+
private void pinParameter(int address) {
255+
Func pinPtr = linker.get(store, "", "__pin").get().func();
256+
WasmFunctions.Consumer1<Integer> pin = WasmFunctions.consumer(store, pinPtr, I32);
257+
pin.accept(address);
258+
}
259+
260+
private void unpinParameter(int address) {
261+
Func unpinPtr = linker.get(store, "", "__unpin").get().func();
262+
WasmFunctions.Consumer1<Integer> unpin = WasmFunctions.consumer(store, unpinPtr, I32);
263+
unpin.accept(address);
264+
}
265+
266+
private void unpinAll() {
267+
for(int address : pinnedAddresses) {
268+
unpinParameter(address);
269+
}
270+
pinnedAddresses.clear();
271+
}
272+
273+
private int getPinnedParameter(String param) {
274+
int address = newWasmString(param);
275+
pinParameter(address);
276+
pinnedAddresses.add(address);
277+
return address;
278+
}
279+
280+
private int getSDKKeyAddress(String sdkKey) {
281+
if(!sdkKeyAddresses.containsKey(sdkKey)) {
282+
int sdkKeyAddress = newWasmString(sdkKey);
283+
pinParameter(sdkKeyAddress);
284+
sdkKeyAddresses.put(sdkKey, sdkKeyAddress);
285+
}
286+
287+
return sdkKeyAddresses.get(sdkKey);
237288
}
238289
}
239290

0 commit comments

Comments
 (0)