Skip to content

Commit

Permalink
spotless apply
Browse files Browse the repository at this point in the history
  • Loading branch information
Prashant Singh authored and singhpk234 committed Jan 8, 2023
1 parent dd0ca59 commit b3d2e07
Show file tree
Hide file tree
Showing 9 changed files with 197 additions and 149 deletions.
63 changes: 30 additions & 33 deletions core/src/main/java/org/apache/iceberg/MicroBatches.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,21 @@ public MicroBatchBuilder specsById(Map<Integer, PartitionSpec> specs) {
return this;
}

public MicroBatch generate(long startFileIndex, long targetSizeInBytes, boolean scanAllFiles) {
public MicroBatch generate(
long startFileIndex, long endFileIndex, long targetSizeInBytes, boolean scanAllFiles) {
Preconditions.checkArgument(endFileIndex >= 0, "endFileIndex is unexpectedly smaller than 0");
Preconditions.checkArgument(
startFileIndex >= 0, "startFileIndex is unexpectedly smaller than 0");
Preconditions.checkArgument(
targetSizeInBytes > 0, "targetSizeInBytes should be larger than 0");

return generateMicroBatch(
MicroBatchesUtil.skippedManifestIndexesFromSnapshot(snapshot, startFileIndex, scanAllFiles),
startFileIndex, endFileIndex, targetSizeInBytes, scanAllFiles);
MicroBatchesUtil.skippedManifestIndexesFromSnapshot(
snapshot, startFileIndex, scanAllFiles),
startFileIndex,
endFileIndex,
targetSizeInBytes,
scanAllFiles);
}

