Skip to content

Commit 2f70273

Browse files
committed
Can choose a policy in delete_in_advance option
1 parent c3ba201 commit 2f70273

File tree

3 files changed

+89
-29
lines changed

3 files changed

+89
-29
lines changed

README.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,13 @@ A File Output Plugin for Embulk to write HDFS.
2323
- **overwrite** overwrite files when the same filenames already exists (boolean, default: `false`)
2424
- *caution*: even if this property is `true`, this does not mean ensuring the idempotence. if you want to ensure the idempotence, you need the procedures to remove output files after or before running.
2525
- **doas** username which access to Hdfs (string, default: executed user)
26-
- **delete_in_advance** delete files and directories having `path_prefix` in advance (boolean, default: `false`)
26+
- **delete_in_advance** delete files and directories having `path_prefix` in advance (enum, default: `NONE`)
27+
- `NONE`: do nothing
28+
- `FILE_ONLY`: delete files
29+
- `RECURSIVE`: delete files and directories
2730

2831
## CAUTION
29-
If you use `hadoop` user (hdfs admin user) as `doas`, and if `delete_in_advance` is true,
32+
If you use `hadoop` user (hdfs admin user) as `doas`, and if `delete_in_advance` is `RECURSIVE`,
3033
`embulk-output-hdfs` can delete any files and directories you indicate as `path_prefix`,
3134
this means `embulk-output-hdfs` can destroy your hdfs.
3235
So, please be careful when you use `delete_in_advance` option and `doas` option ...

src/main/java/org/embulk/output/hdfs/HdfsFileOutputPlugin.java

Lines changed: 42 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.embulk.config.Config;
1010
import org.embulk.config.ConfigDefault;
1111
import org.embulk.config.ConfigDiff;
12+
import org.embulk.config.ConfigException;
1213
import org.embulk.config.ConfigSource;
1314
import org.embulk.config.Task;
1415
import org.embulk.config.TaskReport;
@@ -28,6 +29,8 @@
2829
import java.util.List;
2930
import java.util.Map;
3031

32+
import static org.embulk.output.hdfs.HdfsFileOutputPlugin.PluginTask.*;
33+
3134
public class HdfsFileOutputPlugin
3235
implements FileOutputPlugin
3336
{
@@ -48,7 +51,7 @@ public interface PluginTask
4851
String getPathPrefix();
4952

5053
@Config("file_ext")
51-
String getFileNameExtension();
54+
String getFileExt();
5255

5356
@Config("sequence_format")
5457
@ConfigDefault("\"%03d.%02d.\"")
@@ -66,9 +69,10 @@ public interface PluginTask
6669
@ConfigDefault("null")
6770
Optional<String> getDoas();
6871

72+
enum DeleteInAdvancePolicy{ NONE, FILE_ONLY, RECURSIVE}
6973
@Config("delete_in_advance")
70-
@ConfigDefault("false")
71-
boolean getDeleteInAdvance();
74+
@ConfigDefault("\"NONE\"")
75+
DeleteInAdvancePolicy getDeleteInAdvance();
7276
}
7377

