Skip to content

Commit efd33cc

Browse files
igorbernstein2dhalperi
authored andcommitted
Fix HadoopFileSource’s split size estimate (GoogleCloudPlatform#534)
* Fix HadoopFileSource’s split size estimate * Properly set interrupted state
1 parent 2e57ab1 commit efd33cc

File tree

2 files changed

+33
-1
lines changed

2 files changed

+33
-1
lines changed

contrib/hadoop/src/main/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSource.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,12 +239,21 @@ private <T> Coder<T> getDefaultCoder(Class<T> c) {
239239
public long getEstimatedSizeBytes(PipelineOptions options) {
240240
long size = 0;
241241
try {
242+
// If this source represents a split from splitIntoBundles, then return the size of the split,
243+
// rather then the entire input
244+
if (serializableSplit != null) {
245+
return serializableSplit.getSplit().getLength();
246+
}
247+
242248
Job job = Job.getInstance(); // new instance
243249
for (FileStatus st : listStatus(createFormat(job), job)) {
244250
size += st.getLen();
245251
}
246252
} catch (IOException | NoSuchMethodException | InvocationTargetException
247-
| IllegalAccessException | InstantiationException e) {
253+
| IllegalAccessException | InstantiationException) {
254+
// ignore, and return 0
255+
} catch (InterruptedException e) {
256+
Thread.currentThread().interrupt();
248257
// ignore, and return 0
249258
}
250259
return size;

contrib/hadoop/src/test/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSourceTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,29 @@ public void testSplits() throws Exception {
152152
assertTrue(nonEmptySplits > 2);
153153
}
154154

155+
@Test
156+
public void testSplitEstimatedSize() throws Exception {
157+
PipelineOptions options = PipelineOptionsFactory.create();
158+
159+
List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10000, 0);
160+
File file = createFileWithData("tmp.avro", expectedResults);
161+
162+
HadoopFileSource<IntWritable, Text> source = HadoopFileSource.from(
163+
file.toString(), SequenceFileInputFormat.class, IntWritable.class, Text.class
164+
);
165+
166+
long originalSize = source.getEstimatedSizeBytes(options);
167+
long splitTotalSize = 0;
168+
List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source.splitIntoBundles(
169+
SequenceFile.SYNC_INTERVAL, options
170+
);
171+
for (BoundedSource<KV<IntWritable, Text>> splitSource : splits) {
172+
splitTotalSize += splitSource.getEstimatedSizeBytes(options);
173+
}
174+
// Assert that the estimated size of the whole is the sum of its parts
175+
assertEquals(originalSize, splitTotalSize);
176+
}
177+
155178
private File createFileWithData(String filename, List<KV<IntWritable, Text>> records)
156179
throws IOException {
157180
File tmpFile = tmpFolder.newFile(filename);

0 commit comments

Comments
 (0)