Skip to content

Commit

Permalink
handle spark 3.0 / 3.1
Browse files Browse the repository at this point in the history
  • Loading branch information
Prashant Singh committed Apr 4, 2022
1 parent 14983d3 commit 71ac6bb
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 10 deletions.
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/iceberg/MicroBatches.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ private static List<Pair<ManifestFile, Integer>> skipManifests(List<Pair<Manifes
* @param scanAllFiles Used to check whether all the data files should be processed, or only added files.
* @return A MicroBatch.
*/
@SuppressWarnings("checkstyle:CyclomaticComplexity")
private MicroBatch generateMicroBatch(
List<Pair<ManifestFile, Integer>> indexedManifests,
long startFileIndex, long endFileIndex, long targetSizeInBytes, boolean scanAllFiles) {
Expand All @@ -203,7 +204,7 @@ private MicroBatch generateMicroBatch(
while (taskIter.hasNext()) {
FileScanTask task = taskIter.next();
// want to read [startFileIndex ... endFileIndex)
if (currentFileIndex >= startFileIndex && currentFileIndex < endFileIndex) {
if (currentFileIndex >= startFileIndex) {
// Make sure there's at least one task in each MicroBatch to void job to be stuck, always add task
// firstly.
tasks.add(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void testGenerateMicroBatchWithSmallTargetSize() {

MicroBatch batch3 = MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch2.endFileIndex(), 4,10L, true);
.generate(batch2.endFileIndex(), 4, 10L, true);
Assert.assertEquals(batch3.endFileIndex(), 4);
Assert.assertEquals(batch3.sizeInBytes(), 10);
filesMatch(Lists.newArrayList("D"), filesToScan(batch3.tasks()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,13 @@ private List<FileScanTask> planFiles(StreamingOffset startOffset, StreamingOffse
continue;
}

MicroBatch latestMicroBatch = MicroBatches.from(table.snapshot(currentOffset.snapshotId()), table.io())
Snapshot currentSnapshot = table.snapshot(currentOffset.snapshotId());

MicroBatch latestMicroBatch = MicroBatches.from(currentSnapshot, table.io())
.caseSensitive(caseSensitive)
.specsById(table.specs())
.generate(currentOffset.position(), Long.MAX_VALUE, currentOffset.shouldScanAllFiles());
.generate(currentOffset.position(), Iterables.size(currentSnapshot.addedFiles()), Long.MAX_VALUE,
currentOffset.shouldScanAllFiles());

fileScanTasks.addAll(latestMicroBatch.tasks());
} while (currentOffset.snapshotId() != endOffset.snapshotId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,12 @@ private List<FileScanTask> planFiles(StreamingOffset startOffset, StreamingOffse
continue;
}

MicroBatch latestMicroBatch = MicroBatches.from(table.snapshot(currentOffset.snapshotId()), table.io())
Snapshot currentSnapshot = table.snapshot(currentOffset.snapshotId());
MicroBatch latestMicroBatch = MicroBatches.from(currentSnapshot, table.io())
.caseSensitive(caseSensitive)
.specsById(table.specs())
.generate(currentOffset.position(), Long.MAX_VALUE, currentOffset.shouldScanAllFiles());
.generate(currentOffset.position(), Iterables.size(currentSnapshot.addedFiles()), Long.MAX_VALUE,
currentOffset.shouldScanAllFiles());

fileScanTasks.addAll(latestMicroBatch.tasks());
} while (currentOffset.snapshotId() != endOffset.snapshotId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,9 @@ private List<FileScanTask> planFiles(StreamingOffset startOffset, StreamingOffse

// [(startOffset : startFileIndex), (endOffset : endFileIndex) )
do {
long startFileIndex = 0;
long endFileIndex;
if (currentOffset == null) {
currentOffset = batchStartOffset;
startFileIndex = batchStartOffset.position();
} else {
Snapshot snapshotAfter = SnapshotUtil.snapshotAfter(table, currentOffset.snapshotId());
// it may happen that we need to read this snapshot partially in case it's equal to endOffset.
Expand All @@ -216,7 +214,7 @@ private List<FileScanTask> planFiles(StreamingOffset startOffset, StreamingOffse
MicroBatch latestMicroBatch = MicroBatches.from(currentSnapshot, table.io())
.caseSensitive(caseSensitive)
.specsById(table.specs())
.generate(startFileIndex, endFileIndex, Long.MAX_VALUE, currentOffset.shouldScanAllFiles());
.generate(currentOffset.position(), endFileIndex, Long.MAX_VALUE, currentOffset.shouldScanAllFiles());

fileScanTasks.addAll(latestMicroBatch.tasks());
} while (currentOffset.snapshotId() != endOffset.snapshotId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfRows_1
}

@Test
public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfFiles_4() throws Exception {
public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfRows_4() throws Exception {
appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);

Assert.assertEquals(2,
Expand Down

0 comments on commit 71ac6bb

Please sign in to comment.