Skip to content

Commit f778b1f

Browse files
committed
[fix] DVC-6373: use address pinning in Local Bucketing
1 parent f05f0bc commit f778b1f

File tree

2 files changed

+107
-29
lines changed

2 files changed

+107
-29
lines changed
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.devcycle.sdk.server.common.model;
2+
3+
import com.devcycle.sdk.server.common.model.Variable.TypeEnum;
4+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
5+
import com.fasterxml.jackson.annotation.JsonProperty;
6+
import io.swagger.v3.oas.annotations.media.Schema;
7+
import lombok.AllArgsConstructor;
8+
import lombok.Data;
9+
10+
@Data
11+
@AllArgsConstructor
12+
@JsonIgnoreProperties(ignoreUnknown = true)
13+
public class VariableSet<T> {
14+
@Schema(required = false, description = "unique database id")
15+
@JsonProperty("_id")
16+
private String id;
17+
18+
@Schema(required = true, description = "Unique key by Project, can be used in the SDK / API to reference by 'key' rather than _id.")
19+
private String key;
20+
21+
@Schema(required = true, description = "Variable type")
22+
private TypeEnum type;
23+
24+
@Schema(required = true, description = "Variable value can be a string, number, boolean, or JSON")
25+
private T value;
26+
}

src/main/java/com/devcycle/sdk/server/local/bucketing/LocalBucketing.java

