Skip to content

Commit 10935c0

Browse files
authored
Merge pull request #54 from trocco-io/retry-upload-files
Add file upload retry handling
2 parents 38a2382 + ad34c64 commit 10935c0

File tree

5 files changed

+51
-8
lines changed

5 files changed

+51
-8
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ dependencies {
5151
testImplementation "org.embulk:embulk-parser-csv:0.10.31"
5252

5353
compile "org.embulk:embulk-output-jdbc:0.10.2"
54-
compile "net.snowflake:snowflake-jdbc:3.13.14"
54+
compile "net.snowflake:snowflake-jdbc:3.13.26"
5555
}
5656
embulkPlugin {
5757
mainClass = "org.embulk.output.SnowflakeOutputPlugin"

gradle/dependency-locks/embulkPluginRuntime.lockfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ com.fasterxml.jackson.core:jackson-core:2.6.7
66
com.fasterxml.jackson.core:jackson-databind:2.6.7
77
com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.6.7
88
javax.validation:validation-api:1.1.0.Final
9-
net.snowflake:snowflake-jdbc:3.13.14
9+
net.snowflake:snowflake-jdbc:3.13.26
1010
org.embulk:embulk-output-jdbc:0.10.2
1111
org.embulk:embulk-util-config:0.3.0
1212
org.embulk:embulk-util-json:0.1.1

makefile

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
build: gradlew/build
2+
gem: gradlew/gem
3+
check: gradlew/check
4+
lint-auto: gradlew/spotlessApply
5+
test: gradlew/test
6+
update-dependencies:
7+
./gradlew dependencies --write-locks
8+
gradlew/%:
9+
./gradlew $(@F)

src/main/java/org/embulk/output/SnowflakeOutputPlugin.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ public interface SnowflakePluginTask extends PluginTask {
5151
@Config("delete_stage")
5252
@ConfigDefault("false")
5353
public boolean getDeleteStage();
54+
55+
@Config("max_upload_retries")
56+
@ConfigDefault("3")
57+
public int getMaxUploadRetries();
5458
}
5559

5660
@Override
@@ -145,7 +149,11 @@ protected BatchInsert newBatchInsert(PluginTask task, Optional<MergeConfig> merg
145149
snowflakeCon.runCreateStage(this.stageIdentifier);
146150
}
147151

148-
return new SnowflakeCopyBatchInsert(getConnector(task, true), this.stageIdentifier, false);
152+
return new SnowflakeCopyBatchInsert(
153+
getConnector(task, true),
154+
this.stageIdentifier,
155+
false,
156+
((SnowflakePluginTask) task).getMaxUploadRetries());
149157
}
150158

151159
@Override

src/main/java/org/embulk/output/snowflake/SnowflakeCopyBatchInsert.java

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public class SnowflakeCopyBatchInsert implements BatchInsert {
2626
protected static final String nullString = "\\N";
2727
protected static final String newLineString = "\n";
2828
protected static final String delimiterString = "\t";
29+
private final int maxUploadRetries;
2930

3031
private SnowflakeOutputConnection connection = null;
3132
private TableIdentifier tableIdentifier = null;
@@ -39,7 +40,10 @@ public class SnowflakeCopyBatchInsert implements BatchInsert {
3940
private List<Future<Void>> uploadAndCopyFutures;
4041

4142
public SnowflakeCopyBatchInsert(
42-
JdbcOutputConnector connector, StageIdentifier stageIdentifier, boolean deleteStageFile)
43+
JdbcOutputConnector connector,
44+
StageIdentifier stageIdentifier,
45+
boolean deleteStageFile,
46+
int maxUploadRetries)
4347
throws IOException {
4448
this.index = 0;
4549
openNewFile();
@@ -48,6 +52,7 @@ public SnowflakeCopyBatchInsert(
4852
this.executorService = Executors.newCachedThreadPool();
4953
this.deleteStageFile = deleteStageFile;
5054
this.uploadAndCopyFutures = new ArrayList();
55+
this.maxUploadRetries = maxUploadRetries;
5156
}
5257

5358
@Override
@@ -251,7 +256,7 @@ public void flush() throws IOException, SQLException {
251256
String snowflakeStageFileName = "embulk_snowflake_" + SnowflakeUtils.randomString(8);
252257

253258
UploadTask uploadTask =
254-
new UploadTask(file, batchRows, stageIdentifier, snowflakeStageFileName);
259+
new UploadTask(file, batchRows, stageIdentifier, snowflakeStageFileName, maxUploadRetries);
255260
Future<Void> uploadFuture = executorService.submit(uploadTask);
256261
uploadAndCopyFutures.add(uploadFuture);
257262

@@ -330,28 +335,49 @@ private class UploadTask implements Callable<Void> {
330335
private final int batchRows;
331336
private final String snowflakeStageFileName;
332337
private final StageIdentifier stageIdentifier;
338+
private final int maxUploadRetries;
333339

334340
public UploadTask(
335-
File file, int batchRows, StageIdentifier stageIdentifier, String snowflakeStageFileName) {
341+
File file,
342+
int batchRows,
343+
StageIdentifier stageIdentifier,
344+
String snowflakeStageFileName,
345+
int maxUploadRetries) {
336346
this.file = file;
337347
this.batchRows = batchRows;
338348
this.snowflakeStageFileName = snowflakeStageFileName;
339349
this.stageIdentifier = stageIdentifier;
350+
this.maxUploadRetries = maxUploadRetries;
340351
}
341352

342-
public Void call() throws IOException, SQLException {
353+
public Void call() throws IOException, SQLException, InterruptedException {
343354
logger.info(
344355
String.format(
345356
"Uploading file id %s to Snowflake (%,d bytes %,d rows)",
346357
snowflakeStageFileName, file.length(), batchRows));
347358

359+
int retries = 0;
348360
try {
349361
long startTime = System.currentTimeMillis();
350362
// put file to snowflake internal storage
351363
SnowflakeOutputConnection con = (SnowflakeOutputConnection) connector.connect(true);
352364

353365
FileInputStream fileInputStream = new FileInputStream(file);
354-
con.runUploadFile(stageIdentifier, snowflakeStageFileName, fileInputStream);
366+
do {
367+
try {
368+
con.runUploadFile(stageIdentifier, snowflakeStageFileName, fileInputStream);
369+
} catch (SQLException e) {
370+
retries++;
371+
if (retries > this.maxUploadRetries) {
372+
throw e;
373+
}
374+
logger.warn(
375+
String.format(
376+
"Upload error %s file %s retries: %d", e, snowflakeStageFileName, retries));
377+
Thread.sleep(retries * retries * 1000);
378+
}
379+
break;
380+
} while (retries < this.maxUploadRetries);
355381

356382
double seconds = (System.currentTimeMillis() - startTime) / 1000.0;
357383

0 commit comments

Comments
 (0)