-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-29197][SQL] Remove saveModeForDSV2 from DataFrameWriter #25876
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
502cd1b
7135164
8710266
792bd3b
331ec74
3e054c7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -67,7 +67,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
| * @since 1.4.0 | ||
| */ | ||
| def mode(saveMode: SaveMode): DataFrameWriter[T] = { | ||
| this.mode = Some(saveMode) | ||
| this.mode = saveMode | ||
| this | ||
| } | ||
|
|
||
|
|
@@ -267,7 +267,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
| "if partition columns are specified.") | ||
| } | ||
| lazy val relation = DataSourceV2Relation.create(table, dsOptions) | ||
| modeForDSV2 match { | ||
| mode match { | ||
| case SaveMode.Append => | ||
| runCommand(df.sparkSession, "save") { | ||
| AppendData.byName(relation, df.logicalPlan, extraOptions.toMap) | ||
|
|
@@ -308,7 +308,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
| sparkSession = df.sparkSession, | ||
| className = source, | ||
| partitionColumns = partitioningColumns.getOrElse(Nil), | ||
| options = extraOptions.toMap).planForWriting(modeForDSV1, df.logicalPlan) | ||
| options = extraOptions.toMap).planForWriting(mode, df.logicalPlan) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -319,6 +319,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
| * @note Unlike `saveAsTable`, `insertInto` ignores the column names and just uses position-based | ||
| * resolution. For example: | ||
| * | ||
| * @note SaveMode.ErrorIfExists and SaveMode.Ignore behave as SaveMode.Append in `insertInto` as | ||
| * `insertInto` is not a table creating operation. | ||
| * | ||
| * {{{ | ||
| * scala> Seq((1, 2)).toDF("i", "j").write.mode("overwrite").saveAsTable("t1") | ||
| * scala> Seq((3, 4)).toDF("j", "i").write.insertInto("t1") | ||
|
|
@@ -380,8 +383,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
| DataSourceV2Relation.create(t) | ||
| } | ||
|
|
||
| val command = modeForDSV2 match { | ||
| case SaveMode.Append => | ||
| val command = mode match { | ||
| case SaveMode.Append | SaveMode.ErrorIfExists | SaveMode.Ignore => | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A note to future readers: this is the old behavior, that non-overwrite mode means append. This is due to the bad design of Note that, we don't have this problem in the new
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like the previous version used this: So I agree that this is using the same behavior that v1 did. |
||
| AppendData.byPosition(table, df.logicalPlan, extraOptions.toMap) | ||
|
|
||
| case SaveMode.Overwrite => | ||
|
|
@@ -394,10 +397,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
| } else { | ||
| OverwriteByExpression.byPosition(table, df.logicalPlan, Literal(true), extraOptions.toMap) | ||
| } | ||
|
|
||
| case other => | ||
| throw new AnalysisException(s"insertInto does not support $other mode, " + | ||
| s"please use Append or Overwrite mode instead.") | ||
| } | ||
|
|
||
| runCommand(df.sparkSession, "insertInto") { | ||
|
|
@@ -411,7 +410,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
| table = UnresolvedRelation(tableIdent), | ||
| partitionSpec = Map.empty[String, Option[String]], | ||
| query = df.logicalPlan, | ||
| overwrite = modeForDSV1 == SaveMode.Overwrite, | ||
| overwrite = mode == SaveMode.Overwrite, | ||
| ifPartitionNotExists = false) | ||
| } | ||
| } | ||
|
|
@@ -490,12 +489,10 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
|
|
||
| session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match { | ||
| case CatalogObjectIdentifier(Some(catalog), ident) => | ||
| saveAsTable(catalog.asTableCatalog, ident, modeForDSV2) | ||
| saveAsTable(catalog.asTableCatalog, ident) | ||
|
|
||
| case CatalogObjectIdentifier(None, ident) if canUseV2 && ident.namespace().length <= 1 => | ||
| // We pass in the modeForDSV1, as using the V2 session catalog should maintain compatibility | ||
| // for now. | ||
| saveAsTable(sessionCatalog.asTableCatalog, ident, modeForDSV1) | ||
| saveAsTable(sessionCatalog.asTableCatalog, ident) | ||
|
|
||
| case AsTableIdentifier(tableIdentifier) => | ||
| saveAsTable(tableIdentifier) | ||
|
|
@@ -507,7 +504,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
| } | ||
|
|
||
|
|
||
| private def saveAsTable(catalog: TableCatalog, ident: Identifier, mode: SaveMode): Unit = { | ||
| private def saveAsTable(catalog: TableCatalog, ident: Identifier): Unit = { | ||
| val partitioning = partitioningColumns.map { colNames => | ||
| colNames.map(name => IdentityTransform(FieldReference(name))) | ||
| }.getOrElse(Seq.empty[Transform]) | ||
|
|
@@ -568,7 +565,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
| val tableIdentWithDB = tableIdent.copy(database = Some(db)) | ||
| val tableName = tableIdentWithDB.unquotedString | ||
|
|
||
| (tableExists, modeForDSV1) match { | ||
| (tableExists, mode) match { | ||
| case (true, SaveMode.Ignore) => | ||
| // Do nothing | ||
|
|
||
|
|
@@ -624,7 +621,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
| bucketSpec = getBucketSpec) | ||
|
|
||
| runCommand(df.sparkSession, "saveAsTable")( | ||
| CreateTable(tableDesc, modeForDSV1, Some(df.logicalPlan))) | ||
| CreateTable(tableDesc, mode, Some(df.logicalPlan))) | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -830,10 +827,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
| SQLExecution.withNewExecutionId(session, qe, Some(name))(qe.toRdd) | ||
| } | ||
|
|
||
| private def modeForDSV1 = mode.getOrElse(SaveMode.ErrorIfExists) | ||
|
|
||
| private def modeForDSV2 = mode.getOrElse(SaveMode.Append) | ||
|
|
||
| private def lookupV2Provider(): Option[TableProvider] = { | ||
| DataSource.lookupDataSourceV2(source, df.sparkSession.sessionState.conf) match { | ||
| // TODO(SPARK-28396): File source v2 write path is currently broken. | ||
|
|
@@ -848,7 +841,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
|
|
||
| private var source: String = df.sparkSession.sessionState.conf.defaultDataSourceName | ||
|
|
||
| private var mode: Option[SaveMode] = None | ||
| private var mode: SaveMode = SaveMode.ErrorIfExists | ||
|
|
||
| private val extraOptions = new scala.collection.mutable.HashMap[String, String] | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need to change this file since we disable kafka v2 by default?