@@ -77,14 +77,13 @@ public CompletableFuture<com.skyflow.vault.data.InsertResponse> bulkInsertAsync(
7777 setBearerToken ();
7878 configureInsertConcurrencyAndBatchSize (insertRequest .getValues ().size ());
7979 com .skyflow .generated .rest .resources .recordservice .requests .InsertRequest request = super .getBulkInsertRequestBody (insertRequest , super .getVaultConfig ());
80- ExecutorService executor = Executors .newFixedThreadPool (insertConcurrencyLimit );
8180
82- List <ErrorRecord > errorRecords = new ArrayList <>();
83- List <CompletableFuture <com .skyflow .vault .data .InsertResponse >> futures = this .insertBatchFutures (request , errorRecords , executor );
81+ List <CompletableFuture <com .skyflow .vault .data .InsertResponse >> futures = this .insertBatchFutures (request );
8482
8583 return CompletableFuture .allOf (futures .toArray (new CompletableFuture [0 ]))
8684 .thenApply (v -> {
8785 List <Success > successRecords = new ArrayList <>();
86+ List <ErrorRecord > errorRecords = new ArrayList <>();
8887
8988 for (CompletableFuture <com .skyflow .vault .data .InsertResponse > future : futures ) {
9089 com .skyflow .vault .data .InsertResponse futureResponse = future .join ();
@@ -97,7 +96,6 @@ public CompletableFuture<com.skyflow.vault.data.InsertResponse> bulkInsertAsync(
9796 }
9897 }
9998 }
100- executor .shutdown (); // Shutdown the executor after all tasks are completed
10199
102100 return new com .skyflow .vault .data .InsertResponse (successRecords , errorRecords , insertRequest .getValues ());
103101 });
@@ -115,9 +113,9 @@ private com.skyflow.vault.data.InsertResponse processSync(
115113 LogUtil .printInfoLog (InfoLogs .PROCESSING_BATCHES .getLog ());
116114 List <ErrorRecord > errorRecords = new ArrayList <>();
117115 List <Success > successRecords = new ArrayList <>();
118- ExecutorService executor = Executors .newFixedThreadPool (insertConcurrencyLimit );
119116
120- List <CompletableFuture <com .skyflow .vault .data .InsertResponse >> futures = this .insertBatchFutures (insertRequest , errorRecords , executor );
117+ List <CompletableFuture <com .skyflow .vault .data .InsertResponse >> futures = this .insertBatchFutures (insertRequest );
118+
121119 CompletableFuture <Void > allFutures = CompletableFuture .allOf (futures .toArray (new CompletableFuture [0 ]));
122120 allFutures .join ();
123121
@@ -131,7 +129,6 @@ private com.skyflow.vault.data.InsertResponse processSync(
131129 errorRecords .addAll (futureResponse .getErrors ());
132130 }
133131 }
134- executor .shutdown (); // Shutdown the executor after all tasks are completed
135132 }
136133 com .skyflow .vault .data .InsertResponse response = new com .skyflow .vault .data .InsertResponse (successRecords , errorRecords , originalPayload );
137134 LogUtil .printInfoLog (InfoLogs .INSERT_REQUEST_RESOLVED .getLog ());
@@ -140,9 +137,11 @@ private com.skyflow.vault.data.InsertResponse processSync(
140137
141138
142139 private List <CompletableFuture <com .skyflow .vault .data .InsertResponse >> insertBatchFutures (
143- com .skyflow .generated .rest .resources .recordservice .requests .InsertRequest insertRequest , List <ErrorRecord > errorRecords , ExecutorService executor ) {
140+ com .skyflow .generated .rest .resources .recordservice .requests .InsertRequest insertRequest
141+ ) {
144142 List <InsertRecordData > records = insertRequest .getRecords ().get ();
145143
144+ ExecutorService executor = Executors .newFixedThreadPool (insertConcurrencyLimit );
146145 List <List <InsertRecordData >> batches = Utils .createBatches (records , insertBatchSize );
147146 List <CompletableFuture <com .skyflow .vault .data .InsertResponse >> futures = new ArrayList <>();
148147
@@ -153,12 +152,7 @@ private List<CompletableFuture<com.skyflow.vault.data.InsertResponse>> insertBat
153152 CompletableFuture <com .skyflow .vault .data .InsertResponse > future = CompletableFuture
154153 .supplyAsync (() -> insertBatch (batch , insertRequest .getTableName ().get ()), executor )
155154 .thenApply (response -> formatResponse (response , batchNumber , insertBatchSize ))
156- .exceptionally (ex -> {
157- synchronized (errorRecords ){
158- errorRecords .addAll (handleBatchException (ex , batch , batchNumber , batches ));
159- }
160- return null ;
161- });
155+ .exceptionally (ex -> new com .skyflow .vault .data .InsertResponse (null , handleBatchException (ex , batch , batchNumber , batches )));
162156 futures .add (future );
163157 }
164158 } finally {
@@ -225,4 +219,4 @@ private void configureInsertConcurrencyAndBatchSize(int totalRequests) {
225219 this .insertConcurrencyLimit = Math .min (Constants .INSERT_CONCURRENCY_LIMIT , maxConcurrencyNeeded );
226220 }
227221 }
228- }
222+ }
0 commit comments