Skip to content

Commit 3424ec7

Browse files
cloud-fansunchao
authored andcommitted
[SPARK-34720][SQL] MERGE ... UPDATE/INSERT * should do by-name resolution
In Spark, we have an extension in the MERGE syntax: INSERT/UPDATE *. This is not from ANSI standard or any other mainstream databases, so we need to define the behaviors by our own. The behavior today is very weird: assume the source table has `n1` columns, target table has `n2` columns. We generate the assignments by taking the first `min(n1, n2)` columns from source & target tables and pairing them by ordinal. This PR proposes a more reasonable behavior: take all the columns from target table as keys, and find the corresponding columns from source table by name as values. Fix the MEREG INSERT/UPDATE * to be more user-friendly and easy to do schema evolution. Yes, but MERGE is only supported by very few data sources. new tests Closes apache#32192 from cloud-fan/merge. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent abf48b1 commit 3424ec7

File tree

2 files changed

+86
-36
lines changed

2 files changed

+86
-36
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 40 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1646,14 +1646,18 @@ class Analyzer(override val catalogManager: CatalogManager)
16461646
case UpdateAction(updateCondition, assignments) =>
16471647
val resolvedUpdateCondition = updateCondition.map(
16481648
resolveExpressionByPlanChildren(_, m))
1649-
// The update value can access columns from both target and source tables.
16501649
UpdateAction(
16511650
resolvedUpdateCondition,
1652-
resolveAssignments(Some(assignments), m, resolveValuesWithSourceOnly = false))
1651+
// The update value can access columns from both target and source tables.
1652+
resolveAssignments(assignments, m, resolveValuesWithSourceOnly = false))
16531653
case UpdateStarAction(updateCondition) =>
1654+
val assignments = targetTable.output.map { attr =>
1655+
Assignment(attr, UnresolvedAttribute(Seq(attr.name)))
1656+
}
16541657
UpdateAction(
16551658
updateCondition.map(resolveExpressionByPlanChildren(_, m)),
1656-
resolveAssignments(assignments = None, m, resolveValuesWithSourceOnly = false))
1659+
// For UPDATE *, the value must from source table.
1660+
resolveAssignments(assignments, m, resolveValuesWithSourceOnly = true))
16571661
case o => o
16581662
}
16591663
val newNotMatchedActions = m.notMatchedActions.map {
@@ -1664,15 +1668,18 @@ class Analyzer(override val catalogManager: CatalogManager)
16641668
resolveExpressionByPlanChildren(_, Project(Nil, m.sourceTable)))
16651669
InsertAction(
16661670
resolvedInsertCondition,
1667-
resolveAssignments(Some(assignments), m, resolveValuesWithSourceOnly = true))
1671+
resolveAssignments(assignments, m, resolveValuesWithSourceOnly = true))
16681672
case InsertStarAction(insertCondition) =>
16691673
// The insert action is used when not matched, so its condition and value can only
16701674
// access columns from the source table.
16711675
val resolvedInsertCondition = insertCondition.map(
16721676
resolveExpressionByPlanChildren(_, Project(Nil, m.sourceTable)))
1677+
val assignments = targetTable.output.map { attr =>
1678+
Assignment(attr, UnresolvedAttribute(Seq(attr.name)))
1679+
}
16731680
InsertAction(
16741681
resolvedInsertCondition,
1675-
resolveAssignments(assignments = None, m, resolveValuesWithSourceOnly = true))
1682+
resolveAssignments(assignments, m, resolveValuesWithSourceOnly = true))
16761683
case o => o
16771684
}
16781685
val resolvedMergeCondition = resolveExpressionByPlanChildren(m.mergeCondition, m)
@@ -1690,33 +1697,38 @@ class Analyzer(override val catalogManager: CatalogManager)
16901697
}
16911698

