Skip to content

Commit

Permalink
Spark 3.3: support rate limit in Spark Streaming (apache#4479)
Browse files Browse the repository at this point in the history
  • Loading branch information
singhpk234 authored Apr 22, 2023
1 parent a36d120 commit 029622b
Show file tree
Hide file tree
Showing 6 changed files with 384 additions and 113 deletions.
209 changes: 121 additions & 88 deletions core/src/main/java/org/apache/iceberg/MicroBatches.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.Pair;
import org.slf4j.Logger;
Expand All @@ -36,6 +37,96 @@
public class MicroBatches {
private MicroBatches() {}

public static List<Pair<ManifestFile, Integer>> skippedManifestIndexesFromSnapshot(
FileIO io, Snapshot snapshot, long startFileIndex, boolean scanAllFiles) {
List<ManifestFile> manifests =
scanAllFiles
? snapshot.dataManifests(io)
: snapshot.dataManifests(io).stream()
.filter(m -> m.snapshotId().equals(snapshot.snapshotId()))
.collect(Collectors.toList());

List<Pair<ManifestFile, Integer>> manifestIndexes = indexManifests(manifests);

return skipManifests(manifestIndexes, startFileIndex);
}

public static CloseableIterable<FileScanTask> openManifestFile(
FileIO io,
Map<Integer, PartitionSpec> specsById,
boolean caseSensitive,
Snapshot snapshot,
ManifestFile manifestFile,
boolean scanAllFiles) {

ManifestGroup manifestGroup =
new ManifestGroup(io, ImmutableList.of(manifestFile))
.specsById(specsById)
.caseSensitive(caseSensitive);
if (!scanAllFiles) {
manifestGroup =
manifestGroup
.filterManifestEntries(
entry ->
entry.snapshotId() == snapshot.snapshotId()
&& entry.status() == ManifestEntry.Status.ADDED)
.ignoreDeleted();
}

return manifestGroup.planFiles();
}

/**
* Method to index the data files for each manifest. For example, if manifest m1 has 3 data files,
* manifest m2 has 2 data files, manifest m3 has 1 data file, then the index will be (m1, 0), (m2,
* 3), (m3, 5).
*
* @param manifestFiles List of input manifests used to index.
* @return a list pairing each manifest with the index number of the first data file entry in that
* manifest.
*/
private static List<Pair<ManifestFile, Integer>> indexManifests(
List<ManifestFile> manifestFiles) {
int currentFileIndex = 0;
List<Pair<ManifestFile, Integer>> manifestIndexes = Lists.newArrayList();

for (ManifestFile manifest : manifestFiles) {
manifestIndexes.add(Pair.of(manifest, currentFileIndex));
currentFileIndex += manifest.addedFilesCount() + manifest.existingFilesCount();
}

return manifestIndexes;
}

/**
* Method to skip the manifest file whose index is smaller than startFileIndex. For example, if
* the index list is : (m1, 0), (m2, 3), (m3, 5), and startFileIndex is 4, then the returned
* manifest index list is: (m2, 3), (m3, 5).
*
* @param indexedManifests List of input manifests.
* @param startFileIndex Index used to skip all manifests with an index less than or equal to this
* value.
* @return a sub-list of manifest file index which only contains the manifest indexes larger than
* the startFileIndex.
*/
private static List<Pair<ManifestFile, Integer>> skipManifests(
List<Pair<ManifestFile, Integer>> indexedManifests, long startFileIndex) {
if (startFileIndex == 0) {
return indexedManifests;
}

int manifestIndex = 0;
for (Pair<ManifestFile, Integer> manifest : indexedManifests) {
if (manifest.second() > startFileIndex) {
break;
}

manifestIndex++;
}

return indexedManifests.subList(Math.max(manifestIndex - 1, 0), indexedManifests.size());
}

public static class MicroBatch {
private final long snapshotId;
private final long startFileIndex;
Expand Down Expand Up @@ -113,73 +204,27 @@ public MicroBatchBuilder specsById(Map<Integer, PartitionSpec> specs) {
}

public MicroBatch generate(long startFileIndex, long targetSizeInBytes, boolean scanAllFiles) {
return generate(
startFileIndex,
Iterables.size(snapshot.addedDataFiles(io)),
targetSizeInBytes,
scanAllFiles);
}

public MicroBatch generate(
long startFileIndex, long endFileIndex, long targetSizeInBytes, boolean scanAllFiles) {
Preconditions.checkArgument(endFileIndex >= 0, "endFileIndex is unexpectedly smaller than 0");
Preconditions.checkArgument(
startFileIndex >= 0, "startFileIndex is unexpectedly smaller than 0");
Preconditions.checkArgument(
targetSizeInBytes > 0, "targetSizeInBytes should be larger than 0");

List<ManifestFile> manifests =
scanAllFiles
? snapshot.dataManifests(io)
: snapshot.dataManifests(io).stream()
.filter(m -> m.snapshotId().equals(snapshot.snapshotId()))
.collect(Collectors.toList());

List<Pair<ManifestFile, Integer>> manifestIndexes = indexManifests(manifests);
List<Pair<ManifestFile, Integer>> skippedManifestIndexes =
skipManifests(manifestIndexes, startFileIndex);

return generateMicroBatch(
skippedManifestIndexes, startFileIndex, targetSizeInBytes, scanAllFiles);
}

/**
* Method to index the data files for each manifest. For example, if manifest m1 has 3 data
* files, manifest m2 has 2 data files, manifest m3 has 1 data file, then the index will be (m1,
* 0), (m2, 3), (m3, 5).
*
* @param manifestFiles List of input manifests used to index.
* @return a list of manifest index with key as manifest file, value as file counts.
*/
private static List<Pair<ManifestFile, Integer>> indexManifests(
List<ManifestFile> manifestFiles) {
int currentFileIndex = 0;
List<Pair<ManifestFile, Integer>> manifestIndexes = Lists.newArrayList();

for (ManifestFile manifest : manifestFiles) {
manifestIndexes.add(Pair.of(manifest, currentFileIndex));
currentFileIndex += manifest.addedFilesCount() + manifest.existingFilesCount();
}

return manifestIndexes;
}

/**
* Method to skip the manifest file in which the index is smaller than startFileIndex. For
* example, if the index list is : (m1, 0), (m2, 3), (m3, 5), and startFileIndex is 4, then the
* returned manifest index list is: (m2, 3), (m3, 5).
*
* @param indexedManifests List of input manifests.
* @param startFileIndex Index used to skip the processed manifests.
* @return a sub-list of manifest file index which only contains the manifest indexes larger
* than the startFileIndex.
*/
private static List<Pair<ManifestFile, Integer>> skipManifests(
List<Pair<ManifestFile, Integer>> indexedManifests, long startFileIndex) {
if (startFileIndex == 0) {
return indexedManifests;
}

int manifestIndex = 0;
for (Pair<ManifestFile, Integer> manifest : indexedManifests) {
if (manifest.second() > startFileIndex) {
break;
}

manifestIndex++;
}

return indexedManifests.subList(manifestIndex - 1, indexedManifests.size());
skippedManifestIndexesFromSnapshot(io, snapshot, startFileIndex, scanAllFiles),
startFileIndex,
endFileIndex,
targetSizeInBytes,
scanAllFiles);
}

/**
Expand All @@ -188,25 +233,23 @@ private static List<Pair<ManifestFile, Integer>> skipManifests(
*
* @param indexedManifests A list of indexed manifests to generate MicroBatch
* @param startFileIndex A startFileIndex used to skip processed files.
* @param endFileIndex An endFileIndex used to find files to include, exclusive.
* @param targetSizeInBytes Used to control the size of MicroBatch, the processed file bytes
* must be smaller than this size.
* @param scanAllFiles Used to check whether all the data files should be processed, or only
* added files.
* @return A MicroBatch.
*/
@SuppressWarnings("checkstyle:CyclomaticComplexity")
private MicroBatch generateMicroBatch(
List<Pair<ManifestFile, Integer>> indexedManifests,
long startFileIndex,
long endFileIndex,
long targetSizeInBytes,
boolean scanAllFiles) {
if (indexedManifests.isEmpty()) {
return new MicroBatch(
snapshot.snapshotId(),
startFileIndex,
startFileIndex + 1,
0L,
Collections.emptyList(),
true);
snapshot.snapshotId(), startFileIndex, endFileIndex, 0L, Collections.emptyList(), true);
}

long currentSizeInBytes = 0L;
Expand All @@ -218,11 +261,18 @@ private MicroBatch generateMicroBatch(
currentFileIndex = indexedManifests.get(idx).second();

try (CloseableIterable<FileScanTask> taskIterable =
open(indexedManifests.get(idx).first(), scanAllFiles);
openManifestFile(
io,
specsById,
caseSensitive,
snapshot,
indexedManifests.get(idx).first(),
scanAllFiles);
CloseableIterator<FileScanTask> taskIter = taskIterable.iterator()) {
while (taskIter.hasNext()) {
FileScanTask task = taskIter.next();
if (currentFileIndex >= startFileIndex) {
// want to read [startFileIndex ... endFileIndex)
if (currentFileIndex >= startFileIndex && currentFileIndex < endFileIndex) {
// Make sure there's at least one task in each MicroBatch to void job to be stuck,
// always add task
// firstly.
Expand All @@ -231,7 +281,7 @@ private MicroBatch generateMicroBatch(
}

currentFileIndex++;
if (currentSizeInBytes >= targetSizeInBytes) {
if (currentSizeInBytes >= targetSizeInBytes || currentFileIndex >= endFileIndex) {
break;
}
}
Expand Down Expand Up @@ -259,6 +309,7 @@ private MicroBatch generateMicroBatch(
}
}

// [startFileIndex ....currentFileIndex)
return new MicroBatch(
snapshot.snapshotId(),
startFileIndex,
Expand All @@ -267,23 +318,5 @@ private MicroBatch generateMicroBatch(
tasks,
isLastIndex);
}

private CloseableIterable<FileScanTask> open(ManifestFile manifestFile, boolean scanAllFiles) {
ManifestGroup manifestGroup =
new ManifestGroup(io, ImmutableList.of(manifestFile))
.specsById(specsById)
.caseSensitive(caseSensitive);
if (!scanAllFiles) {
manifestGroup =
manifestGroup
.filterManifestEntries(
entry ->
entry.snapshotId() == snapshot.snapshotId()
&& entry.status() == ManifestEntry.Status.ADDED)
.ignoreDeleted();
}

return manifestGroup.planFiles();
}
}
}
20 changes: 10 additions & 10 deletions core/src/test/java/org/apache/iceberg/TestMicroBatchBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void testGenerateMicroBatch() {
MicroBatch batch =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(0, Long.MAX_VALUE, true);
.generate(0, 6, Long.MAX_VALUE, true);
Assert.assertEquals(batch.snapshotId(), 1L);
Assert.assertEquals(batch.startFileIndex(), 0);
Assert.assertEquals(batch.endFileIndex(), 5);
Expand All @@ -63,7 +63,7 @@ public void testGenerateMicroBatch() {
MicroBatch batch1 =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(0, 15L, true);
.generate(0, 1, 15L, true);
Assert.assertEquals(batch1.endFileIndex(), 1);
Assert.assertEquals(batch1.sizeInBytes(), 10);
Assert.assertFalse(batch1.lastIndexOfSnapshot());
Expand All @@ -72,7 +72,7 @@ public void testGenerateMicroBatch() {
MicroBatch batch2 =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch1.endFileIndex(), 30L, true);
.generate(batch1.endFileIndex(), 4, 30L, true);
Assert.assertEquals(batch2.endFileIndex(), 4);
Assert.assertEquals(batch2.sizeInBytes(), 30);
Assert.assertFalse(batch2.lastIndexOfSnapshot());
Expand All @@ -81,7 +81,7 @@ public void testGenerateMicroBatch() {
MicroBatch batch3 =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch2.endFileIndex(), 50L, true);
.generate(batch2.endFileIndex(), 5, 50L, true);
Assert.assertEquals(batch3.endFileIndex(), 5);
Assert.assertEquals(batch3.sizeInBytes(), 10);
Assert.assertTrue(batch3.lastIndexOfSnapshot());
Expand All @@ -95,7 +95,7 @@ public void testGenerateMicroBatchWithSmallTargetSize() {
MicroBatch batch =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(0, 10L, true);
.generate(0, 1, 10L, true);
Assert.assertEquals(batch.snapshotId(), 1L);
Assert.assertEquals(batch.startFileIndex(), 0);
Assert.assertEquals(batch.endFileIndex(), 1);
Expand All @@ -106,7 +106,7 @@ public void testGenerateMicroBatchWithSmallTargetSize() {
MicroBatch batch1 =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch.endFileIndex(), 5L, true);
.generate(batch.endFileIndex(), 2, 5L, true);
Assert.assertEquals(batch1.endFileIndex(), 2);
Assert.assertEquals(batch1.sizeInBytes(), 10);
filesMatch(Lists.newArrayList("B"), filesToScan(batch1.tasks()));
Expand All @@ -115,7 +115,7 @@ public void testGenerateMicroBatchWithSmallTargetSize() {
MicroBatch batch2 =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch1.endFileIndex(), 10L, true);
.generate(batch1.endFileIndex(), 3, 10L, true);
Assert.assertEquals(batch2.endFileIndex(), 3);
Assert.assertEquals(batch2.sizeInBytes(), 10);
filesMatch(Lists.newArrayList("C"), filesToScan(batch2.tasks()));
Expand All @@ -124,7 +124,7 @@ public void testGenerateMicroBatchWithSmallTargetSize() {
MicroBatch batch3 =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch2.endFileIndex(), 10L, true);
.generate(batch2.endFileIndex(), 4, 10L, true);
Assert.assertEquals(batch3.endFileIndex(), 4);
Assert.assertEquals(batch3.sizeInBytes(), 10);
filesMatch(Lists.newArrayList("D"), filesToScan(batch3.tasks()));
Expand All @@ -133,7 +133,7 @@ public void testGenerateMicroBatchWithSmallTargetSize() {
MicroBatch batch4 =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch3.endFileIndex(), 5L, true);
.generate(batch3.endFileIndex(), 5, 5L, true);
Assert.assertEquals(batch4.endFileIndex(), 5);
Assert.assertEquals(batch4.sizeInBytes(), 10);
filesMatch(Lists.newArrayList("E"), filesToScan(batch4.tasks()));
Expand All @@ -142,7 +142,7 @@ public void testGenerateMicroBatchWithSmallTargetSize() {
MicroBatch batch5 =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch4.endFileIndex(), 5L, true);
.generate(batch4.endFileIndex(), 5, 5L, true);
Assert.assertEquals(batch5.endFileIndex(), 5);
Assert.assertEquals(batch5.sizeInBytes(), 0);
Assert.assertTrue(Iterables.isEmpty(batch5.tasks()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,22 @@ public Long endTimestamp() {
return confParser.longConf().option(SparkReadOptions.END_TIMESTAMP).parseOptional();
}

public Integer maxFilesPerMicroBatch() {
return confParser
.intConf()
.option(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH)
.defaultValue(Integer.MAX_VALUE)
.parse();
}

public Integer maxRecordsPerMicroBatch() {
return confParser
.intConf()
.option(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH)
.defaultValue(Integer.MAX_VALUE)
.parse();
}

public boolean preserveDataGrouping() {
return confParser
.booleanConf()
Expand Down
Loading

0 comments on commit 029622b

Please sign in to comment.