Skip to content

Commit b45593d

Browse files
committed
SPARK-28612: Add DataFrameWriterV2 API.
1 parent db9e0fd commit b45593d

File tree

12 files changed

+1105
-33
lines changed

12 files changed

+1105
-33
lines changed
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.expressions
19+
20+
import org.apache.spark.sql.types.{DataType, IntegerType}
21+
22+
/**
23+
* Base class for expressions that are converted to v2 partition transforms.
24+
*
25+
* Subclasses represent abstract transform functions with concrete implementations that are
26+
* determined by data source implementations. Because the concrete implementation is not known,
27+
* these expressions are [[Unevaluable]].
28+
*
29+
* These expressions are used to pass transformations from the DataFrame API:
30+
*
31+
* {{{
32+
* df.writeTo("catalog.db.table").partitionedBy($"category", days($"timestamp")).create()
33+
* }}}
34+
*/
35+
abstract class PartitionTransformExpression extends Expression with Unevaluable {
36+
override def nullable: Boolean = true
37+
}
38+
39+
/**
40+
* Expression for the v2 partition transform years.
41+
*/
42+
case class Years(child: Expression) extends PartitionTransformExpression {
43+
override def dataType: DataType = IntegerType
44+
override def children: Seq[Expression] = Seq(child)
45+
}
46+
47+
/**
48+
* Expression for the v2 partition transform months.
49+
*/
50+
case class Months(child: Expression) extends PartitionTransformExpression {
51+
override def dataType: DataType = IntegerType
52+
override def children: Seq[Expression] = Seq(child)
53+
}
54+
55+
/**
56+
* Expression for the v2 partition transform days.
57+
*/
58+
case class Days(child: Expression) extends PartitionTransformExpression {
59+
override def dataType: DataType = IntegerType
60+
override def children: Seq[Expression] = Seq(child)
61+
}
62+
63+
/**
64+
* Expression for the v2 partition transform hours.
65+
*/
66+
case class Hours(child: Expression) extends PartitionTransformExpression {
67+
override def dataType: DataType = IntegerType
68+
override def children: Seq[Expression] = Seq(child)
69+
}
70+
71+
/**
72+
* Expression for the v2 partition transform bucket.
73+
*/
74+
case class Bucket(numBuckets: Literal, child: Expression) extends PartitionTransformExpression {
75+
override def dataType: DataType = IntegerType
76+
override def children: Seq[Expression] = Seq(numBuckets, child)
77+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2564,7 +2564,7 @@ class Analyzer(
25642564
*/
25652565
object ResolveOutputRelation extends Rule[LogicalPlan] {
25662566
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
2567-
case append @ AppendData(table, query, isByName)
2567+
case append @ AppendData(table, query, _, isByName)
25682568
if table.resolved && query.resolved && !append.outputResolved =>
25692569
val projection =
25702570
TableOutputResolver.resolveOutputColumns(
@@ -2576,7 +2576,7 @@ class Analyzer(
25762576
append
25772577
}
25782578

2579-
case overwrite @ OverwriteByExpression(table, _, query, isByName)
2579+
case overwrite @ OverwriteByExpression(table, _, query, _, isByName)
25802580
if table.resolved && query.resolved && !overwrite.outputResolved =>
25812581
val projection =
25822582
TableOutputResolver.resolveOutputColumns(
@@ -2588,7 +2588,7 @@ class Analyzer(
25882588
overwrite
25892589
}
25902590

2591-
case overwrite @ OverwritePartitionsDynamic(table, query, isByName)
2591+
case overwrite @ OverwritePartitionsDynamic(table, query, _, isByName)
25922592
if table.resolved && query.resolved && !overwrite.outputResolved =>
25932593
val projection =
25942594
TableOutputResolver.resolveOutputColumns(

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,7 @@ case class ReplaceTableAsSelect(
488488
override def tableSchema: StructType = query.schema
489489
override def children: Seq[LogicalPlan] = Seq(query)
490490

491-
override lazy val resolved: Boolean = {
491+
override lazy val resolved: Boolean = childrenResolved && {
492492
// the table schema is created from the query schema, so the only resolution needed is to check
493493
// that the columns referenced by the table's partitioning exist in the query schema
494494
val references = partitioning.flatMap(_.references).toSet
@@ -506,15 +506,22 @@ case class ReplaceTableAsSelect(
506506
case class AppendData(
507507
table: NamedRelation,
508508
query: LogicalPlan,
509+
writeOptions: Map[String, String],
509510
isByName: Boolean) extends V2WriteCommand
510511

511512
object AppendData {
512-
def byName(table: NamedRelation, df: LogicalPlan): AppendData = {
513-
new AppendData(table, df, isByName = true)
513+
def byName(
514+
table: NamedRelation,
515+
df: LogicalPlan,
516+
writeOptions: Map[String, String] = Map.empty): AppendData = {
517+
new AppendData(table, df, writeOptions, isByName = true)
514518
}
515519

516-
def byPosition(table: NamedRelation, query: LogicalPlan): AppendData = {
517-
new AppendData(table, query, isByName = false)
520+
def byPosition(
521+
table: NamedRelation,
522+
query: LogicalPlan,
523+
writeOptions: Map[String, String] = Map.empty): AppendData = {
524+
new AppendData(table, query, writeOptions, isByName = false)
518525
}
519526
}
520527

@@ -525,19 +532,26 @@ case class OverwriteByExpression(
525532
table: NamedRelation,
526533
deleteExpr: Expression,
527534
query: LogicalPlan,
535+
writeOptions: Map[String, String],
528536
isByName: Boolean) extends V2WriteCommand {
529537
override lazy val resolved: Boolean = outputResolved && deleteExpr.resolved
530538
}
531539

532540
object OverwriteByExpression {
533541
def byName(
534-
table: NamedRelation, df: LogicalPlan, deleteExpr: Expression): OverwriteByExpression = {
535-
OverwriteByExpression(table, deleteExpr, df, isByName = true)
542+
table: NamedRelation,
543+
df: LogicalPlan,
544+
deleteExpr: Expression,
545+
writeOptions: Map[String, String] = Map.empty): OverwriteByExpression = {
546+
OverwriteByExpression(table, deleteExpr, df, writeOptions, isByName = true)
536547
}
537548

538549
def byPosition(
539-
table: NamedRelation, query: LogicalPlan, deleteExpr: Expression): OverwriteByExpression = {
540-
OverwriteByExpression(table, deleteExpr, query, isByName = false)
550+
table: NamedRelation,
551+
query: LogicalPlan,
552+
deleteExpr: Expression,
553+
writeOptions: Map[String, String] = Map.empty): OverwriteByExpression = {
554+
OverwriteByExpression(table, deleteExpr, query, writeOptions, isByName = false)
541555
}
542556
}
543557

@@ -547,15 +561,22 @@ object OverwriteByExpression {
547561
case class OverwritePartitionsDynamic(
548562
table: NamedRelation,
549563
query: LogicalPlan,
564+
writeOptions: Map[String, String],
550565
isByName: Boolean) extends V2WriteCommand
551566

552567
object OverwritePartitionsDynamic {
553-
def byName(table: NamedRelation, df: LogicalPlan): OverwritePartitionsDynamic = {
554-
OverwritePartitionsDynamic(table, df, isByName = true)
568+
def byName(
569+
table: NamedRelation,
570+
df: LogicalPlan,
571+
writeOptions: Map[String, String] = Map.empty): OverwritePartitionsDynamic = {
572+
OverwritePartitionsDynamic(table, df, writeOptions, isByName = true)
555573
}
556574

557-
def byPosition(table: NamedRelation, query: LogicalPlan): OverwritePartitionsDynamic = {
558-
OverwritePartitionsDynamic(table, query, isByName = false)
575+
def byPosition(
576+
table: NamedRelation,
577+
query: LogicalPlan,
578+
writeOptions: Map[String, String] = Map.empty): OverwritePartitionsDynamic = {
579+
OverwritePartitionsDynamic(table, query, writeOptions, isByName = false)
559580
}
560581
}
561582

sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@
1717

1818
package org.apache.spark.sql.execution.datasources.v2
1919

20+
import scala.collection.JavaConverters._
21+
2022
import org.apache.spark.sql.AnalysisException
2123
import org.apache.spark.sql.connector.catalog.{SupportsDelete, SupportsRead, SupportsWrite, Table, TableCapability}
24+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
2225

2326
object DataSourceV2Implicits {
2427
implicit class TableHelper(table: Table) {
@@ -53,4 +56,10 @@ object DataSourceV2Implicits {
5356

5457
def supportsAny(capabilities: TableCapability*): Boolean = capabilities.exists(supports)
5558
}
59+
60+
implicit class OptionsHelper(options: Map[String, String]) {
61+
def asOptions: CaseInsensitiveStringMap = {
62+
new CaseInsensitiveStringMap(options.asJava)
63+
}
64+
}
5665
}

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,11 @@ class InMemoryTable(
4141
override val properties: util.Map[String, String])
4242
extends Table with SupportsRead with SupportsWrite with SupportsDelete {
4343

44+
private val allowUnsupportedTransforms =
45+
properties.getOrDefault("allow-unsupported-transforms", "false").toBoolean
46+
4447
partitioning.foreach { t =>
45-
if (!t.isInstanceOf[IdentityTransform]) {
48+
if (!t.isInstanceOf[IdentityTransform] && !allowUnsupportedTransforms) {
4649
throw new IllegalArgumentException(s"Transform $t must be IdentityTransform")
4750
}
4851
}

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -271,13 +271,14 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
271271
modeForDSV2 match {
272272
case SaveMode.Append =>
273273
runCommand(df.sparkSession, "save") {
274-
AppendData.byName(relation, df.logicalPlan)
274+
AppendData.byName(relation, df.logicalPlan, extraOptions.toMap)
275275
}
276276

277277
case SaveMode.Overwrite if table.supportsAny(TRUNCATE, OVERWRITE_BY_FILTER) =>
278278
// truncate the table
279279
runCommand(df.sparkSession, "save") {
280-
OverwriteByExpression.byName(relation, df.logicalPlan, Literal(true))
280+
OverwriteByExpression.byName(
281+
relation, df.logicalPlan, Literal(true), extraOptions.toMap)
281282
}
282283

283284
case other =>
@@ -382,17 +383,17 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
382383

383384
val command = modeForDSV2 match {
384385
case SaveMode.Append =>
385-
AppendData.byPosition(table, df.logicalPlan)
386+
AppendData.byPosition(table, df.logicalPlan, extraOptions.toMap)
386387

387388
case SaveMode.Overwrite =>
388389
val conf = df.sparkSession.sessionState.conf
389390
val dynamicPartitionOverwrite = table.table.partitioning.size > 0 &&
390391
conf.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC
391392

392393
if (dynamicPartitionOverwrite) {
393-
OverwritePartitionsDynamic.byPosition(table, df.logicalPlan)
394+
OverwritePartitionsDynamic.byPosition(table, df.logicalPlan, extraOptions.toMap)
394395
} else {
395-
OverwriteByExpression.byPosition(table, df.logicalPlan, Literal(true))
396+
OverwriteByExpression.byPosition(table, df.logicalPlan, Literal(true), extraOptions.toMap)
396397
}
397398

398399
case other =>

0 commit comments

Comments
 (0)