Skip to content

Commit 8bff026

Browse files
committed
remove RetryableInputStream and use org.embulk.spi.util.ResumableInputStream
1 parent 1511054 commit 8bff026

File tree

3 files changed

+5
-132
lines changed

3 files changed

+5
-132
lines changed

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ configurations {
1616
version = "0.1.7"
1717

1818
dependencies {
19-
compile "org.embulk:embulk-core:0.5.3"
20-
provided "org.embulk:embulk-core:0.5.3"
19+
compile "org.embulk:embulk-core:0.6.13"
20+
provided "org.embulk:embulk-core:0.6.13"
2121
compile "com.amazonaws:aws-java-sdk-s3:1.9.22"
2222
testCompile "junit:junit:4.+"
2323
testCompile "org.mockito:mockito-core:1.+"

src/main/java/org/embulk/input/s3/RetryableInputStream.java

Lines changed: 0 additions & 128 deletions
This file was deleted.

src/main/java/org/embulk/input/s3/S3FileInputPlugin.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.embulk.spi.FileInputPlugin;
3535
import org.embulk.spi.TransactionalFileInput;
3636
import org.embulk.spi.util.InputStreamFileInput;
37+
import org.embulk.spi.util.ResumableInputStream;
3738
import org.embulk.input.s3.RetryExecutor.Retryable;
3839
import org.embulk.input.s3.RetryExecutor.RetryGiveupException;
3940
import static org.embulk.input.s3.RetryExecutor.retryExecutor;
@@ -207,7 +208,7 @@ public TransactionalFileInput open(TaskSource taskSource, int taskIndex)
207208
}
208209

209210
private static class S3InputStreamReopener
210-
implements RetryableInputStream.Reopener
211+
implements ResumableInputStream.Reopener
211212
{
212213
private final Logger log = Exec.getLogger(S3FileInputPlugin.class);
213214

@@ -302,7 +303,7 @@ public InputStream openNext() throws IOException
302303
opened = true;
303304
GetObjectRequest request = new GetObjectRequest(bucket, key);
304305
S3Object obj = client.getObject(request);
305-
return new RetryableInputStream(obj.getObjectContent(), new S3InputStreamReopener(client, request, obj.getObjectMetadata().getContentLength()));
306+
return new ResumableInputStream(obj.getObjectContent(), new S3InputStreamReopener(client, request, obj.getObjectMetadata().getContentLength()));
306307
}
307308

308309
@Override

0 commit comments

Comments
 (0)