Skip to content

Commit

Permalink
[SPARK-49366][CONNECT] Treat Union node as leaf in dataframe column r…
Browse files Browse the repository at this point in the history
…esolution

### What changes were proposed in this pull request?

Treat Union node as leaf in column resolution

### Why are the changes needed?
bug fix:
```
from pyspark.sql.functions import concat, lit, col
df1 = spark.range(10).withColumn("value", lit(1))
df2 = df1.union(df1)
df1.join(df2, df1.id == df2.id, "left").show()
```
fails with `AMBIGUOUS_COLUMN_REFERENCE`

```
resolveExpressionByPlanChildren: e = '`==`('id, 'id)
resolveExpressionByPlanChildren: q =
'[id=63]Join LeftOuter, '`==`('id, 'id)
:- [id=61]Project [id#550L, 1 AS value#553]
:  +- Range (0, 10, step=1, splits=Some(12))
+- [id=62]Union false, false
   :- [id=61]Project [id#564L, 1 AS value#565]
   :  +- Range (0, 10, step=1, splits=Some(12))
   +- [id=61]Project [id#566L, 1 AS value#567]
      +- Range (0, 10, step=1, splits=Some(12))

'id with id = 61

[id=61]Project [id#564L, 1 AS value#565]
+- Range (0, 10, step=1, splits=Some(12))

[id=61]Project [id#566L, 1 AS value#567]
+- Range (0, 10, step=1, splits=Some(12))

resolved: Vector((Some((id#564L,1)),true), (Some((id#566L,1)),true))
```

When resolving `'id with id = 61`, existing detection fails in the second child.

### Does this PR introduce _any_ user-facing change?
yes, bug fix

### How was this patch tested?
added tests

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#47853 from zhengruifeng/fix_ambgious_union.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
  • Loading branch information
zhengruifeng committed Aug 27, 2024
1 parent f394cd3 commit d64d1f7
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 1 deletion.
14 changes: 14 additions & 0 deletions python/pyspark/sql/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,20 @@ def test_self_join_II(self):
self.assertTrue(df3.columns, ["aa", "b", "a", "b"])
self.assertTrue(df3.count() == 2)

def test_self_join_III(self):
df1 = self.spark.range(10).withColumn("value", lit(1))
df2 = df1.union(df1)
df3 = df1.join(df2, df1.id == df2.id, "left")
self.assertTrue(df3.columns, ["id", "value", "id", "value"])
self.assertTrue(df3.count() == 20)

def test_self_join_IV(self):
df1 = self.spark.range(10).withColumn("value", lit(1))
df2 = df1.withColumn("value", lit(2)).union(df1.withColumn("value", lit(3)))
df3 = df1.join(df2, df1.id == df2.id, "right")
self.assertTrue(df3.columns, ["id", "value", "id", "value"])
self.assertTrue(df3.count() == 20)

def test_duplicated_column_names(self):
df = self.spark.createDataFrame([(1, 2)], ["c", "c"])
row = df.select("*").first()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,12 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
}
(resolved.map(r => (r, currentDepth)), true)
} else {
resolveDataFrameColumnByPlanId(u, id, isMetadataAccess, p.children, currentDepth + 1)
val children = p match {
// treat Union node as the leaf node
case _: Union => Seq.empty[LogicalPlan]
case _ => p.children
}
resolveDataFrameColumnByPlanId(u, id, isMetadataAccess, children, currentDepth + 1)
}

// In self join case like:
Expand Down

0 comments on commit d64d1f7

Please sign in to comment.