/**
Expand All @@ -128,18 +134,22 @@ public MicroBatch generate(long startFileIndex, long targetSizeInBytes, boolean
* @param indexedManifests A list of indexed manifests to generate MicroBatch
* @param startFileIndex A startFileIndex used to skip processed files.
* @param endFileIndex An endFileIndex used to find files to include, it not inclusive.
* @param targetSizeInBytes Used to control the size of MicroBatch, the processed file bytes must be smaller than
* this size.
* @param scanAllFiles Used to check whether all the data files should be processed, or only added files.
* @param targetSizeInBytes Used to control the size of MicroBatch, the processed file bytes
* must be smaller than this size.
* @param scanAllFiles Used to check whether all the data files should be processed, or only
* added files.
* @return A MicroBatch.
*/
@SuppressWarnings("checkstyle:CyclomaticComplexity")
private MicroBatch generateMicroBatch(
List<Pair<ManifestFile, Integer>> indexedManifests,
long startFileIndex, long endFileIndex, long targetSizeInBytes, boolean scanAllFiles) {
long startFileIndex,
long endFileIndex,
long targetSizeInBytes,
boolean scanAllFiles) {
if (indexedManifests.isEmpty()) {
return new MicroBatch(snapshot.snapshotId(), startFileIndex, endFileIndex, 0L,
Collections.emptyList(), true);
return new MicroBatch(
snapshot.snapshotId(), startFileIndex, endFileIndex, 0L, Collections.emptyList(), true);
}

long currentSizeInBytes = 0L;
Expand All @@ -150,14 +160,21 @@ private MicroBatch generateMicroBatch(
for (int idx = 0; idx < indexedManifests.size(); idx++) {
currentFileIndex = indexedManifests.get(idx).second();

try (CloseableIterable<FileScanTask> taskIterable = MicroBatchesUtil.openManifestFile(io, specsById,
caseSensitive, snapshot, indexedManifests.get(idx).first(), scanAllFiles);
try (CloseableIterable<FileScanTask> taskIterable =
MicroBatchesUtil.openManifestFile(
io,
specsById,
caseSensitive,
snapshot,
indexedManifests.get(idx).first(),
scanAllFiles);
CloseableIterator<FileScanTask> taskIter = taskIterable.iterator()) {
while (taskIter.hasNext()) {
FileScanTask task = taskIter.next();
// want to read [startFileIndex ... endFileIndex)
if (currentFileIndex >= startFileIndex && currentFileIndex < endFileIndex) {
// Make sure there's at least one task in each MicroBatch to void job to be stuck, always add task
// Make sure there's at least one task in each MicroBatch to void job to be stuck,
// always add task
// firstly.
tasks.add(task);
currentSizeInBytes += task.length();
Expand Down Expand Up @@ -192,34 +209,14 @@ private MicroBatch generateMicroBatch(
}
}

// [startFileIndex ....currentFileIndex)
return new MicroBatch(
snapshot.snapshotId(),
startFileIndex,
currentFileIndex,
currentSizeInBytes,
tasks,
isLastIndex);
// [startFileIndex ....currentFileIndex)
return new MicroBatch(snapshot.snapshotId(), startFileIndex, currentFileIndex, currentSizeInBytes,
tasks, isLastIndex);
}

private CloseableIterable<FileScanTask> open(ManifestFile manifestFile, boolean scanAllFiles) {
ManifestGroup manifestGroup =
new ManifestGroup(io, ImmutableList.of(manifestFile))
.specsById(specsById)
.caseSensitive(caseSensitive);
if (!scanAllFiles) {
manifestGroup =
manifestGroup
.filterManifestEntries(
entry ->
entry.snapshotId() == snapshot.snapshotId()
&& entry.status() == ManifestEntry.Status.ADDED)
.ignoreDeleted();
}

return manifestGroup.planFiles();
}
}
}
68 changes: 40 additions & 28 deletions core/src/main/java/org/apache/iceberg/MicroBatchesUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/


package org.apache.iceberg;

import java.util.List;
Expand All @@ -31,45 +29,59 @@

public class MicroBatchesUtil {

private MicroBatchesUtil() {
}
private MicroBatchesUtil() {}

public static List<Pair<ManifestFile, Integer>> skippedManifestIndexesFromSnapshot(
Snapshot snapshot, long startFileIndex, boolean scanAllFiles) {
// Preconditions.checkArgument(startFileIndex >= 0, "startFileIndex is unexpectedly smaller than 0");
List<ManifestFile> manifests = scanAllFiles ? snapshot.dataManifests() :
snapshot.dataManifests().stream().filter(m -> m.snapshotId().equals(snapshot.snapshotId()))
.collect(Collectors.toList());
// Preconditions.checkArgument(startFileIndex >= 0, "startFileIndex is unexpectedly smaller
// than 0");
List<ManifestFile> manifests =
scanAllFiles
? snapshot.dataManifests()
: snapshot.dataManifests().stream()
.filter(m -> m.snapshotId().equals(snapshot.snapshotId()))
.collect(Collectors.toList());

List<Pair<ManifestFile, Integer>> manifestIndexes = MicroBatchesUtil.indexManifests(manifests);

return MicroBatchesUtil.skipManifests(manifestIndexes, startFileIndex);
}

public static CloseableIterable<FileScanTask> openManifestFile(FileIO io, Map<Integer, PartitionSpec> specsById,
boolean caseSensitive, Snapshot snapshot, ManifestFile manifestFile, boolean scanAllFiles) {

ManifestGroup manifestGroup = new ManifestGroup(io, ImmutableList.of(manifestFile))
.specsById(specsById)
.caseSensitive(caseSensitive);
public static CloseableIterable<FileScanTask> openManifestFile(
FileIO io,
Map<Integer, PartitionSpec> specsById,
boolean caseSensitive,
Snapshot snapshot,
ManifestFile manifestFile,
boolean scanAllFiles) {

ManifestGroup manifestGroup =
new ManifestGroup(io, ImmutableList.of(manifestFile))
.specsById(specsById)
.caseSensitive(caseSensitive);
if (!scanAllFiles) {
manifestGroup = manifestGroup
.filterManifestEntries(entry ->
entry.snapshotId() == snapshot.snapshotId() && entry.status() == ManifestEntry.Status.ADDED)
.ignoreDeleted();
manifestGroup =
manifestGroup
.filterManifestEntries(
entry ->
entry.snapshotId() == snapshot.snapshotId()
&& entry.status() == ManifestEntry.Status.ADDED)
.ignoreDeleted();
}

return manifestGroup.planFiles();
}

/**
* Method to index the data files for each manifest. For example, if manifest m1 has 3 data files, manifest
* m2 has 2 data files, manifest m3 has 1 data file, then the index will be (m1, 0), (m2, 3), (m3, 5).
* Method to index the data files for each manifest. For example, if manifest m1 has 3 data files,
* manifest m2 has 2 data files, manifest m3 has 1 data file, then the index will be (m1, 0), (m2,
* 3), (m3, 5).
*
* @param manifestFiles List of input manifests used to index.
* @return a list of manifest index with key as manifest file, value as file counts.
*/
private static List<Pair<ManifestFile, Integer>> indexManifests(List<ManifestFile> manifestFiles) {
private static List<Pair<ManifestFile, Integer>> indexManifests(
List<ManifestFile> manifestFiles) {
int currentFileIndex = 0;
List<Pair<ManifestFile, Integer>> manifestIndexes = Lists.newArrayList();

Expand All @@ -82,17 +94,17 @@ private static List<Pair<ManifestFile, Integer>> indexManifests(List<ManifestFil
}

/**
* Method to skip the manifest file in which the index is smaller than startFileIndex. For example, if the
* index list is : (m1, 0), (m2, 3), (m3, 5), and startFileIndex is 4, then the returned manifest index list is:
* (m2, 3), (m3, 5).
* Method to skip the manifest file in which the index is smaller than startFileIndex. For
* example, if the index list is : (m1, 0), (m2, 3), (m3, 5), and startFileIndex is 4, then the
* returned manifest index list is: (m2, 3), (m3, 5).
*
* @param indexedManifests List of input manifests.
* @param startFileIndex Index used to skip the processed manifests.
* @return a sub-list of manifest file index which only contains the manifest indexes larger than the
* startFileIndex.
* @return a sub-list of manifest file index which only contains the manifest indexes larger than
* the startFileIndex.
*/
private static List<Pair<ManifestFile, Integer>> skipManifests(List<Pair<ManifestFile, Integer>> indexedManifests,
long startFileIndex) {
private static List<Pair<ManifestFile, Integer>> skipManifests(
List<Pair<ManifestFile, Integer>> indexedManifests, long startFileIndex) {
if (startFileIndex == 0) {
return indexedManifests;
}
Expand Down
70 changes: 40 additions & 30 deletions core/src/test/java/org/apache/iceberg/TestMicroBatchBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,35 +49,39 @@ public void setupTableProperties() {
public void testGenerateMicroBatch() {
add(table.newAppend(), files("A", "B", "C", "D", "E"));

MicroBatch batch = MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(0, 6, Long.MAX_VALUE, true);
MicroBatch batch =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(0, 6, Long.MAX_VALUE, true);
Assert.assertEquals(batch.snapshotId(), 1L);
Assert.assertEquals(batch.startFileIndex(), 0);
Assert.assertEquals(batch.endFileIndex(), 5);
Assert.assertEquals(batch.sizeInBytes(), 50);
Assert.assertTrue(batch.lastIndexOfSnapshot());
filesMatch(Lists.newArrayList("A", "B", "C", "D", "E"), filesToScan(batch.tasks()));

MicroBatch batch1 = MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(0, 1, 15L, true);
MicroBatch batch1 =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(0, 1, 15L, true);
Assert.assertEquals(batch1.endFileIndex(), 1);
Assert.assertEquals(batch1.sizeInBytes(), 10);
Assert.assertFalse(batch1.lastIndexOfSnapshot());
filesMatch(Lists.newArrayList("A"), filesToScan(batch1.tasks()));

MicroBatch batch2 = MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch1.endFileIndex(), 4, 30L, true);
MicroBatch batch2 =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch1.endFileIndex(), 4, 30L, true);
Assert.assertEquals(batch2.endFileIndex(), 4);
Assert.assertEquals(batch2.sizeInBytes(), 30);
Assert.assertFalse(batch2.lastIndexOfSnapshot());
filesMatch(Lists.newArrayList("B", "C", "D"), filesToScan(batch2.tasks()));

MicroBatch batch3 = MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch2.endFileIndex(), 5, 50L, true);
MicroBatch batch3 =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch2.endFileIndex(), 5, 50L, true);
Assert.assertEquals(batch3.endFileIndex(), 5);
Assert.assertEquals(batch3.sizeInBytes(), 10);
Assert.assertTrue(batch3.lastIndexOfSnapshot());
Expand All @@ -88,51 +92,57 @@ public void testGenerateMicroBatch() {
public void testGenerateMicroBatchWithSmallTargetSize() {
add(table.newAppend(), files("A", "B", "C", "D", "E"));

MicroBatch batch = MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(0, 1, 10L, true);
MicroBatch batch =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(0, 1, 10L, true);
Assert.assertEquals(batch.snapshotId(), 1L);
Assert.assertEquals(batch.startFileIndex(), 0);
Assert.assertEquals(batch.endFileIndex(), 1);
Assert.assertEquals(batch.sizeInBytes(), 10);
Assert.assertFalse(batch.lastIndexOfSnapshot());
filesMatch(Lists.newArrayList("A"), filesToScan(batch.tasks()));

MicroBatch batch1 = MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch.endFileIndex(), 2, 5L, true);
MicroBatch batch1 =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch.endFileIndex(), 2, 5L, true);
Assert.assertEquals(batch1.endFileIndex(), 2);
Assert.assertEquals(batch1.sizeInBytes(), 10);
filesMatch(Lists.newArrayList("B"), filesToScan(batch1.tasks()));
Assert.assertFalse(batch1.lastIndexOfSnapshot());

MicroBatch batch2 = MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch1.endFileIndex(), 3, 10L, true);
MicroBatch batch2 =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch1.endFileIndex(), 3, 10L, true);
Assert.assertEquals(batch2.endFileIndex(), 3);
Assert.assertEquals(batch2.sizeInBytes(), 10);
filesMatch(Lists.newArrayList("C"), filesToScan(batch2.tasks()));
Assert.assertFalse(batch2.lastIndexOfSnapshot());

MicroBatch batch3 = MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch2.endFileIndex(), 4, 10L, true);
MicroBatch batch3 =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch2.endFileIndex(), 4, 10L, true);
Assert.assertEquals(batch3.endFileIndex(), 4);
Assert.assertEquals(batch3.sizeInBytes(), 10);
filesMatch(Lists.newArrayList("D"), filesToScan(batch3.tasks()));
Assert.assertFalse(batch3.lastIndexOfSnapshot());

MicroBatch batch4 = MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch3.endFileIndex(), 5, 5L, true);
MicroBatch batch4 =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch3.endFileIndex(), 5, 5L, true);
Assert.assertEquals(batch4.endFileIndex(), 5);
Assert.assertEquals(batch4.sizeInBytes(), 10);
filesMatch(Lists.newArrayList("E"), filesToScan(batch4.tasks()));
Assert.assertTrue(batch4.lastIndexOfSnapshot());

MicroBatch batch5 = MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch4.endFileIndex(), 5, 5L, true);
MicroBatch batch5 =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch4.endFileIndex(), 5, 5L, true);
Assert.assertEquals(batch5.endFileIndex(), 5);
Assert.assertEquals(batch5.sizeInBytes(), 0);
Assert.assertTrue(Iterables.isEmpty(batch5.tasks()));
Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,15 @@ private List<FileScanTask> planFiles(StreamingOffset startOffset, StreamingOffse
}

