From 029622b42091a17ca386977dba4a68f511ffba21 Mon Sep 17 00:00:00 2001 From: Prashant Singh <35593236+singhpk234@users.noreply.github.com> Date: Sat, 22 Apr 2023 10:24:49 -0700 Subject: [PATCH] Spark 3.3: support rate limit in Spark Streaming (#4479) --- .../java/org/apache/iceberg/MicroBatches.java | 209 ++++++++++-------- .../apache/iceberg/TestMicroBatchBuilder.java | 20 +- .../apache/iceberg/spark/SparkReadConf.java | 16 ++ .../iceberg/spark/SparkReadOptions.java | 7 + .../spark/source/SparkMicroBatchStream.java | 169 ++++++++++++-- .../source/TestStructuredStreamingRead3.java | 76 ++++++- 6 files changed, 384 insertions(+), 113 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/MicroBatches.java b/core/src/main/java/org/apache/iceberg/MicroBatches.java index e066e1b31493..d96246f15b02 100644 --- a/core/src/main/java/org/apache/iceberg/MicroBatches.java +++ b/core/src/main/java/org/apache/iceberg/MicroBatches.java @@ -28,6 +28,7 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.util.Pair; import org.slf4j.Logger; @@ -36,6 +37,96 @@ public class MicroBatches { private MicroBatches() {} + public static List> skippedManifestIndexesFromSnapshot( + FileIO io, Snapshot snapshot, long startFileIndex, boolean scanAllFiles) { + List manifests = + scanAllFiles + ? snapshot.dataManifests(io) + : snapshot.dataManifests(io).stream() + .filter(m -> m.snapshotId().equals(snapshot.snapshotId())) + .collect(Collectors.toList()); + + List> manifestIndexes = indexManifests(manifests); + + return skipManifests(manifestIndexes, startFileIndex); + } + + public static CloseableIterable openManifestFile( + FileIO io, + Map specsById, + boolean caseSensitive, + Snapshot snapshot, + ManifestFile manifestFile, + boolean scanAllFiles) { + + ManifestGroup manifestGroup = + new ManifestGroup(io, ImmutableList.of(manifestFile)) + .specsById(specsById) + .caseSensitive(caseSensitive); + if (!scanAllFiles) { + manifestGroup = + manifestGroup + .filterManifestEntries( + entry -> + entry.snapshotId() == snapshot.snapshotId() + && entry.status() == ManifestEntry.Status.ADDED) + .ignoreDeleted(); + } + + return manifestGroup.planFiles(); + } + + /** + * Method to index the data files for each manifest. For example, if manifest m1 has 3 data files, + * manifest m2 has 2 data files, manifest m3 has 1 data file, then the index will be (m1, 0), (m2, + * 3), (m3, 5). + * + * @param manifestFiles List of input manifests used to index. + * @return a list pairing each manifest with the index number of the first data file entry in that + * manifest. + */ + private static List> indexManifests( + List manifestFiles) { + int currentFileIndex = 0; + List> manifestIndexes = Lists.newArrayList(); + + for (ManifestFile manifest : manifestFiles) { + manifestIndexes.add(Pair.of(manifest, currentFileIndex)); + currentFileIndex += manifest.addedFilesCount() + manifest.existingFilesCount(); + } + + return manifestIndexes; + } + + /** + * Method to skip the manifest file whose index is smaller than startFileIndex. For example, if + * the index list is : (m1, 0), (m2, 3), (m3, 5), and startFileIndex is 4, then the returned + * manifest index list is: (m2, 3), (m3, 5). + * + * @param indexedManifests List of input manifests. + * @param startFileIndex Index used to skip all manifests with an index less than or equal to this + * value. + * @return a sub-list of manifest file index which only contains the manifest indexes larger than + * the startFileIndex. + */ + private static List> skipManifests( + List> indexedManifests, long startFileIndex) { + if (startFileIndex == 0) { + return indexedManifests; + } + + int manifestIndex = 0; + for (Pair manifest : indexedManifests) { + if (manifest.second() > startFileIndex) { + break; + } + + manifestIndex++; + } + + return indexedManifests.subList(Math.max(manifestIndex - 1, 0), indexedManifests.size()); + } + public static class MicroBatch { private final long snapshotId; private final long startFileIndex; @@ -113,73 +204,27 @@ public MicroBatchBuilder specsById(Map specs) { } public MicroBatch generate(long startFileIndex, long targetSizeInBytes, boolean scanAllFiles) { + return generate( + startFileIndex, + Iterables.size(snapshot.addedDataFiles(io)), + targetSizeInBytes, + scanAllFiles); + } + + public MicroBatch generate( + long startFileIndex, long endFileIndex, long targetSizeInBytes, boolean scanAllFiles) { + Preconditions.checkArgument(endFileIndex >= 0, "endFileIndex is unexpectedly smaller than 0"); Preconditions.checkArgument( startFileIndex >= 0, "startFileIndex is unexpectedly smaller than 0"); Preconditions.checkArgument( targetSizeInBytes > 0, "targetSizeInBytes should be larger than 0"); - List manifests = - scanAllFiles - ? snapshot.dataManifests(io) - : snapshot.dataManifests(io).stream() - .filter(m -> m.snapshotId().equals(snapshot.snapshotId())) - .collect(Collectors.toList()); - - List> manifestIndexes = indexManifests(manifests); - List> skippedManifestIndexes = - skipManifests(manifestIndexes, startFileIndex); - return generateMicroBatch( - skippedManifestIndexes, startFileIndex, targetSizeInBytes, scanAllFiles); - } - - /** - * Method to index the data files for each manifest. For example, if manifest m1 has 3 data - * files, manifest m2 has 2 data files, manifest m3 has 1 data file, then the index will be (m1, - * 0), (m2, 3), (m3, 5). - * - * @param manifestFiles List of input manifests used to index. - * @return a list of manifest index with key as manifest file, value as file counts. - */ - private static List> indexManifests( - List manifestFiles) { - int currentFileIndex = 0; - List> manifestIndexes = Lists.newArrayList(); - - for (ManifestFile manifest : manifestFiles) { - manifestIndexes.add(Pair.of(manifest, currentFileIndex)); - currentFileIndex += manifest.addedFilesCount() + manifest.existingFilesCount(); - } - - return manifestIndexes; - } - - /** - * Method to skip the manifest file in which the index is smaller than startFileIndex. For - * example, if the index list is : (m1, 0), (m2, 3), (m3, 5), and startFileIndex is 4, then the - * returned manifest index list is: (m2, 3), (m3, 5). - * - * @param indexedManifests List of input manifests. - * @param startFileIndex Index used to skip the processed manifests. - * @return a sub-list of manifest file index which only contains the manifest indexes larger - * than the startFileIndex. - */ - private static List> skipManifests( - List> indexedManifests, long startFileIndex) { - if (startFileIndex == 0) { - return indexedManifests; - } - - int manifestIndex = 0; - for (Pair manifest : indexedManifests) { - if (manifest.second() > startFileIndex) { - break; - } - - manifestIndex++; - } - - return indexedManifests.subList(manifestIndex - 1, indexedManifests.size()); + skippedManifestIndexesFromSnapshot(io, snapshot, startFileIndex, scanAllFiles), + startFileIndex, + endFileIndex, + targetSizeInBytes, + scanAllFiles); } /** @@ -188,25 +233,23 @@ private static List> skipManifests( * * @param indexedManifests A list of indexed manifests to generate MicroBatch * @param startFileIndex A startFileIndex used to skip processed files. + * @param endFileIndex An endFileIndex used to find files to include, exclusive. * @param targetSizeInBytes Used to control the size of MicroBatch, the processed file bytes * must be smaller than this size. * @param scanAllFiles Used to check whether all the data files should be processed, or only * added files. * @return A MicroBatch. */ + @SuppressWarnings("checkstyle:CyclomaticComplexity") private MicroBatch generateMicroBatch( List> indexedManifests, long startFileIndex, + long endFileIndex, long targetSizeInBytes, boolean scanAllFiles) { if (indexedManifests.isEmpty()) { return new MicroBatch( - snapshot.snapshotId(), - startFileIndex, - startFileIndex + 1, - 0L, - Collections.emptyList(), - true); + snapshot.snapshotId(), startFileIndex, endFileIndex, 0L, Collections.emptyList(), true); } long currentSizeInBytes = 0L; @@ -218,11 +261,18 @@ private MicroBatch generateMicroBatch( currentFileIndex = indexedManifests.get(idx).second(); try (CloseableIterable taskIterable = - open(indexedManifests.get(idx).first(), scanAllFiles); + openManifestFile( + io, + specsById, + caseSensitive, + snapshot, + indexedManifests.get(idx).first(), + scanAllFiles); CloseableIterator taskIter = taskIterable.iterator()) { while (taskIter.hasNext()) { FileScanTask task = taskIter.next(); - if (currentFileIndex >= startFileIndex) { + // want to read [startFileIndex ... endFileIndex) + if (currentFileIndex >= startFileIndex && currentFileIndex < endFileIndex) { // Make sure there's at least one task in each MicroBatch to void job to be stuck, // always add task // firstly. @@ -231,7 +281,7 @@ private MicroBatch generateMicroBatch( } currentFileIndex++; - if (currentSizeInBytes >= targetSizeInBytes) { + if (currentSizeInBytes >= targetSizeInBytes || currentFileIndex >= endFileIndex) { break; } } @@ -259,6 +309,7 @@ private MicroBatch generateMicroBatch( } } + // [startFileIndex ....currentFileIndex) return new MicroBatch( snapshot.snapshotId(), startFileIndex, @@ -267,23 +318,5 @@ private MicroBatch generateMicroBatch( tasks, isLastIndex); } - - private CloseableIterable open(ManifestFile manifestFile, boolean scanAllFiles) { - ManifestGroup manifestGroup = - new ManifestGroup(io, ImmutableList.of(manifestFile)) - .specsById(specsById) - .caseSensitive(caseSensitive); - if (!scanAllFiles) { - manifestGroup = - manifestGroup - .filterManifestEntries( - entry -> - entry.snapshotId() == snapshot.snapshotId() - && entry.status() == ManifestEntry.Status.ADDED) - .ignoreDeleted(); - } - - return manifestGroup.planFiles(); - } } } diff --git a/core/src/test/java/org/apache/iceberg/TestMicroBatchBuilder.java b/core/src/test/java/org/apache/iceberg/TestMicroBatchBuilder.java index b907a5031964..deb6e7c8ad32 100644 --- a/core/src/test/java/org/apache/iceberg/TestMicroBatchBuilder.java +++ b/core/src/test/java/org/apache/iceberg/TestMicroBatchBuilder.java @@ -52,7 +52,7 @@ public void testGenerateMicroBatch() { MicroBatch batch = MicroBatches.from(table.snapshot(1L), table.io()) .specsById(table.specs()) - .generate(0, Long.MAX_VALUE, true); + .generate(0, 6, Long.MAX_VALUE, true); Assert.assertEquals(batch.snapshotId(), 1L); Assert.assertEquals(batch.startFileIndex(), 0); Assert.assertEquals(batch.endFileIndex(), 5); @@ -63,7 +63,7 @@ public void testGenerateMicroBatch() { MicroBatch batch1 = MicroBatches.from(table.snapshot(1L), table.io()) .specsById(table.specs()) - .generate(0, 15L, true); + .generate(0, 1, 15L, true); Assert.assertEquals(batch1.endFileIndex(), 1); Assert.assertEquals(batch1.sizeInBytes(), 10); Assert.assertFalse(batch1.lastIndexOfSnapshot()); @@ -72,7 +72,7 @@ public void testGenerateMicroBatch() { MicroBatch batch2 = MicroBatches.from(table.snapshot(1L), table.io()) .specsById(table.specs()) - .generate(batch1.endFileIndex(), 30L, true); + .generate(batch1.endFileIndex(), 4, 30L, true); Assert.assertEquals(batch2.endFileIndex(), 4); Assert.assertEquals(batch2.sizeInBytes(), 30); Assert.assertFalse(batch2.lastIndexOfSnapshot()); @@ -81,7 +81,7 @@ public void testGenerateMicroBatch() { MicroBatch batch3 = MicroBatches.from(table.snapshot(1L), table.io()) .specsById(table.specs()) - .generate(batch2.endFileIndex(), 50L, true); + .generate(batch2.endFileIndex(), 5, 50L, true); Assert.assertEquals(batch3.endFileIndex(), 5); Assert.assertEquals(batch3.sizeInBytes(), 10); Assert.assertTrue(batch3.lastIndexOfSnapshot()); @@ -95,7 +95,7 @@ public void testGenerateMicroBatchWithSmallTargetSize() { MicroBatch batch = MicroBatches.from(table.snapshot(1L), table.io()) .specsById(table.specs()) - .generate(0, 10L, true); + .generate(0, 1, 10L, true); Assert.assertEquals(batch.snapshotId(), 1L); Assert.assertEquals(batch.startFileIndex(), 0); Assert.assertEquals(batch.endFileIndex(), 1); @@ -106,7 +106,7 @@ public void testGenerateMicroBatchWithSmallTargetSize() { MicroBatch batch1 = MicroBatches.from(table.snapshot(1L), table.io()) .specsById(table.specs()) - .generate(batch.endFileIndex(), 5L, true); + .generate(batch.endFileIndex(), 2, 5L, true); Assert.assertEquals(batch1.endFileIndex(), 2); Assert.assertEquals(batch1.sizeInBytes(), 10); filesMatch(Lists.newArrayList("B"), filesToScan(batch1.tasks())); @@ -115,7 +115,7 @@ public void testGenerateMicroBatchWithSmallTargetSize() { MicroBatch batch2 = MicroBatches.from(table.snapshot(1L), table.io()) .specsById(table.specs()) - .generate(batch1.endFileIndex(), 10L, true); + .generate(batch1.endFileIndex(), 3, 10L, true); Assert.assertEquals(batch2.endFileIndex(), 3); Assert.assertEquals(batch2.sizeInBytes(), 10); filesMatch(Lists.newArrayList("C"), filesToScan(batch2.tasks())); @@ -124,7 +124,7 @@ public void testGenerateMicroBatchWithSmallTargetSize() { MicroBatch batch3 = MicroBatches.from(table.snapshot(1L), table.io()) .specsById(table.specs()) - .generate(batch2.endFileIndex(), 10L, true); + .generate(batch2.endFileIndex(), 4, 10L, true); Assert.assertEquals(batch3.endFileIndex(), 4); Assert.assertEquals(batch3.sizeInBytes(), 10); filesMatch(Lists.newArrayList("D"), filesToScan(batch3.tasks())); @@ -133,7 +133,7 @@ public void testGenerateMicroBatchWithSmallTargetSize() { MicroBatch batch4 = MicroBatches.from(table.snapshot(1L), table.io()) .specsById(table.specs()) - .generate(batch3.endFileIndex(), 5L, true); + .generate(batch3.endFileIndex(), 5, 5L, true); Assert.assertEquals(batch4.endFileIndex(), 5); Assert.assertEquals(batch4.sizeInBytes(), 10); filesMatch(Lists.newArrayList("E"), filesToScan(batch4.tasks())); @@ -142,7 +142,7 @@ public void testGenerateMicroBatchWithSmallTargetSize() { MicroBatch batch5 = MicroBatches.from(table.snapshot(1L), table.io()) .specsById(table.specs()) - .generate(batch4.endFileIndex(), 5L, true); + .generate(batch4.endFileIndex(), 5, 5L, true); Assert.assertEquals(batch5.endFileIndex(), 5); Assert.assertEquals(batch5.sizeInBytes(), 0); Assert.assertTrue(Iterables.isEmpty(batch5.tasks())); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index 1c1182c4da60..dbd2613dded7 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -255,6 +255,22 @@ public Long endTimestamp() { return confParser.longConf().option(SparkReadOptions.END_TIMESTAMP).parseOptional(); } + public Integer maxFilesPerMicroBatch() { + return confParser + .intConf() + .option(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH) + .defaultValue(Integer.MAX_VALUE) + .parse(); + } + + public Integer maxRecordsPerMicroBatch() { + return confParser + .intConf() + .option(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH) + .defaultValue(Integer.MAX_VALUE) + .parse(); + } + public boolean preserveDataGrouping() { return confParser .booleanConf() diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java index 9063e0f9aba6..80d60cf872f3 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java @@ -84,6 +84,13 @@ private SparkReadOptions() {} // Timestamp in milliseconds; start a stream from the snapshot that occurs after this timestamp public static final String STREAM_FROM_TIMESTAMP = "stream-from-timestamp"; + // maximum file per micro_batch + public static final String STREAMING_MAX_FILES_PER_MICRO_BATCH = + "streaming-max-files-per-micro-batch"; + // maximum rows per micro_batch + public static final String STREAMING_MAX_ROWS_PER_MICRO_BATCH = + "streaming-max-rows-per-micro-batch"; + // Table path public static final String PATH = "path"; diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 6e03dd69a850..bb419e9951b9 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -30,6 +30,7 @@ import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DataOperations; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.ManifestFile; import org.apache.iceberg.MicroBatches; import org.apache.iceberg.MicroBatches.MicroBatch; import org.apache.iceberg.Schema; @@ -38,6 +39,7 @@ import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.Table; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; @@ -48,6 +50,7 @@ import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.TableScanUtil; @@ -59,10 +62,12 @@ import org.apache.spark.sql.connector.read.PartitionReaderFactory; import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; import org.apache.spark.sql.connector.read.streaming.Offset; +import org.apache.spark.sql.connector.read.streaming.ReadLimit; +import org.apache.spark.sql.connector.read.streaming.SupportsAdmissionControl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SparkMicroBatchStream implements MicroBatchStream { +public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissionControl { private static final Joiner SLASH = Joiner.on("/"); private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class); private static final Types.StructType EMPTY_GROUPING_KEY_TYPE = Types.StructType.of(); @@ -80,6 +85,8 @@ public class SparkMicroBatchStream implements MicroBatchStream { private final boolean skipDelete; private final boolean skipOverwrite; private final Long fromTimestamp; + private final Integer maxFilesPerMicroBatch; + private final Integer maxRecordsPerMicroBatch; SparkMicroBatchStream( JavaSparkContext sparkContext, @@ -97,6 +104,8 @@ public class SparkMicroBatchStream implements MicroBatchStream { this.splitLookback = readConf.splitLookback(); this.splitOpenFileCost = readConf.splitOpenFileCost(); this.fromTimestamp = readConf.streamFromTimestamp(); + this.maxFilesPerMicroBatch = readConf.maxFilesPerMicroBatch(); + this.maxRecordsPerMicroBatch = readConf.maxRecordsPerMicroBatch(); InitialOffsetStore initialOffsetStore = new InitialOffsetStore(table, checkpointLocation, fromTimestamp); @@ -118,16 +127,8 @@ public Offset latestOffset() { } Snapshot latestSnapshot = table.currentSnapshot(); - long addedFilesCount = - PropertyUtil.propertyAsLong(latestSnapshot.summary(), SnapshotSummary.ADDED_FILES_PROP, -1); - // if the latest snapshot summary doesn't contain SnapshotSummary.ADDED_FILES_PROP, - // iterate through addedDataFiles to compute addedFilesCount - addedFilesCount = - addedFilesCount == -1 - ? Iterables.size(latestSnapshot.addedDataFiles(table.io())) - : addedFilesCount; - - return new StreamingOffset(latestSnapshot.snapshotId(), addedFilesCount, false); + + return new StreamingOffset(latestSnapshot.snapshotId(), addedFilesCount(latestSnapshot), false); } @Override @@ -204,12 +205,20 @@ private List planFiles(StreamingOffset startOffset, StreamingOffse StreamingOffset currentOffset = null; + // [(startOffset : startFileIndex), (endOffset : endFileIndex) ) do { + long endFileIndex; if (currentOffset == null) { currentOffset = batchStartOffset; } else { Snapshot snapshotAfter = SnapshotUtil.snapshotAfter(table, currentOffset.snapshotId()); - currentOffset = new StreamingOffset(snapshotAfter.snapshotId(), 0L, false); + // it may happen that we need to read this snapshot partially in case it's equal to + // endOffset. + if (currentOffset.snapshotId() != endOffset.snapshotId()) { + currentOffset = new StreamingOffset(snapshotAfter.snapshotId(), 0L, false); + } else { + currentOffset = endOffset; + } } Snapshot snapshot = table.snapshot(currentOffset.snapshotId()); @@ -226,12 +235,22 @@ private List planFiles(StreamingOffset startOffset, StreamingOffse continue; } + Snapshot currentSnapshot = table.snapshot(currentOffset.snapshotId()); + if (currentOffset.snapshotId() == endOffset.snapshotId()) { + endFileIndex = endOffset.position(); + } else { + endFileIndex = addedFilesCount(currentSnapshot); + } + MicroBatch latestMicroBatch = - MicroBatches.from(table.snapshot(currentOffset.snapshotId()), table.io()) + MicroBatches.from(currentSnapshot, table.io()) .caseSensitive(caseSensitive) .specsById(table.specs()) .generate( - currentOffset.position(), Long.MAX_VALUE, currentOffset.shouldScanAllFiles()); + currentOffset.position(), + endFileIndex, + Long.MAX_VALUE, + currentOffset.shouldScanAllFiles()); fileScanTasks.addAll(latestMicroBatch.tasks()); } while (currentOffset.snapshotId() != endOffset.snapshotId()); @@ -295,6 +314,128 @@ private static StreamingOffset determineStartingOffset(Table table, Long fromTim } } + @Override + @SuppressWarnings("checkstyle:CyclomaticComplexity") + public Offset latestOffset(Offset startOffset, ReadLimit limit) { + // calculate end offset get snapshotId from the startOffset + Preconditions.checkArgument( + startOffset instanceof StreamingOffset, + "Invalid start offset: %s is not a StreamingOffset", + startOffset); + + table.refresh(); + if (table.currentSnapshot() == null) { + return StreamingOffset.START_OFFSET; + } + + if (table.currentSnapshot().timestampMillis() < fromTimestamp) { + return StreamingOffset.START_OFFSET; + } + + // end offset can expand to multiple snapshots + StreamingOffset startingOffset = (StreamingOffset) startOffset; + + if (startOffset.equals(StreamingOffset.START_OFFSET)) { + startingOffset = determineStartingOffset(table, fromTimestamp); + } + + Snapshot curSnapshot = table.snapshot(startingOffset.snapshotId()); + int startPosOfSnapOffset = (int) startingOffset.position(); + + boolean scanAllFiles = startingOffset.shouldScanAllFiles(); + + boolean shouldContinueReading = true; + int curFilesAdded = 0; + int curRecordCount = 0; + int curPos = 0; + + // Note : we produce nextOffset with pos as non-inclusive + while (shouldContinueReading) { + // generate manifest index for the curSnapshot + List> indexedManifests = + MicroBatches.skippedManifestIndexesFromSnapshot( + table.io(), curSnapshot, startPosOfSnapOffset, scanAllFiles); + // this is under assumption we will be able to add at-least 1 file in the new offset + for (int idx = 0; idx < indexedManifests.size() && shouldContinueReading; idx++) { + // be rest assured curPos >= startFileIndex + curPos = indexedManifests.get(idx).second(); + try (CloseableIterable taskIterable = + MicroBatches.openManifestFile( + table.io(), + table.specs(), + caseSensitive, + curSnapshot, + indexedManifests.get(idx).first(), + scanAllFiles); + CloseableIterator taskIter = taskIterable.iterator()) { + while (taskIter.hasNext()) { + FileScanTask task = taskIter.next(); + if (curPos >= startPosOfSnapOffset) { + // TODO : use readLimit provided in function param, the readLimits are derived from + // these 2 properties. + if ((curFilesAdded + 1) > maxFilesPerMicroBatch + || (curRecordCount + task.file().recordCount()) > maxRecordsPerMicroBatch) { + shouldContinueReading = false; + break; + } + + curFilesAdded += 1; + curRecordCount += task.file().recordCount(); + } + ++curPos; + } + } catch (IOException ioe) { + LOG.warn("Failed to close task iterable", ioe); + } + } + // if the currentSnapShot was also the mostRecentSnapshot then break + if (curSnapshot.snapshotId() == table.currentSnapshot().snapshotId()) { + break; + } + + // if everything was OK and we consumed complete snapshot then move to next snapshot + if (shouldContinueReading) { + startPosOfSnapOffset = -1; + curSnapshot = SnapshotUtil.snapshotAfter(table, curSnapshot.snapshotId()); + // if anyhow we are moving to next snapshot we should only scan addedFiles + scanAllFiles = false; + } + } + + StreamingOffset latestStreamingOffset = + new StreamingOffset(curSnapshot.snapshotId(), curPos, scanAllFiles); + + // if no new data arrived, then return null. + return latestStreamingOffset.equals(startingOffset) ? null : latestStreamingOffset; + } + + private long addedFilesCount(Snapshot snapshot) { + long addedFilesCount = + PropertyUtil.propertyAsLong(snapshot.summary(), SnapshotSummary.ADDED_FILES_PROP, -1); + // If snapshotSummary doesn't have SnapshotSummary.ADDED_FILES_PROP, + // iterate through addedFiles iterator to find addedFilesCount. + return addedFilesCount == -1 + ? Iterables.size(snapshot.addedDataFiles(table.io())) + : addedFilesCount; + } + + @Override + public ReadLimit getDefaultReadLimit() { + if (maxFilesPerMicroBatch != Integer.MAX_VALUE + && maxRecordsPerMicroBatch != Integer.MAX_VALUE) { + ReadLimit[] readLimits = new ReadLimit[2]; + readLimits[0] = ReadLimit.maxFiles(maxFilesPerMicroBatch); + readLimits[1] = ReadLimit.maxRows(maxFilesPerMicroBatch); + return ReadLimit.compositeLimit(readLimits); + } else if (maxFilesPerMicroBatch != Integer.MAX_VALUE) { + return ReadLimit.maxFiles(maxFilesPerMicroBatch); + } else if (maxRecordsPerMicroBatch != Integer.MAX_VALUE) { + return ReadLimit.maxRows(maxRecordsPerMicroBatch); + } else { + return ReadLimit.allAvailable(); + } + } + private static class InitialOffsetStore { private final Table table; private final FileIO io; diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index dd456f22371e..a2d0c9acaf48 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.BaseTable; @@ -47,6 +48,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkCatalogTestBase; import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.spark.api.java.function.VoidFunction2; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; @@ -70,6 +72,8 @@ public TestStructuredStreamingRead3( private Table table; + private final AtomicInteger microBatches = new AtomicInteger(); + /** * test data to be used by multiple writes each write creates a snapshot and writes a list of * records @@ -115,6 +119,7 @@ public void setupTable() { + "PARTITIONED BY (bucket(3, id))", tableName); this.table = validationCatalog.loadTable(tableIdent); + microBatches.set(0); } @After @@ -140,6 +145,57 @@ public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws Exception Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected)); } + @Test + public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfFiles_1() + throws Exception { + appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS); + + Assert.assertEquals( + 6, + microBatchCount( + ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1"))); + } + + @Test + public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfFiles_2() + throws Exception { + appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS); + + Assert.assertEquals( + 3, + microBatchCount( + ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "2"))); + } + + @Test + public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfRows_1() + throws Exception { + appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS); + + // only 1 micro-batch will be formed and we will read data partially + Assert.assertEquals( + 1, + microBatchCount(ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "1"))); + + StreamingQuery query = startStream(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "1"); + + // check answer correctness only 1 record read the micro-batch will be stuck + List actual = rowsAvailable(query); + Assertions.assertThat(actual) + .containsExactlyInAnyOrderElementsOf( + Lists.newArrayList(TEST_DATA_MULTIPLE_SNAPSHOTS.get(0).get(0))); + } + + @Test + public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfRows_4() + throws Exception { + appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS); + + Assert.assertEquals( + 2, + microBatchCount(ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "4"))); + } + @Test public void testReadStreamOnIcebergThenAddData() throws Exception { List> expected = TEST_DATA_MULTIPLE_SNAPSHOTS; @@ -561,7 +617,25 @@ private StreamingQuery startStream() throws TimeoutException { } private StreamingQuery startStream(String key, String value) throws TimeoutException { - return startStream(ImmutableMap.of(key, value)); + return startStream( + ImmutableMap.of(key, value, SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")); + } + + private int microBatchCount(Map options) throws TimeoutException { + Dataset ds = spark.readStream().options(options).format("iceberg").load(tableName); + + ds.writeStream() + .options(options) + .foreachBatch( + (VoidFunction2, Long>) + (dataset, batchId) -> { + microBatches.getAndIncrement(); + }) + .start() + .processAllAvailable(); + + stopStreams(); + return microBatches.get(); } private List rowsAvailable(StreamingQuery query) {