Skip to content

Commit 624c53c

Browse files
zhengruifengSteve Vaughan
authored and
Steve Vaughan
committed
[SPARK-47129][CONNECT][SQL][3.4] Make ResolveRelations cache connect plan properly
### What changes were proposed in this pull request? Make `ResolveRelations` handle plan id properly cherry-pick bugfix apache#45214 to 3.4 ### Why are the changes needed? bug fix for Spark Connect, it won't affect classic Spark SQL before this PR: ``` from pyspark.sql import functions as sf spark.range(10).withColumn("value_1", sf.lit(1)).write.saveAsTable("test_table_1") spark.range(10).withColumnRenamed("id", "index").withColumn("value_2", sf.lit(2)).write.saveAsTable("test_table_2") df1 = spark.read.table("test_table_1") df2 = spark.read.table("test_table_2") df3 = spark.read.table("test_table_1") join1 = df1.join(df2, on=df1.id==df2.index).select(df2.index, df2.value_2) join2 = df3.join(join1, how="left", on=join1.index==df3.id) join2.schema ``` fails with ``` AnalysisException: [CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "id". It's probably because of illegal references like `df1.select(df2.col("a"))`. SQLSTATE: 42704 ``` That is due to existing plan caching in `ResolveRelations` doesn't work with Spark Connect ``` === Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations === '[apache#12]Join LeftOuter, '`==`('index, 'id) '[apache#12]Join LeftOuter, '`==`('index, 'id) !:- '[apache#9]UnresolvedRelation [test_table_1], [], false :- '[apache#9]SubqueryAlias spark_catalog.default.test_table_1 !+- '[apache#11]Project ['index, 'value_2] : +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false ! +- '[apache#10]Join Inner, '`==`('id, 'index) +- '[apache#11]Project ['index, 'value_2] ! :- '[apache#7]UnresolvedRelation [test_table_1], [], false +- '[apache#10]Join Inner, '`==`('id, 'index) ! +- '[apache#8]UnresolvedRelation [test_table_2], [], false :- '[apache#9]SubqueryAlias spark_catalog.default.test_table_1 ! : +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false ! +- '[apache#8]SubqueryAlias spark_catalog.default.test_table_2 ! +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_2`, [], false Can not resolve 'id with plan 7 ``` `[apache#7]UnresolvedRelation [test_table_1], [], false` was wrongly resolved to the cached one ``` :- '[apache#9]SubqueryAlias spark_catalog.default.test_table_1 +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false ``` ### Does this PR introduce _any_ user-facing change? yes, bug fix ### How was this patch tested? added ut ### Was this patch authored or co-authored using generative AI tooling? ci Closes apache#46290 from zhengruifeng/connect_fix_read_join_34. Authored-by: Ruifeng Zheng <ruifengz@apache.org> Signed-off-by: Ruifeng Zheng <ruifengz@apache.org> (cherry picked from commit 5f58fa7)
1 parent 625068a commit 624c53c

File tree

2 files changed

+41
-7
lines changed

2 files changed

+41
-7
lines changed

python/pyspark/sql/tests/test_readwriter.py

+21
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,27 @@ def test_insert_into(self):
181181
df.write.mode("overwrite").insertInto("test_table", False)
182182
self.assertEqual(6, self.spark.sql("select * from test_table").count())
183183

184+
def test_cached_table(self):
185+
with self.table("test_cached_table_1"):
186+
self.spark.range(10).withColumn(
187+
"value_1",
188+
lit(1),
189+
).write.saveAsTable("test_cached_table_1")
190+
191+
with self.table("test_cached_table_2"):
192+
self.spark.range(10).withColumnRenamed("id", "index").withColumn(
193+
"value_2", lit(2)
194+
).write.saveAsTable("test_cached_table_2")
195+
196+
df1 = self.spark.read.table("test_cached_table_1")
197+
df2 = self.spark.read.table("test_cached_table_2")
198+
df3 = self.spark.read.table("test_cached_table_1")
199+
200+
join1 = df1.join(df2, on=df1.id == df2.index).select(df2.index, df2.value_2)
201+
join2 = df3.join(join1, how="left", on=join1.index == df3.id)
202+
203+
self.assertEqual(join2.columns, ["id", "value_1", "index", "value_2"])
204+
184205

185206
class ReadwriterV2TestsMixin:
186207
def test_api(self):

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

+20-7
Original file line numberDiff line numberDiff line change
@@ -1261,16 +1261,29 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
12611261
expandIdentifier(u.multipartIdentifier) match {
12621262
case CatalogAndIdentifier(catalog, ident) =>
12631263
val key = ((catalog.name +: ident.namespace :+ ident.name).toSeq, timeTravelSpec)
1264-
AnalysisContext.get.relationCache.get(key).map(_.transform {
1265-
case multi: MultiInstanceRelation =>
1266-
val newRelation = multi.newInstance()
1267-
newRelation.copyTagsFrom(multi)
1268-
newRelation
1269-
}).orElse {
1264+
AnalysisContext.get.relationCache.get(key).map { cache =>
1265+
val cachedRelation = cache.transform {
1266+
case multi: MultiInstanceRelation =>
1267+
val newRelation = multi.newInstance()
1268+
newRelation.copyTagsFrom(multi)
1269+
newRelation
1270+
}
1271+
u.getTagValue(LogicalPlan.PLAN_ID_TAG).map { planId =>
1272+
val cachedConnectRelation = cachedRelation.clone()
1273+
cachedConnectRelation.setTagValue(LogicalPlan.PLAN_ID_TAG, planId)
1274+
cachedConnectRelation
1275+
}.getOrElse(cachedRelation)
1276+
}.orElse {
12701277
val table = CatalogV2Util.loadTable(catalog, ident, timeTravelSpec)
12711278
val loaded = createRelation(catalog, ident, table, u.options, u.isStreaming)
12721279
loaded.foreach(AnalysisContext.get.relationCache.update(key, _))
1273-
loaded
1280+
u.getTagValue(LogicalPlan.PLAN_ID_TAG).map { planId =>
1281+
loaded.map { loadedRelation =>
1282+
val loadedConnectRelation = loadedRelation.clone()
1283+
loadedConnectRelation.setTagValue(LogicalPlan.PLAN_ID_TAG, planId)
1284+
loadedConnectRelation
1285+
}
1286+
}.getOrElse(loaded)
12741287
}
12751288
case _ => None
12761289
}

0 commit comments

Comments
 (0)