Skip to content

[SPARK-26785][SQL] data source v2 API refactor: streaming write #23702

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import org.apache.spark.sql.sources._
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
import org.apache.spark.sql.sources.v2.writer.WriteBuilder
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingWrite, SupportsOutputMode}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType

Expand All @@ -47,7 +48,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
with StreamSinkProvider
with RelationProvider
with CreatableRelationProvider
with StreamingWriteSupportProvider
with TableProvider
with Logging {
import KafkaSourceProvider._
Expand Down Expand Up @@ -180,20 +180,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}
}

override def createStreamingWriteSupport(
queryId: String,
schema: StructType,
mode: OutputMode,
options: DataSourceOptions): StreamingWriteSupport = {
import scala.collection.JavaConverters._

val topic = Option(options.get(TOPIC_OPTION_KEY).orElse(null)).map(_.trim)
// We convert the options argument from V2 -> Java map -> scala mutable -> scala immutable.
val producerParams = kafkaParamsForProducer(options.asMap.asScala.toMap)

new KafkaStreamingWriteSupport(topic, producerParams, schema)
}

private def strategy(caseInsensitiveParams: Map[String, String]) =
caseInsensitiveParams.find(x => STRATEGY_OPTION_KEYS.contains(x._1)).get match {
case ("assign", value) =>
Expand Down Expand Up @@ -365,7 +351,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}

class KafkaTable(strategy: => ConsumerStrategy) extends Table
with SupportsMicroBatchRead with SupportsContinuousRead {
with SupportsMicroBatchRead with SupportsContinuousRead with SupportsStreamingWrite {

override def name(): String = s"Kafka $strategy"

Expand All @@ -374,6 +360,28 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new ScanBuilder {
override def build(): Scan = new KafkaScan(options)
}

override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
new WriteBuilder with SupportsOutputMode {
private var inputSchema: StructType = _

override def withInputDataSchema(schema: StructType): WriteBuilder = {
this.inputSchema = schema
this
}

override def outputMode(mode: OutputMode): WriteBuilder = this

override def buildForStreaming(): StreamingWrite = {
import scala.collection.JavaConverters._

assert(inputSchema != null)
val topic = Option(options.get(TOPIC_OPTION_KEY).orElse(null)).map(_.trim)
val producerParams = kafkaParamsForProducer(options.asMap.asScala.toMap)
new KafkaStreamingWrite(topic, producerParams, inputSchema)
}
}
}
}

class KafkaScan(options: DataSourceOptions) extends Scan {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport}
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
import org.apache.spark.sql.types.StructType

