Skip to content

Commit a5a0034

Browse files
committed
Merge pull request #14 from civitaspo/avoid_create_0byte_files
Avoid create 0byte files
2 parents 6fa4524 + 29283f9 commit a5a0034

File tree

2 files changed

+64
-11
lines changed

2 files changed

+64
-11
lines changed
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
hdfs_example: &hdfs_example
2+
config_files:
3+
- /etc/hadoop/conf/core-site.xml
4+
- /etc/hadoop/conf/hdfs-site.xml
5+
config:
6+
fs.defaultFS: 'hdfs://hadoop-nn1:8020'
7+
fs.hdfs.impl: 'org.apache.hadoop.hdfs.DistributedFileSystem'
8+
fs.file.impl: 'org.apache.hadoop.fs.LocalFileSystem'
9+
10+
local_fs_example: &local_fs_example
11+
config:
12+
fs.defaultFS: 'file:///'
13+
fs.hdfs.impl: 'org.apache.hadoop.fs.RawLocalFileSystem'
14+
fs.file.impl: 'org.apache.hadoop.fs.RawLocalFileSystem'
15+
io.compression.codecs: 'org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec'
16+
17+
exec:
18+
min_output_tasks: 10
19+
20+
in:
21+
type: file
22+
path_prefix: example/data
23+
parser:
24+
charset: UTF-8
25+
newline: CRLF
26+
type: csv
27+
delimiter: ','
28+
quote: '"'
29+
header_line: true
30+
stop_on_invalid_record: true
31+
columns:
32+
- {name: id, type: long}
33+
- {name: account, type: long}
34+
- {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
35+
- {name: purchase, type: timestamp, format: '%Y%m%d'}
36+
- {name: comment, type: string}
37+
38+
39+
out:
40+
type: hdfs
41+
<<: *local_fs_example
42+
path_prefix: /tmp/embulk-output-hdfs_example/file_
43+
file_ext: csv
44+
delete_in_advance: FILE_ONLY
45+
formatter:
46+
type: csv
47+
newline: CRLF
48+
newline_in_field: LF
49+
header_line: false
50+
charset: UTF-8
51+
quote_policy: NONE
52+
quote: '"'
53+
escape: '\'
54+
null_string: ''
55+
default_timezone: UTC

src/main/java/org/embulk/output/hdfs/HdfsFileOutputPlugin.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -122,30 +122,28 @@ public TransactionalFileOutput open(TaskSource taskSource, final int taskIndex)
122122
{
123123
private final List<String> hdfsFileNames = new ArrayList<>();
124124
private int fileIndex = 0;
125+
private Path currentPath = null;
125126
private OutputStream output = null;
126127

127128
@Override
128129
public void nextFile()
129130
{
130131
closeCurrentStream();
131-
Path path = new Path(pathPrefix + String.format(sequenceFormat, taskIndex, fileIndex) + pathSuffix);
132-
try {
133-
FileSystem fs = getFs(task);
134-
output = fs.create(path, task.getOverwrite());
135-
logger.info("Uploading '{}'", path);
136-
}
137-
catch (IOException e) {
138-
logger.error(e.getMessage());
139-
throw new RuntimeException(e);
140-
}
141-
hdfsFileNames.add(path.toString());
132+
currentPath = new Path(pathPrefix + String.format(sequenceFormat, taskIndex, fileIndex) + pathSuffix);
142133
fileIndex++;
143134
}
144135

145136
@Override
146137
public void add(Buffer buffer)
147138
{
148139
try {
140+
// this implementation is for creating file when there is data.
141+
if (output == null) {
142+
FileSystem fs = getFs(task);
143+
output = fs.create(currentPath, task.getOverwrite());
144+
logger.info("Uploading '{}'", currentPath);
145+
hdfsFileNames.add(currentPath.toString());
146+
}
149147
output.write(buffer.array(), buffer.offset(), buffer.limit());
150148
}
151149
catch (IOException e) {

0 commit comments

Comments
 (0)