Skip to content

Commit

Permalink
use manifest index for skipping complete data file
Browse files Browse the repository at this point in the history
  • Loading branch information
Prashant Singh committed Apr 12, 2022
1 parent b9b1de5 commit 471b53d
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 94 deletions.
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/GenericDataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Types;

public class GenericDataFile extends BaseFile<DataFile> implements DataFile {
class GenericDataFile extends BaseFile<DataFile> implements DataFile {
/**
* Used by Avro reflection to instantiate this class when reading manifest files.
*/
Expand Down
78 changes: 5 additions & 73 deletions core/src/main/java/org/apache/iceberg/MicroBatches.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.Pair;
import org.slf4j.Logger;
Expand Down Expand Up @@ -110,65 +108,12 @@ public MicroBatchBuilder specsById(Map<Integer, PartitionSpec> specs) {
}

public MicroBatch generate(long startFileIndex, long endFileIndex, long targetSizeInBytes, boolean scanAllFiles) {
Preconditions.checkArgument(startFileIndex >= 0, "startFileIndex is unexpectedly smaller than 0");
Preconditions.checkArgument(endFileIndex >= 0, "endFileIndex is unexpectedly smaller than 0");
Preconditions.checkArgument(targetSizeInBytes > 0, "targetSizeInBytes should be larger than 0");

List<ManifestFile> manifests = scanAllFiles ? snapshot.dataManifests() :
snapshot.dataManifests().stream().filter(m -> m.snapshotId().equals(snapshot.snapshotId()))
.collect(Collectors.toList());

List<Pair<ManifestFile, Integer>> manifestIndexes = indexManifests(manifests);
List<Pair<ManifestFile, Integer>> skippedManifestIndexes = skipManifests(manifestIndexes, startFileIndex);

return generateMicroBatch(skippedManifestIndexes, startFileIndex, endFileIndex, targetSizeInBytes, scanAllFiles);
}

/**
* Method to index the data files for each manifest. For example, if manifest m1 has 3 data files, manifest
* m2 has 2 data files, manifest m3 has 1 data file, then the index will be (m1, 0), (m2, 3), (m3, 5).
*
* @param manifestFiles List of input manifests used to index.
* @return a list of manifest index with key as manifest file, value as file counts.
*/
private static List<Pair<ManifestFile, Integer>> indexManifests(List<ManifestFile> manifestFiles) {
int currentFileIndex = 0;
List<Pair<ManifestFile, Integer>> manifestIndexes = Lists.newArrayList();

for (ManifestFile manifest : manifestFiles) {
manifestIndexes.add(Pair.of(manifest, currentFileIndex));
currentFileIndex += manifest.addedFilesCount() + manifest.existingFilesCount();
}

return manifestIndexes;
}

/**
* Method to skip the manifest file in which the index is smaller than startFileIndex. For example, if the
* index list is : (m1, 0), (m2, 3), (m3, 5), and startFileIndex is 4, then the returned manifest index list is:
* (m2, 3), (m3, 5).
*
* @param indexedManifests List of input manifests.
* @param startFileIndex Index used to skip the processed manifests.
* @return a sub-list of manifest file index which only contains the manifest indexes larger than the
* startFileIndex.
*/
private static List<Pair<ManifestFile, Integer>> skipManifests(List<Pair<ManifestFile, Integer>> indexedManifests,
long startFileIndex) {
if (startFileIndex == 0) {
return indexedManifests;
}

int manifestIndex = 0;
for (Pair<ManifestFile, Integer> manifest : indexedManifests) {
if (manifest.second() > startFileIndex) {
break;
}

manifestIndex++;
}

return indexedManifests.subList(manifestIndex - 1, indexedManifests.size());
return generateMicroBatch(
MicroBatchesUtil.skippedManifestIndexesFromSnapshot(snapshot, startFileIndex, scanAllFiles),
startFileIndex, endFileIndex, targetSizeInBytes, scanAllFiles);
}

/**
Expand Down Expand Up @@ -199,7 +144,8 @@ private MicroBatch generateMicroBatch(
for (int idx = 0; idx < indexedManifests.size(); idx++) {
currentFileIndex = indexedManifests.get(idx).second();

try (CloseableIterable<FileScanTask> taskIterable = open(indexedManifests.get(idx).first(), scanAllFiles);
try (CloseableIterable<FileScanTask> taskIterable = MicroBatchesUtil.openManifestFile(io, specsById,
caseSensitive, snapshot, indexedManifests.get(idx).first(), scanAllFiles);
CloseableIterator<FileScanTask> taskIter = taskIterable.iterator()) {
while (taskIter.hasNext()) {
FileScanTask task = taskIter.next();
Expand Down Expand Up @@ -243,19 +189,5 @@ private MicroBatch generateMicroBatch(
return new MicroBatch(snapshot.snapshotId(), startFileIndex, currentFileIndex, currentSizeInBytes,
tasks, isLastIndex);
}

private CloseableIterable<FileScanTask> open(ManifestFile manifestFile, boolean scanAllFiles) {
ManifestGroup manifestGroup = new ManifestGroup(io, ImmutableList.of(manifestFile))
.specsById(specsById)
.caseSensitive(caseSensitive);
if (!scanAllFiles) {
manifestGroup = manifestGroup
.filterManifestEntries(entry ->
entry.snapshotId() == snapshot.snapshotId() && entry.status() == ManifestEntry.Status.ADDED)
.ignoreDeleted();
}

return manifestGroup.planFiles();
}
}
}
112 changes: 112 additions & 0 deletions core/src/main/java/org/apache/iceberg/MicroBatchesUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/


package org.apache.iceberg;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.Pair;

public class MicroBatchesUtil {

private MicroBatchesUtil() {
}

public static List<Pair<ManifestFile, Integer>> skippedManifestIndexesFromSnapshot(
Snapshot snapshot, long startFileIndex, boolean scanAllFiles) {
Preconditions.checkArgument(startFileIndex >= 0, "startFileIndex is unexpectedly smaller than 0");
List<ManifestFile> manifests = scanAllFiles ? snapshot.dataManifests() :
snapshot.dataManifests().stream().filter(m -> m.snapshotId().equals(snapshot.snapshotId()))
.collect(Collectors.toList());

List<Pair<ManifestFile, Integer>> manifestIndexes = MicroBatchesUtil.indexManifests(manifests);

return MicroBatchesUtil.skipManifests(manifestIndexes, startFileIndex);
}

public static CloseableIterable<FileScanTask> openManifestFile(FileIO io, Map<Integer, PartitionSpec> specsById,
boolean caseSensitive, Snapshot snapshot, ManifestFile manifestFile, boolean scanAllFiles) {

ManifestGroup manifestGroup = new ManifestGroup(io, ImmutableList.of(manifestFile))
.specsById(specsById)
.caseSensitive(caseSensitive);
if (!scanAllFiles) {
manifestGroup = manifestGroup
.filterManifestEntries(entry ->
entry.snapshotId() == snapshot.snapshotId() && entry.status() == ManifestEntry.Status.ADDED)
.ignoreDeleted();
}

return manifestGroup.planFiles();
}

/**
* Method to index the data files for each manifest. For example, if manifest m1 has 3 data files, manifest
* m2 has 2 data files, manifest m3 has 1 data file, then the index will be (m1, 0), (m2, 3), (m3, 5).
*
* @param manifestFiles List of input manifests used to index.
* @return a list of manifest index with key as manifest file, value as file counts.
*/
private static List<Pair<ManifestFile, Integer>> indexManifests(List<ManifestFile> manifestFiles) {
int currentFileIndex = 0;
List<Pair<ManifestFile, Integer>> manifestIndexes = Lists.newArrayList();

for (ManifestFile manifest : manifestFiles) {
manifestIndexes.add(Pair.of(manifest, currentFileIndex));
currentFileIndex += manifest.addedFilesCount() + manifest.existingFilesCount();
}

return manifestIndexes;
}

/**
* Method to skip the manifest file in which the index is smaller than startFileIndex. For example, if the
* index list is : (m1, 0), (m2, 3), (m3, 5), and startFileIndex is 4, then the returned manifest index list is:
* (m2, 3), (m3, 5).
*
* @param indexedManifests List of input manifests.
* @param startFileIndex Index used to skip the processed manifests.
* @return a sub-list of manifest file index which only contains the manifest indexes larger than the
* startFileIndex.
*/
private static List<Pair<ManifestFile, Integer>> skipManifests(List<Pair<ManifestFile, Integer>> indexedManifests,
long startFileIndex) {
if (startFileIndex == 0) {
return indexedManifests;
}

int manifestIndex = 0;
for (Pair<ManifestFile, Integer> manifest : indexedManifests) {
if (manifest.second() > startFileIndex) {
break;
}

manifestIndex++;
}

return indexedManifests.subList(manifestIndex - 1, indexedManifests.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,19 @@
import java.util.List;
import java.util.Locale;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataOperations;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.GenericDataFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.MicroBatches;
import org.apache.iceberg.MicroBatches.MicroBatch;
import org.apache.iceberg.MicroBatchesUtil;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
Expand All @@ -52,6 +53,7 @@
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.source.SparkScan.ReadTask;
import org.apache.iceberg.spark.source.SparkScan.ReaderFactory;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.iceberg.util.Tasks;
Expand Down Expand Up @@ -274,6 +276,7 @@ private static StreamingOffset determineStartingOffset(Table table, Long fromTim
}

@Override
@SuppressWarnings("checkstyle:CyclomaticComplexity")
public Offset latestOffset(Offset startOffset, ReadLimit limit) {
// calculate end offset get snapshotId from the startOffset
Preconditions.checkArgument(
Expand All @@ -298,31 +301,47 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) {
Snapshot curSnapshot = table.snapshot(startingOffset.snapshotId());
int startPosOfSnapOffset = (int) startingOffset.position();

boolean scanAllFiles = startingOffset.shouldScanAllFiles();

List<Pair<ManifestFile, Integer>> indexedManifests = MicroBatchesUtil.skippedManifestIndexesFromSnapshot(
curSnapshot, startPosOfSnapOffset, scanAllFiles);

boolean isOk = true;
int curFilesAdded = 0;
int curRecordCount = 0;
int curPos = 0;

// Note : we produce nextOffset with pos as non-inclusive
while (isOk) {
// start iterating the current Snapshot's added Files
// this is under assumption we will be able to add at-least 1 file in the new offset
curPos = 0;
for (DataFile dataFile : curSnapshot.addedFiles()) {
GenericDataFile genericDataFile = (GenericDataFile) dataFile;
if (curPos >= startPosOfSnapOffset) {
// TODO : use readLimit provided in function param, the readLimits are derived from these 2 properties.
if ((curFilesAdded + 1) > maxFilesPerMicroBatch ||
(curRecordCount + genericDataFile.recordCount()) > maxRecordsPerMicroBatch) {
isOk = false;
break;
for (int idx = 0; idx < indexedManifests.size() && isOk; idx++) {
// be rest assured curPos >= startFileIndex
curPos = indexedManifests.get(idx).second();
try (CloseableIterable<FileScanTask> taskIterable = MicroBatchesUtil.openManifestFile(table.io(),
table.specs(),
caseSensitive,
curSnapshot,
indexedManifests.get(idx).first(),
scanAllFiles);
CloseableIterator<FileScanTask> taskIter = taskIterable.iterator()) {
while (taskIter.hasNext()) {
FileScanTask task = taskIter.next();
if (curPos >= startPosOfSnapOffset) {
// TODO : use readLimit provided in function param, the readLimits are derived from these 2 properties.
if ((curFilesAdded + 1) > maxFilesPerMicroBatch ||
(curRecordCount + task.file().recordCount()) > maxRecordsPerMicroBatch) {
isOk = false;
break;
}

curFilesAdded += 1;
curRecordCount += task.file().recordCount();
}
++curPos;
}

curFilesAdded += 1;
curRecordCount += genericDataFile.recordCount();
} catch (IOException ioe) {
LOG.warn("Failed to close task iterable", ioe);
}

++curPos;
}
// if the currentSnapShot was also the mostRecentSnapshot then break
if (curSnapshot.snapshotId() == table.currentSnapshot().snapshotId()) {
Expand All @@ -333,10 +352,12 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) {
if (isOk) {
startPosOfSnapOffset = -1;
curSnapshot = SnapshotUtil.snapshotAfter(table, curSnapshot.snapshotId());
// if anyhow we are moving to next snapshot we should only scan addedFiles
scanAllFiles = false;
}
}

StreamingOffset latestStreamingOffset = new StreamingOffset(curSnapshot.snapshotId(), curPos, false);
StreamingOffset latestStreamingOffset = new StreamingOffset(curSnapshot.snapshotId(), curPos, scanAllFiles);

// if no new data arrived, then return null.
return latestStreamingOffset.equals(startingOffset) ? null : latestStreamingOffset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,15 @@ public void testReadStreamOnIcebergThenAddData() throws Exception {
public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfFiles_1() throws Exception {
appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);

Assert.assertEquals(6,
Assert.assertEquals(4,
microBatchCount(ImmutableMap.of(SparkReadOptions.MAX_FILES_PER_MICRO_BATCH, "1")));
}

@Test
public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfFiles_2() throws Exception {
appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);

Assert.assertEquals(3,
Assert.assertEquals(2,
microBatchCount(ImmutableMap.of(SparkReadOptions.MAX_FILES_PER_MICRO_BATCH, "2")));
}

Expand Down

0 comments on commit 471b53d

Please sign in to comment.