Skip to content

Commit f8c7c1f

Browse files
jose-torreszsxwing
authored andcommitted
[SPARK-22732] Add Structured Streaming APIs to DataSourceV2
## What changes were proposed in this pull request? This PR provides DataSourceV2 API support for structured streaming, including new pieces needed to support continuous processing [SPARK-20928]. High level summary: - DataSourceV2 includes new mixins to support micro-batch and continuous reads and writes. For reads, we accept an optional user specified schema rather than using the ReadSupportWithSchema model, because doing so would severely complicate the interface. - DataSourceV2Reader includes new interfaces to read a specific microbatch or read continuously from a given offset. These follow the same setter pattern as the existing Supports* mixins so that they can work with SupportsScanUnsafeRow. - DataReader (the per-partition reader) has a new subinterface ContinuousDataReader only for continuous processing. This reader has a special method to check progress, and next() blocks for new input rather than returning false. - Offset, an abstract representation of position in a streaming query, is ported to the public API. (Each type of reader will define its own Offset implementation.) - DataSourceV2Writer has a new subinterface ContinuousWriter only for continuous processing. Commits to this interface come tagged with an epoch number, as the execution engine will continue to produce new epoch commits as the task continues indefinitely. Note that this PR does not propose to change the existing DataSourceV2 batch API, or deprecate the existing streaming source/sink internal APIs in spark.sql.execution.streaming. ## How was this patch tested? Toy implementations of the new interfaces with unit tests. Author: Jose Torres <jose@databricks.com> Closes #19925 from joseph-torres/continuous-api.
1 parent 1e44dd0 commit f8c7c1f

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1390
-35
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.InternalRow
3232
import org.apache.spark.sql.catalyst.util.DateTimeUtils
3333
import org.apache.spark.sql.execution.streaming._
3434
import org.apache.spark.sql.kafka010.KafkaSource._
35+
import org.apache.spark.sql.sources.v2.reader.Offset
3536
import org.apache.spark.sql.types._
3637
import org.apache.spark.unsafe.types.UTF8String
3738

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ package org.apache.spark.sql.kafka010
1919

2020
import org.apache.kafka.common.TopicPartition
2121

22-
import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
22+
import org.apache.spark.sql.execution.streaming.SerializedOffset
23+
import org.apache.spark.sql.sources.v2.reader.Offset
2324

