Skip to content

Commit 41041f3

Browse files
Spark3 structured streaming micro_batch read support
This work is an extension of the idea in issue apache#179 & the Spark2 work done in PR apache#2272 - only that - this is for Spark3. **In the current implementation:** * Iceberg Snapshot is the upper bound for MicroBatch. A given MicroBatch will only Span within a Snapshot. It will not be composed of multiple Snapshots. BatchSize - is used to limit the number of files with in a given snapshot. * The streaming reader - will error out if it encounters any Snapshot of type NOT EQUAL to type `APPEND`. * Handling `DELETES`, `REPLACE` & `OVERWRITES` is something for future. * Columnar reads are not enabled. Something for future.
1 parent 1dcd926 commit 41041f3

File tree

4 files changed

+945
-3
lines changed

4 files changed

+945
-3
lines changed

spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java

+12-3
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.apache.spark.sql.connector.read.Scan;
5353
import org.apache.spark.sql.connector.read.Statistics;
5454
import org.apache.spark.sql.connector.read.SupportsReportStatistics;
55+
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
5556
import org.apache.spark.sql.types.StructType;
5657
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
5758
import org.apache.spark.sql.vectorized.ColumnarBatch;
@@ -62,6 +63,7 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics {
6263
private static final Logger LOG = LoggerFactory.getLogger(SparkBatchScan.class);
6364

6465
private final JavaSparkContext sparkContext;
66+
private final SparkSession spark;
6567
private final Table table;
6668
private final boolean caseSensitive;
6769
private final boolean localityPreferred;
@@ -76,6 +78,7 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics {
7678
SparkBatchScan(SparkSession spark, Table table, boolean caseSensitive, Schema expectedSchema,
7779
List<Expression> filters, CaseInsensitiveStringMap options) {
7880
this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
81+
this.spark = spark;
7982
this.table = table;
8083
this.caseSensitive = caseSensitive;
8184
this.expectedSchema = expectedSchema;
@@ -108,6 +111,12 @@ public Batch toBatch() {
108111
return this;
109112
}
110113

114+
@Override
115+
public MicroBatchStream toMicroBatchStream(String checkpointLocation) {
116+
return new SparkMicroBatchStream(
117+
spark, sparkContext, table, caseSensitive, expectedSchema, options, checkpointLocation);
118+
}
119+
111120
@Override
112121
public StructType readSchema() {
113122
if (readSchema == null) {
@@ -213,10 +222,10 @@ public String description() {
213222
return String.format("%s [filters=%s]", table, filters);
214223
}
215224

216-
private static class ReaderFactory implements PartitionReaderFactory {
225+
public static class ReaderFactory implements PartitionReaderFactory {
217226
private final int batchSize;
218227

219-
private ReaderFactory(int batchSize) {
228+
ReaderFactory(int batchSize) {
220229
this.batchSize = batchSize;
221230
}
222231

@@ -256,7 +265,7 @@ private static class BatchReader extends BatchDataReader implements PartitionRea
256265
}
257266
}
258267

259-
private static class ReadTask implements InputPartition, Serializable {
268+
public static class ReadTask implements InputPartition, Serializable {
260269
private final CombinedScanTask task;
261270
private final Broadcast<Table> tableBroadcast;
262271
private final String expectedSchemaString;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,330 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.spark.source;
21+
22+
import java.util.List;
23+
import java.util.Optional;
24+
import org.apache.hadoop.fs.Path;
25+
import org.apache.iceberg.CombinedScanTask;
26+
import org.apache.iceberg.DataOperations;
27+
import org.apache.iceberg.FileScanTask;
28+
import org.apache.iceberg.MicroBatches;
29+
import org.apache.iceberg.MicroBatches.MicroBatch;
30+
import org.apache.iceberg.MicroBatches.MicroBatchBuilder;
31+
import org.apache.iceberg.Schema;
32+
import org.apache.iceberg.SchemaParser;
33+
import org.apache.iceberg.SerializableTable;
34+
import org.apache.iceberg.Snapshot;
35+
import org.apache.iceberg.Table;
36+
import org.apache.iceberg.io.CloseableIterable;
37+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
38+
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
39+
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
40+
import org.apache.iceberg.spark.Spark3Util;
41+
import org.apache.iceberg.spark.SparkReadOptions;
42+
import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask;
43+
import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory;
44+
import org.apache.iceberg.util.PropertyUtil;
45+
import org.apache.iceberg.util.SnapshotUtil;
46+
import org.apache.iceberg.util.TableScanUtil;
47+
import org.apache.spark.api.java.JavaSparkContext;
48+
import org.apache.spark.broadcast.Broadcast;
49+
import org.apache.spark.sql.SparkSession;
50+
import org.apache.spark.sql.connector.read.InputPartition;
51+
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
52+
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
53+
import org.apache.spark.sql.connector.read.streaming.Offset;
54+
import org.apache.spark.sql.execution.streaming.OffsetSeq;
55+
import org.apache.spark.sql.execution.streaming.OffsetSeqLog;
56+
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
57+
import org.slf4j.Logger;
58+
import org.slf4j.LoggerFactory;
59+
import scala.Option;
60+
import scala.collection.JavaConverters;
61+
62+
import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
63+
import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT;
64+
import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
65+
import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT;
66+
import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
67+
import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
68+
69+
public class SparkMicroBatchStream implements MicroBatchStream {
70+
private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class);
71+
72+
private final JavaSparkContext sparkContext;
73+
private final Table table;
74+
private final boolean caseSensitive;
75+
private final Schema expectedSchema;
76+
private final int batchSize;
77+
private final Long splitSize;
78+
private final Integer splitLookback;
79+
private final Long splitOpenFileCost;
80+
private final boolean localityPreferred;
81+
private final OffsetLog offsetLog;
82+
83+
private StreamingOffset initialOffset = null;
84+
private PlannedEndOffset previousEndOffset = null;
85+
86+
SparkMicroBatchStream(SparkSession spark, JavaSparkContext sparkContext,
87+
Table table, boolean caseSensitive, Schema expectedSchema,
88+
CaseInsensitiveStringMap options, String checkpointLocation) {
89+
this.sparkContext = sparkContext;
90+
this.table = table;
91+
this.caseSensitive = caseSensitive;
92+
this.expectedSchema = expectedSchema;
93+
this.batchSize = Spark3Util.batchSize(table.properties(), options);
94+
this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), table.location(), options);
95+
this.splitSize = Optional.ofNullable(Spark3Util.propertyAsLong(options, SparkReadOptions.SPLIT_SIZE, null))
96+
.orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(), SPLIT_SIZE, SPLIT_SIZE_DEFAULT));
97+
this.splitLookback = Optional.ofNullable(Spark3Util.propertyAsInt(options, SparkReadOptions.LOOKBACK, null))
98+
.orElseGet(() -> PropertyUtil.propertyAsInt(table.properties(), SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT));
99+
this.splitOpenFileCost = Optional.ofNullable(
100+
Spark3Util.propertyAsLong(options, SparkReadOptions.FILE_OPEN_COST, null))
101+
.orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(), SPLIT_OPEN_FILE_COST,
102+
SPLIT_OPEN_FILE_COST_DEFAULT));
103+
this.offsetLog = OffsetLog.getInstance(spark, checkpointLocation);
104+
}
105+
106+
@Override
107+
public Offset latestOffset() {
108+
initialOffset();
109+
110+
if (isTableEmpty()) {
111+
return StreamingOffset.START_OFFSET;
112+
}
113+
114+
StreamingOffset microBatchStartOffset = isFirstBatch() ? initialOffset : previousEndOffset;
115+
if (isEndOfSnapshot(microBatchStartOffset)) {
116+
microBatchStartOffset = getNextAvailableSnapshot(microBatchStartOffset);
117+
}
118+
119+
previousEndOffset = calculateEndOffset(microBatchStartOffset);
120+
return previousEndOffset;
121+
}
122+
123+
@Override
124+
public InputPartition[] planInputPartitions(Offset start, Offset end) {
125+
if (end.equals(StreamingOffset.START_OFFSET)) {
126+
return new InputPartition[0];
127+
}
128+
129+
// broadcast the table metadata as input partitions will be sent to executors
130+
Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
131+
String expectedSchemaString = SchemaParser.toJson(expectedSchema);
132+
133+
Preconditions.checkState(
134+
end instanceof PlannedEndOffset,
135+
"The end offset passed to planInputPartitions() is not the one that is returned by lastOffset()");
136+
PlannedEndOffset endOffset = (PlannedEndOffset) end;
137+
138+
List<FileScanTask> fileScanTasks = endOffset.getMicroBatch().tasks();
139+
140+
CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
141+
CloseableIterable.withNoopClose(fileScanTasks),
142+
splitSize);
143+
List<CombinedScanTask> combinedScanTasks = Lists.newArrayList(
144+
TableScanUtil.planTasks(splitTasks, splitSize, splitLookback, splitOpenFileCost));
145+
InputPartition[] readTasks = new InputPartition[combinedScanTasks.size()];
146+
147+
for (int i = 0; i < combinedScanTasks.size(); i++) {
148+
readTasks[i] = new ReadTask(
149+
combinedScanTasks.get(i), tableBroadcast, expectedSchemaString,
150+
caseSensitive, localityPreferred);
151+
}
152+
153+
return readTasks;
154+
}
155+
156+
@Override
157+
public PartitionReaderFactory createReaderFactory() {
158+
int batchSizeValueToDisableColumnarReads = 0;
159+
return new ReaderFactory(batchSizeValueToDisableColumnarReads);
160+
}
161+
162+
@Override
163+
public Offset initialOffset() {
164+
if (isInitialOffsetResolved()) {
165+
return initialOffset;
166+
}
167+
168+
if (isStreamResumedFromCheckpoint()) {
169+
initialOffset = calculateInitialOffsetFromCheckpoint();
170+
return initialOffset;
171+
}
172+
173+
List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
174+
if (snapshotIds.isEmpty()) {
175+
initialOffset = StreamingOffset.START_OFFSET;
176+
Preconditions.checkState(isTableEmpty(),
177+
"criteria behind isTableEmpty() changed.");
178+
} else {
179+
initialOffset = new StreamingOffset(Iterables.getLast(snapshotIds), 0, true);
180+
}
181+
182+
return initialOffset;
183+
}
184+
185+
@Override
186+
public Offset deserializeOffset(String json) {
187+
return StreamingOffset.fromJson(json);
188+
}
189+
190+
@Override
191+
public void commit(Offset end) {
192+
}
193+
194+
@Override
195+
public void stop() {
196+
}
197+
198+
private boolean isInitialOffsetResolved() {
199+
return initialOffset != null;
200+
}
201+
202+
private StreamingOffset calculateInitialOffsetFromCheckpoint() {
203+
Preconditions.checkState(isStreamResumedFromCheckpoint(),
204+
"Stream is not resumed from checkpoint.");
205+
206+
return offsetLog.getLatest();
207+
}
208+
209+
private boolean isStreamResumedFromCheckpoint() {
210+
Preconditions.checkState(!isInitialOffsetResolved(),
211+
"isStreamResumedFromCheckpoint() is invoked without resolving initialOffset");
212+
213+
return offsetLog.isOffsetLogInitialized();
214+
}
215+
216+
private boolean isFirstBatch() {
217+
return previousEndOffset == null || previousEndOffset.equals(StreamingOffset.START_OFFSET);
218+
}
219+
220+
private boolean isTableEmpty() {
221+
Preconditions.checkState(isInitialOffsetResolved(),
222+
"isTableEmpty() is invoked without resolving initialOffset");
223+
224+
return initialOffset.equals(StreamingOffset.START_OFFSET);
225+
}
226+
227+
private StreamingOffset getNextAvailableSnapshot(StreamingOffset microBatchStartOffset) {
228+
if (table.currentSnapshot().snapshotId() == microBatchStartOffset.snapshotId()) {
229+
return microBatchStartOffset;
230+
}
231+
232+
Snapshot previousSnapshot = table.snapshot(microBatchStartOffset.snapshotId());
233+
Snapshot pointer = table.currentSnapshot();
234+
while (pointer != null && previousSnapshot.snapshotId() != pointer.parentId()) {
235+
Preconditions.checkState(pointer.operation().equals(DataOperations.APPEND),
236+
"Encountered Snapshot DataOperation other than APPEND.");
237+
238+
pointer = table.snapshot(pointer.parentId());
239+
}
240+
241+
Preconditions.checkState(pointer != null,
242+
"snapshot on which the stream operated has been garbage collected.");
243+
244+
return new StreamingOffset(pointer.snapshotId(), 0L, false);
245+
}
246+
247+
private PlannedEndOffset calculateEndOffset(StreamingOffset microBatchStartOffset) {
248+
MicroBatch microBatch = MicroBatches.from(table.snapshot(microBatchStartOffset.snapshotId()), table.io())
249+
.caseSensitive(caseSensitive)
250+
.specsById(table.specs())
251+
.generate(microBatchStartOffset.position(), batchSize, microBatchStartOffset.shouldScanAllFiles());
252+
253+
return new PlannedEndOffset(
254+
microBatch.snapshotId(),
255+
microBatch.endFileIndex(),
256+
microBatchStartOffset.shouldScanAllFiles(),
257+
microBatch);
258+
}
259+
260+
private boolean isEndOfSnapshot(StreamingOffset microBatchStartOffset) {
261+
MicroBatchBuilder microBatchBuilder = MicroBatches.from(
262+
table.snapshot(microBatchStartOffset.snapshotId()), table.io())
263+
.caseSensitive(caseSensitive)
264+
.specsById(table.specs());
265+
266+
MicroBatch microBatchStart = microBatchBuilder.generate(
267+
microBatchStartOffset.position(),
268+
1,
269+
microBatchStartOffset.shouldScanAllFiles());
270+
271+
return microBatchStartOffset.position() == microBatchStart.startFileIndex() &&
272+
microBatchStartOffset.position() == microBatchStart.endFileIndex() &&
273+
microBatchStart.lastIndexOfSnapshot();
274+
}
275+
276+
private static class PlannedEndOffset extends StreamingOffset {
277+
278+
private final MicroBatch microBatch;
279+
280+
PlannedEndOffset(long snapshotId, long position, boolean scanAllFiles, MicroBatch microBatch) {
281+
super(snapshotId, position, scanAllFiles);
282+
this.microBatch = microBatch;
283+
}
284+
285+
public MicroBatch getMicroBatch() {
286+
return microBatch;
287+
}
288+
}
289+
290+
interface OffsetLog {
291+
static OffsetLog getInstance(SparkSession spark, String checkpointLocation) {
292+
return new OffsetLogImpl(spark, checkpointLocation);
293+
}
294+
295+
boolean isOffsetLogInitialized();
296+
297+
StreamingOffset getLatest();
298+
}
299+
300+
private static class OffsetLogImpl implements OffsetLog {
301+
private final OffsetSeqLog offsetSeqLog;
302+
303+
OffsetLogImpl(SparkSession spark, String checkpointLocation) {
304+
this.offsetSeqLog = checkpointLocation != null ?
305+
new OffsetSeqLog(spark, getOffsetLogLocation(checkpointLocation)) :
306+
null;
307+
}
308+
309+
@Override
310+
public boolean isOffsetLogInitialized() {
311+
return offsetSeqLog != null &&
312+
offsetSeqLog.getLatest() != null &&
313+
offsetSeqLog.getLatest().isDefined();
314+
}
315+
316+
@Override
317+
public StreamingOffset getLatest() {
318+
OffsetSeq offsetSeq = offsetSeqLog.getLatest().get()._2;
319+
320+
List<Option<Offset>> offsetSeqCol = JavaConverters.seqAsJavaList(offsetSeq.offsets());
321+
Option<Offset> optionalOffset = offsetSeqCol.get(0);
322+
323+
return StreamingOffset.fromJson(optionalOffset.get().json());
324+
}
325+
326+
private String getOffsetLogLocation(String checkpointLocation) {
327+
return new Path(checkpointLocation.replace("/sources/0", ""), "offsets").toString();
328+
}
329+
}
330+
}

spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java

+1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ public class SparkTable implements org.apache.spark.sql.connector.catalog.Table,
7070
private static final Set<TableCapability> CAPABILITIES = ImmutableSet.of(
7171
TableCapability.BATCH_READ,
7272
TableCapability.BATCH_WRITE,
73+
TableCapability.MICRO_BATCH_READ,
7374
TableCapability.STREAMING_WRITE,
7475
TableCapability.OVERWRITE_BY_FILTER,
7576
TableCapability.OVERWRITE_DYNAMIC);

0 commit comments

Comments
 (0)