Snapshot currentSnapshot = table.snapshot(currentOffset.snapshotId());
MicroBatch latestMicroBatch = MicroBatches.from(currentSnapshot, table.io())
.caseSensitive(caseSensitive)
.specsById(table.specs())
.generate(currentOffset.position(), Iterables.size(currentSnapshot.addedFiles()), Long.MAX_VALUE,
currentOffset.shouldScanAllFiles());
MicroBatch latestMicroBatch =
MicroBatches.from(currentSnapshot, table.io())
.caseSensitive(caseSensitive)
.specsById(table.specs())
.generate(
currentOffset.position(),
Iterables.size(currentSnapshot.addedFiles()),
Long.MAX_VALUE,
currentOffset.shouldScanAllFiles());

fileScanTasks.addAll(latestMicroBatch.tasks());
} while (currentOffset.snapshotId() != endOffset.snapshotId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,14 +230,16 @@ public Long endTimestamp() {
}

public Integer maxFilesPerMicroBatch() {
return confParser.intConf()
return confParser
.intConf()
.option(SparkReadOptions.MAX_FILES_PER_MICRO_BATCH)
.defaultValue(Integer.MAX_VALUE)
.parse();
}

public Integer maxRecordsPerMicroBatch() {
return confParser.intConf()
return confParser
.intConf()
.option(SparkReadOptions.MAX_ROWS_PER_MICRO_BATCH)
.defaultValue(Integer.MAX_VALUE)
.parse();
Expand Down
Loading

0 comments on commit b3d2e07

Please sign in to comment.