2425
/**
2526
* An [[Offset]] for the [[KafkaSource]]. This one tracks all partitions of subscribed topics and

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import org.apache.spark.sql.ForeachWriter
3838
import org.apache.spark.sql.execution.streaming._
3939
import org.apache.spark.sql.functions.{count, window}
4040
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
41+
import org.apache.spark.sql.sources.v2.reader.Offset
4142
import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
4243
import org.apache.spark.sql.streaming.util.StreamManualClock
4344
import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2;
19+
20+
import java.util.Optional;
21+
22+
import org.apache.spark.sql.sources.v2.reader.ContinuousReader;
23+
import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
24+
import org.apache.spark.sql.types.StructType;
25+
26+
/**
27+
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
28+
* provide data reading ability for continuous stream processing.
29+
*/
30+
public interface ContinuousReadSupport extends DataSourceV2 {
31+
/**
32+
* Creates a {@link ContinuousReader} to scan the data from this data source.
33+
*
34+
* @param schema the user provided schema, or empty() if none was provided
35+
* @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure
36+
* recovery. Readers for the same logical source in the same query
37+
* will be given the same checkpointLocation.
38+
* @param options the options for the returned data source reader, which is an immutable
39+
* case-insensitive string-to-string map.
40+
*/
41+
ContinuousReader createContinuousReader(Optional<StructType> schema, String checkpointLocation, DataSourceV2Options options);
42+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2;
19+
20+
import java.util.Optional;
21+
22+
import org.apache.spark.annotation.InterfaceStability;
23+
import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
24+
import org.apache.spark.sql.sources.v2.writer.ContinuousWriter;
25+
import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
26+
import org.apache.spark.sql.streaming.OutputMode;
27+
import org.apache.spark.sql.types.StructType;
28+
29+
/**
30+
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
31+
* provide data writing ability for continuous stream processing.
32+
*/
33+
@InterfaceStability.Evolving
34+
public interface ContinuousWriteSupport extends BaseStreamingSink {
35+
36+
/**
37+
* Creates an optional {@link ContinuousWriter} to save the data to this data source. Data
38+
* sources can return None if there is no writing needed to be done.
39+
*
40+
* @param queryId A unique string for the writing query. It's possible that there are many writing
41+
* queries running at the same time, and the returned {@link DataSourceV2Writer}
42+
* can use this id to distinguish itself from others.
43+
* @param schema the schema of the data to be written.
44+
* @param mode the output mode which determines what successive epoch output means to this
45+
* sink, please refer to {@link OutputMode} for more details.
46+
* @param options the options for the returned data source writer, which is an immutable
47+
* case-insensitive string-to-string map.
48+
*/
49+
Optional<ContinuousWriter> createContinuousWriter(
50+
String queryId,
51+
StructType schema,
52+
OutputMode mode,
53+
DataSourceV2Options options);
54+
}

sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2Options.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,21 @@ private String toLowerCase(String key) {
3636
return key.toLowerCase(Locale.ROOT);
3737
}
3838

39+
public static DataSourceV2Options empty() {
40+
return new DataSourceV2Options(new HashMap<>());
41+
}
42+
3943
public DataSourceV2Options(Map<String, String> originalMap) {
4044
keyLowerCasedMap = new HashMap<>(originalMap.size());
4145
for (Map.Entry<String, String> entry : originalMap.entrySet()) {
4246
keyLowerCasedMap.put(toLowerCase(entry.getKey()), entry.getValue());
4347
}
4448
}
4549

50+
public Map<String, String> asMap() {
51+
return new HashMap<>(keyLowerCasedMap);
52+
}
53+
4654
/**
4755
* Returns the option value to which the specified key is mapped, case-insensitively.
4856
*/
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2;
19+
20+
import java.util.Optional;
21+
22+
import org.apache.spark.annotation.InterfaceStability;
23+
import org.apache.spark.sql.sources.v2.reader.MicroBatchReader;
24+
import org.apache.spark.sql.types.StructType;
25+
26+
/**
27+
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
28+
* provide streaming micro-batch data reading ability.
29+
*/
30+
@InterfaceStability.Evolving
31+
public interface MicroBatchReadSupport extends DataSourceV2 {
32+
/**
33+
* Creates a {@link MicroBatchReader} to read batches of data from this data source in a
34+
* streaming query.
35+
*
36+
* The execution engine will create a micro-batch reader at the start of a streaming query,
37+
* alternate calls to setOffsetRange and createReadTasks for each batch to process, and then
38+
* call stop() when the execution is complete. Note that a single query may have multiple
39+
* executions due to restart or failure recovery.
40+
*
41+
* @param schema the user provided schema, or empty() if none was provided
42+
* @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure
43+
* recovery. Readers for the same logical source in the same query
44+
* will be given the same checkpointLocation.
45+
* @param options the options for the returned data source reader, which is an immutable
46+
* case-insensitive string-to-string map.
47+
*/
48+
MicroBatchReader createMicroBatchReader(
49+
Optional<StructType> schema,
50+
String checkpointLocation,
51+
DataSourceV2Options options);
52+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2;
19+
20+
import java.util.Optional;
21+
22+
import org.apache.spark.annotation.InterfaceStability;
23+
import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
24+
import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
25+
import org.apache.spark.sql.streaming.OutputMode;
26+
import org.apache.spark.sql.types.StructType;
27+
28+
/**
29+
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
30+
* provide data writing ability and save the data from a microbatch to the data source.
31+
*/
32+
@InterfaceStability.Evolving
33+
public interface MicroBatchWriteSupport extends BaseStreamingSink {
34+
35+
/**
36+
* Creates an optional {@link DataSourceV2Writer} to save the data to this data source. Data
37+
* sources can return None if there is no writing needed to be done.
38+
*
39+
* @param queryId A unique string for the writing query. It's possible that there are many writing
40+
* queries running at the same time, and the returned {@link DataSourceV2Writer}
41+
* can use this id to distinguish itself from others.
42+
* @param epochId The uniquenumeric ID of the batch within this writing query. This is an
43+
* incrementing counter representing a consistent set of data; the same batch may
44+
* be started multiple times in failure recovery scenarios, but it will always
45+
* contain the same records.
46+
* @param schema the schema of the data to be written.
47+
* @param mode the output mode which determines what successive batch output means to this
48+
* sink, please refer to {@link OutputMode} for more details.
49+
* @param options the options for the returned data source writer, which is an immutable
50+
* case-insensitive string-to-string map.
51+
*/
52+
Optional<DataSourceV2Writer> createMicroBatchWriter(
53+
String queryId,
54+
long epochId,
55+
StructType schema,
56+
OutputMode mode,
57+
DataSourceV2Options options);
58+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2.reader;
19+
20+
import org.apache.spark.sql.sources.v2.reader.PartitionOffset;
21+
22+
import java.io.IOException;
23+
24+
/**
25+
* A variation on {@link DataReader} for use with streaming in continuous processing mode.
26+
*/
27+
public interface ContinuousDataReader<T> extends DataReader<T> {
28+
/**
29+
* Get the offset of the current record, or the start offset if no records have been read.
30+
*
31+
* The execution engine will call this method along with get() to keep track of the current
32+
* offset. When an epoch ends, the offset of the previous record in each partition will be saved
33+
* as a restart checkpoint.
34+
*/
35+
PartitionOffset getOffset();
36+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2.reader;
19+
20+
import org.apache.spark.sql.sources.v2.reader.PartitionOffset;
21+
import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
22+
23+
import java.util.Optional;
24+
25+
/**
26+
* A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this
27+
* interface to allow reading in a continuous processing mode stream.
28+
*
29+
* Implementations must ensure each read task output is a {@link ContinuousDataReader}.
30+
*/
31+
public interface ContinuousReader extends BaseStreamingSource, DataSourceV2Reader {
32+
/**
33+
* Merge offsets coming from {@link ContinuousDataReader} instances in each partition to
34+
* a single global offset.
35+
*/
36+
Offset mergeOffsets(PartitionOffset[] offsets);
37+
38+
/**
39+
* Deserialize a JSON string into an Offset of the implementation-defined offset type.
40+
* @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader
41+
*/
42+
Offset deserializeOffset(String json);
43+
44+
/**
45+
* Set the desired start offset for read tasks created from this reader. The scan will start
46+
* from the first record after the provided offset, or from an implementation-defined inferred
47+
* starting point if no offset is provided.
48+
*/
49+
void setOffset(Optional<Offset> start);
50+
51+
/**
52+
* Return the specified or inferred start offset for this reader.
53+
*
54+
* @throws IllegalStateException if setOffset has not been called
55+
*/
56+
Offset getStartOffset();
57+
58+
/**
59+
* The execution engine will call this method in every epoch to determine if new read tasks need
60+
* to be generated, which may be required if for example the underlying source system has had
61+
* partitions added or removed.
62+
*
63+
* If true, the query will be shut down and restarted with a new reader.
64+
*/
65+
default boolean needsReconfiguration() {
66+
return false;
67+
}
68+
}

0 commit comments

Comments
 (0)