Skip to content

Commit 16be91e

Browse files
SK-2258 fix default concurrency limit logic
1 parent 6f765f6 commit 16be91e

File tree

11 files changed

+1198
-74
lines changed

11 files changed

+1198
-74
lines changed

v3/src/main/java/com/skyflow/utils/Utils.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -103,16 +103,16 @@ public static com.skyflow.vault.data.InsertResponse formatResponse(InsertRespons
103103
int recordsSize = response.getRecords().get().size();
104104
for (int index = 0; index < recordsSize; index++) {
105105
if (record.get(index).getError().isPresent()) {
106-
ErrorRecord errorRecord = new ErrorRecord();
107-
errorRecord.setIndex(indexNumber);
108-
errorRecord.setError(record.get(index).getError().get());
109-
errorRecord.setCode(record.get(index).getHttpCode().get());
106+
ErrorRecord errorRecord = new ErrorRecord(indexNumber, record.get(index).getError().get(), record.get(index).getHttpCode().get());
107+
// errorRecord.setIndex(indexNumber);
108+
// errorRecord.setError(record.get(index).getError().get());
109+
// errorRecord.setCode(record.get(index).getHttpCode().get());
110110
errorRecords.add(errorRecord);
111111
// errorRecord.setCode(record.get(index).getError().get().getCode());
112112
} else {
113-
Success success = new Success();
114-
success.setIndex(indexNumber);
115-
success.setSkyflowId(record.get(index).getSkyflowId().get());
113+
Success success = new Success(index, record.get(index).getSkyflowId().get(), null, null);
114+
// success.setIndex(indexNumber);
115+
// success.setSkyflowId(record.get(index).getSkyflowId().get());
116116
// success.setData(record.get(index).getData().get());
117117
if (record.get(index).getTokens().isPresent()) {
118118
List<Token> tokens = null;

v3/src/main/java/com/skyflow/utils/validations/Validations.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ private Validations() {
1818
super();
1919
}
2020

21+
// add validations specific to v3 SDK
2122
public static void validateInsertRequest(InsertRequest insertRequest) throws SkyflowException {
2223
String table = insertRequest.getTable();
2324
ArrayList<HashMap<String, Object>> values = insertRequest.getValues();
@@ -43,6 +44,10 @@ public static void validateInsertRequest(InsertRequest insertRequest) throws Sky
4344
ErrorLogs.EMPTY_VALUES.getLog(), InterfaceName.INSERT.getName()
4445
));
4546
throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.EmptyValues.getMessage());
47+
} else if (upsert != null && upsert.isEmpty()){
48+
LogUtil.printErrorLog(Utils.parameterizedString(
49+
ErrorLogs.EMPTY_UPSERT.getLog(), InterfaceName.INSERT.getName()
50+
));
4651
}
4752
// upsert
4853

v3/src/main/java/com/skyflow/vault/controller/VaultController.java

Lines changed: 33 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -203,43 +203,50 @@ private InsertResponse insertBatch(List<InsertRecordData> batch, String tableNam
203203
}
204204

205205
private void configureConcurrencyAndBatchSize(int totalRequests) {
206-
Dotenv dotenv = Dotenv.load();
207-
String userProvidedBatchSize = dotenv.get("BATCH_SIZE");
208-
String userProvidedConcurrencyLimit = dotenv.get("CONCURRENCY_LIMIT");
209-
210-
if (userProvidedBatchSize != null) {
211-
try {
212-
int batchSize = Integer.parseInt(userProvidedBatchSize);
213-
if (batchSize > 0) {
214-
this.batchSize = batchSize;
215-
} else {
206+
try {
207+
Dotenv dotenv = Dotenv.load();
208+
String userProvidedBatchSize = dotenv.get("BATCH_SIZE");
209+
String userProvidedConcurrencyLimit = dotenv.get("CONCURRENCY_LIMIT");
210+
211+
if (userProvidedBatchSize != null) {
212+
try {
213+
int batchSize = Integer.parseInt(userProvidedBatchSize);
214+
if (batchSize > 0) {
215+
this.batchSize = batchSize;
216+
} else {
217+
LogUtil.printWarningLog(WarningLogs.INVALID_BATCH_SIZE_PROVIDED.getLog());
218+
this.batchSize = Constants.BATCH_SIZE;
219+
}
220+
} catch (NumberFormatException e) {
216221
LogUtil.printWarningLog(WarningLogs.INVALID_BATCH_SIZE_PROVIDED.getLog());
217222
this.batchSize = Constants.BATCH_SIZE;
218223
}
219-
} catch (NumberFormatException e) {
220-
LogUtil.printWarningLog(WarningLogs.INVALID_BATCH_SIZE_PROVIDED.getLog());
221-
this.batchSize = Constants.BATCH_SIZE;
222224
}
223-
}
224225

225-
// Max no of threads required to run all batches concurrently at once
226-
int maxConcurrencyNeeded = (totalRequests + this.batchSize - 1) / this.batchSize;
227-
228-
if (userProvidedConcurrencyLimit != null) {
229-
try {
230-
int concurrencyLimit = Integer.parseInt(userProvidedConcurrencyLimit);
231-
if (concurrencyLimit > 0) {
232-
this.concurrencyLimit = Math.min(concurrencyLimit, maxConcurrencyNeeded);
233-
} else {
226+
// Max no of threads required to run all batches concurrently at once
227+
int maxConcurrencyNeeded = (totalRequests + this.batchSize - 1) / this.batchSize;
228+
229+
if (userProvidedConcurrencyLimit != null) {
230+
try {
231+
int concurrencyLimit = Integer.parseInt(userProvidedConcurrencyLimit);
232+
if (concurrencyLimit > 0) {
233+
this.concurrencyLimit = Math.min(concurrencyLimit, maxConcurrencyNeeded);
234+
} else {
235+
LogUtil.printWarningLog(WarningLogs.INVALID_CONCURRENCY_LIMIT_PROVIDED.getLog());
236+
this.concurrencyLimit = Math.min(Constants.CONCURRENCY_LIMIT, maxConcurrencyNeeded);
237+
}
238+
} catch (NumberFormatException e) {
234239
LogUtil.printWarningLog(WarningLogs.INVALID_CONCURRENCY_LIMIT_PROVIDED.getLog());
235240
this.concurrencyLimit = Math.min(Constants.CONCURRENCY_LIMIT, maxConcurrencyNeeded);
236241
}
237-
} catch (NumberFormatException e) {
238-
LogUtil.printWarningLog(WarningLogs.INVALID_CONCURRENCY_LIMIT_PROVIDED.getLog());
242+
} else {
239243
this.concurrencyLimit = Math.min(Constants.CONCURRENCY_LIMIT, maxConcurrencyNeeded);
240244
}
241-
} else {
245+
} catch (Exception e) {
246+
this.batchSize = Constants.BATCH_SIZE;
247+
int maxConcurrencyNeeded = (totalRequests + this.batchSize - 1) / this.batchSize;
242248
this.concurrencyLimit = Math.min(Constants.CONCURRENCY_LIMIT, maxConcurrencyNeeded);
243249
}
244250
}
251+
245252
}

v3/src/main/java/com/skyflow/vault/data/ErrorRecord.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,28 @@
11
package com.skyflow.vault.data;
22

33
import com.google.gson.Gson;
4+
import com.google.gson.annotations.Expose;
45

56
public class ErrorRecord {
7+
@Expose(serialize = true)
68
private int index;
9+
@Expose(serialize = true)
710
private String error;
8-
11+
@Expose(serialize = true)
912
private int code;
1013
public int getIndex() {
1114
return index;
1215
}
1316

17+
public ErrorRecord() {
18+
}
19+
20+
public ErrorRecord(int index, String error, int code) {
21+
this.index = index;
22+
this.error = error;
23+
this.code = code;
24+
}
25+
1426
public void setIndex(int index) {
1527
this.index = index;
1628
}

v3/src/main/java/com/skyflow/vault/data/InsertResponse.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,11 @@
1111

1212
public class InsertResponse {
1313
// These members will be included in the toString() output
14-
@Expose
14+
@Expose(serialize = true)
1515
private Summary summary;
16-
@Expose
16+
@Expose(serialize = true)
1717
private List<Success> success;
18-
@Expose
18+
@Expose(serialize = true)
1919
private List<ErrorRecord> errors;
2020

2121
// Internal fields. Should not be included in toString() output
@@ -39,28 +39,28 @@ public InsertResponse(
3939
}
4040

4141
public Summary getSummary() {
42-
return summary;
42+
return this.summary;
4343
}
4444

45-
public void setSummary(Summary summary) {
46-
this.summary = summary;
47-
}
45+
// public void setSummary(Summary summary) {
46+
// this.summary = summary;
47+
// }
4848

4949
public List<Success> getSuccess() {
50-
return success;
50+
return this.success;
5151
}
5252

53-
public void setSuccess(List<Success> success) {
54-
this.success = success;
55-
}
53+
// public void setSuccess(List<Success> success) {
54+
// this.success = success;
55+
// }
5656

5757
public List<ErrorRecord> getErrors() {
58-
return errors;
58+
return this.errors;
5959
}
6060

61-
public void setErrors(List<ErrorRecord> errors) {
62-
this.errors = errors;
63-
}
61+
// public void setErrors(List<ErrorRecord> errors) {
62+
// this.errors = errors;
63+
// }
6464

6565
public List<Map<String, Object>> getRecordsToRetry() {
6666
if (recordsToRetry == null) {
@@ -70,7 +70,7 @@ public List<Map<String, Object>> getRecordsToRetry() {
7070
recordsToRetry.add(originalPayload.get(index));
7171
}
7272
}
73-
return recordsToRetry;
73+
return this.recordsToRetry;
7474
}
7575

7676
@Override

v3/src/main/java/com/skyflow/vault/data/Success.java

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,58 @@
11
package com.skyflow.vault.data;
22
import com.google.gson.Gson;
3+
import com.google.gson.annotations.Expose;
34

45
import java.util.List;
56
import java.util.Map;
67

78
public class Success {
9+
@Expose(serialize = true)
810
private int index;
11+
@Expose(serialize = true)
912
private String skyflow_id;
13+
@Expose(serialize = true)
1014
private Map<String, List<Token>> tokens;
15+
@Expose(serialize = true)
1116
private Map<String, Object> data;
1217

1318
public int getIndex() {
1419
return index;
1520
}
1621

17-
public void setIndex(int index) {
22+
public Success(int index, String skyflow_id, Map<String, List<Token>> tokens, Map<String, Object> data) {
1823
this.index = index;
24+
this.skyflow_id = skyflow_id;
25+
this.tokens = tokens;
26+
this.data = data;
1927
}
2028

29+
// public void setIndex(int index) {
30+
// this.index = index;
31+
// }
32+
2133
public String getSkyflowId() {
2234
return skyflow_id;
2335
}
2436

25-
public void setSkyflowId(String skyflow_id) {
26-
this.skyflow_id = skyflow_id;
27-
}
37+
// public void setSkyflowId(String skyflow_id) {
38+
// this.skyflow_id = skyflow_id;
39+
// }
2840

2941
public Map<String, List<Token>> getTokens() {
3042
return tokens;
3143
}
3244

33-
public void setTokens(Map<String, List<Token>> tokens) {
34-
this.tokens = tokens;
35-
}
45+
// public void setTokens(Map<String, List<Token>> tokens) {
46+
// this.tokens = tokens;
47+
// }
3648

3749
public Map<String, Object> getData() {
3850
return data;
3951
}
4052

41-
public void setData(Map<String, Object> data) {
42-
this.data = data;
43-
}
53+
// public void setData(Map<String, Object> data) {
54+
// this.data = data;
55+
// }
4456
@Override
4557
public String toString() {
4658
Gson gson = new Gson();

v3/src/main/java/com/skyflow/vault/data/Summary.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
package com.skyflow.vault.data;
22

33
import com.google.gson.Gson;
4+
import com.google.gson.annotations.Expose;
45

56
public class Summary {
7+
@Expose(serialize = true)
68
private int totalRecords;
9+
@Expose(serialize = true)
710
private int totalInserted;
11+
@Expose(serialize = true)
812
private int totalFailed;
913

1014
public Summary() {
@@ -20,25 +24,25 @@ public int getTotalRecords() {
2024
return totalRecords;
2125
}
2226

23-
public void setTotalRecords(int totalRecords) {
24-
this.totalRecords = totalRecords;
25-
}
27+
// public void setTotalRecords(int totalRecords) {
28+
// this.totalRecords = totalRecords;
29+
// }
2630

2731
public int getTotalInserted() {
2832
return totalInserted;
2933
}
3034

31-
public void setTotalInserted(int totalInserted) {
32-
this.totalInserted = totalInserted;
33-
}
35+
// public void setTotalInserted(int totalInserted) {
36+
// this.totalInserted = totalInserted;
37+
// }
3438

3539
public int getTotalFailed() {
3640
return totalFailed;
3741
}
3842

39-
public void setTotalFailed(int totalFailed) {
40-
this.totalFailed = totalFailed;
41-
}
43+
// public void setTotalFailed(int totalFailed) {
44+
// this.totalFailed = totalFailed;
45+
// }
4246

4347
@Override
4448
public String toString() {
Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,20 @@
11
package com.skyflow.vault.data;
22

3+
import com.google.gson.annotations.Expose;
4+
35
public class Token {
6+
@Expose(serialize = true)
47
private String token;
8+
@Expose(serialize = true)
59
private String tokenGroupName;
610

711
public String getToken() {
812
return token;
913
}
1014

11-
public void setToken(String token) {
12-
this.token = token;
13-
}
15+
// public void setToken(String token) {
16+
// this.token = token;
17+
// }
1418

1519
public String getTokenGroupName() {
1620
return tokenGroupName;
@@ -19,4 +23,6 @@ public String getTokenGroupName() {
1923
public void setTokenGroupName(String tokenGroupName) {
2024
this.tokenGroupName = tokenGroupName;
2125
}
26+
27+
2228
}

0 commit comments

Comments
 (0)