From efd33cc43061e54d8fe1e16415157de21b904d90 Mon Sep 17 00:00:00 2001 From: igorbernstein2 Date: Thu, 26 Jan 2017 14:57:48 -0500 Subject: [PATCH] =?UTF-8?q?Fix=20HadoopFileSource=E2=80=99s=20split=20size?= =?UTF-8?q?=20estimate=20(#534)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fix HadoopFileSource’s split size estimate * Properly set interrupted state --- .../contrib/hadoop/HadoopFileSource.java | 11 ++++++++- .../contrib/hadoop/HadoopFileSourceTest.java | 23 +++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/contrib/hadoop/src/main/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSource.java b/contrib/hadoop/src/main/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSource.java index e8981d2d6a..cffc475d71 100644 --- a/contrib/hadoop/src/main/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSource.java +++ b/contrib/hadoop/src/main/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSource.java @@ -239,12 +239,21 @@ private Coder getDefaultCoder(Class c) { public long getEstimatedSizeBytes(PipelineOptions options) { long size = 0; try { + // If this source represents a split from splitIntoBundles, then return the size of the split, + // rather then the entire input + if (serializableSplit != null) { + return serializableSplit.getSplit().getLength(); + } + Job job = Job.getInstance(); // new instance for (FileStatus st : listStatus(createFormat(job), job)) { size += st.getLen(); } } catch (IOException | NoSuchMethodException | InvocationTargetException - | IllegalAccessException | InstantiationException e) { + | IllegalAccessException | InstantiationException) { + // ignore, and return 0 + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // ignore, and return 0 } return size; diff --git a/contrib/hadoop/src/test/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSourceTest.java b/contrib/hadoop/src/test/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSourceTest.java index cef3c08348..eac54a1e31 100644 --- a/contrib/hadoop/src/test/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSourceTest.java +++ b/contrib/hadoop/src/test/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSourceTest.java @@ -152,6 +152,29 @@ public void testSplits() throws Exception { assertTrue(nonEmptySplits > 2); } + @Test + public void testSplitEstimatedSize() throws Exception { + PipelineOptions options = PipelineOptionsFactory.create(); + + List> expectedResults = createRandomRecords(3, 10000, 0); + File file = createFileWithData("tmp.avro", expectedResults); + + HadoopFileSource source = HadoopFileSource.from( + file.toString(), SequenceFileInputFormat.class, IntWritable.class, Text.class + ); + + long originalSize = source.getEstimatedSizeBytes(options); + long splitTotalSize = 0; + List>> splits = source.splitIntoBundles( + SequenceFile.SYNC_INTERVAL, options + ); + for (BoundedSource> splitSource : splits) { + splitTotalSize += splitSource.getEstimatedSizeBytes(options); + } + // Assert that the estimated size of the whole is the sum of its parts + assertEquals(originalSize, splitTotalSize); + } + private File createFileWithData(String filename, List> records) throws IOException { File tmpFile = tmpFolder.newFile(filename);