7478
@Override
@@ -77,19 +81,13 @@ public ConfigDiff transaction(ConfigSource config, int taskCount,
7781
{
7882
PluginTask task = config.loadConfig(PluginTask.class);
7983

80-
if (task.getDeleteInAdvance()) {
81-
final String pathPrefix = strftime(task.getPathPrefix(), task.getRewindSeconds());
82-
final Path globPath = new Path(pathPrefix + "*");
83-
try {
84-
FileSystem fs = getFs(task);
85-
for (FileStatus status : fs.globStatus(globPath)) {
86-
logger.debug("delete in advance: {}", status.getPath());
87-
fs.delete(status.getPath(), true);
88-
}
89-
}
90-
catch (IOException e) {
91-
throw Throwables.propagate(e);
92-
}
84+
try {
85+
String pathPrefix = strftime(task.getPathPrefix(), task.getRewindSeconds());
86+
FileSystem fs = getFs(task);
87+
deleteInAdvance(fs, pathPrefix, task.getDeleteInAdvance());
88+
}
89+
catch (IOException e) {
90+
throw Throwables.propagate(e);
9391
}
9492

9593
control.run(task.dump());
@@ -117,7 +115,7 @@ public TransactionalFileOutput open(TaskSource taskSource, final int taskIndex)
117115
final PluginTask task = taskSource.loadTask(PluginTask.class);
118116

119117
final String pathPrefix = strftime(task.getPathPrefix(), task.getRewindSeconds());
120-
final String pathSuffix = task.getFileNameExtension();
118+
final String pathSuffix = task.getFileExt();
121119
final String sequenceFormat = task.getSequenceFormat();
122120

123121
return new TransactionalFileOutput()
@@ -231,4 +229,31 @@ private String strftime(final String raw, final int rewind_seconds)
231229
String.format("(Time.now - %s).strftime('%s')", String.valueOf(rewind_seconds), raw));
232230
return resolved.toString();
233231
}
232+
233+
private void deleteInAdvance(FileSystem fs, String pathPrefix, DeleteInAdvancePolicy deleteInAdvancePolicy)
234+
throws IOException
235+
{
236+
final Path globPath = new Path(pathPrefix + "*");
237+
switch (deleteInAdvancePolicy) {
238+
case NONE:
239+
// do nothing
240+
break;
241+
case FILE_ONLY:
242+
for (FileStatus status : fs.globStatus(globPath)) {
243+
if (status.isFile()) {
244+
logger.debug("delete in advance: {}", status.getPath());
245+
fs.delete(status.getPath(), false);
246+
}
247+
}
248+
break;
249+
case RECURSIVE:
250+
for (FileStatus status : fs.globStatus(globPath)) {
251+
logger.debug("delete in advance: {}", status.getPath());
252+
fs.delete(status.getPath(), true);
253+
}
254+
break;
255+
default:
256+
throw new ConfigException("`delete_in_advance` must not null.");
257+
}
258+
}
234259
}

src/test/java/org/embulk/output/hdfs/TestHdfsFileOutputPlugin.java

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -102,14 +102,14 @@ public void testDefaultValues()
102102
ConfigSource config = getBaseConfigSource();
103103
PluginTask task = config.loadConfig(PluginTask.class);
104104
assertEquals(pathPrefix, task.getPathPrefix());
105-
assertEquals("csv", task.getFileNameExtension());
105+
assertEquals("csv", task.getFileExt());
106106
assertEquals("%03d.%02d.", task.getSequenceFormat());
107107
assertEquals(Lists.newArrayList(), task.getConfigFiles());
108108
assertEquals(Maps.newHashMap(), task.getConfig());
109109
assertEquals(0, task.getRewindSeconds());
110110
assertEquals(false, task.getOverwrite());
111111
assertEquals(Optional.absent(), task.getDoas());
112-
assertEquals(false, task.getDeleteInAdvance());
112+
assertEquals(PluginTask.DeleteInAdvancePolicy.NONE, task.getDeleteInAdvance());
113113
}
114114

115115
@Test(expected = ConfigException.class)
@@ -119,24 +119,25 @@ public void testRequiredValues()
119119
PluginTask task = config.loadConfig(PluginTask.class);
120120
}
121121

122-
private List<String> lsR(List<String> fileNames, java.nio.file.Path dir)
122+
private List<String> lsR(List<String> names, java.nio.file.Path dir)
123123
{
124124
try (DirectoryStream<java.nio.file.Path> stream = Files.newDirectoryStream(dir)) {
125125
for (java.nio.file.Path path : stream) {
126126
if (path.toFile().isDirectory()) {
127127
logger.debug("[lsR] find a directory: {}", path.toAbsolutePath().toString());
128-
lsR(fileNames, path);
128+
names.add(path.toAbsolutePath().toString());
129+
lsR(names, path);
129130
}
130131
else {
131132
logger.debug("[lsR] find a file: {}", path.toAbsolutePath().toString());
132-
fileNames.add(path.toAbsolutePath().toString());
133+
names.add(path.toAbsolutePath().toString());
133134
}
134135
}
135136
}
136137
catch (IOException e) {
137138
logger.debug(e.getMessage(), e);
138139
}
139-
return fileNames;
140+
return names;
140141
}
141142

