Skip to content

Commit

Permalink
minor refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Prashant Singh committed Apr 4, 2022
1 parent bf643b3 commit 14983d3
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 26 deletions.
8 changes: 5 additions & 3 deletions core/src/main/java/org/apache/iceberg/MicroBatches.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public MicroBatchBuilder specsById(Map<Integer, PartitionSpec> specs) {

public MicroBatch generate(long startFileIndex, long endFileIndex, long targetSizeInBytes, boolean scanAllFiles) {
Preconditions.checkArgument(startFileIndex >= 0, "startFileIndex is unexpectedly smaller than 0");
Preconditions.checkArgument(endFileIndex >= 0, "endFileIndex is unexpectedly smaller than 0");
Preconditions.checkArgument(targetSizeInBytes > 0, "targetSizeInBytes should be larger than 0");

List<ManifestFile> manifests = scanAllFiles ? snapshot.dataManifests() :
Expand Down Expand Up @@ -175,14 +176,15 @@ private static List<Pair<ManifestFile, Integer>> skipManifests(List<Pair<Manifes
*
* @param indexedManifests A list of indexed manifests to generate MicroBatch
* @param startFileIndex A startFileIndex used to skip processed files.
* @param endFileIndex An endFileIndex used to find files to include, it not inclusive.
* @param targetSizeInBytes Used to control the size of MicroBatch, the processed file bytes must be smaller than
* this size.
* @param scanAllFiles Used to check whether all the data files should be processed, or only added files.
* @return A MicroBatch.
*/
private MicroBatch generateMicroBatch(List<Pair<ManifestFile, Integer>> indexedManifests,
long startFileIndex, long endFileIndex,long targetSizeInBytes,
boolean scanAllFiles) {
private MicroBatch generateMicroBatch(
List<Pair<ManifestFile, Integer>> indexedManifests,
long startFileIndex, long endFileIndex, long targetSizeInBytes, boolean scanAllFiles) {
if (indexedManifests.isEmpty()) {
return new MicroBatch(snapshot.snapshotId(), startFileIndex, endFileIndex, 0L,
Collections.emptyList(), true);
Expand Down
14 changes: 7 additions & 7 deletions core/src/test/java/org/apache/iceberg/TestMicroBatchBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,23 +62,23 @@ public void testGenerateMicroBatch() {

MicroBatch batch1 = MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(0, 1,15L, true);
.generate(0, 1, 15L, true);
Assert.assertEquals(batch1.endFileIndex(), 1);
Assert.assertEquals(batch1.sizeInBytes(), 10);
Assert.assertFalse(batch1.lastIndexOfSnapshot());
filesMatch(Lists.newArrayList("A"), filesToScan(batch1.tasks()));

MicroBatch batch2 = MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch1.endFileIndex(), 4,30L, true);
.generate(batch1.endFileIndex(), 4, 30L, true);
Assert.assertEquals(batch2.endFileIndex(), 4);
Assert.assertEquals(batch2.sizeInBytes(), 30);
Assert.assertFalse(batch2.lastIndexOfSnapshot());
filesMatch(Lists.newArrayList("B", "C", "D"), filesToScan(batch2.tasks()));

MicroBatch batch3 = MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch2.endFileIndex(), 5,50L, true);
.generate(batch2.endFileIndex(), 5, 50L, true);
Assert.assertEquals(batch3.endFileIndex(), 5);
Assert.assertEquals(batch3.sizeInBytes(), 10);
Assert.assertTrue(batch3.lastIndexOfSnapshot());
Expand All @@ -101,15 +101,15 @@ public void testGenerateMicroBatchWithSmallTargetSize() {

MicroBatch batch1 = MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch.endFileIndex(), 2,5L, true);
.generate(batch.endFileIndex(), 2, 5L, true);
Assert.assertEquals(batch1.endFileIndex(), 2);
Assert.assertEquals(batch1.sizeInBytes(), 10);
filesMatch(Lists.newArrayList("B"), filesToScan(batch1.tasks()));
Assert.assertFalse(batch1.lastIndexOfSnapshot());

MicroBatch batch2 = MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch1.endFileIndex(), 3,10L, true);
.generate(batch1.endFileIndex(), 3, 10L, true);
Assert.assertEquals(batch2.endFileIndex(), 3);
Assert.assertEquals(batch2.sizeInBytes(), 10);
filesMatch(Lists.newArrayList("C"), filesToScan(batch2.tasks()));
Expand All @@ -125,7 +125,7 @@ public void testGenerateMicroBatchWithSmallTargetSize() {

MicroBatch batch4 = MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch3.endFileIndex(), 5,5L, true);
.generate(batch3.endFileIndex(), 5, 5L, true);
Assert.assertEquals(batch4.endFileIndex(), 5);
Assert.assertEquals(batch4.sizeInBytes(), 10);
filesMatch(Lists.newArrayList("E"), filesToScan(batch4.tasks()));
Expand Down Expand Up @@ -180,4 +180,4 @@ private static void filesMatch(List<String> expected, List<String> actual) {
Collections.sort(actual);
Assert.assertEquals(expected, actual);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,11 @@ public void stop() {
private List<FileScanTask> planFiles(StreamingOffset startOffset, StreamingOffset endOffset) {
List<FileScanTask> fileScanTasks = Lists.newArrayList();
StreamingOffset batchStartOffset = StreamingOffset.START_OFFSET.equals(startOffset) ?
determineStartingOffset(table, fromTimestamp, 0) : startOffset;
determineStartingOffset(table, fromTimestamp) : startOffset;

StreamingOffset currentOffset = null;
boolean isEndOffset;

// (11, 20), (21, 25) .... (26 ...40)
// [startFileIndex, endFileIndex)
// [(startOffset : startFileIndex), (endOffset : endFileIndex) )
do {
long startFileIndex = 0;
long endFileIndex;
Expand All @@ -195,6 +193,7 @@ private List<FileScanTask> planFiles(StreamingOffset startOffset, StreamingOffse
startFileIndex = batchStartOffset.position();
} else {
Snapshot snapshotAfter = SnapshotUtil.snapshotAfter(table, currentOffset.snapshotId());
// it may happen that we need to read this snapshot partially in case it's equal to endOffset.
if (currentOffset.snapshotId() != endOffset.snapshotId()) {
currentOffset = new StreamingOffset(snapshotAfter.snapshotId(), 0L, false);
} else {
Expand Down Expand Up @@ -249,14 +248,14 @@ private boolean shouldProcess(Snapshot snapshot) {
}
}

private static StreamingOffset determineStartingOffset(Table table, Long fromTimestamp, int startPos) {
private static StreamingOffset determineStartingOffset(Table table, Long fromTimestamp) {
if (table.currentSnapshot() == null) {
return StreamingOffset.START_OFFSET;
}

if (fromTimestamp == null) {
// match existing behavior and start from the oldest snapshot
return new StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), startPos, false);
return new StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false);
}

if (table.currentSnapshot().timestampMillis() < fromTimestamp) {
Expand All @@ -266,13 +265,13 @@ private static StreamingOffset determineStartingOffset(Table table, Long fromTim
try {
Snapshot snapshot = SnapshotUtil.oldestAncestorAfter(table, fromTimestamp);
if (snapshot != null) {
return new StreamingOffset(snapshot.snapshotId(), startPos, false);
return new StreamingOffset(snapshot.snapshotId(), 0, false);
} else {
return StreamingOffset.START_OFFSET;
}
} catch (IllegalStateException e) {
// could not determine the first snapshot after the timestamp. use the oldest ancestor instead
return new StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), startPos, false);
return new StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false);
}
}

Expand All @@ -295,8 +294,7 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) {
StreamingOffset curOffset = (StreamingOffset) startOffset;

if (startOffset.equals(StreamingOffset.START_OFFSET)) {
// find the oldest ancestor in this case of the table
curOffset = determineStartingOffset(table, fromTimestamp, 0);
curOffset = determineStartingOffset(table, fromTimestamp);
}

Snapshot curSnapshot = table.snapshot(curOffset.snapshotId());
Expand All @@ -307,9 +305,7 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) {
int curRecordCount = 0;
int curPos = 0;

// >= pos
// (>= pos && <= endPos)

// Note : we produce nextOffset with pos as non-inclusive
while (isOk) {
// start iterating the current Snapshot's added Files
// this is under assumption we will be able to add at-least 1 file in the new offset
Expand Down Expand Up @@ -342,8 +338,7 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) {
}
}

StreamingOffset so = new StreamingOffset(curSnapshot.snapshotId(), curPos, false);
return so;
return new StreamingOffset(curSnapshot.snapshotId(), curPos, false);
}

@Override
Expand Down Expand Up @@ -382,7 +377,7 @@ public StreamingOffset initialOffset() {
}

table.refresh();
StreamingOffset offset = determineStartingOffset(table, fromTimestamp, 0);
StreamingOffset offset = determineStartingOffset(table, fromTimestamp);

OutputFile outputFile = io.newOutputFile(initialOffsetLocation);
writeOffset(offset, outputFile);
Expand Down

0 comments on commit 14983d3

Please sign in to comment.