Skip to content

Commit 17d2570

Browse files
committed
Add FileList unit which support limiting number of input files
1 parent 5dd8ffe commit 17d2570

File tree

3 files changed

+332
-35
lines changed

3 files changed

+332
-35
lines changed

embulk-input-s3/src/main/java/org/embulk/input/s3/AbstractS3FileInputPlugin.java

Lines changed: 22 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.util.List;
44
import java.util.ArrayList;
55
import java.util.Collections;
6+
import java.util.Iterator;
67
import java.io.IOException;
78
import java.io.InterruptedIOException;
89
import java.io.InputStream;
@@ -46,7 +47,7 @@ public abstract class AbstractS3FileInputPlugin
4647
private final Logger log = Exec.getLogger(S3FileInputPlugin.class);
4748

4849
public interface PluginTask
49-
extends AwsCredentialsTask, Task
50+
extends AwsCredentialsTask, FileList.Task, Task
5051
{
5152
@Config("bucket")
5253
public String getBucket();
@@ -64,8 +65,8 @@ public interface PluginTask
6465

6566
// TODO timeout, ssl, etc
6667

67-
public List<String> getFiles();
68-
public void setFiles(List<String> files);
68+
public FileList getFiles();
69+
public void setFiles(FileList files);
6970

7071
@ConfigInject
7172
public BufferAllocator getBufferAllocator();
@@ -82,7 +83,7 @@ public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control contr
8283
task.setFiles(listFiles(task));
8384

8485
// number of processors is same with number of files
85-
return resume(task.dump(), task.getFiles().size(), control);
86+
return resume(task.dump(), task.getFiles().getTaskCount(), control);
8687
}
8788

8889
@Override
@@ -101,16 +102,7 @@ public ConfigDiff resume(TaskSource taskSource,
101102
ConfigDiff configDiff = Exec.newConfigDiff();
102103

103104
// last_path
104-
if (task.getFiles().isEmpty()) {
105-
// keep the last value
106-
if (task.getLastPath().isPresent()) {
107-
configDiff.set("last_path", task.getLastPath().get());
108-
}
109-
} else {
110-
List<String> files = new ArrayList<String>(task.getFiles());
111-
Collections.sort(files);
112-
configDiff.set("last_path", files.get(files.size() - 1));
113-
}
105+
configDiff.set("last_path", task.getFiles().getLastPath(task.getLastPath()));
114106

115107
return configDiff;
116108
}
@@ -145,7 +137,7 @@ protected ClientConfiguration getClientConfiguration(PluginTask task)
145137
return clientConfig;
146138
}
147139

148-
private List<String> listFiles(PluginTask task)
140+
private FileList listFiles(PluginTask task)
149141
{
150142
AmazonS3Client client = newS3Client(task);
151143
String bucketName = task.getBucket();
@@ -154,32 +146,35 @@ private List<String> listFiles(PluginTask task)
154146
log.info("Listing files with prefix \"/\". This doesn't mean all files in a bucket. If you intend to read all files, use \"path_prefix: ''\" (empty string) instead.");
155147
}
156148

157-
return listS3FilesByPrefix(client, bucketName, task.getPathPrefix(), task.getLastPath());
149+
FileList.Builder builder = new FileList.Builder(task);
150+
listS3FilesByPrefix(builder, client, bucketName,
151+
task.getPathPrefix(), task.getLastPath());
152+
return builder.build();
158153
}
159154

160155
/**
161156
* Lists S3 filenames filtered by prefix.
162157
*
163158
* The resulting list does not include the file that's size == 0.
164159
*/
165-
public static List<String> listS3FilesByPrefix(AmazonS3Client client, String bucketName,
160+
public static void listS3FilesByPrefix(FileList.Builder builder,
161+
AmazonS3Client client, String bucketName,
166162
String prefix, Optional<String> lastPath)
167163
{
168-
ImmutableList.Builder<String> builder = ImmutableList.builder();
169-
170164
String lastKey = lastPath.orNull();
171165
do {
172166
ListObjectsRequest req = new ListObjectsRequest(bucketName, prefix, lastKey, null, 1024);
173167
ObjectListing ol = client.listObjects(req);
174-
for(S3ObjectSummary s : ol.getObjectSummaries()) {
168+
for (S3ObjectSummary s : ol.getObjectSummaries()) {
175169
if (s.getSize() > 0) {
176-
builder.add(s.getKey());
170+
builder.add(s.getKey(), s.getSize());
171+
if (!builder.more()) {
172+
return;
173+
}
177174
}
178175
}
179176
lastKey = ol.getNextMarker();
180177
} while(lastKey != null);
181-
182-
return builder.build();
183178
}
184179

185180
@Override
@@ -283,24 +278,22 @@ private class SingleFileProvider
283278
{
284279
private AmazonS3Client client;
285280
private final String bucket;
286-
private final String key;
287-
private boolean opened = false;
281+
private final Iterator<String> iterator;
288282

289283
public SingleFileProvider(PluginTask task, int taskIndex)
290284
{
291285
this.client = newS3Client(task);
292286
this.bucket = task.getBucket();
293-
this.key = task.getFiles().get(taskIndex);
287+
this.iterator = task.getFiles().get(taskIndex).iterator();
294288
}
295289

296290
@Override
297291
public InputStream openNext() throws IOException
298292
{
299-
if (opened) {
293+
if (!iterator.hasNext()) {
300294
return null;
301295
}
302-
opened = true;
303-
GetObjectRequest request = new GetObjectRequest(bucket, key);
296+
GetObjectRequest request = new GetObjectRequest(bucket, iterator.next());
304297
S3Object obj = client.getObject(request);
305298
return new ResumableInputStream(obj.getObjectContent(), new S3InputStreamReopener(client, request, obj.getObjectMetadata().getContentLength()));
306299
}

0 commit comments

Comments
 (0)