Skip to content

Commit b546158

Browse files
committed
Add min_task_size option
This option is useful to combine multiple input files into a single task so that output plugins or executor plugin can maximize their performance.
1 parent 4211292 commit b546158

File tree

1 file changed

+27
-3
lines changed

1 file changed

+27
-3
lines changed

embulk-input-s3/src/main/java/org/embulk/input/s3/FileList.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ public interface Task
3838
@Config("total_file_count_limit")
3939
@ConfigDefault("2147483647")
4040
int getTotalFileCountLimit();
41+
42+
// TODO support more algorithms to combine tasks
43+
@Config("min_task_size")
44+
@ConfigDefault("0")
45+
long getMinTaskSize();
4146
}
4247

4348
public static class Entry
@@ -69,22 +74,25 @@ public static class Builder
6974
private String last = null;
7075

7176
private int limitCount = Integer.MAX_VALUE;
77+
private long minTaskSize = 1;
7278
private Pattern pathMatchPattern;
7379

7480
private final ByteBuffer castBuffer = ByteBuffer.allocate(4);
7581

7682
public Builder(Task task)
7783
{
7884
this();
79-
this.limitCount = task.getTotalFileCountLimit();
8085
this.pathMatchPattern = Pattern.compile(task.getPathMatchPattern());
86+
this.limitCount = task.getTotalFileCountLimit();
87+
this.minTaskSize = task.getMinTaskSize();
8188
}
8289

8390
public Builder(ConfigSource config)
8491
{
8592
this();
8693
this.pathMatchPattern = Pattern.compile(config.get(String.class, "path_match_pattern", ".*"));
8794
this.limitCount = config.get(int.class, "total_file_count_limit", Integer.MAX_VALUE);
95+
this.minTaskSize = config.get(long.class, "min_task_size", 0L);
8896
}
8997

9098
public Builder()
@@ -104,6 +112,12 @@ public Builder limitTotalFileCount(int limitCount)
104112
return this;
105113
}
106114

115+
public Builder minTaskSize(long bytes)
116+
{
117+
this.minTaskSize = bytes;
118+
return this;
119+
}
120+
107121
public Builder pathMatchPattern(String pattern)
108122
{
109123
this.pathMatchPattern = Pattern.compile(pattern);
@@ -163,10 +177,20 @@ public FileList build()
163177

164178
private List<List<Entry>> getSplits(List<Entry> all)
165179
{
166-
// TODO combine multiple entries into one task using some configuration parameters
167180
List<List<Entry>> tasks = new ArrayList<>();
181+
long currentTaskSize = 0;
182+
List<Entry> currentTask = new ArrayList<>();
168183
for (Entry entry : all) {
169-
tasks.add(ImmutableList.of(entry));
184+
currentTask.add(entry);
185+
currentTaskSize += entry.getSize(); // TODO consider to multiply the size by cost_per_byte, and add cost_per_file
186+
if (currentTaskSize >= minTaskSize) {
187+
tasks.add(currentTask);
188+
currentTask = new ArrayList<>();
189+
currentTaskSize = 0;
190+
}
191+
}
192+
if (!currentTask.isEmpty()) {
193+
tasks.add(currentTask);
170194
}
171195
return tasks;
172196
}

0 commit comments

Comments
 (0)