diff --git a/contrib/hadoop/src/main/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSource.java b/contrib/hadoop/src/main/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSource.java index e8981d2d6a..cffc475d71 100644 --- a/contrib/hadoop/src/main/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSource.java +++ b/contrib/hadoop/src/main/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSource.java @@ -239,12 +239,21 @@ private Coder getDefaultCoder(Class c) { public long getEstimatedSizeBytes(PipelineOptions options) { long size = 0; try { + // If this source represents a split from splitIntoBundles, then return the size of the split, + // rather then the entire input + if (serializableSplit != null) { + return serializableSplit.getSplit().getLength(); + } + Job job = Job.getInstance(); // new instance for (FileStatus st : listStatus(createFormat(job), job)) { size += st.getLen(); } } catch (IOException | NoSuchMethodException | InvocationTargetException - | IllegalAccessException | InstantiationException e) { + | IllegalAccessException | InstantiationException) { + // ignore, and return 0 + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // ignore, and return 0 } return size; diff --git a/contrib/hadoop/src/test/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSourceTest.java b/contrib/hadoop/src/test/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSourceTest.java index cef3c08348..eac54a1e31 100644 --- a/contrib/hadoop/src/test/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSourceTest.java +++ b/contrib/hadoop/src/test/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSourceTest.java @@ -152,6 +152,29 @@ public void testSplits() throws Exception { assertTrue(nonEmptySplits > 2); } + @Test + public void testSplitEstimatedSize() throws Exception { + PipelineOptions options = PipelineOptionsFactory.create(); + + List> expectedResults = createRandomRecords(3, 10000, 0); + File file = createFileWithData("tmp.avro", expectedResults); + + HadoopFileSource source = HadoopFileSource.from( + file.toString(), SequenceFileInputFormat.class, IntWritable.class, Text.class + ); + + long originalSize = source.getEstimatedSizeBytes(options); + long splitTotalSize = 0; + List>> splits = source.splitIntoBundles( + SequenceFile.SYNC_INTERVAL, options + ); + for (BoundedSource> splitSource : splits) { + splitTotalSize += splitSource.getEstimatedSizeBytes(options); + } + // Assert that the estimated size of the whole is the sum of its parts + assertEquals(originalSize, splitTotalSize); + } + private File createFileWithData(String filename, List> records) throws IOException { File tmpFile = tmpFolder.newFile(filename);