Skip to content

[SPARK-29331][SQL] create DS v2 Write at physical plan #26001

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[kafka010] object KafkaWriter extends Logging {
topic: Option[String] = None): Unit = {
schema.find(_.name == TOPIC_ATTRIBUTE_NAME).getOrElse(
if (topic.isEmpty) {
throw new AnalysisException(s"topic option required when no " +
throw new IllegalArgumentException(s"topic option required when no " +
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we check the options at the physical plan phase, so this should not be AnalysisException anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is unacceptable, then we may need to have analysis time write info and runtime write info. Table.newWriteBuilder takes analysis time write info and WriteBuilder.build takes runtime write info.

I'm not sure if it's worth this complexity.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be SparkException? I think the last time we discussed these, it wasn't clear what type of exception to use after analysis. Maybe we need new exception types?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use SparkException as well. IllegalArgumentException is a standard java exception which indicates invalid input, I think it's OK to use it even after analysis.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we typically want to always raise SparkException because all exception types inherit from it. Unless we are throwing an exception from a method where there is an illegal argument, but that's not what is happening here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we typically want to always raise SparkException because all exception types inherit from it.

In Spark SQL no exceptions inherit from it. In fact SparkException was rarely used in Spark SQL before we adding the v2 commands. SparkException is defined in spark core and usually used when Spark fails to run a task.

In Spark SQL, AnalysisException and standard Java exceptions are more widely used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I thought that AnalysisException inherited from SparkException. Looks like I was wrong.

s"'$TOPIC_ATTRIBUTE_NAME' attribute is present. Use the " +
s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a topic.")
} else {
Expand All @@ -59,22 +59,22 @@ private[kafka010] object KafkaWriter extends Logging {
).dataType match {
case StringType => // good
case _ =>
throw new AnalysisException(s"Topic type must be a ${StringType.catalogString}")
throw new IllegalArgumentException(s"Topic type must be a ${StringType.catalogString}")
}
schema.find(_.name == KEY_ATTRIBUTE_NAME).getOrElse(
Literal(null, StringType)
).dataType match {
case StringType | BinaryType => // good
case _ =>
throw new AnalysisException(s"$KEY_ATTRIBUTE_NAME attribute type " +
throw new IllegalArgumentException(s"$KEY_ATTRIBUTE_NAME attribute type " +
s"must be a ${StringType.catalogString} or ${BinaryType.catalogString}")
}
schema.find(_.name == VALUE_ATTRIBUTE_NAME).getOrElse(
throw new AnalysisException(s"Required attribute '$VALUE_ATTRIBUTE_NAME' not found")
throw new IllegalArgumentException(s"Required attribute '$VALUE_ATTRIBUTE_NAME' not found")
).dataType match {
case StringType | BinaryType => // good
case _ =>
throw new AnalysisException(s"$VALUE_ATTRIBUTE_NAME attribute type " +
throw new IllegalArgumentException(s"$VALUE_ATTRIBUTE_NAME attribute type " +
s"must be a ${StringType.catalogString} or ${BinaryType.catalogString}")
}
schema.find(_.name == HEADERS_ATTRIBUTE_NAME).getOrElse(
Expand All @@ -83,7 +83,7 @@ private[kafka010] object KafkaWriter extends Logging {
).dataType match {
case KafkaRecordToRowConverter.headersType => // good
case _ =>
throw new AnalysisException(s"$HEADERS_ATTRIBUTE_NAME attribute type " +
throw new IllegalArgumentException(s"$HEADERS_ATTRIBUTE_NAME attribute type " +
s"must be a ${KafkaRecordToRowConverter.headersType.catalogString}")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import org.scalatest.time.SpanSugar._

import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SpecificInternalRow, UnsafeProjection}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types.{BinaryType, DataType}
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -215,6 +217,7 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
test("streaming - write data with bad schema") {
val inputTopic = newTopic()
testUtils.createTopic(inputTopic, partitions = 1)
testUtils.sendMessages(inputTopic, Array("0"))

val input = spark
.readStream
Expand All @@ -226,21 +229,21 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
val topic = newTopic()
testUtils.createTopic(topic)

val ex = intercept[AnalysisException] {
val ex = intercept[StreamingQueryException] {
/* No topic field or topic option */
createKafkaWriter(input.toDF())(
withSelectExpr = "value as key", "value"
)
).processAllAvailable()
}
assert(ex.getMessage
.toLowerCase(Locale.ROOT)
.contains("topic option required when no 'topic' attribute is present"))

val ex2 = intercept[AnalysisException] {
val ex2 = intercept[StreamingQueryException] {
/* No value field */
createKafkaWriter(input.toDF())(
withSelectExpr = s"'$topic' as topic", "value as key"
)
).processAllAvailable()
}
assert(ex2.getMessage.toLowerCase(Locale.ROOT).contains(
"required attribute 'value' not found"))
Expand All @@ -249,6 +252,7 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
test("streaming - write data with valid schema but wrong types") {
val inputTopic = newTopic()
testUtils.createTopic(inputTopic, partitions = 1)
testUtils.sendMessages(inputTopic, Array("0"))

val input = spark
.readStream
Expand All @@ -261,28 +265,28 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
val topic = newTopic()
testUtils.createTopic(topic)

val ex = intercept[AnalysisException] {
val ex = intercept[StreamingQueryException] {
/* topic field wrong type */
createKafkaWriter(input.toDF())(
withSelectExpr = s"CAST('1' as INT) as topic", "value"
)
).processAllAvailable()
}
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("topic type must be a string"))

val ex2 = intercept[AnalysisException] {
val ex2 = intercept[StreamingQueryException] {
/* value field wrong type */
createKafkaWriter(input.toDF())(
withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as value"
)
).processAllAvailable()
}
assert(ex2.getMessage.toLowerCase(Locale.ROOT).contains(
"value attribute type must be a string or binary"))

val ex3 = intercept[AnalysisException] {
val ex3 = intercept[StreamingQueryException] {
/* key field wrong type */
createKafkaWriter(input.toDF())(
withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as key", "value"
)
).processAllAvailable()
}
assert(ex3.getMessage.toLowerCase(Locale.ROOT).contains(
"key attribute type must be a string or binary"))
Expand Down Expand Up @@ -330,18 +334,18 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
.option("subscribe", inputTopic)
.load()

val ex = intercept[IllegalArgumentException] {
val ex = intercept[StreamingQueryException] {
createKafkaWriter(
input.toDF(),
withOptions = Map("kafka.key.serializer" -> "foo"))()
withOptions = Map("kafka.key.serializer" -> "foo"))().processAllAvailable()
}
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
"kafka option 'key.serializer' is not supported"))

val ex2 = intercept[IllegalArgumentException] {
val ex2 = intercept[StreamingQueryException] {
createKafkaWriter(
input.toDF(),
withOptions = Map("kafka.value.serializer" -> "foo"))()
withOptions = Map("kafka.value.serializer" -> "foo"))().processAllAvailable()
}
assert(ex2.getMessage.toLowerCase(Locale.ROOT).contains(
"kafka option 'value.serializer' is not supported"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownF
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.streaming.{WriteMicroBatch, WriteMicroBatchExec}
import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
import org.apache.spark.sql.sources
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -176,9 +177,6 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {

withProjection :: Nil

case WriteToDataSourceV2(writer, query) =>
WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil

case CreateV2Table(catalog, ident, schema, parts, props, ifNotExists) =>
CreateTableExec(catalog, ident, schema, parts, props, ifNotExists) :: Nil

Expand Down Expand Up @@ -265,8 +263,13 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
}).toArray
DeleteFromTableExec(r.table.asDeletable, filters) :: Nil

case WriteToContinuousDataSource(writer, query) =>
WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil
case WriteMicroBatch(table, query, queryId, querySchema, outputMode, options, epochId) =>
WriteMicroBatchExec(
table, planLater(query), queryId, querySchema, outputMode, options, epochId) :: Nil

case WriteToContinuousDataSource(table, query, queryId, querySchema, outputMode, options) =>
WriteToContinuousDataSourceExec(
table, planLater(query), queryId, querySchema, outputMode, options) :: Nil

case Repartition(1, false, child) =>
val isContinuous = child.find {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,6 @@ import org.apache.spark.sql.sources.{AlwaysTrue, Filter}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.{LongAccumulator, Utils}

/**
* Deprecated logical plan for writing data into data source v2. This is being replaced by more
* specific logical plans, like [[org.apache.spark.sql.catalyst.plans.logical.AppendData]].
*/
@deprecated("Use specific logical plans like AppendData instead", "2.4.0")
case class WriteToDataSourceV2(batchWrite: BatchWrite, query: LogicalPlan)
extends LogicalPlan {
override def children: Seq[LogicalPlan] = Seq(query)
override def output: Seq[Attribute] = Nil
}

/**
* Physical plan node for v2 create table as select when the catalog does not support staging
* the table creation.
Expand Down Expand Up @@ -315,17 +304,6 @@ case class OverwritePartitionsDynamicExec(
}
}

case class WriteToDataSourceV2Exec(
batchWrite: BatchWrite,
query: SparkPlan) extends V2TableWriteExec {

def writeOptions: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty()

override protected def doExecute(): RDD[InternalRow] = {
writeWithV2(batchWrite)
}
}

/**
* Helper for physical plans that build batch writes.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming

import scala.collection.JavaConverters._

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, Complete, Update}
import org.apache.spark.sql.connector.catalog.SupportsWrite
import org.apache.spark.sql.connector.write.SupportsTruncate
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.Utils

trait BaseStreamingWriteExec extends UnaryExecNode {
def table: SupportsWrite
def query: SparkPlan
def queryId: String
def querySchema: StructType
def outputMode: OutputMode
def options: Map[String, String]

override def child: SparkPlan = query
override def output: Seq[Attribute] = Nil

protected lazy val inputRDD = query.execute()
lazy val streamWrite = {
val writeBuilder = table.newWriteBuilder(new CaseInsensitiveStringMap(options.asJava))
.withQueryId(queryId)
.withInputDataSchema(querySchema)
outputMode match {
case Append =>
writeBuilder.buildForStreaming()

case Complete =>
// TODO: we should do this check earlier when we have capability API.
require(writeBuilder.isInstanceOf[SupportsTruncate],
table.name + " does not support Complete mode.")
writeBuilder.asInstanceOf[SupportsTruncate].truncate().buildForStreaming()

case Update =>
// Although no v2 sinks really support Update mode now, but during tests we do want them
// to pretend to support Update mode, and treat Update mode same as Append mode.
if (Utils.isTesting) {
writeBuilder.buildForStreaming()
} else {
throw new IllegalArgumentException(
"Data source v2 streaming sinks does not support Update mode.")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset => OffsetV2, SparkDataStream}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, StreamWriterCommitProgress, WriteToDataSourceV2Exec}
import org.apache.spark.sql.execution.streaming.sources.{RateControlMicroBatchStream, WriteToMicroBatchDataSource}
import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, StreamWriterCommitProgress}
import org.apache.spark.sql.execution.streaming.sources.RateControlMicroBatchStream
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.util.Clock
Expand Down Expand Up @@ -127,8 +127,8 @@ class MicroBatchExecution(
// TODO (SPARK-27484): we should add the writing node before the plan is analyzed.
sink match {
case s: SupportsWrite =>
val streamingWrite = createStreamingWrite(s, extraOptions, _logicalPlan)
WriteToMicroBatchDataSource(streamingWrite, _logicalPlan)
WriteToMicroBatchDataSource(
s, _logicalPlan, id.toString, _logicalPlan.schema, outputMode, extraOptions)

case _ => _logicalPlan
}
Expand Down Expand Up @@ -557,7 +557,7 @@ class MicroBatchExecution(
nextBatch.collect()
}
lastExecution.executedPlan match {
case w: WriteToDataSourceV2Exec => w.commitProgress
case w: WriteMicroBatchExec => w.commitProgress
case _ => None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -584,35 +584,6 @@ abstract class StreamExecution(
|batch = $batchDescription""".stripMargin
}

protected def createStreamingWrite(
table: SupportsWrite,
options: Map[String, String],
inputPlan: LogicalPlan): StreamingWrite = {
val writeBuilder = table.newWriteBuilder(new CaseInsensitiveStringMap(options.asJava))
.withQueryId(id.toString)
.withInputDataSchema(inputPlan.schema)
outputMode match {
case Append =>
writeBuilder.buildForStreaming()

case Complete =>
// TODO: we should do this check earlier when we have capability API.
require(writeBuilder.isInstanceOf[SupportsTruncate],
table.name + " does not support Complete mode.")
writeBuilder.asInstanceOf[SupportsTruncate].truncate().buildForStreaming()

case Update =>
// Although no v2 sinks really support Update mode now, but during tests we do want them
// to pretend to support Update mode, and treat Update mode same as Append mode.
if (Utils.isTesting) {
writeBuilder.buildForStreaming()
} else {
throw new IllegalArgumentException(
"Data source v2 streaming sinks does not support Update mode.")
}
}
}

protected def purge(threshold: Long): Unit = {
logDebug(s"Purging metadata at threshold=$threshold")
offsetLog.purge(threshold)
Expand Down
Loading