Skip to content

Commit

Permalink
rebase changes
Browse files Browse the repository at this point in the history
  • Loading branch information
singhpk234 committed Jan 8, 2023
1 parent b3d2e07 commit 149f406
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 8 deletions.
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/MicroBatches.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public MicroBatch generate(

return generateMicroBatch(
MicroBatchesUtil.skippedManifestIndexesFromSnapshot(
snapshot, startFileIndex, scanAllFiles),
io, snapshot, startFileIndex, scanAllFiles),
startFileIndex,
endFileIndex,
targetSizeInBytes,
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/java/org/apache/iceberg/MicroBatchesUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ public class MicroBatchesUtil {
private MicroBatchesUtil() {}

public static List<Pair<ManifestFile, Integer>> skippedManifestIndexesFromSnapshot(
Snapshot snapshot, long startFileIndex, boolean scanAllFiles) {
FileIO io, Snapshot snapshot, long startFileIndex, boolean scanAllFiles) {
// Preconditions.checkArgument(startFileIndex >= 0, "startFileIndex is unexpectedly smaller
// than 0");
List<ManifestFile> manifests =
scanAllFiles
? snapshot.dataManifests()
: snapshot.dataManifests().stream()
? snapshot.dataManifests(io)
: snapshot.dataManifests(io).stream()
.filter(m -> m.snapshotId().equals(snapshot.snapshotId()))
.collect(Collectors.toList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ private List<FileScanTask> planFiles(StreamingOffset startOffset, StreamingOffse
.specsById(table.specs())
.generate(
currentOffset.position(),
Iterables.size(currentSnapshot.addedFiles()),
Iterables.size(currentSnapshot.addedDataFiles(table.io())),
Long.MAX_VALUE,
currentOffset.shouldScanAllFiles());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) {
// generate manifest index for the curSnapshot
List<Pair<ManifestFile, Integer>> indexedManifests =
MicroBatchesUtil.skippedManifestIndexesFromSnapshot(
curSnapshot, startPosOfSnapOffset, scanAllFiles);
table.io(), curSnapshot, startPosOfSnapOffset, scanAllFiles);
// this is under assumption we will be able to add at-least 1 file in the new offset
for (int idx = 0; idx < indexedManifests.size() && isOk; idx++) {
// be rest assured curPos >= startFileIndex
Expand Down Expand Up @@ -410,7 +410,9 @@ private long addedFilesCount(Snapshot snapshot) {
PropertyUtil.propertyAsLong(snapshot.summary(), SnapshotSummary.ADDED_FILES_PROP, -1);
// If snapshotSummary doesn't have SnapshotSummary.ADDED_FILES_PROP,
// iterate through addedFiles iterator to find addedFilesCount.
return addedFilesCount == -1 ? Iterables.size(snapshot.addedFiles()) : addedFilesCount;
return addedFilesCount == -1
? Iterables.size(snapshot.addedDataFiles(table.io()))
: addedFilesCount;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ private List<FileScanTask> planFiles(StreamingOffset startOffset, StreamingOffse
.specsById(table.specs())
.generate(
currentOffset.position(),
Iterables.size(currentSnapshot.addedFiles()),
Iterables.size(currentSnapshot.addedDataFiles(table.io())),
Long.MAX_VALUE,
currentOffset.shouldScanAllFiles());
fileScanTasks.addAll(latestMicroBatch.tasks());
Expand Down

0 comments on commit 149f406

Please sign in to comment.