16921699
def resolveAssignments(
1693-
assignments: Option[Seq[Assignment]],
1700+
assignments: Seq[Assignment],
16941701
mergeInto: MergeIntoTable,
16951702
resolveValuesWithSourceOnly: Boolean): Seq[Assignment] = {
1696-
if (assignments.isEmpty) {
1697-
val expandedColumns = mergeInto.targetTable.output
1698-
val expandedValues = mergeInto.sourceTable.output
1699-
expandedColumns.zip(expandedValues).map(kv => Assignment(kv._1, kv._2))
1700-
} else {
1701-
assignments.get.map { assign =>
1702-
val resolvedKey = assign.key match {
1703-
case c if !c.resolved =>
1704-
resolveExpressionByPlanChildren(c, Project(Nil, mergeInto.targetTable))
1705-
case o => o
1706-
}
1707-
val resolvedValue = assign.value match {
1708-
// The update values may contain target and/or source references.
1709-
case c if !c.resolved =>
1710-
if (resolveValuesWithSourceOnly) {
1711-
resolveExpressionByPlanChildren(c, Project(Nil, mergeInto.sourceTable))
1712-
} else {
1713-
resolveExpressionByPlanChildren(c, mergeInto)
1714-
}
1715-
case o => o
1716-
}
1717-
Assignment(resolvedKey, resolvedValue)
1703+
assignments.map { assign =>
1704+
val resolvedKey = assign.key match {
1705+
case c if !c.resolved =>
1706+
resolveMergeExprOrFail(c, Project(Nil, mergeInto.targetTable))
1707+
case o => o
17181708
}
1709+
val resolvedValue = assign.value match {
1710+
// The update values may contain target and/or source references.
1711+
case c if !c.resolved =>
1712+
if (resolveValuesWithSourceOnly) {
1713+
resolveMergeExprOrFail(c, Project(Nil, mergeInto.sourceTable))
1714+
} else {
1715+
resolveMergeExprOrFail(c, mergeInto)
1716+
}
1717+
case o => o
1718+
}
1719+
Assignment(resolvedKey, resolvedValue)
1720+
}
1721+
}
1722+
1723+
private def resolveMergeExprOrFail(e: Expression, p: LogicalPlan): Expression = {
1724+
val resolved = resolveExpressionByPlanChildren(e, p)
1725+
resolved.references.filter(!_.resolved).foreach { a =>
1726+
// Note: This will throw error only on unresolved attribute issues,
1727+
// not other resolution errors like mismatched data types.
1728+
val cols = p.inputSet.toSeq.map(_.sql).mkString(", ")
1729+
a.failAnalysis(s"cannot resolve ${a.sql} in MERGE command given columns [$cols]")
17191730
}
1731+
resolved
17201732
}
17211733

17221734
def newAliases(expressions: Seq[NamedExpression]): Seq[NamedExpression] = {

sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,20 @@ class PlanResolutionSuite extends AnalysisTest {
5858
t
5959
}
6060

61+
private val table1: Table = {
62+
val t = mock(classOf[Table])
63+
when(t.schema()).thenReturn(new StructType().add("s", "string").add("i", "int"))
64+
when(t.partitioning()).thenReturn(Array.empty[Transform])
65+
t
66+
}
67+
68+
private val table2: Table = {
69+
val t = mock(classOf[Table])
70+
when(t.schema()).thenReturn(new StructType().add("i", "int").add("x", "string"))
71+
when(t.partitioning()).thenReturn(Array.empty[Transform])
72+
t
73+
}
74+
6175
private val tableWithAcceptAnySchemaCapability: Table = {
6276
val t = mock(classOf[Table])
6377
when(t.schema()).thenReturn(new StructType().add("i", "int"))
@@ -86,7 +100,8 @@ class PlanResolutionSuite extends AnalysisTest {
86100
when(newCatalog.loadTable(any())).thenAnswer((invocation: InvocationOnMock) => {
87101
invocation.getArgument[Identifier](0).name match {
88102
case "tab" => table
89-
case "tab1" => table
103+
case "tab1" => table1
104+
case "tab2" => table2
90105
case name => throw new NoSuchTableException(name)
91106
}
92107
})
@@ -102,7 +117,7 @@ class PlanResolutionSuite extends AnalysisTest {
102117
case "v1Table1" => v1Table
103118
case "v1HiveTable" => v1HiveTable
104119
case "v2Table" => table
105-
case "v2Table1" => table
120+
case "v2Table1" => table1
106121
case "v2TableWithAcceptAnySchemaCapability" => tableWithAcceptAnySchemaCapability
107122
case name => throw new NoSuchTableException(name)
108123
}
@@ -1369,7 +1384,7 @@ class PlanResolutionSuite extends AnalysisTest {
13691384
// cte
13701385
val sql5 =
13711386
s"""
1372-
|WITH source(i, s) AS
1387+
|WITH source(s, i) AS
13731388
| (SELECT * FROM $source)
13741389
|MERGE INTO $target AS target
13751390
|USING source
@@ -1389,7 +1404,7 @@ class PlanResolutionSuite extends AnalysisTest {
13891404
updateAssigns)),
13901405
Seq(InsertAction(Some(EqualTo(il: AttributeReference, StringLiteral("insert"))),
13911406
insertAssigns))) =>
1392-
assert(source.output.map(_.name) == Seq("i", "s"))
1407+
assert(source.output.map(_.name) == Seq("s", "i"))
13931408
checkResolution(target, source, mergeCondition, Some(dl), Some(ul), Some(il),
13941409
updateAssigns, insertAssigns)
13951410

@@ -1398,8 +1413,7 @@ class PlanResolutionSuite extends AnalysisTest {
13981413
}
13991414

14001415
// no aliases
1401-
Seq(("v2Table", "v2Table1"),
1402-
("testcat.tab", "testcat.tab1")).foreach { pair =>
1416+
Seq(("v2Table", "v2Table1"), ("testcat.tab", "testcat.tab1")).foreach { pair =>
14031417

14041418
val target = pair._1
14051419
val source = pair._2
@@ -1491,7 +1505,7 @@ class PlanResolutionSuite extends AnalysisTest {
14911505
assert(e5.message.contains("Reference 's' is ambiguous"))
14921506
}
14931507

1494-
val sql6 =
1508+
val sql1 =
14951509
s"""
14961510
|MERGE INTO non_exist_target
14971511
|USING non_exist_source
@@ -1500,13 +1514,37 @@ class PlanResolutionSuite extends AnalysisTest {
15001514
|WHEN MATCHED THEN UPDATE SET *
15011515
|WHEN NOT MATCHED THEN INSERT *
15021516
""".stripMargin
1503-
val parsed = parseAndResolve(sql6)
1517+
val parsed = parseAndResolve(sql1)
15041518
parsed match {
15051519
case u: MergeIntoTable =>
15061520
assert(u.targetTable.isInstanceOf[UnresolvedRelation])
15071521
assert(u.sourceTable.isInstanceOf[UnresolvedRelation])
15081522
case _ => fail("Expect MergeIntoTable, but got:\n" + parsed.treeString)
15091523
}
1524+
1525+
// UPDATE * with incompatible schema between source and target tables.
1526+
val sql2 =
1527+
"""
1528+
|MERGE INTO testcat.tab
1529+
|USING testcat.tab2
1530+
|ON 1 = 1
1531+
|WHEN MATCHED THEN UPDATE SET *
1532+
|""".stripMargin
1533+
val e2 = intercept[AnalysisException](parseAndResolve(sql2))
1534+
assert(e2.message.contains(
1535+
"cannot resolve `s` in MERGE command given columns [testcat.tab2.`i`, testcat.tab2.`x`]"))
1536+
1537+
// INSERT * with incompatible schema between source and target tables.
1538+
val sql3 =
1539+
"""
1540+
|MERGE INTO testcat.tab
1541+
|USING testcat.tab2
1542+
|ON 1 = 1
1543+
|WHEN NOT MATCHED THEN INSERT *
1544+
|""".stripMargin
1545+
val e3 = intercept[AnalysisException](parseAndResolve(sql3))
1546+
assert(e3.message.contains(
1547+
"cannot resolve `s` in MERGE command given columns [testcat.tab2.`i`, testcat.tab2.`x`]"))
15101548
}
15111549

15121550
test("MERGE INTO TABLE - skip resolution on v2 tables that accept any schema") {

0 commit comments

Comments
 (0)