142143
private void run(ConfigSource config)
@@ -218,12 +219,12 @@ public void testBulkLoad()
218219
}
219220

220221
@Test
221-
public void testDeleteInAdvance()
222+
public void testDeleteRECURSIVEInAdvance()
222223
throws IOException
223224
{
224225
for (int n = 0; n <= 10; n++) {
225-
tmpFolder.newFile("embulk-output-hdfs_" + n + ".txt");
226-
tmpFolder.newFolder("embulk-output-hdfs_" + n);
226+
tmpFolder.newFile("embulk-output-hdfs_file_" + n + ".txt");
227+
tmpFolder.newFolder("embulk-output-hdfs_directory_" + n);
227228
}
228229

229230
List<String> fileListBeforeRun = lsR(Lists.<String>newArrayList(), Paths.get(tmpFolder.getRoot().getAbsolutePath()));
@@ -233,16 +234,47 @@ public void testDeleteInAdvance()
233234
.set("fs.hdfs.impl", "org.apache.hadoop.fs.RawLocalFileSystem")
234235
.set("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem")
235236
.set("fs.defaultFS", "file:///"))
236-
.set("delete_in_advance", true);
237+
.set("delete_in_advance", "RECURSIVE");
237238

238239
run(config);
239240

240241
List<String> fileListAfterRun = lsR(Lists.<String>newArrayList(), Paths.get(tmpFolder.getRoot().getAbsolutePath()));
241242
assertNotEquals(fileListBeforeRun, fileListAfterRun);
243+
assertThat(fileListAfterRun, not(hasItem(containsString("embulk-output-hdfs_directory"))));
242244
assertThat(fileListAfterRun, not(hasItem(containsString("txt"))));
243245
assertThat(fileListAfterRun, hasItem(containsString(pathPrefix + "001.00.csv")));
244246
assertRecordsInFile(String.format("%s/%s001.00.csv",
245247
tmpFolder.getRoot().getAbsolutePath(),
246248
pathPrefix));
247249
}
250+
251+
@Test
252+
public void testDeleteFILE_ONLYInAdvance()
253+
throws IOException
254+
{
255+
for (int n = 0; n <= 10; n++) {
256+
tmpFolder.newFile("embulk-output-hdfs_file_" + n + ".txt");
257+
tmpFolder.newFolder("embulk-output-hdfs_directory_" + n);
258+
}
259+
260+
List<String> fileListBeforeRun = lsR(Lists.<String>newArrayList(), Paths.get(tmpFolder.getRoot().getAbsolutePath()));
261+
262+
ConfigSource config = getBaseConfigSource()
263+
.setNested("config", Exec.newConfigSource()
264+
.set("fs.hdfs.impl", "org.apache.hadoop.fs.RawLocalFileSystem")
265+
.set("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem")
266+
.set("fs.defaultFS", "file:///"))
267+
.set("delete_in_advance", "FILE_ONLY");
268+
269+
run(config);
270+
271+
List<String> fileListAfterRun = lsR(Lists.<String>newArrayList(), Paths.get(tmpFolder.getRoot().getAbsolutePath()));
272+
assertNotEquals(fileListBeforeRun, fileListAfterRun);
273+
assertThat(fileListAfterRun, not(hasItem(containsString("txt"))));
274+
assertThat(fileListAfterRun, hasItem(containsString("embulk-output-hdfs_directory")));
275+
assertThat(fileListAfterRun, hasItem(containsString(pathPrefix + "001.00.csv")));
276+
assertRecordsInFile(String.format("%s/%s001.00.csv",
277+
tmpFolder.getRoot().getAbsolutePath(),
278+
pathPrefix));
279+
}
248280
}

0 commit comments

Comments
 (0)