Skip to content

Commit c120bb1

Browse files
committed
Add path_match_pattern option
This option is used to skip files if path name doesn't match with pattern.
1 parent 952a469 commit c120bb1

File tree

3 files changed

+51
-3
lines changed

3 files changed

+51
-3
lines changed

README.md

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@
5353
5454
- **session_token**: session token (string, required)
5555
56+
* **path_match_pattern**: regexp to match file paths. If a file path doesn't match with this pattern, the file will be skipped (regexp string, optional)
57+
58+
* **total_file_count_limit**: maximum number of files to read (integer, optional)
5659
5760
## Example
5861
@@ -66,6 +69,22 @@ in:
6669
secret_access_key: AbCxYz123aBcXyZ123
6770
```
6871

72+
To skip files using regexp:
73+
74+
```yaml
75+
in:
76+
type: s3
77+
bucket: my-s3-bucket
78+
path_prefix: logs/csv-
79+
# ...
80+
path_match_pattern: \.csv$ # a file will be skipped if its path doesn't match with this pattern
81+
82+
## some examples of regexp:
83+
#path_match_pattern: /archive/ # match files in .../archive/... directory
84+
#path_match_pattern: /data1/|/data2/ # match files in .../data1/... or .../data2/... directory
85+
#path_match_pattern: .csv$|.csv.gz$ # match files whose suffix is .csv or .csv.gz
86+
```
87+
6988
To use AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables:
7089

7190
```yaml
@@ -100,4 +119,4 @@ To run unit tests, we need to configure the following environment variables.
100119
EMBULK_S3_TEST_BUCKET
101120
EMBULK_S3_TEST_ACCESS_KEY_ID
102121
EMBULK_S3_TEST_SECRET_ACCESS_KEY
103-
```
122+
```

embulk-input-s3/src/main/java/org/embulk/input/s3/FileList.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
import java.util.ArrayList;
66
import java.util.zip.GZIPInputStream;
77
import java.util.zip.GZIPOutputStream;
8+
import java.util.regex.Pattern;
9+
import java.util.regex.Matcher;
810
import java.io.InputStream;
911
import java.io.OutputStream;
1012
import java.io.BufferedOutputStream;
@@ -29,6 +31,10 @@ public class FileList
2931
{
3032
public interface Task
3133
{
34+
@Config("path_match_pattern")
35+
@ConfigDefault("\".*\"")
36+
String getPathMatchPattern();
37+
3238
@Config("total_file_count_limit")
3339
@ConfigDefault("2147483647")
3440
int getTotalFileCountLimit();
@@ -63,17 +69,21 @@ public static class Builder
6369
private String last = null;
6470

6571
private int limitCount = Integer.MAX_VALUE;
72+
private Pattern pathMatchPattern;
73+
6674
private final ByteBuffer castBuffer = ByteBuffer.allocate(4);
6775

6876
public Builder(Task task)
6977
{
7078
this();
7179
this.limitCount = task.getTotalFileCountLimit();
80+
this.pathMatchPattern = Pattern.compile(task.getPathMatchPattern());
7281
}
7382

7483
public Builder(ConfigSource config)
7584
{
7685
this();
86+
this.pathMatchPattern = Pattern.compile(config.get(String.class, "path_match_pattern", ".*"));
7787
this.limitCount = config.get(int.class, "total_file_count_limit", Integer.MAX_VALUE);
7888
}
7989

@@ -94,6 +104,12 @@ public Builder limitTotalFileCount(int limitCount)
94104
return this;
95105
}
96106

107+
public Builder pathMatchPattern(String pattern)
108+
{
109+
this.pathMatchPattern = Pattern.compile(pattern);
110+
return this;
111+
}
112+
97113
public int size()
98114
{
99115
return entries.size();
@@ -104,6 +120,7 @@ public boolean needsMore()
104120
return size() < limitCount;
105121
}
106122

123+
// returns true if this file is used
107124
public synchronized boolean add(String path, long size)
108125
{
109126
// TODO throw IllegalStateException if stream is already closed
@@ -112,8 +129,9 @@ public synchronized boolean add(String path, long size)
112129
return false;
113130
}
114131

115-
// TODO in the future, support some other filtering parameters (file name suffix filter, regex filter, etc)
116-
// and return false if filtered out.
132+
if (!pathMatchPattern.matcher(path).matches()) {
133+
return false;
134+
}
117135

118136
int index = entries.size();
119137
entries.add(new Entry(index, size));

embulk-input-s3/src/test/java/org/embulk/input/s3/TestS3FileInputPlugin.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,17 @@ public void useTotalFileCountLimit()
115115
assertEquals(0, getRecords(config, output).size());
116116
}
117117

118+
@Test
119+
public void usePathMatchPattern()
120+
throws Exception
121+
{
122+
ConfigSource config = this.config.deepCopy().set("path_match_pattern", "/match/");
123+
ConfigDiff configDiff = runner.transaction(config, new Control(runner, output));
124+
125+
assertNull(configDiff.get(String.class, "last_path"));
126+
assertEquals(0, getRecords(config, output).size());
127+
}
128+
118129
static class Control
119130
implements InputPlugin.Control
120131
{

0 commit comments

Comments
 (0)