Skip to content

Commit

Permalink
handle spark 3.0 / 3.1
Browse files Browse the repository at this point in the history
  • Loading branch information
Prashant Singh committed Apr 12, 2022
1 parent e4eb52a commit b9b1de5
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 24 deletions.
1 change: 1 addition & 0 deletions core/src/main/java/org/apache/iceberg/MicroBatches.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ private static List<Pair<ManifestFile, Integer>> skipManifests(List<Pair<Manifes
* @param scanAllFiles Used to check whether all the data files should be processed, or only added files.
* @return A MicroBatch.
*/
@SuppressWarnings("checkstyle:CyclomaticComplexity")
private MicroBatch generateMicroBatch(
List<Pair<ManifestFile, Integer>> indexedManifests,
long startFileIndex, long endFileIndex, long targetSizeInBytes, boolean scanAllFiles) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void testGenerateMicroBatchWithSmallTargetSize() {

MicroBatch batch3 = MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch2.endFileIndex(), 4,10L, true);
.generate(batch2.endFileIndex(), 4, 10L, true);
Assert.assertEquals(batch3.endFileIndex(), 4);
Assert.assertEquals(batch3.sizeInBytes(), 10);
filesMatch(Lists.newArrayList("D"), filesToScan(batch3.tasks()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,13 @@ private List<FileScanTask> planFiles(StreamingOffset startOffset, StreamingOffse
continue;
}

MicroBatch latestMicroBatch = MicroBatches.from(table.snapshot(currentOffset.snapshotId()), table.io())
Snapshot currentSnapshot = table.snapshot(currentOffset.snapshotId());

MicroBatch latestMicroBatch = MicroBatches.from(currentSnapshot, table.io())
.caseSensitive(caseSensitive)
.specsById(table.specs())
.generate(currentOffset.position(), Long.MAX_VALUE, currentOffset.shouldScanAllFiles());
.generate(currentOffset.position(), Iterables.size(currentSnapshot.addedFiles()), Long.MAX_VALUE,
currentOffset.shouldScanAllFiles());

fileScanTasks.addAll(latestMicroBatch.tasks());
} while (currentOffset.snapshotId() != endOffset.snapshotId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,12 @@ private List<FileScanTask> planFiles(StreamingOffset startOffset, StreamingOffse
continue;
}

MicroBatch latestMicroBatch = MicroBatches.from(table.snapshot(currentOffset.snapshotId()), table.io())
Snapshot currentSnapshot = table.snapshot(currentOffset.snapshotId());
MicroBatch latestMicroBatch = MicroBatches.from(currentSnapshot, table.io())
.caseSensitive(caseSensitive)
.specsById(table.specs())
.generate(currentOffset.position(), Long.MAX_VALUE, currentOffset.shouldScanAllFiles());
.generate(currentOffset.position(), Iterables.size(currentSnapshot.addedFiles()), Long.MAX_VALUE,
currentOffset.shouldScanAllFiles());

fileScanTasks.addAll(latestMicroBatch.tasks());
} while (currentOffset.snapshotId() != endOffset.snapshotId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,9 @@ private List<FileScanTask> planFiles(StreamingOffset startOffset, StreamingOffse

// [(startOffset : startFileIndex), (endOffset : endFileIndex) )
do {
long startFileIndex = 0;
long endFileIndex;
if (currentOffset == null) {
currentOffset = batchStartOffset;
startFileIndex = batchStartOffset.position();
} else {
Snapshot snapshotAfter = SnapshotUtil.snapshotAfter(table, currentOffset.snapshotId());
// it may happen that we need to read this snapshot partially in case it's equal to endOffset.
Expand All @@ -216,7 +214,7 @@ private List<FileScanTask> planFiles(StreamingOffset startOffset, StreamingOffse
MicroBatch latestMicroBatch = MicroBatches.from(currentSnapshot, table.io())
.caseSensitive(caseSensitive)
.specsById(table.specs())
.generate(startFileIndex, endFileIndex, Long.MAX_VALUE, currentOffset.shouldScanAllFiles());
.generate(currentOffset.position(), endFileIndex, Long.MAX_VALUE, currentOffset.shouldScanAllFiles());

fileScanTasks.addAll(latestMicroBatch.tasks());
} while (currentOffset.snapshotId() != endOffset.snapshotId());
Expand Down Expand Up @@ -291,14 +289,14 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) {
}

// end offset can expand to multiple snapshots
StreamingOffset curOffset = (StreamingOffset) startOffset;
StreamingOffset startingOffset = (StreamingOffset) startOffset;

if (startOffset.equals(StreamingOffset.START_OFFSET)) {
curOffset = determineStartingOffset(table, fromTimestamp);
startingOffset = determineStartingOffset(table, fromTimestamp);
}

Snapshot curSnapshot = table.snapshot(curOffset.snapshotId());
int startPosOfSnapOffset = (int) curOffset.position();
Snapshot curSnapshot = table.snapshot(startingOffset.snapshotId());
int startPosOfSnapOffset = (int) startingOffset.position();

boolean isOk = true;
int curFilesAdded = 0;
Expand All @@ -312,10 +310,10 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) {
curPos = 0;
for (DataFile dataFile : curSnapshot.addedFiles()) {
GenericDataFile genericDataFile = (GenericDataFile) dataFile;
if (curPos > startPosOfSnapOffset) {
if (curPos >= startPosOfSnapOffset) {
// TODO : use readLimit provided in function param, the readLimits are derived from these 2 properties.
if (curFilesAdded + 1 > maxFilesPerMicroBatch ||
curRecordCount + genericDataFile.recordCount() > maxRecordsPerMicroBatch) {
if ((curFilesAdded + 1) > maxFilesPerMicroBatch ||
(curRecordCount + genericDataFile.recordCount()) > maxRecordsPerMicroBatch) {
isOk = false;
break;
}
Expand All @@ -338,7 +336,10 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) {
}
}

return new StreamingOffset(curSnapshot.snapshotId(), curPos, false);
StreamingOffset latestStreamingOffset = new StreamingOffset(curSnapshot.snapshotId(), curPos, false);

// if no new data arrived, then return null.
return latestStreamingOffset.equals(startingOffset) ? null : latestStreamingOffset;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,34 +170,36 @@ public void testReadStreamOnIcebergThenAddData() throws Exception {
public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfFiles_1() throws Exception {
appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);

Assert.assertEquals(3,
Assert.assertEquals(6,
microBatchCount(ImmutableMap.of(SparkReadOptions.MAX_FILES_PER_MICRO_BATCH, "1")));
}

@Test
public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfFiles_2() throws Exception {
appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);

Assert.assertEquals(2,
Assert.assertEquals(3,
microBatchCount(ImmutableMap.of(SparkReadOptions.MAX_FILES_PER_MICRO_BATCH, "2")));
}

@Test
public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfRows_1() throws Exception {
appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);

Assert.assertEquals(4,
// only 1 micro-batch will be formed and we will read data partially
Assert.assertEquals(1,
microBatchCount(ImmutableMap.of(SparkReadOptions.MAX_ROWS_PER_MICRO_BATCH, "1")));

StreamingQuery query = startStream();
StreamingQuery query = startStream(SparkReadOptions.MAX_ROWS_PER_MICRO_BATCH, "1");

// check answer correctness
// check answer correctness only 1 record read the micro-batch will be stuck
List<SimpleRecord> actual = rowsAvailable(query);
Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(TEST_DATA_MULTIPLE_SNAPSHOTS));
Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(
Lists.newArrayList(TEST_DATA_MULTIPLE_SNAPSHOTS.get(0).get(0)));
}

@Test
public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfFiles_4() throws Exception {
public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfRows_4() throws Exception {
appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);

Assert.assertEquals(2,
Expand Down Expand Up @@ -597,7 +599,7 @@ private StreamingQuery startStream() throws TimeoutException {
}

private StreamingQuery startStream(String key, String value) throws TimeoutException {
return startStream(ImmutableMap.of(key, value));
return startStream(ImmutableMap.of(key, value, SparkReadOptions.MAX_FILES_PER_MICRO_BATCH, "1"));
}

private List<SimpleRecord> rowsAvailable(StreamingQuery query) {
Expand Down

0 comments on commit b9b1de5

Please sign in to comment.