Skip to content

[SPARK-26356][SQL] remove SaveMode from data source v2 #24233

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
* The base interface for v2 data sources which don't have a real catalog. Implementations must
* have a public, 0-arg constructor.
* <p>
* Note that, TableProvider can only apply data operations to existing tables, like read, append,
* delete, and overwrite. It does not support the operations that require metadata changes, like
* create/drop tables.
* <p>
* The major responsibility of this interface is to return a {@link Table} for read/write.
* </p>
*/
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,6 @@ default WriteBuilder withInputDataSchema(StructType schema) {
* exception, data sources must overwrite this method to provide an implementation, if the
* {@link Table} that creates this write returns {@link TableCapability#BATCH_WRITE} support in
* its {@link Table#capabilities()}.
*
* Note that, the returned {@link BatchWrite} can be null if the implementation supports SaveMode,
* to indicate that no writing is needed. We can clean it up after removing
* {@link SupportsSaveMode}.
*/
default BatchWrite buildForBatch() {
throw new UnsupportedOperationException(getClass().getName() +
Expand Down
77 changes: 42 additions & 35 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,11 @@ import org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoTable,
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, DataSourceUtils, LogicalRelation}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Utils, FileDataSourceV2, WriteToDataSourceV2}
import org.apache.spark.sql.execution.datasources.v2._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.TableCapability._
import org.apache.spark.sql.sources.v2.writer.SupportsSaveMode
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

Expand All @@ -56,13 +55,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* <li>`SaveMode.Overwrite`: overwrite the existing data.</li>
* <li>`SaveMode.Append`: append the data.</li>
* <li>`SaveMode.Ignore`: ignore the operation (i.e. no-op).</li>
* <li>`SaveMode.ErrorIfExists`: default option, throw an exception at runtime.</li>
* <li>`SaveMode.ErrorIfExists`: throw an exception at runtime.</li>
* </ul>
* <p>
* When writing to data source v1, the default option is `ErrorIfExists`. When writing to data
* source v2, the default option is `Append`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @rxin objected to changing the default mode for DSv2. Has that changed?

I think that this is a good idea because the most sensible default for v2 is to append if no mode is set, instead of failing because the mode is not supported. This doesn't change v1 behavior, so I think it is okay.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @rxin has a concern about breaking existing ETL pipelines. Now we only change the default mode for DS v2(exclude file sources), I think there is no compatibility issue.

*
* @since 1.4.0
*/
def mode(saveMode: SaveMode): DataFrameWriter[T] = {
this.mode = saveMode
this.mode = Some(saveMode)
this
}

Expand All @@ -78,15 +80,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* @since 1.4.0
*/
def mode(saveMode: String): DataFrameWriter[T] = {
this.mode = saveMode.toLowerCase(Locale.ROOT) match {
case "overwrite" => SaveMode.Overwrite
case "append" => SaveMode.Append
case "ignore" => SaveMode.Ignore
case "error" | "errorifexists" | "default" => SaveMode.ErrorIfExists
saveMode.toLowerCase(Locale.ROOT) match {
case "overwrite" => mode(SaveMode.Overwrite)
case "append" => mode(SaveMode.Append)
case "ignore" => mode(SaveMode.Ignore)
case "error" | "errorifexists" => mode(SaveMode.ErrorIfExists)
case "default" => this
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " +
"Accepted save modes are 'overwrite', 'append', 'ignore', 'error', 'errorifexists'.")
}
this
}

/**
Expand Down Expand Up @@ -268,9 +270,24 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {

import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
provider.getTable(dsOptions) match {
// TODO (SPARK-27815): To not break existing tests, here we treat file source as a special
// case, and pass the save mode to file source directly. This hack should be removed.
case table: FileTable =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this hack necessary? Why not put off v2 support for path-based tables?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we want to migrate file source to v2, to validate the API of Table, ScanBuilder, WriteBuilder, etc.

This hack is just to work around the issue that we do not have a proper entry API for path-based data source, which I believe we will have later on.

I think this is not a bad idea. We can unblock the file source migration, and we can keep the DS v2 clean (FileTable is an internal class). Besides that this hack will be removed once we have path-based API in DS v2.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File sources do not use v2 by default, so this is not a necessary change for this commit. I think it should be in a separate PR. We can discuss whether it is a good idea to add this hack in the next DSv2 sync.

Please remove it from this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm adding this to my agenda for the sync-up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that we agreed in the sync-up to remove this hack. Is there a reason why it is still included?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I explained in the comment: #24233 (comment)

Removing the hack breaks several tests and I'd like to do it in another PR. Since SupportsSaveMode is a hack, which means we already have a hack for file source v2 in DataFrameWriter.save. To keep the PR small, I think it's better to still keep the hack(it's not creating a new hack) and open a new PR to remove it and update the tests.

val write = table.newWriteBuilder(dsOptions).asInstanceOf[FileWriteBuilder]
.mode(modeForDSV1) // should not change default mode for file source.
.withQueryId(UUID.randomUUID().toString)
.withInputDataSchema(df.logicalPlan.schema)
.buildForBatch()
// The returned `Write` can be null, which indicates that we can skip writing.
if (write != null) {
runCommand(df.sparkSession, "save") {
WriteToDataSourceV2(write, df.logicalPlan)
}
}

case table: SupportsWrite if table.supports(BATCH_WRITE) =>
lazy val relation = DataSourceV2Relation.create(table, dsOptions)
mode match {
modeForDSV2 match {
case SaveMode.Append =>
runCommand(df.sparkSession, "save") {
AppendData.byName(relation, df.logicalPlan)
Expand All @@ -282,25 +299,10 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
OverwriteByExpression.byName(relation, df.logicalPlan, Literal(true))
}

case _ =>
table.newWriteBuilder(dsOptions) match {
case writeBuilder: SupportsSaveMode =>
val write = writeBuilder.mode(mode)
.withQueryId(UUID.randomUUID().toString)
.withInputDataSchema(df.logicalPlan.schema)
.buildForBatch()
// It can only return null with `SupportsSaveMode`. We can clean it up after
// removing `SupportsSaveMode`.
if (write != null) {
runCommand(df.sparkSession, "save") {
WriteToDataSourceV2(write, df.logicalPlan)
}
}

case _ =>
throw new AnalysisException(
s"data source ${table.name} does not support SaveMode $mode")
}
case other =>
throw new AnalysisException(s"TableProvider implementation $source cannot be " +
s"written with $other mode, please use Append or Overwrite " +
"modes instead.")
}

// Streaming also uses the data source V2 API. So it may be that the data source implements
Expand Down Expand Up @@ -328,7 +330,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
sparkSession = df.sparkSession,
className = source,
partitionColumns = partitioningColumns.getOrElse(Nil),
options = extraOptions.toMap).planForWriting(mode, df.logicalPlan)
options = extraOptions.toMap).planForWriting(modeForDSV1, df.logicalPlan)
}
}

Expand Down Expand Up @@ -377,7 +379,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
table = UnresolvedRelation(tableIdent),
partition = Map.empty[String, Option[String]],
query = df.logicalPlan,
overwrite = mode == SaveMode.Overwrite,
overwrite = modeForDSV1 == SaveMode.Overwrite,
ifPartitionNotExists = false)
}
}
Expand Down Expand Up @@ -457,7 +459,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
val tableIdentWithDB = tableIdent.copy(database = Some(db))
val tableName = tableIdentWithDB.unquotedString

(tableExists, mode) match {
(tableExists, modeForDSV1) match {
case (true, SaveMode.Ignore) =>
// Do nothing

Expand Down Expand Up @@ -512,7 +514,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
partitionColumnNames = partitioningColumns.getOrElse(Nil),
bucketSpec = getBucketSpec)

runCommand(df.sparkSession, "saveAsTable")(CreateTable(tableDesc, mode, Some(df.logicalPlan)))
runCommand(df.sparkSession, "saveAsTable")(
CreateTable(tableDesc, modeForDSV1, Some(df.logicalPlan)))
}

/**
Expand Down Expand Up @@ -718,13 +721,17 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
SQLExecution.withNewExecutionId(session, qe, Some(name))(qe.toRdd)
}

private def modeForDSV1 = mode.getOrElse(SaveMode.ErrorIfExists)

private def modeForDSV2 = mode.getOrElse(SaveMode.Append)

///////////////////////////////////////////////////////////////////////////////////////
// Builder pattern config options
///////////////////////////////////////////////////////////////////////////////////////

private var source: String = df.sparkSession.sessionState.conf.defaultDataSourceName

private var mode: SaveMode = SaveMode.ErrorIfExists
private var mode: Option[SaveMode] = None

private val extraOptions = new scala.collection.mutable.HashMap[String, String]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.util

import scala.collection.JavaConverters._

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.v2._
Expand All @@ -47,13 +46,12 @@ private[noop] object NoopTable extends Table with SupportsWrite {
Set(
TableCapability.BATCH_WRITE,
TableCapability.STREAMING_WRITE,
TableCapability.TRUNCATE,
TableCapability.ACCEPT_ANY_SCHEMA).asJava
}
}

private[noop] object NoopWriteBuilder extends WriteBuilder
with SupportsSaveMode with SupportsTruncate {
override def mode(mode: SaveMode): WriteBuilder = this
private[noop] object NoopWriteBuilder extends WriteBuilder with SupportsTruncate {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since now DSV2(except file sources) can only write to an existing table, here the write path of NoopDataSource will still fail (analyzer rule ResolveOutputRelation)

override def truncate(): WriteBuilder = this
override def buildForBatch(): BatchWrite = NoopBatchWrite
override def buildForStreaming(): StreamingWrite = NoopStreamingWrite
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, DataSource, OutputWriterFactory, WriteJobDescription}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2.writer.{BatchWrite, SupportsSaveMode, WriteBuilder}
import org.apache.spark.sql.sources.v2.writer.{BatchWrite, WriteBuilder}
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.sql.util.SchemaUtils
Expand All @@ -43,8 +43,7 @@ abstract class FileWriteBuilder(
options: CaseInsensitiveStringMap,
paths: Seq[String],
formatName: String,
supportsDataType: DataType => Boolean)
extends WriteBuilder with SupportsSaveMode {
supportsDataType: DataType => Boolean) extends WriteBuilder {
private var schema: StructType = _
private var queryId: String = _
private var mode: SaveMode = _
Expand All @@ -59,7 +58,7 @@ abstract class FileWriteBuilder(
this
}

override def mode(mode: SaveMode): WriteBuilder = {
def mode(mode: SaveMode): WriteBuilder = {
this.mode = mode
this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import org.apache.spark.{SparkEnv, SparkException, TaskContext}
import org.apache.spark.executor.CommitDeniedException
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalog.v2.{Identifier, TableCatalog}
import org.apache.spark.sql.catalog.v2.expressions.Transform
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -36,7 +35,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.sources.{AlwaysTrue, Filter}
import org.apache.spark.sql.sources.v2.SupportsWrite
import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriterFactory, SupportsDynamicOverwrite, SupportsOverwrite, SupportsSaveMode, SupportsTruncate, WriteBuilder, WriterCommitMessage}
import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriterFactory, SupportsDynamicOverwrite, SupportsOverwrite, SupportsTruncate, WriteBuilder, WriterCommitMessage}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: import org.apache.spark.sql.sources.v2.writer._

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't agree with this.

Wildcard imports make it difficult to cherry-pick commits and increase conflicts. It is also difficult to see where symbols are coming from and pollutes the namespace with everything in a package instead of just the required names.

For example, I recently hit problems adding a logical package for expressions because of places that imported expressions._ along with plans._.

import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.{LongAccumulator, Utils}

Expand Down Expand Up @@ -81,16 +80,10 @@ case class CreateTableAsSelectExec(
Utils.tryWithSafeFinallyAndFailureCallbacks({
catalog.createTable(ident, query.schema, partitioning.toArray, properties.asJava) match {
case table: SupportsWrite =>
val builder = table.newWriteBuilder(writeOptions)
.withInputDataSchema(query.schema)
.withQueryId(UUID.randomUUID().toString)
val batchWrite = builder match {
case supportsSaveMode: SupportsSaveMode =>
supportsSaveMode.mode(SaveMode.Append).buildForBatch()

case _ =>
builder.buildForBatch()
}
val batchWrite = table.newWriteBuilder(writeOptions)
.withInputDataSchema(query.schema)
.withQueryId(UUID.randomUUID().toString)
.buildForBatch()

doWrite(batchWrite)

Expand All @@ -116,13 +109,7 @@ case class AppendDataExec(
query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper {

override protected def doExecute(): RDD[InternalRow] = {
val batchWrite = newWriteBuilder() match {
case builder: SupportsSaveMode =>
builder.mode(SaveMode.Append).buildForBatch()

case builder =>
builder.buildForBatch()
}
val batchWrite = newWriteBuilder().buildForBatch()
doWrite(batchWrite)
}
}
Expand Down Expand Up @@ -152,9 +139,6 @@ case class OverwriteByExpressionExec(
case builder: SupportsTruncate if isTruncate(deleteWhere) =>
builder.truncate().buildForBatch()

case builder: SupportsSaveMode if isTruncate(deleteWhere) =>
builder.mode(SaveMode.Overwrite).buildForBatch()

case builder: SupportsOverwrite =>
builder.overwrite(deleteWhere).buildForBatch()

Expand Down Expand Up @@ -185,9 +169,6 @@ case class OverwritePartitionsDynamicExec(
case builder: SupportsDynamicOverwrite =>
builder.overwriteDynamicPartitions().buildForBatch()

case builder: SupportsSaveMode =>
builder.mode(SaveMode.Overwrite).buildForBatch()

case _ =>
throw new SparkException(s"Table does not support dynamic partition overwrite: $table")
}
Expand Down Expand Up @@ -350,8 +331,8 @@ object DataWritingSparkTask extends Logging {
}

private[v2] case class DataWritingSparkTaskResult(
numRows: Long,
writerCommitMessage: WriterCommitMessage)
numRows: Long,
writerCommitMessage: WriterCommitMessage)

/**
* Sink progress information collected after commit.
Expand Down
Loading