Lines changed: 81 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,15 @@ public class LocalBucketing {
2828
AtomicReference<Memory> memRef; // reference to start of WASM's memory
2929
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
3030

31+
private Set<Integer> pinnedAddresses;
32+
private HashMap<String, Integer> sdkKeyAddresses;
33+
3134
public LocalBucketing() {
3235
OBJECT_MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL);
3336

37+
pinnedAddresses = new HashSet<>();
38+
sdkKeyAddresses = new HashMap<>();
39+
3440
store = Store.withoutData();
3541
linker = new Linker(store.engine());
3642
memRef = new AtomicReference<>();
@@ -122,78 +128,85 @@ private static long getUnsignedInt(byte[] data) {
122128
}
123129

124130

125-
public void storeConfig(String token, String config) {
126-
int tokenAddress = newWasmString(token);
131+
public void storeConfig(String sdkKey, String config) {
132+
unpinAll();
133+
int sdkKeyAddress = getSDKKeyAddress(sdkKey);
127134
int configAddress = newWasmString(config);
128135

129136
Func setConfigDataPtr = linker.get(store, "", "setConfigData").get().func();
130137
WasmFunctions.Consumer2<Integer, Integer> fn = WasmFunctions.consumer(store, setConfigDataPtr, I32, I32);
131-
fn.accept(tokenAddress, configAddress);
138+
fn.accept(sdkKeyAddress, configAddress);
132139
}
133140

134141
public void setPlatformData(String platformData) {
142+
unpinAll();
135143
int platformDataAddress = newWasmString(platformData);
136144

137145
Func setPlatformDataPtr = linker.get(store, "", "setPlatformData").get().func();
138146
WasmFunctions.Consumer1<Integer> fn = WasmFunctions.consumer(store, setPlatformDataPtr, I32);
139147
fn.accept(platformDataAddress);
140148
}
141149

142-
public BucketedUserConfig generateBucketedConfig(String token, User user) throws JsonProcessingException {
150+
public BucketedUserConfig generateBucketedConfig(String sdkKey, User user) throws JsonProcessingException {
151+
unpinAll();
143152
String userString = OBJECT_MAPPER.writeValueAsString(user);
144153

145-
int tokenAddress = newWasmString(token);
154+
int sdkKeyAddress = getSDKKeyAddress(sdkKey);
146155
int userAddress = newWasmString(userString);
147156

148157
Func generateBucketedConfigForUserPtr = linker.get(store, "", "generateBucketedConfigForUser").get().func();
149158
WasmFunctions.Function2<Integer, Integer, Integer> generateBucketedConfigForUser = WasmFunctions.func(
150159
store, generateBucketedConfigForUserPtr, I32, I32, I32);
151160

152-
int resultAddress = generateBucketedConfigForUser.call(tokenAddress, userAddress);
161+
int resultAddress = generateBucketedConfigForUser.call(sdkKeyAddress, userAddress);
153162
String bucketedConfigString = readWasmString(resultAddress);
154163

155164
ObjectMapper objectMapper = new ObjectMapper();
156165
BucketedUserConfig config = objectMapper.readValue(bucketedConfigString, BucketedUserConfig.class);
157166
return config;
158167
}
159168

160-
public void initEventQueue(String token, String options) {
161-
int tokenAddress = newWasmString(token);
169+
public void initEventQueue(String sdkKey, String options) {
170+
unpinAll();
171+
int sdkKeyAddress = getSDKKeyAddress(sdkKey);
162172
int optionsAddress = newWasmString(options);
163173

164174
Func initEventQueuePtr = linker.get(store, "", "initEventQueue").get().func();
165175
WasmFunctions.Consumer2<Integer, Integer> fn = WasmFunctions.consumer(store, initEventQueuePtr, I32, I32);
166-
fn.accept(tokenAddress, optionsAddress);
176+
fn.accept(sdkKeyAddress, optionsAddress);
167177
}
168178

169-
public void queueEvent(String token, String user, String event) {
170-
int tokenAddress = newWasmString(token);
171-
int userAddress = newWasmString(user);
179+
public void queueEvent(String sdkKey, String user, String event) {
180+
unpinAll();
181+
int sdkKeyAddress = newWasmString(sdkKey);
182+
int userAddress = getPinnedParameter(user);
172183
int eventAddress = newWasmString(event);
173184

174185
Func queueEventPtr = linker.get(store, "", "queueEvent").get().func();
175186
WasmFunctions.Consumer3<Integer, Integer, Integer> fn = WasmFunctions.consumer(store, queueEventPtr, I32, I32, I32);
176-
fn.accept(tokenAddress, userAddress, eventAddress);
187+
fn.accept(sdkKeyAddress, userAddress, eventAddress);
177188
}
178189

179-
public void queueAggregateEvent(String token, String event, String variableVariationMap) {
180-
int tokenAddress = newWasmString(token);
181-
int eventAddress = newWasmString(event);
190+
public void queueAggregateEvent(String sdkKey, String event, String variableVariationMap) {
191+
unpinAll();
192+
int sdkKeyAddress = getSDKKeyAddress(sdkKey);
193+
int eventAddress = getPinnedParameter(event);
182194
int variableVariationMapAddress = newWasmString(variableVariationMap);
183195

184196
Func queueAggregateEventPtr = linker.get(store, "", "queueAggregateEvent").get().func();
185197
WasmFunctions.Consumer3<Integer, Integer, Integer> fn = WasmFunctions.consumer(store, queueAggregateEventPtr, I32, I32, I32);
186-
fn.accept(tokenAddress, eventAddress, variableVariationMapAddress);
198+
fn.accept(sdkKeyAddress, eventAddress, variableVariationMapAddress);
187199
}
188200

189-
public FlushPayload[] flushEventQueue(String token) throws JsonProcessingException {
190-
int tokenAddress = newWasmString(token);
201+
public FlushPayload[] flushEventQueue(String sdkKey) throws JsonProcessingException {
202+
unpinAll();
203+
int sdkKeyAddress = getSDKKeyAddress(sdkKey);
191204

192205
Func flushEventQueuePtr = linker.get(store, "", "flushEventQueue").get().func();
193206
WasmFunctions.Function1<Integer, Integer> fn = WasmFunctions.func(
194207
store, flushEventQueuePtr, I32, I32);
195208

196-
int resultAddress = fn.call(tokenAddress);
209+
int resultAddress = fn.call(sdkKeyAddress);
197210
String flushPayloadsStr = readWasmString(resultAddress);
198211

199212
ObjectMapper objectMapper = new ObjectMapper();
@@ -208,32 +221,71 @@ public FlushPayload[] flushEventQueue(String token) throws JsonProcessingExcepti
208221
return payloads;
209222
}
210223

211-
public void onPayloadFailure(String token, String payloadId, boolean retryable) {
212-
int tokenAddress = newWasmString(token);
224+
public void onPayloadFailure(String sdkKey, String payloadId, boolean retryable) {
225+
unpinAll();
226+
int sdkKeyAddress = getSDKKeyAddress(sdkKey);
213227
int payloadIdAddress = newWasmString(payloadId);
214228

215229
Func onPayloadFailurePtr = linker.get(store, "", "onPayloadFailure").get().func();
216230
WasmFunctions.Consumer3<Integer, Integer, Integer> fn = WasmFunctions.consumer(store, onPayloadFailurePtr, I32, I32, I32);
217-
fn.accept(tokenAddress, payloadIdAddress, retryable ? 1 : 0);
231+
fn.accept(sdkKeyAddress, payloadIdAddress, retryable ? 1 : 0);
218232
}
219233

220-
public void onPayloadSuccess(String token, String payloadId) {
221-
int tokenAddress = newWasmString(token);
234+
public void onPayloadSuccess(String sdkKey, String payloadId) {
235+
unpinAll();
236+
int sdkKeyAddress = getSDKKeyAddress(sdkKey);
222237
int payloadIdAddress = newWasmString(payloadId);
223238

224239
Func onPayloadSuccessPtr = linker.get(store, "", "onPayloadSuccess").get().func();
225240
WasmFunctions.Consumer2<Integer, Integer> fn = WasmFunctions.consumer(store, onPayloadSuccessPtr, I32, I32);
226-
fn.accept(tokenAddress, payloadIdAddress);
241+
fn.accept(sdkKeyAddress, payloadIdAddress);
227242
}
228243

229-
public int getEventQueueSize(String token) {
230-
int tokenAddress = newWasmString(token);
244+
public int getEventQueueSize(String sdkKey) {
245+
unpinAll();
246+
int sdkKeyAddress = getSDKKeyAddress(sdkKey);
231247

232248
Func getEventQueueSizePtr = linker.get(store, "", "eventQueueSize").get().func();
233249
WasmFunctions.Function1<Integer, Integer> getEventQueueSize = WasmFunctions.func(
234250
store, getEventQueueSizePtr, I32, I32);
235251

236-
return getEventQueueSize.call(tokenAddress);
252+
return getEventQueueSize.call(sdkKeyAddress);
253+
}
254+
255+
private void pinParameter(int address) {
256+
Func pinPtr = linker.get(store, "", "__pin").get().func();
257+
WasmFunctions.Consumer1<Integer> pin = WasmFunctions.consumer(store, pinPtr, I32);
258+
pin.accept(address);
259+
}
260+
261+
private void unpinParameter(int address) {
262+
Func unpinPtr = linker.get(store, "", "__unpin").get().func();
263+
WasmFunctions.Consumer1<Integer> unpin = WasmFunctions.consumer(store, unpinPtr, I32);
264+
unpin.accept(address);
265+
}
266+
267+
private void unpinAll() {
268+
for(int address : pinnedAddresses) {
269+
unpinParameter(address);
270+
}
271+
pinnedAddresses.clear();
272+
}
273+
274+
private int getPinnedParameter(String param) {
275+
int address = newWasmString(param);
276+
pinParameter(address);
277+
pinnedAddresses.add(address);
278+
return address;
279+
}
280+
281+
private int getSDKKeyAddress(String sdkKey) {
282+
if(!sdkKeyAddresses.containsKey(sdkKey)) {
283+
int sdkKeyAddress = newWasmString(sdkKey);
284+
pinParameter(sdkKeyAddress);
285+
sdkKeyAddresses.put(sdkKey, sdkKeyAddress);
286+
}
287+
288+
return sdkKeyAddresses.get(sdkKey);
237289
}
238290
}
239291

0 commit comments

Comments
 (0)