-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-26356][SQL] remove SaveMode from data source v2 #24233
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
3f38569
4bd5aad
83d199d
66263c1
30f95b7
bf48a24
6753c06
34246d6
22ba355
15d2071
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,12 +30,11 @@ import org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoTable, | |
import org.apache.spark.sql.execution.SQLExecution | ||
import org.apache.spark.sql.execution.command.DDLUtils | ||
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, DataSourceUtils, LogicalRelation} | ||
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Utils, FileDataSourceV2, WriteToDataSourceV2} | ||
import org.apache.spark.sql.execution.datasources.v2._ | ||
import org.apache.spark.sql.internal.SQLConf | ||
import org.apache.spark.sql.sources.BaseRelation | ||
import org.apache.spark.sql.sources.v2._ | ||
import org.apache.spark.sql.sources.v2.TableCapability._ | ||
import org.apache.spark.sql.sources.v2.writer.SupportsSaveMode | ||
import org.apache.spark.sql.types.StructType | ||
import org.apache.spark.sql.util.CaseInsensitiveStringMap | ||
|
||
|
@@ -56,13 +55,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
* <li>`SaveMode.Overwrite`: overwrite the existing data.</li> | ||
* <li>`SaveMode.Append`: append the data.</li> | ||
* <li>`SaveMode.Ignore`: ignore the operation (i.e. no-op).</li> | ||
* <li>`SaveMode.ErrorIfExists`: default option, throw an exception at runtime.</li> | ||
* <li>`SaveMode.ErrorIfExists`: throw an exception at runtime.</li> | ||
* </ul> | ||
* <p> | ||
* When writing to data source v1, the default option is `ErrorIfExists`. When writing to data | ||
* source v2, the default option is `Append`. | ||
* | ||
* @since 1.4.0 | ||
*/ | ||
def mode(saveMode: SaveMode): DataFrameWriter[T] = { | ||
this.mode = saveMode | ||
this.mode = Some(saveMode) | ||
this | ||
} | ||
|
||
|
@@ -78,15 +80,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
* @since 1.4.0 | ||
*/ | ||
def mode(saveMode: String): DataFrameWriter[T] = { | ||
this.mode = saveMode.toLowerCase(Locale.ROOT) match { | ||
case "overwrite" => SaveMode.Overwrite | ||
case "append" => SaveMode.Append | ||
case "ignore" => SaveMode.Ignore | ||
case "error" | "errorifexists" | "default" => SaveMode.ErrorIfExists | ||
saveMode.toLowerCase(Locale.ROOT) match { | ||
case "overwrite" => mode(SaveMode.Overwrite) | ||
case "append" => mode(SaveMode.Append) | ||
case "ignore" => mode(SaveMode.Ignore) | ||
case "error" | "errorifexists" => mode(SaveMode.ErrorIfExists) | ||
case "default" => this | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 |
||
case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " + | ||
"Accepted save modes are 'overwrite', 'append', 'ignore', 'error', 'errorifexists'.") | ||
} | ||
this | ||
} | ||
|
||
/** | ||
|
@@ -268,9 +270,24 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
|
||
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ | ||
provider.getTable(dsOptions) match { | ||
// TODO (SPARK-27815): To not break existing tests, here we treat file source as a special | ||
// case, and pass the save mode to file source directly. This hack should be removed. | ||
case table: FileTable => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this hack necessary? Why not put off v2 support for path-based tables? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because we want to migrate file source to v2, to validate the API of This hack is just to work around the issue that we do not have a proper entry API for path-based data source, which I believe we will have later on. I think this is not a bad idea. We can unblock the file source migration, and we can keep the DS v2 clean ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. File sources do not use v2 by default, so this is not a necessary change for this commit. I think it should be in a separate PR. We can discuss whether it is a good idea to add this hack in the next DSv2 sync. Please remove it from this PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm adding this to my agenda for the sync-up. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe that we agreed in the sync-up to remove this hack. Is there a reason why it is still included? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I explained in the comment: #24233 (comment) Removing the hack breaks several tests and I'd like to do it in another PR. Since |
||
val write = table.newWriteBuilder(dsOptions).asInstanceOf[FileWriteBuilder] | ||
.mode(modeForDSV1) // should not change default mode for file source. | ||
.withQueryId(UUID.randomUUID().toString) | ||
.withInputDataSchema(df.logicalPlan.schema) | ||
.buildForBatch() | ||
// The returned `Write` can be null, which indicates that we can skip writing. | ||
if (write != null) { | ||
runCommand(df.sparkSession, "save") { | ||
WriteToDataSourceV2(write, df.logicalPlan) | ||
} | ||
} | ||
|
||
case table: SupportsWrite if table.supports(BATCH_WRITE) => | ||
lazy val relation = DataSourceV2Relation.create(table, dsOptions) | ||
mode match { | ||
modeForDSV2 match { | ||
case SaveMode.Append => | ||
runCommand(df.sparkSession, "save") { | ||
AppendData.byName(relation, df.logicalPlan) | ||
|
@@ -282,25 +299,10 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
OverwriteByExpression.byName(relation, df.logicalPlan, Literal(true)) | ||
} | ||
|
||
case _ => | ||
table.newWriteBuilder(dsOptions) match { | ||
case writeBuilder: SupportsSaveMode => | ||
val write = writeBuilder.mode(mode) | ||
.withQueryId(UUID.randomUUID().toString) | ||
.withInputDataSchema(df.logicalPlan.schema) | ||
.buildForBatch() | ||
// It can only return null with `SupportsSaveMode`. We can clean it up after | ||
// removing `SupportsSaveMode`. | ||
if (write != null) { | ||
runCommand(df.sparkSession, "save") { | ||
WriteToDataSourceV2(write, df.logicalPlan) | ||
} | ||
} | ||
|
||
case _ => | ||
throw new AnalysisException( | ||
s"data source ${table.name} does not support SaveMode $mode") | ||
} | ||
case other => | ||
throw new AnalysisException(s"TableProvider implementation $source cannot be " + | ||
s"written with $other mode, please use Append or Overwrite " + | ||
"modes instead.") | ||
} | ||
|
||
// Streaming also uses the data source V2 API. So it may be that the data source implements | ||
|
@@ -328,7 +330,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
sparkSession = df.sparkSession, | ||
className = source, | ||
partitionColumns = partitioningColumns.getOrElse(Nil), | ||
options = extraOptions.toMap).planForWriting(mode, df.logicalPlan) | ||
options = extraOptions.toMap).planForWriting(modeForDSV1, df.logicalPlan) | ||
} | ||
} | ||
|
||
|
@@ -377,7 +379,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
table = UnresolvedRelation(tableIdent), | ||
partition = Map.empty[String, Option[String]], | ||
query = df.logicalPlan, | ||
overwrite = mode == SaveMode.Overwrite, | ||
overwrite = modeForDSV1 == SaveMode.Overwrite, | ||
ifPartitionNotExists = false) | ||
} | ||
} | ||
|
@@ -457,7 +459,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
val tableIdentWithDB = tableIdent.copy(database = Some(db)) | ||
val tableName = tableIdentWithDB.unquotedString | ||
|
||
(tableExists, mode) match { | ||
(tableExists, modeForDSV1) match { | ||
case (true, SaveMode.Ignore) => | ||
// Do nothing | ||
|
||
|
@@ -512,7 +514,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
partitionColumnNames = partitioningColumns.getOrElse(Nil), | ||
bucketSpec = getBucketSpec) | ||
|
||
runCommand(df.sparkSession, "saveAsTable")(CreateTable(tableDesc, mode, Some(df.logicalPlan))) | ||
runCommand(df.sparkSession, "saveAsTable")( | ||
CreateTable(tableDesc, modeForDSV1, Some(df.logicalPlan))) | ||
} | ||
|
||
/** | ||
|
@@ -718,13 +721,17 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
SQLExecution.withNewExecutionId(session, qe, Some(name))(qe.toRdd) | ||
} | ||
|
||
private def modeForDSV1 = mode.getOrElse(SaveMode.ErrorIfExists) | ||
|
||
private def modeForDSV2 = mode.getOrElse(SaveMode.Append) | ||
|
||
/////////////////////////////////////////////////////////////////////////////////////// | ||
// Builder pattern config options | ||
/////////////////////////////////////////////////////////////////////////////////////// | ||
|
||
private var source: String = df.sparkSession.sessionState.conf.defaultDataSourceName | ||
|
||
private var mode: SaveMode = SaveMode.ErrorIfExists | ||
private var mode: Option[SaveMode] = None | ||
|
||
private val extraOptions = new scala.collection.mutable.HashMap[String, String] | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,7 +21,6 @@ import java.util | |
|
||
import scala.collection.JavaConverters._ | ||
|
||
import org.apache.spark.sql.SaveMode | ||
import org.apache.spark.sql.catalyst.InternalRow | ||
import org.apache.spark.sql.sources.DataSourceRegister | ||
import org.apache.spark.sql.sources.v2._ | ||
|
@@ -47,13 +46,12 @@ private[noop] object NoopTable extends Table with SupportsWrite { | |
Set( | ||
TableCapability.BATCH_WRITE, | ||
TableCapability.STREAMING_WRITE, | ||
TableCapability.TRUNCATE, | ||
TableCapability.ACCEPT_ANY_SCHEMA).asJava | ||
} | ||
} | ||
|
||
private[noop] object NoopWriteBuilder extends WriteBuilder | ||
with SupportsSaveMode with SupportsTruncate { | ||
override def mode(mode: SaveMode): WriteBuilder = this | ||
private[noop] object NoopWriteBuilder extends WriteBuilder with SupportsTruncate { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since now DSV2(except file sources) can only write to an existing table, here the write path of NoopDataSource will still fail (analyzer rule |
||
override def truncate(): WriteBuilder = this | ||
override def buildForBatch(): BatchWrite = NoopBatchWrite | ||
override def buildForStreaming(): StreamingWrite = NoopStreamingWrite | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,7 +26,6 @@ import org.apache.spark.{SparkEnv, SparkException, TaskContext} | |
import org.apache.spark.executor.CommitDeniedException | ||
import org.apache.spark.internal.Logging | ||
import org.apache.spark.rdd.RDD | ||
import org.apache.spark.sql.SaveMode | ||
import org.apache.spark.sql.catalog.v2.{Identifier, TableCatalog} | ||
import org.apache.spark.sql.catalog.v2.expressions.Transform | ||
import org.apache.spark.sql.catalyst.InternalRow | ||
|
@@ -36,7 +35,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | |
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} | ||
import org.apache.spark.sql.sources.{AlwaysTrue, Filter} | ||
import org.apache.spark.sql.sources.v2.SupportsWrite | ||
import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriterFactory, SupportsDynamicOverwrite, SupportsOverwrite, SupportsSaveMode, SupportsTruncate, WriteBuilder, WriterCommitMessage} | ||
import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriterFactory, SupportsDynamicOverwrite, SupportsOverwrite, SupportsTruncate, WriteBuilder, WriterCommitMessage} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: import org.apache.spark.sql.sources.v2.writer._ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't agree with this. Wildcard imports make it difficult to cherry-pick commits and increase conflicts. It is also difficult to see where symbols are coming from and pollutes the namespace with everything in a package instead of just the required names. For example, I recently hit problems adding a |
||
import org.apache.spark.sql.util.CaseInsensitiveStringMap | ||
import org.apache.spark.util.{LongAccumulator, Utils} | ||
|
||
|
@@ -81,16 +80,10 @@ case class CreateTableAsSelectExec( | |
Utils.tryWithSafeFinallyAndFailureCallbacks({ | ||
catalog.createTable(ident, query.schema, partitioning.toArray, properties.asJava) match { | ||
case table: SupportsWrite => | ||
val builder = table.newWriteBuilder(writeOptions) | ||
.withInputDataSchema(query.schema) | ||
.withQueryId(UUID.randomUUID().toString) | ||
val batchWrite = builder match { | ||
case supportsSaveMode: SupportsSaveMode => | ||
supportsSaveMode.mode(SaveMode.Append).buildForBatch() | ||
|
||
case _ => | ||
builder.buildForBatch() | ||
} | ||
val batchWrite = table.newWriteBuilder(writeOptions) | ||
.withInputDataSchema(query.schema) | ||
.withQueryId(UUID.randomUUID().toString) | ||
.buildForBatch() | ||
|
||
doWrite(batchWrite) | ||
|
||
|
@@ -116,13 +109,7 @@ case class AppendDataExec( | |
query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper { | ||
|
||
override protected def doExecute(): RDD[InternalRow] = { | ||
val batchWrite = newWriteBuilder() match { | ||
case builder: SupportsSaveMode => | ||
builder.mode(SaveMode.Append).buildForBatch() | ||
|
||
case builder => | ||
builder.buildForBatch() | ||
} | ||
val batchWrite = newWriteBuilder().buildForBatch() | ||
doWrite(batchWrite) | ||
} | ||
} | ||
|
@@ -152,9 +139,6 @@ case class OverwriteByExpressionExec( | |
case builder: SupportsTruncate if isTruncate(deleteWhere) => | ||
builder.truncate().buildForBatch() | ||
|
||
case builder: SupportsSaveMode if isTruncate(deleteWhere) => | ||
builder.mode(SaveMode.Overwrite).buildForBatch() | ||
|
||
case builder: SupportsOverwrite => | ||
builder.overwrite(deleteWhere).buildForBatch() | ||
|
||
|
@@ -185,9 +169,6 @@ case class OverwritePartitionsDynamicExec( | |
case builder: SupportsDynamicOverwrite => | ||
builder.overwriteDynamicPartitions().buildForBatch() | ||
|
||
case builder: SupportsSaveMode => | ||
builder.mode(SaveMode.Overwrite).buildForBatch() | ||
|
||
case _ => | ||
throw new SparkException(s"Table does not support dynamic partition overwrite: $table") | ||
} | ||
|
@@ -350,8 +331,8 @@ object DataWritingSparkTask extends Logging { | |
} | ||
|
||
private[v2] case class DataWritingSparkTaskResult( | ||
numRows: Long, | ||
writerCommitMessage: WriterCommitMessage) | ||
numRows: Long, | ||
writerCommitMessage: WriterCommitMessage) | ||
|
||
/** | ||
* Sink progress information collected after commit. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think @rxin objected to changing the default mode for DSv2. Has that changed?
I think that this is a good idea because the most sensible default for v2 is to append if no mode is set, instead of failing because the mode is not supported. This doesn't change v1 behavior, so I think it is okay.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think @rxin has a concern about breaking existing ETL pipelines. Now we only change the default mode for DS v2(exclude file sources), I think there is no compatibility issue.