Skip to content

Commit 3e1cd20

Browse files
author
XuQianJin-Stars
committed
Support structured streaming read for Iceberg
1 parent 01995a9 commit 3e1cd20

File tree

1 file changed

+9
-10
lines changed

1 file changed

+9
-10
lines changed

spark2/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java

+9-10
Original file line numberDiff line numberDiff line change
@@ -271,23 +271,22 @@ private StreamingOffset calculateEndOffset(StreamingOffset start) {
271271
}
272272

273273
/**
274-
* Streaming Read control is performed by changing the startOffset and maxSize.
274+
* Streaming Read control is performed by changing the offset and maxSize.
275275
*
276-
* @param startOffset The start offset to scan from
276+
* @param offset The start offset to scan from
277277
* @param maxSize The maximum size of Bytes can calculate how many batches
278278
* @return MicroBatch of list
279279
*/
280280
@VisibleForTesting
281-
@SuppressWarnings("checkstyle:HiddenField")
282-
List<MicroBatch> getChangesWithRateLimit(StreamingOffset startOffset, long maxSize) {
281+
List<MicroBatch> getChangesWithRateLimit(StreamingOffset offset, long maxSize) {
283282
List<MicroBatch> batches = Lists.newArrayList();
284283
long currentLeftSize = maxSize;
285284
MicroBatch lastBatch = null;
286285

287-
assertNoOverwrite(table.snapshot(startOffset.snapshotId()));
288-
if (shouldGenerateFromStartOffset(startOffset)) {
289-
MicroBatch batch = generateMicroBatch(startOffset.snapshotId(), startOffset.position(),
290-
startOffset.shouldScanAllFiles(), currentLeftSize);
286+
assertNoOverwrite(table.snapshot(offset.snapshotId()));
287+
if (shouldGenerateFromStartOffset(offset)) {
288+
MicroBatch batch = generateMicroBatch(offset.snapshotId(), offset.position(),
289+
offset.shouldScanAllFiles(), currentLeftSize);
291290
if (!batch.tasks().isEmpty()) {
292291
batches.add(batch);
293292
currentLeftSize -= batch.sizeInBytes();
@@ -301,13 +300,13 @@ List<MicroBatch> getChangesWithRateLimit(StreamingOffset startOffset, long maxSi
301300
}
302301

303302
long currentSnapshotId = table.currentSnapshot().snapshotId();
304-
if (currentSnapshotId == startOffset.snapshotId()) {
303+
if (currentSnapshotId == offset.snapshotId()) {
305304
// the snapshot of current offset is already the latest snapshot of this table.
306305
return batches;
307306
}
308307

309308
ImmutableList<Long> snapshotIds = ImmutableList.<Long>builder()
310-
.addAll(SnapshotUtil.snapshotIdsBetween(table, startOffset.snapshotId(), currentSnapshotId))
309+
.addAll(SnapshotUtil.snapshotIdsBetween(table, offset.snapshotId(), currentSnapshotId))
311310
.build()
312311
.reverse();
313312

0 commit comments

Comments
 (0)