/**
Expand All @@ -33,18 +33,18 @@ import org.apache.spark.sql.types.StructType
case object KafkaWriterCommitMessage extends WriterCommitMessage

/**
* A [[StreamingWriteSupport]] for Kafka writing. Responsible for generating the writer factory.
* A [[StreamingWrite]] for Kafka writing. Responsible for generating the writer factory.
*
* @param topic The topic this writer is responsible for. If None, topic will be inferred from
* a `topic` field in the incoming data.
* @param producerParams Parameters for Kafka producers in each task.
* @param schema The schema of the input data.
*/
class KafkaStreamingWriteSupport(
class KafkaStreamingWrite(
topic: Option[String],
producerParams: ju.Map[String, Object],
schema: StructType)
extends StreamingWriteSupport {
extends StreamingWrite {

validateQuery(schema.toAttributes, producerParams, topic)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
import org.apache.spark.annotation.Evolving;

/**
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
* A mix-in interface for {@link TableProvider}. Data sources can implement this interface to
* propagate session configs with the specified key-prefix to all data source operations in this
* session.
*/
@Evolving
public interface SessionConfigSupport extends DataSourceV2 {
public interface SessionConfigSupport extends TableProvider {

/**
* Key prefix of the session configs to propagate, which is usually the data source name. Spark
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
* An empty mix-in interface for {@link Table}, to indicate this table supports batch write.
* <p>
* If a {@link Table} implements this interface, the
* {@link SupportsWrite#newWriteBuilder(DataSourceOptions)} must return a {@link WriteBuilder}
* {@link SupportsWrite#newWriteBuilder(DataSourceOptions)} must return a {@link WriteBuilder}
* with {@link WriteBuilder#buildForBatch()} implemented.
* </p>
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
import org.apache.spark.sql.sources.v2.writer.WriteBuilder;

/**
* An empty mix-in interface for {@link Table}, to indicate this table supports streaming write.
* <p>
* If a {@link Table} implements this interface, the
* {@link SupportsWrite#newWriteBuilder(DataSourceOptions)} must return a {@link WriteBuilder}
* with {@link WriteBuilder#buildForStreaming()} implemented.
* </p>
*/
@Evolving
public interface SupportsStreamingWrite extends SupportsWrite, BaseStreamingSink { }
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@
* </p>
*/
@Evolving
// TODO: do not extend `DataSourceV2`, after we finish the API refactor completely.
public interface TableProvider extends DataSourceV2 {
public interface TableProvider {

/**
* Return a {@link Table} instance to do read/write with user-specified options.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.sources.v2.SupportsBatchWrite;
import org.apache.spark.sql.sources.v2.Table;
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite;
import org.apache.spark.sql.types.StructType;

/**
Expand Down Expand Up @@ -64,6 +65,12 @@ default WriteBuilder withInputDataSchema(StructType schema) {
* {@link SupportsSaveMode}.
*/
default BatchWrite buildForBatch() {
throw new UnsupportedOperationException("Batch scans are not supported");
throw new UnsupportedOperationException(getClass().getName() +
" does not support batch write");
}

default StreamingWrite buildForStreaming() {
throw new UnsupportedOperationException(getClass().getName() +
" does not support streaming write");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
import java.io.Serializable;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport;
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite;

/**
* A commit message returned by {@link DataWriter#commit()} and will be sent back to the driver side
* as the input parameter of {@link BatchWrite#commit(WriterCommitMessage[])} or
* {@link StreamingWriteSupport#commit(long, WriterCommitMessage[])}.
* {@link StreamingWrite#commit(long, WriterCommitMessage[])}.
*
* This is an empty interface, data sources should define their own message class and use it when
* generating messages at executor side and handling the messages at driver side.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

/**
* A factory of {@link DataWriter} returned by
* {@link StreamingWriteSupport#createStreamingWriterFactory()}, which is responsible for creating
* {@link StreamingWrite#createStreamingWriterFactory()}, which is responsible for creating
* and initializing the actual data writer at executor side.
*
* Note that, the writer factory will be serialized and sent to executors, then the data writer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,26 @@
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;

/**
* An interface that defines how to write the data to data source for streaming processing.
* An interface that defines how to write the data to data source in streaming queries.
*
* Streaming queries are divided into intervals of data called epochs, with a monotonically
* increasing numeric ID. This writer handles commits and aborts for each successive epoch.
* The writing procedure is:
* 1. Create a writer factory by {@link #createStreamingWriterFactory()}, serialize and send it to
* all the partitions of the input data(RDD).
* 2. For each epoch in each partition, create the data writer, and write the data of the epoch in
* the partition with this writer. If all the data are written successfully, call
* {@link DataWriter#commit()}. If exception happens during the writing, call
* {@link DataWriter#abort()}.
* 3. If writers in all partitions of one epoch are successfully committed, call
* {@link #commit(long, WriterCommitMessage[])}. If some writers are aborted, or the job failed
* with an unknown reason, call {@link #abort(long, WriterCommitMessage[])}.
*
* While Spark will retry failed writing tasks, Spark won't retry failed writing jobs. Users should
* do it manually in their Spark applications if they want to retry.
*
* Please refer to the documentation of commit/abort methods for detailed specifications.
*/
@Evolving
public interface StreamingWriteSupport {
public interface StreamingWrite {

/**
* Creates a writer factory which will be serialized and sent to executors.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2;
package org.apache.spark.sql.sources.v2.writer.streaming;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.annotation.Unstable;
import org.apache.spark.sql.sources.v2.writer.WriteBuilder;
import org.apache.spark.sql.streaming.OutputMode;

/**
* TODO: remove it when we finish the API refactor for streaming write side.
*/
@Evolving
public interface DataSourceV2 {}
// TODO: remove it when we have `SupportsTruncate`
@Unstable
public interface SupportsOutputMode extends WriteBuilder {

WriteBuilder outputMode(OutputMode mode);
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
if (classOf[TableProvider].isAssignableFrom(cls)) {
val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider]
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
ds = provider, conf = sparkSession.sessionState.conf)
source = provider, conf = sparkSession.sessionState.conf)
val pathsOption = {
val objectMapper = new ObjectMapper()
DataSourceOptions.PATHS_KEY -> objectMapper.writeValueAsString(paths.toArray)
Expand Down
Loading