Skip to content

Commit afbb6d9

Browse files
author
XuQianJin-Stars
committed
Support structured streaming read for Iceberg
1 parent 56940b0 commit afbb6d9

File tree

4 files changed

+26
-19
lines changed

4 files changed

+26
-19
lines changed

core/src/main/java/org/apache/iceberg/MicroBatches.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ private static List<Pair<ManifestFile, Integer>> indexManifests(List<ManifestFil
153153
* startFileIndex.
154154
*/
155155
private static List<Pair<ManifestFile, Integer>> skipManifests(List<Pair<ManifestFile, Integer>> indexedManifests,
156-
long startFileIndex) {
156+
long startFileIndex) {
157157
if (startFileIndex == 0) {
158158
return indexedManifests;
159159
}

core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java

+1-14
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
package org.apache.iceberg.util;
2121

2222
import java.util.List;
23-
import java.util.concurrent.atomic.AtomicBoolean;
2423
import java.util.function.Function;
2524
import org.apache.iceberg.DataFile;
2625
import org.apache.iceberg.Snapshot;
@@ -70,20 +69,8 @@ public static List<Long> currentAncestors(Table table) {
7069
* This method assumes that fromSnapshotId is an ancestor of toSnapshotId.
7170
*/
7271
public static List<Long> snapshotIdsBetween(Table table, long fromSnapshotId, long toSnapshotId) {
73-
AtomicBoolean isAncestor = new AtomicBoolean(false);
7472
List<Long> snapshotIds = Lists.newArrayList(ancestorIds(table.snapshot(toSnapshotId),
75-
snapshotId -> {
76-
if (snapshotId == fromSnapshotId) {
77-
isAncestor.set(true);
78-
return null;
79-
} else {
80-
return table.snapshot(snapshotId);
81-
}
82-
}));
83-
if (!isAncestor.get()) {
84-
throw new IllegalStateException(fromSnapshotId + " is not an ancestor of " + toSnapshotId);
85-
}
86-
73+
snapshotId -> snapshotId != fromSnapshotId ? table.snapshot(snapshotId) : null));
8774
return snapshotIds;
8875
}
8976

spark2/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java

+24-3
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ public void stop() {
167167

168168
@Override
169169
public boolean enableBatchRead() {
170-
return readUsingBatch == null ? false : readUsingBatch;
170+
return readUsingBatch != null && readUsingBatch;
171171
}
172172

173173
@Override
@@ -215,6 +215,12 @@ protected List<CombinedScanTask> tasks() {
215215
return combinedScanTasks;
216216
}
217217

218+
/**
219+
* Used to calculate start offset. If the startSnapshotId has a value, start the construction
220+
* from the specified snapshot, otherwise, start the construction from the beginning.
221+
*
222+
* @return The start offset to scan from.
223+
*/
218224
private StreamingOffset calculateStartingOffset() {
219225
StreamingOffset startingOffset;
220226
if (startSnapshotId != null) {
@@ -232,6 +238,12 @@ private StreamingOffset calculateStartingOffset() {
232238
return startingOffset;
233239
}
234240

241+
/**
242+
* Used to calculate end offset.
243+
*
244+
* @param start The start offset to scan from
245+
* @return The end offset to scan to
246+
*/
235247
private StreamingOffset calculateEndOffset(StreamingOffset start) {
236248
if (start.equals(StreamingOffset.START_OFFSET)) {
237249
return StreamingOffset.START_OFFSET;
@@ -255,6 +267,13 @@ private StreamingOffset calculateEndOffset(StreamingOffset start) {
255267
}
256268
}
257269

270+
/**
271+
* Streaming Read control is performed by changing the startOffset and maxSize.
272+
*
273+
* @param startOffset The start offset to scan from
274+
* @param maxSize The maximum size of Bytes can calculate how many batches
275+
* @return MicroBatch of list
276+
*/
258277
@VisibleForTesting
259278
@SuppressWarnings("checkstyle:HiddenField")
260279
List<MicroBatch> getChangesWithRateLimit(StreamingOffset startOffset, long maxSize) {
@@ -335,9 +354,11 @@ private boolean shouldGenerateFromStartOffset(StreamingOffset startOffset) {
335354
}
336355

337356
private static void assertNoOverwrite(Snapshot snapshot) {
338-
if (snapshot.operation().equals(DataOperations.OVERWRITE)) {
357+
if (snapshot.operation().equals(DataOperations.OVERWRITE) ||
358+
snapshot.operation().equals(DataOperations.REPLACE) ||
359+
snapshot.operation().equals(DataOperations.DELETE)) {
339360
throw new UnsupportedOperationException(String.format("Found %s operation, cannot support incremental data for " +
340-
"snapshot %d", DataOperations.OVERWRITE, snapshot.snapshotId()));
361+
"snapshot %d", snapshot.operation(), snapshot.snapshotId()));
341362
}
342363
}
343364

spark2/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead.java

-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import org.apache.iceberg.Table;
3737
import org.apache.iceberg.hadoop.HadoopTables;
3838
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
39-
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
4039
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
4140
import org.apache.iceberg.types.Types;
4241
import org.apache.iceberg.util.SnapshotUtil;

0 commit comments

Comments
 (0)