From 14983d3d08bc33853232e8af159bb6109ef9ddbe Mon Sep 17 00:00:00 2001 From: Prashant Singh Date: Mon, 4 Apr 2022 23:35:23 +0530 Subject: [PATCH] minor refactor --- .../java/org/apache/iceberg/MicroBatches.java | 8 +++--- .../apache/iceberg/TestMicroBatchBuilder.java | 14 +++++----- .../spark/source/SparkMicroBatchStream.java | 27 ++++++++----------- 3 files changed, 23 insertions(+), 26 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/MicroBatches.java b/core/src/main/java/org/apache/iceberg/MicroBatches.java index a70dabec22b1..884ffc85be6d 100644 --- a/core/src/main/java/org/apache/iceberg/MicroBatches.java +++ b/core/src/main/java/org/apache/iceberg/MicroBatches.java @@ -111,6 +111,7 @@ public MicroBatchBuilder specsById(Map specs) { public MicroBatch generate(long startFileIndex, long endFileIndex, long targetSizeInBytes, boolean scanAllFiles) { Preconditions.checkArgument(startFileIndex >= 0, "startFileIndex is unexpectedly smaller than 0"); + Preconditions.checkArgument(endFileIndex >= 0, "endFileIndex is unexpectedly smaller than 0"); Preconditions.checkArgument(targetSizeInBytes > 0, "targetSizeInBytes should be larger than 0"); List manifests = scanAllFiles ? snapshot.dataManifests() : @@ -175,14 +176,15 @@ private static List> skipManifests(List> indexedManifests, - long startFileIndex, long endFileIndex,long targetSizeInBytes, - boolean scanAllFiles) { + private MicroBatch generateMicroBatch( + List> indexedManifests, + long startFileIndex, long endFileIndex, long targetSizeInBytes, boolean scanAllFiles) { if (indexedManifests.isEmpty()) { return new MicroBatch(snapshot.snapshotId(), startFileIndex, endFileIndex, 0L, Collections.emptyList(), true); diff --git a/core/src/test/java/org/apache/iceberg/TestMicroBatchBuilder.java b/core/src/test/java/org/apache/iceberg/TestMicroBatchBuilder.java index 17576b30ba15..1b2c8352d4be 100644 --- a/core/src/test/java/org/apache/iceberg/TestMicroBatchBuilder.java +++ b/core/src/test/java/org/apache/iceberg/TestMicroBatchBuilder.java @@ -62,7 +62,7 @@ public void testGenerateMicroBatch() { MicroBatch batch1 = MicroBatches.from(table.snapshot(1L), table.io()) .specsById(table.specs()) - .generate(0, 1,15L, true); + .generate(0, 1, 15L, true); Assert.assertEquals(batch1.endFileIndex(), 1); Assert.assertEquals(batch1.sizeInBytes(), 10); Assert.assertFalse(batch1.lastIndexOfSnapshot()); @@ -70,7 +70,7 @@ public void testGenerateMicroBatch() { MicroBatch batch2 = MicroBatches.from(table.snapshot(1L), table.io()) .specsById(table.specs()) - .generate(batch1.endFileIndex(), 4,30L, true); + .generate(batch1.endFileIndex(), 4, 30L, true); Assert.assertEquals(batch2.endFileIndex(), 4); Assert.assertEquals(batch2.sizeInBytes(), 30); Assert.assertFalse(batch2.lastIndexOfSnapshot()); @@ -78,7 +78,7 @@ public void testGenerateMicroBatch() { MicroBatch batch3 = MicroBatches.from(table.snapshot(1L), table.io()) .specsById(table.specs()) - .generate(batch2.endFileIndex(), 5,50L, true); + .generate(batch2.endFileIndex(), 5, 50L, true); Assert.assertEquals(batch3.endFileIndex(), 5); Assert.assertEquals(batch3.sizeInBytes(), 10); Assert.assertTrue(batch3.lastIndexOfSnapshot()); @@ -101,7 +101,7 @@ public void testGenerateMicroBatchWithSmallTargetSize() { MicroBatch batch1 = MicroBatches.from(table.snapshot(1L), table.io()) .specsById(table.specs()) - .generate(batch.endFileIndex(), 2,5L, true); + .generate(batch.endFileIndex(), 2, 5L, true); Assert.assertEquals(batch1.endFileIndex(), 2); Assert.assertEquals(batch1.sizeInBytes(), 10); filesMatch(Lists.newArrayList("B"), filesToScan(batch1.tasks())); @@ -109,7 +109,7 @@ public void testGenerateMicroBatchWithSmallTargetSize() { MicroBatch batch2 = MicroBatches.from(table.snapshot(1L), table.io()) .specsById(table.specs()) - .generate(batch1.endFileIndex(), 3,10L, true); + .generate(batch1.endFileIndex(), 3, 10L, true); Assert.assertEquals(batch2.endFileIndex(), 3); Assert.assertEquals(batch2.sizeInBytes(), 10); filesMatch(Lists.newArrayList("C"), filesToScan(batch2.tasks())); @@ -125,7 +125,7 @@ public void testGenerateMicroBatchWithSmallTargetSize() { MicroBatch batch4 = MicroBatches.from(table.snapshot(1L), table.io()) .specsById(table.specs()) - .generate(batch3.endFileIndex(), 5,5L, true); + .generate(batch3.endFileIndex(), 5, 5L, true); Assert.assertEquals(batch4.endFileIndex(), 5); Assert.assertEquals(batch4.sizeInBytes(), 10); filesMatch(Lists.newArrayList("E"), filesToScan(batch4.tasks())); @@ -180,4 +180,4 @@ private static void filesMatch(List expected, List actual) { Collections.sort(actual); Assert.assertEquals(expected, actual); } -} \ No newline at end of file +} diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 174f2772ac1a..fd8297194984 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -180,13 +180,11 @@ public void stop() { private List planFiles(StreamingOffset startOffset, StreamingOffset endOffset) { List fileScanTasks = Lists.newArrayList(); StreamingOffset batchStartOffset = StreamingOffset.START_OFFSET.equals(startOffset) ? - determineStartingOffset(table, fromTimestamp, 0) : startOffset; + determineStartingOffset(table, fromTimestamp) : startOffset; StreamingOffset currentOffset = null; - boolean isEndOffset; - // (11, 20), (21, 25) .... (26 ...40) - // [startFileIndex, endFileIndex) + // [(startOffset : startFileIndex), (endOffset : endFileIndex) ) do { long startFileIndex = 0; long endFileIndex; @@ -195,6 +193,7 @@ private List planFiles(StreamingOffset startOffset, StreamingOffse startFileIndex = batchStartOffset.position(); } else { Snapshot snapshotAfter = SnapshotUtil.snapshotAfter(table, currentOffset.snapshotId()); + // it may happen that we need to read this snapshot partially in case it's equal to endOffset. if (currentOffset.snapshotId() != endOffset.snapshotId()) { currentOffset = new StreamingOffset(snapshotAfter.snapshotId(), 0L, false); } else { @@ -249,14 +248,14 @@ private boolean shouldProcess(Snapshot snapshot) { } } - private static StreamingOffset determineStartingOffset(Table table, Long fromTimestamp, int startPos) { + private static StreamingOffset determineStartingOffset(Table table, Long fromTimestamp) { if (table.currentSnapshot() == null) { return StreamingOffset.START_OFFSET; } if (fromTimestamp == null) { // match existing behavior and start from the oldest snapshot - return new StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), startPos, false); + return new StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false); } if (table.currentSnapshot().timestampMillis() < fromTimestamp) { @@ -266,13 +265,13 @@ private static StreamingOffset determineStartingOffset(Table table, Long fromTim try { Snapshot snapshot = SnapshotUtil.oldestAncestorAfter(table, fromTimestamp); if (snapshot != null) { - return new StreamingOffset(snapshot.snapshotId(), startPos, false); + return new StreamingOffset(snapshot.snapshotId(), 0, false); } else { return StreamingOffset.START_OFFSET; } } catch (IllegalStateException e) { // could not determine the first snapshot after the timestamp. use the oldest ancestor instead - return new StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), startPos, false); + return new StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false); } } @@ -295,8 +294,7 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) { StreamingOffset curOffset = (StreamingOffset) startOffset; if (startOffset.equals(StreamingOffset.START_OFFSET)) { - // find the oldest ancestor in this case of the table - curOffset = determineStartingOffset(table, fromTimestamp, 0); + curOffset = determineStartingOffset(table, fromTimestamp); } Snapshot curSnapshot = table.snapshot(curOffset.snapshotId()); @@ -307,9 +305,7 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) { int curRecordCount = 0; int curPos = 0; - // >= pos - // (>= pos && <= endPos) - + // Note : we produce nextOffset with pos as non-inclusive while (isOk) { // start iterating the current Snapshot's added Files // this is under assumption we will be able to add at-least 1 file in the new offset @@ -342,8 +338,7 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) { } } - StreamingOffset so = new StreamingOffset(curSnapshot.snapshotId(), curPos, false); - return so; + return new StreamingOffset(curSnapshot.snapshotId(), curPos, false); } @Override @@ -382,7 +377,7 @@ public StreamingOffset initialOffset() { } table.refresh(); - StreamingOffset offset = determineStartingOffset(table, fromTimestamp, 0); + StreamingOffset offset = determineStartingOffset(table, fromTimestamp); OutputFile outputFile = io.newOutputFile(initialOffsetLocation); writeOffset(offset, outputFile);