Skip to content

Commit 742a05b

Browse files
Merge pull request #206 from skyflowapi/bharti/SK-2258-implement-insert-public-interface-for-v-3-flow-db-in-java-sdk
SK-2258 fix the error response handling
2 parents a206fde + c82ec17 commit 742a05b

File tree

2 files changed

+15
-11
lines changed

2 files changed

+15
-11
lines changed

v3/src/main/java/com/skyflow/utils/Utils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public static com.skyflow.vault.data.InsertResponse formatResponse(InsertRespons
125125
tokensMap.put(key, tokenList);
126126
}
127127
}
128-
Success success = new Success(indexNumber, record.get(index).getSkyflowId().get(), tokensMap, null);
128+
Success success = new Success(indexNumber, record.get(index).getSkyflowId().get(), tokensMap, record.get(index).getData().isPresent() ? record.get(index).getData().get() : null);
129129
successRecords.add(success);
130130
}
131131
indexNumber++;

v3/src/main/java/com/skyflow/vault/controller/VaultController.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.github.cdimascio.dotenv.Dotenv;
2323

2424
import java.util.ArrayList;
25+
import java.util.Collections;
2526
import java.util.HashMap;
2627
import java.util.List;
2728
import java.util.concurrent.CompletableFuture;
@@ -77,13 +78,13 @@ public CompletableFuture<com.skyflow.vault.data.InsertResponse> bulkInsertAsync(
7778
setBearerToken();
7879
configureInsertConcurrencyAndBatchSize(insertRequest.getValues().size());
7980
com.skyflow.generated.rest.resources.recordservice.requests.InsertRequest request = super.getBulkInsertRequestBody(insertRequest, super.getVaultConfig());
80-
81-
List<CompletableFuture<com.skyflow.vault.data.InsertResponse>> futures = this.insertBatchFutures(request);
81+
List<ErrorRecord> errorRecords = Collections.synchronizedList(new ArrayList<>());
82+
List<CompletableFuture<com.skyflow.vault.data.InsertResponse>> futures = this.insertBatchFutures(request, errorRecords);
8283

8384
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
8485
.thenApply(v -> {
8586
List<Success> successRecords = new ArrayList<>();
86-
List<ErrorRecord> errorRecords = new ArrayList<>();
87+
// List<ErrorRecord> errorRecords = new ArrayList<>();
8788

8889
for (CompletableFuture<com.skyflow.vault.data.InsertResponse> future : futures) {
8990
com.skyflow.vault.data.InsertResponse futureResponse = future.join();
@@ -111,10 +112,10 @@ private com.skyflow.vault.data.InsertResponse processSync(
111112
ArrayList<HashMap<String, Object>> originalPayload
112113
) throws ExecutionException, InterruptedException {
113114
LogUtil.printInfoLog(InfoLogs.PROCESSING_BATCHES.getLog());
114-
List<ErrorRecord> errorRecords = new ArrayList<>();
115+
// List<ErrorRecord> errorRecords = new ArrayList<>();
115116
List<Success> successRecords = new ArrayList<>();
116-
117-
List<CompletableFuture<com.skyflow.vault.data.InsertResponse>> futures = this.insertBatchFutures(insertRequest);
117+
List<ErrorRecord> errorRecords = Collections.synchronizedList(new ArrayList<>());
118+
List<CompletableFuture<com.skyflow.vault.data.InsertResponse>> futures = this.insertBatchFutures(insertRequest, errorRecords);
118119

119120
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
120121
allFutures.join();
@@ -137,8 +138,8 @@ private com.skyflow.vault.data.InsertResponse processSync(
137138

138139

139140
private List<CompletableFuture<com.skyflow.vault.data.InsertResponse>> insertBatchFutures(
140-
com.skyflow.generated.rest.resources.recordservice.requests.InsertRequest insertRequest
141-
) {
141+
com.skyflow.generated.rest.resources.recordservice.requests.InsertRequest insertRequest,
142+
List<ErrorRecord> errorRecords) {
142143
List<InsertRecordData> records = insertRequest.getRecords().get();
143144

144145
ExecutorService executor = Executors.newFixedThreadPool(insertConcurrencyLimit);
@@ -152,7 +153,10 @@ private List<CompletableFuture<com.skyflow.vault.data.InsertResponse>> insertBat
152153
CompletableFuture<com.skyflow.vault.data.InsertResponse> future = CompletableFuture
153154
.supplyAsync(() -> insertBatch(batch, insertRequest.getTableName().get()), executor)
154155
.thenApply(response -> formatResponse(response, batchNumber, insertBatchSize))
155-
.exceptionally(ex -> new com.skyflow.vault.data.InsertResponse(null, handleBatchException(ex, batch, batchNumber, batches)));
156+
.exceptionally(ex -> {
157+
errorRecords.addAll(handleBatchException(ex, batch, batchNumber, batches));
158+
return null;
159+
});
156160
futures.add(future);
157161
}
158162
} finally {
@@ -181,7 +185,7 @@ private void configureInsertConcurrencyAndBatchSize(int totalRequests) {
181185
int batchSize = Integer.parseInt(userProvidedBatchSize);
182186
int maxBatchSize = Math.min(batchSize, Constants.MAX_INSERT_BATCH_SIZE);
183187
if (maxBatchSize > 0) {
184-
this.insertBatchSize = batchSize;
188+
this.insertBatchSize = maxBatchSize;
185189
} else {
186190
LogUtil.printWarningLog(WarningLogs.INVALID_BATCH_SIZE_PROVIDED.getLog());
187191
this.insertBatchSize = Constants.INSERT_BATCH_SIZE;

0 commit comments

Comments
 (0)