Skip to content

Commit bacf28c

Browse files
author
XuQianJin-Stars
committed
Support structured streaming read for Iceberg
1 parent 7a68114 commit bacf28c

File tree

3 files changed

+8
-31
lines changed

3 files changed

+8
-31
lines changed

core/src/main/java/org/apache/iceberg/MicroBatches.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@
2121

2222
import java.io.IOException;
2323
import java.util.Collections;
24+
import java.util.Iterator;
2425
import java.util.List;
2526
import java.util.Map;
2627
import java.util.stream.Collectors;
2728
import org.apache.iceberg.io.CloseableIterable;
28-
import org.apache.iceberg.io.CloseableIterator;
2929
import org.apache.iceberg.io.FileIO;
3030
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
3131
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -198,7 +198,7 @@ private MicroBatch generateMicroBatch(List<Pair<ManifestFile, Integer>> indexedM
198198

199199
try (CloseableIterable<FileScanTask> taskIterable = open(indexedManifests.get(idx).first(),
200200
scanAllFiles)) {
201-
CloseableIterator<FileScanTask> taskIter = taskIterable.iterator();
201+
Iterator<FileScanTask> taskIter = taskIterable.iterator();
202202
while (taskIter.hasNext()) {
203203
FileScanTask task = taskIter.next();
204204
if (currentFileIndex >= startFileIndex) {

spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java

+1-27
Original file line numberDiff line numberDiff line change
@@ -329,33 +329,7 @@ public Statistics estimateStatistics() {
329329

330330
@Override
331331
public boolean enableBatchRead() {
332-
if (readUsingBatch == null) {
333-
boolean allParquetFileScanTasks =
334-
tasks().stream()
335-
.allMatch(combinedScanTask -> !combinedScanTask.isDataTask() && combinedScanTask.files()
336-
.stream()
337-
.allMatch(fileScanTask -> fileScanTask.file().format().equals(
338-
FileFormat.PARQUET)));
339-
340-
boolean allOrcFileScanTasks =
341-
tasks().stream()
342-
.allMatch(combinedScanTask -> !combinedScanTask.isDataTask() && combinedScanTask.files()
343-
.stream()
344-
.allMatch(fileScanTask -> fileScanTask.file().format().equals(
345-
FileFormat.ORC)));
346-
347-
boolean atLeastOneColumn = lazySchema().columns().size() > 0;
348-
349-
boolean onlyPrimitives = lazySchema().columns().stream().allMatch(c -> c.type().isPrimitiveType());
350-
351-
boolean hasNoDeleteFiles = tasks().stream().noneMatch(TableScanUtil::hasDeletes);
352-
353-
boolean batchReadsEnabled = batchReadsEnabled(allParquetFileScanTasks, allOrcFileScanTasks);
354-
355-
this.readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && (allOrcFileScanTasks ||
356-
(allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives));
357-
}
358-
return readUsingBatch;
332+
return readUsingBatch == null ? checkEnableBatchRead(tasks()) : readUsingBatch;
359333
}
360334

361335
private boolean batchReadsEnabled(boolean isParquetOnly, boolean isOrcOnly) {

spark2/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.util.Collections;
2323
import java.util.List;
24+
import java.util.Objects;
2425
import java.util.Optional;
2526
import java.util.stream.Collectors;
2627
import org.apache.commons.collections.CollectionUtils;
@@ -80,7 +81,9 @@ class StreamingReader extends Reader implements MicroBatchReader {
8081
private final long splitOpenFileCost;
8182
private Boolean readUsingBatch = null;
8283

83-
// Used to cache the pending batches for this streaming batch interval.
84+
/**
85+
* Used to cache the pending batches for this streaming batch interval.
86+
*/
8487
private Pair<StreamingOffset, List<MicroBatch>> cachedPendingBatches = null;
8588

8689
StreamingReader(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
@@ -324,7 +327,7 @@ List<MicroBatch> getChangesWithRateLimit(StreamingOffset startOffset, long maxSi
324327

325328
// If the current request size is already satisfied, or none of the DataFile size can satisfy the current left
326329
// size, break the current loop.
327-
if (currentLeftSize <= 0L || !lastBatch.lastIndexOfSnapshot()) {
330+
if (currentLeftSize <= 0L || !Objects.requireNonNull(lastBatch).lastIndexOfSnapshot()) {
328331
break;
329332
}
330333
}

0 commit comments

Comments
 (0)