Skip to content

[SPARK-48666][SQL] Do not push down filter if it contains PythonUDFs #47313

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from

Conversation

HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Jul 12, 2024

What changes were proposed in this pull request?

This PR proposes to prevent pushing down Python UDFs. This PR uses the same approach as #47033, therefore added the author as a co-author, but simplifies the change.

Extracting filters to push down happens first

Batch("Early Filter and Projection Push-Down", Once, earlyScanPushDownRules: _*) :+

override def defaultBatches: Seq[Batch] = (preOptimizationBatches ++ super.defaultBatches :+

Before extracting Python UDFs

Here is full stacktrace:

[INTERNAL_ERROR] Cannot evaluate expression: pyUDF(cast(input[0, bigint, true] as string)) SQLSTATE: XX000
org.apache.spark.SparkException: [INTERNAL_ERROR] Cannot evaluate expression: pyUDF(cast(input[0, bigint, true] as string)) SQLSTATE: XX000
	at org.apache.spark.SparkException$.internalError(SparkException.scala:92)
	at org.apache.spark.SparkException$.internalError(SparkException.scala:96)
	at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotEvaluateExpressionError(QueryExecutionErrors.scala:65)
	at org.apache.spark.sql.catalyst.expressions.FoldableUnevaluable.eval(Expression.scala:387)
	at org.apache.spark.sql.catalyst.expressions.FoldableUnevaluable.eval$(Expression.scala:386)
	at org.apache.spark.sql.catalyst.expressions.PythonUDF.eval(PythonUDF.scala:72)
	at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:563)
	at org.apache.spark.sql.catalyst.expressions.IsNotNull.eval(nullExpressions.scala:403)
	at org.apache.spark.sql.catalyst.expressions.InterpretedPredicate.eval(predicates.scala:53)
	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.$anonfun$prunePartitionsByFilter$1(ExternalCatalogUtils.scala:189)
	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.$anonfun$prunePartitionsByFilter$1$adapted(ExternalCatalogUtils.scala:188)
	at scala.collection.immutable.List.filter(List.scala:516)
	at scala.collection.immutable.List.filter(List.scala:79)
	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.prunePartitionsByFilter(ExternalCatalogUtils.scala:188)
	at org.apache.spark.sql.catalyst.catalog.InMemoryCatalog.listPartitionsByFilter(InMemoryCatalog.scala:604)
	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.listPartitionsByFilter(ExternalCatalogWithListener.scala:262)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitionsByFilter(SessionCatalog.scala:1358)
	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.listPartitionsByFilter(ExternalCatalogUtils.scala:168)
	at org.apache.spark.sql.execution.datasources.CatalogFileIndex.filterPartitions(CatalogFileIndex.scala:74)
	at org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions$$anonfun$apply$1.applyOrElse(PruneFileSourcePartitions.scala:72)
	at org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions$$anonfun$apply$1.applyOrElse(PruneFileSourcePartitions.scala:50)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:470)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:84)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:470)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:330)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:326)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:475)
	at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren(TreeNode.scala:1251)
	at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren$(TreeNode.scala:1250)
	at org.apache.spark.sql.catalyst.plans.logical.Join.mapChildren(basicLogicalOperators.scala:552)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:475)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:330)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:326)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:446)
	at org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions$.apply(PruneFileSourcePartitions.scala:50)
	at org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions$.apply(PruneFileSourcePartitions.scala:35)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:226)
	at scala.collection.LinearSeqOps.foldLeft(LinearSeq.scala:183)
	at scala.collection.LinearSeqOps.foldLeft$(LinearSeq.scala:179)
	at scala.collection.immutable.List.foldLeft(List.scala:79)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:223)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:215)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:215)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:186)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:186)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:167)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:234)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:608)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:234)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:923)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:233)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:163)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:159)
	at org.apache.spark.sql.execution.python.PythonUDFSuite.$anonfun$new$19(PythonUDFSuite.scala:136)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at org.apache.spark.sql.test.SQLTestUtilsBase.withTable(SQLTestUtils.scala:307)
	at org.apache.spark.sql.test.SQLTestUtilsBase.withTable$(SQLTestUtils.scala:305)
	at org.apache.spark.sql.execution.python.PythonUDFSuite.withTable(PythonUDFSuite.scala:25)
	at org.apache.spark.sql.execution.python.PythonUDFSuite.$anonfun$new$18(PythonUDFSuite.scala:130)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
	at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
	at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
	at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
	at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
	at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
	at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
	at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227)
	at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69)
	at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
	at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
	at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:69)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
	at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
	at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
	at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
	at org.scalatest.Suite.run(Suite.scala:1114)
	at org.scalatest.Suite.run$(Suite.scala:1096)
	at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
	at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
	at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:69)
	at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
	at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
	at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
	at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:69)
	at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:47)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1321)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1315)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1315)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:992)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:970)
	at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1481)
	at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:970)
	at org.scalatest.tools.Runner$.run(Runner.scala:798)
	at org.scalatest.tools.Runner.run(Runner.scala)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:43)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:26)

Why are the changes needed?

In order for end users to use Python UDFs against partitioned columns.

Does this PR introduce any user-facing change?

Yes, this fixes a bug - this PR allows to use Python UDF in partitioned columns.

How was this patch tested?

Unittest added.

Was this patch authored or co-authored using generative AI tooling?

No.

Closes #47033

weiatwork and others added 2 commits July 12, 2024 12:33
(cherry picked from commit b56dc70)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
@HyukjinKwon
Copy link
Member Author

cc @cloud-fan and @weiatwork

@HyukjinKwon HyukjinKwon changed the title [SPARK-48666][SQL] Do not push down filter if it contains Unevaluable expression [SPARK-48666][SQL] Do not push down filter if it contains PythonUDFs Jul 12, 2024
@weiatwork
Copy link
Contributor

Looks good, assuming Python UDF is the only expression that needs to be special cased.

@@ -161,7 +161,11 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging {
val filterSet = ExpressionSet(filters)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe I fixed all places that can use PythonUDF as partitioning filters. For DSv1, we don't pushdown partitioning filters. PythonUDF could be pushed down, and the downstream datasource can still handle them if they want.

So I fixed:

  • Hive scan with partitoning
  • DSv2 with partitioning
  • FileSource with partitioning

filters.filter { f =>
f.deterministic &&
!SubqueryExpression.hasSubquery(f) &&
!f.exists(_.isInstanceOf[PythonUDF])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FileSourceStrategy is planning phase, I think PythonUDF should be collected and put in special operators already?

filters.filter { f =>
f.deterministic &&
!SubqueryExpression.hasSubquery(f) &&
!f.exists(_.isInstanceOf[PythonUDF])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add code comments to explain that PythonUDF rewriting happens later so we have to exclude it here.

filters.filter { f =>
f.deterministic &&
!SubqueryExpression.hasSubquery(f) &&
!f.exists(_.isInstanceOf[PythonUDF])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, I think at planning phase we don't need to care about PythonUDF anymore.

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Jul 12, 2024

Merged to master and branch-3.5.

HyukjinKwon added a commit that referenced this pull request Jul 12, 2024
This PR proposes to prevent pushing down Python UDFs. This PR uses the same approach as #47033, therefore added the author as a co-author, but simplifies the change.

Extracting filters to push down happens first

https://github.com/apache/spark/blob/cbe6846c477bc8b6d94385ddd0097c4e97b05d41/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala#L46

https://github.com/apache/spark/blob/cbe6846c477bc8b6d94385ddd0097c4e97b05d41/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L211

https://github.com/apache/spark/blob/cbe6846c477bc8b6d94385ddd0097c4e97b05d41/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala#L51

Before extracting Python UDFs

https://github.com/apache/spark/blob/cbe6846c477bc8b6d94385ddd0097c4e97b05d41/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala#L80

Here is full stacktrace:

```
[INTERNAL_ERROR] Cannot evaluate expression: pyUDF(cast(input[0, bigint, true] as string)) SQLSTATE: XX000
org.apache.spark.SparkException: [INTERNAL_ERROR] Cannot evaluate expression: pyUDF(cast(input[0, bigint, true] as string)) SQLSTATE: XX000
	at org.apache.spark.SparkException$.internalError(SparkException.scala:92)
	at org.apache.spark.SparkException$.internalError(SparkException.scala:96)
	at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotEvaluateExpressionError(QueryExecutionErrors.scala:65)
	at org.apache.spark.sql.catalyst.expressions.FoldableUnevaluable.eval(Expression.scala:387)
	at org.apache.spark.sql.catalyst.expressions.FoldableUnevaluable.eval$(Expression.scala:386)
	at org.apache.spark.sql.catalyst.expressions.PythonUDF.eval(PythonUDF.scala:72)
	at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:563)
	at org.apache.spark.sql.catalyst.expressions.IsNotNull.eval(nullExpressions.scala:403)
	at org.apache.spark.sql.catalyst.expressions.InterpretedPredicate.eval(predicates.scala:53)
	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.$anonfun$prunePartitionsByFilter$1(ExternalCatalogUtils.scala:189)
	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.$anonfun$prunePartitionsByFilter$1$adapted(ExternalCatalogUtils.scala:188)
	at scala.collection.immutable.List.filter(List.scala:516)
	at scala.collection.immutable.List.filter(List.scala:79)
	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.prunePartitionsByFilter(ExternalCatalogUtils.scala:188)
	at org.apache.spark.sql.catalyst.catalog.InMemoryCatalog.listPartitionsByFilter(InMemoryCatalog.scala:604)
	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.listPartitionsByFilter(ExternalCatalogWithListener.scala:262)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitionsByFilter(SessionCatalog.scala:1358)
	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.listPartitionsByFilter(ExternalCatalogUtils.scala:168)
	at org.apache.spark.sql.execution.datasources.CatalogFileIndex.filterPartitions(CatalogFileIndex.scala:74)
	at org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions$$anonfun$apply$1.applyOrElse(PruneFileSourcePartitions.scala:72)
	at org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions$$anonfun$apply$1.applyOrElse(PruneFileSourcePartitions.scala:50)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:470)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:84)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:470)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:330)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:326)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:475)
	at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren(TreeNode.scala:1251)
	at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren$(TreeNode.scala:1250)
	at org.apache.spark.sql.catalyst.plans.logical.Join.mapChildren(basicLogicalOperators.scala:552)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:475)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:330)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:326)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:446)
	at org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions$.apply(PruneFileSourcePartitions.scala:50)
	at org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions$.apply(PruneFileSourcePartitions.scala:35)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:226)
	at scala.collection.LinearSeqOps.foldLeft(LinearSeq.scala:183)
	at scala.collection.LinearSeqOps.foldLeft$(LinearSeq.scala:179)
	at scala.collection.immutable.List.foldLeft(List.scala:79)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:223)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:215)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:215)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:186)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:186)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:167)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:234)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:608)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:234)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:923)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:233)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:163)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:159)
	at org.apache.spark.sql.execution.python.PythonUDFSuite.$anonfun$new$19(PythonUDFSuite.scala:136)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at org.apache.spark.sql.test.SQLTestUtilsBase.withTable(SQLTestUtils.scala:307)
	at org.apache.spark.sql.test.SQLTestUtilsBase.withTable$(SQLTestUtils.scala:305)
	at org.apache.spark.sql.execution.python.PythonUDFSuite.withTable(PythonUDFSuite.scala:25)
	at org.apache.spark.sql.execution.python.PythonUDFSuite.$anonfun$new$18(PythonUDFSuite.scala:130)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
	at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
	at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
	at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
	at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
	at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
	at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
	at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227)
	at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69)
	at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
	at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
	at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:69)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
	at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
	at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
	at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
	at org.scalatest.Suite.run(Suite.scala:1114)
	at org.scalatest.Suite.run$(Suite.scala:1096)
	at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
	at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
	at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:69)
	at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
	at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
	at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
	at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:69)
	at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:47)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1321)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1315)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1315)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:992)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:970)
	at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1481)
	at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:970)
	at org.scalatest.tools.Runner$.run(Runner.scala:798)
	at org.scalatest.tools.Runner.run(Runner.scala)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:43)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:26)
```

In order for end users to use Python UDFs against partitioned columns.

Yes, this fixes a bug - this PR allows to use Python UDF in partitioned columns.

Unittest added.

No.

Closes #47033

Closes #47313 from HyukjinKwon/SPARK-48666.

Lead-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Co-authored-by: Wei Zheng <weiz@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit d747853)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
jingz-db pushed a commit to jingz-db/spark that referenced this pull request Jul 22, 2024
### What changes were proposed in this pull request?

This PR proposes to prevent pushing down Python UDFs. This PR uses the same approach as apache#47033, therefore added the author as a co-author, but simplifies the change.

Extracting filters to push down happens first

https://github.com/apache/spark/blob/cbe6846c477bc8b6d94385ddd0097c4e97b05d41/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala#L46

https://github.com/apache/spark/blob/cbe6846c477bc8b6d94385ddd0097c4e97b05d41/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L211

https://github.com/apache/spark/blob/cbe6846c477bc8b6d94385ddd0097c4e97b05d41/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala#L51

Before extracting Python UDFs

https://github.com/apache/spark/blob/cbe6846c477bc8b6d94385ddd0097c4e97b05d41/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala#L80

Here is full stacktrace:

```
[INTERNAL_ERROR] Cannot evaluate expression: pyUDF(cast(input[0, bigint, true] as string)) SQLSTATE: XX000
org.apache.spark.SparkException: [INTERNAL_ERROR] Cannot evaluate expression: pyUDF(cast(input[0, bigint, true] as string)) SQLSTATE: XX000
	at org.apache.spark.SparkException$.internalError(SparkException.scala:92)
	at org.apache.spark.SparkException$.internalError(SparkException.scala:96)
	at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotEvaluateExpressionError(QueryExecutionErrors.scala:65)
	at org.apache.spark.sql.catalyst.expressions.FoldableUnevaluable.eval(Expression.scala:387)
	at org.apache.spark.sql.catalyst.expressions.FoldableUnevaluable.eval$(Expression.scala:386)
	at org.apache.spark.sql.catalyst.expressions.PythonUDF.eval(PythonUDF.scala:72)
	at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:563)
	at org.apache.spark.sql.catalyst.expressions.IsNotNull.eval(nullExpressions.scala:403)
	at org.apache.spark.sql.catalyst.expressions.InterpretedPredicate.eval(predicates.scala:53)
	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.$anonfun$prunePartitionsByFilter$1(ExternalCatalogUtils.scala:189)
	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.$anonfun$prunePartitionsByFilter$1$adapted(ExternalCatalogUtils.scala:188)
	at scala.collection.immutable.List.filter(List.scala:516)
	at scala.collection.immutable.List.filter(List.scala:79)
	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.prunePartitionsByFilter(ExternalCatalogUtils.scala:188)
	at org.apache.spark.sql.catalyst.catalog.InMemoryCatalog.listPartitionsByFilter(InMemoryCatalog.scala:604)
	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.listPartitionsByFilter(ExternalCatalogWithListener.scala:262)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitionsByFilter(SessionCatalog.scala:1358)
	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.listPartitionsByFilter(ExternalCatalogUtils.scala:168)
	at org.apache.spark.sql.execution.datasources.CatalogFileIndex.filterPartitions(CatalogFileIndex.scala:74)
	at org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions$$anonfun$apply$1.applyOrElse(PruneFileSourcePartitions.scala:72)
	at org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions$$anonfun$apply$1.applyOrElse(PruneFileSourcePartitions.scala:50)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:470)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:84)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:470)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:330)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:326)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:475)
	at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren(TreeNode.scala:1251)
	at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren$(TreeNode.scala:1250)
	at org.apache.spark.sql.catalyst.plans.logical.Join.mapChildren(basicLogicalOperators.scala:552)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:475)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:330)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:326)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:446)
	at org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions$.apply(PruneFileSourcePartitions.scala:50)
	at org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions$.apply(PruneFileSourcePartitions.scala:35)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:226)
	at scala.collection.LinearSeqOps.foldLeft(LinearSeq.scala:183)
	at scala.collection.LinearSeqOps.foldLeft$(LinearSeq.scala:179)
	at scala.collection.immutable.List.foldLeft(List.scala:79)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:223)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:215)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:215)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:186)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:186)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:167)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:234)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:608)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:234)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:923)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:233)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:163)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:159)
	at org.apache.spark.sql.execution.python.PythonUDFSuite.$anonfun$new$19(PythonUDFSuite.scala:136)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at org.apache.spark.sql.test.SQLTestUtilsBase.withTable(SQLTestUtils.scala:307)
	at org.apache.spark.sql.test.SQLTestUtilsBase.withTable$(SQLTestUtils.scala:305)
	at org.apache.spark.sql.execution.python.PythonUDFSuite.withTable(PythonUDFSuite.scala:25)
	at org.apache.spark.sql.execution.python.PythonUDFSuite.$anonfun$new$18(PythonUDFSuite.scala:130)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
	at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
	at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
	at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
	at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
	at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
	at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
	at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227)
	at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69)
	at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
	at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
	at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:69)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
	at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
	at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
	at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
	at org.scalatest.Suite.run(Suite.scala:1114)
	at org.scalatest.Suite.run$(Suite.scala:1096)
	at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
	at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
	at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:69)
	at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
	at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
	at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
	at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:69)
	at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:47)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1321)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1315)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1315)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:992)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:970)
	at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1481)
	at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:970)
	at org.scalatest.tools.Runner$.run(Runner.scala:798)
	at org.scalatest.tools.Runner.run(Runner.scala)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:43)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:26)
```

### Why are the changes needed?

In order for end users to use Python UDFs against partitioned columns.

### Does this PR introduce _any_ user-facing change?

Yes, this fixes a bug - this PR allows to use Python UDF in partitioned columns.

### How was this patch tested?

Unittest added.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47033

Closes apache#47313 from HyukjinKwon/SPARK-48666.

Lead-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Co-authored-by: Wei Zheng <weiz@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
attilapiros pushed a commit to attilapiros/spark that referenced this pull request Oct 4, 2024
### What changes were proposed in this pull request?

This PR proposes to prevent pushing down Python UDFs. This PR uses the same approach as apache#47033, therefore added the author as a co-author, but simplifies the change.

Extracting filters to push down happens first

https://github.com/apache/spark/blob/cbe6846c477bc8b6d94385ddd0097c4e97b05d41/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala#L46

https://github.com/apache/spark/blob/cbe6846c477bc8b6d94385ddd0097c4e97b05d41/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L211

https://github.com/apache/spark/blob/cbe6846c477bc8b6d94385ddd0097c4e97b05d41/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala#L51

Before extracting Python UDFs

https://github.com/apache/spark/blob/cbe6846c477bc8b6d94385ddd0097c4e97b05d41/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala#L80

Here is full stacktrace:

```
[INTERNAL_ERROR] Cannot evaluate expression: pyUDF(cast(input[0, bigint, true] as string)) SQLSTATE: XX000
org.apache.spark.SparkException: [INTERNAL_ERROR] Cannot evaluate expression: pyUDF(cast(input[0, bigint, true] as string)) SQLSTATE: XX000
	at org.apache.spark.SparkException$.internalError(SparkException.scala:92)
	at org.apache.spark.SparkException$.internalError(SparkException.scala:96)
	at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotEvaluateExpressionError(QueryExecutionErrors.scala:65)
	at org.apache.spark.sql.catalyst.expressions.FoldableUnevaluable.eval(Expression.scala:387)
	at org.apache.spark.sql.catalyst.expressions.FoldableUnevaluable.eval$(Expression.scala:386)
	at org.apache.spark.sql.catalyst.expressions.PythonUDF.eval(PythonUDF.scala:72)
	at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:563)
	at org.apache.spark.sql.catalyst.expressions.IsNotNull.eval(nullExpressions.scala:403)
	at org.apache.spark.sql.catalyst.expressions.InterpretedPredicate.eval(predicates.scala:53)
	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.$anonfun$prunePartitionsByFilter$1(ExternalCatalogUtils.scala:189)
	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.$anonfun$prunePartitionsByFilter$1$adapted(ExternalCatalogUtils.scala:188)
	at scala.collection.immutable.List.filter(List.scala:516)
	at scala.collection.immutable.List.filter(List.scala:79)
	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.prunePartitionsByFilter(ExternalCatalogUtils.scala:188)
	at org.apache.spark.sql.catalyst.catalog.InMemoryCatalog.listPartitionsByFilter(InMemoryCatalog.scala:604)
	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.listPartitionsByFilter(ExternalCatalogWithListener.scala:262)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitionsByFilter(SessionCatalog.scala:1358)
	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.listPartitionsByFilter(ExternalCatalogUtils.scala:168)
	at org.apache.spark.sql.execution.datasources.CatalogFileIndex.filterPartitions(CatalogFileIndex.scala:74)
	at org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions$$anonfun$apply$1.applyOrElse(PruneFileSourcePartitions.scala:72)
	at org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions$$anonfun$apply$1.applyOrElse(PruneFileSourcePartitions.scala:50)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:470)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:84)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:470)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:330)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:326)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:475)
	at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren(TreeNode.scala:1251)
	at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren$(TreeNode.scala:1250)
	at org.apache.spark.sql.catalyst.plans.logical.Join.mapChildren(basicLogicalOperators.scala:552)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:475)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:330)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:326)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:446)
	at org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions$.apply(PruneFileSourcePartitions.scala:50)
	at org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions$.apply(PruneFileSourcePartitions.scala:35)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:226)
	at scala.collection.LinearSeqOps.foldLeft(LinearSeq.scala:183)
	at scala.collection.LinearSeqOps.foldLeft$(LinearSeq.scala:179)
	at scala.collection.immutable.List.foldLeft(List.scala:79)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:223)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:215)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:215)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:186)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:186)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:167)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:234)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:608)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:234)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:923)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:233)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:163)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:159)
	at org.apache.spark.sql.execution.python.PythonUDFSuite.$anonfun$new$19(PythonUDFSuite.scala:136)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at org.apache.spark.sql.test.SQLTestUtilsBase.withTable(SQLTestUtils.scala:307)
	at org.apache.spark.sql.test.SQLTestUtilsBase.withTable$(SQLTestUtils.scala:305)
	at org.apache.spark.sql.execution.python.PythonUDFSuite.withTable(PythonUDFSuite.scala:25)
	at org.apache.spark.sql.execution.python.PythonUDFSuite.$anonfun$new$18(PythonUDFSuite.scala:130)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
	at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
	at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
	at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
	at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
	at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
	at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
	at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227)
	at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69)
	at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
	at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
	at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:69)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
	at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
	at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
	at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
	at org.scalatest.Suite.run(Suite.scala:1114)
	at org.scalatest.Suite.run$(Suite.scala:1096)
	at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
	at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
	at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:69)
	at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
	at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
	at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
	at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:69)
	at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:47)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1321)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1315)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1315)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:992)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:970)
	at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1481)
	at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:970)
	at org.scalatest.tools.Runner$.run(Runner.scala:798)
	at org.scalatest.tools.Runner.run(Runner.scala)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:43)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:26)
```

### Why are the changes needed?

In order for end users to use Python UDFs against partitioned columns.

### Does this PR introduce _any_ user-facing change?

Yes, this fixes a bug - this PR allows to use Python UDF in partitioned columns.

### How was this patch tested?

Unittest added.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47033

Closes apache#47313 from HyukjinKwon/SPARK-48666.

Lead-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Co-authored-by: Wei Zheng <weiz@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
himadripal pushed a commit to himadripal/spark that referenced this pull request Oct 19, 2024
### What changes were proposed in this pull request?

This PR proposes to prevent pushing down Python UDFs. This PR uses the same approach as apache#47033, therefore added the author as a co-author, but simplifies the change.

Extracting filters to push down happens first

https://github.com/apache/spark/blob/cbe6846c477bc8b6d94385ddd0097c4e97b05d41/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala#L46

https://github.com/apache/spark/blob/cbe6846c477bc8b6d94385ddd0097c4e97b05d41/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L211

https://github.com/apache/spark/blob/cbe6846c477bc8b6d94385ddd0097c4e97b05d41/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala#L51

Before extracting Python UDFs

https://github.com/apache/spark/blob/cbe6846c477bc8b6d94385ddd0097c4e97b05d41/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala#L80

Here is full stacktrace:

```
[INTERNAL_ERROR] Cannot evaluate expression: pyUDF(cast(input[0, bigint, true] as string)) SQLSTATE: XX000
org.apache.spark.SparkException: [INTERNAL_ERROR] Cannot evaluate expression: pyUDF(cast(input[0, bigint, true] as string)) SQLSTATE: XX000
	at org.apache.spark.SparkException$.internalError(SparkException.scala:92)
	at org.apache.spark.SparkException$.internalError(SparkException.scala:96)
	at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotEvaluateExpressionError(QueryExecutionErrors.scala:65)
	at org.apache.spark.sql.catalyst.expressions.FoldableUnevaluable.eval(Expression.scala:387)
	at org.apache.spark.sql.catalyst.expressions.FoldableUnevaluable.eval$(Expression.scala:386)
	at org.apache.spark.sql.catalyst.expressions.PythonUDF.eval(PythonUDF.scala:72)
	at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:563)
	at org.apache.spark.sql.catalyst.expressions.IsNotNull.eval(nullExpressions.scala:403)
	at org.apache.spark.sql.catalyst.expressions.InterpretedPredicate.eval(predicates.scala:53)
	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.$anonfun$prunePartitionsByFilter$1(ExternalCatalogUtils.scala:189)
	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.$anonfun$prunePartitionsByFilter$1$adapted(ExternalCatalogUtils.scala:188)
	at scala.collection.immutable.List.filter(List.scala:516)
	at scala.collection.immutable.List.filter(List.scala:79)
	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.prunePartitionsByFilter(ExternalCatalogUtils.scala:188)
	at org.apache.spark.sql.catalyst.catalog.InMemoryCatalog.listPartitionsByFilter(InMemoryCatalog.scala:604)
	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.listPartitionsByFilter(ExternalCatalogWithListener.scala:262)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitionsByFilter(SessionCatalog.scala:1358)
	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.listPartitionsByFilter(ExternalCatalogUtils.scala:168)
	at org.apache.spark.sql.execution.datasources.CatalogFileIndex.filterPartitions(CatalogFileIndex.scala:74)
	at org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions$$anonfun$apply$1.applyOrElse(PruneFileSourcePartitions.scala:72)
	at org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions$$anonfun$apply$1.applyOrElse(PruneFileSourcePartitions.scala:50)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:470)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:84)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:470)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:330)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:326)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:475)
	at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren(TreeNode.scala:1251)
	at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren$(TreeNode.scala:1250)
	at org.apache.spark.sql.catalyst.plans.logical.Join.mapChildren(basicLogicalOperators.scala:552)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:475)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:330)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:326)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:446)
	at org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions$.apply(PruneFileSourcePartitions.scala:50)
	at org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions$.apply(PruneFileSourcePartitions.scala:35)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:226)
	at scala.collection.LinearSeqOps.foldLeft(LinearSeq.scala:183)
	at scala.collection.LinearSeqOps.foldLeft$(LinearSeq.scala:179)
	at scala.collection.immutable.List.foldLeft(List.scala:79)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:223)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:215)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:215)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:186)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:186)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:167)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:234)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:608)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:234)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:923)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:233)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:163)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:159)
	at org.apache.spark.sql.execution.python.PythonUDFSuite.$anonfun$new$19(PythonUDFSuite.scala:136)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at org.apache.spark.sql.test.SQLTestUtilsBase.withTable(SQLTestUtils.scala:307)
	at org.apache.spark.sql.test.SQLTestUtilsBase.withTable$(SQLTestUtils.scala:305)
	at org.apache.spark.sql.execution.python.PythonUDFSuite.withTable(PythonUDFSuite.scala:25)
	at org.apache.spark.sql.execution.python.PythonUDFSuite.$anonfun$new$18(PythonUDFSuite.scala:130)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
	at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
	at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
	at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
	at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
	at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
	at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
	at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227)
	at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69)
	at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
	at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
	at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:69)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
	at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
	at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
	at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
	at org.scalatest.Suite.run(Suite.scala:1114)
	at org.scalatest.Suite.run$(Suite.scala:1096)
	at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
	at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
	at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:69)
	at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
	at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
	at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
	at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:69)
	at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:47)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1321)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1315)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1315)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:992)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:970)
	at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1481)
	at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:970)
	at org.scalatest.tools.Runner$.run(Runner.scala:798)
	at org.scalatest.tools.Runner.run(Runner.scala)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:43)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:26)
```

### Why are the changes needed?

In order for end users to use Python UDFs against partitioned columns.

### Does this PR introduce _any_ user-facing change?

Yes, this fixes a bug - this PR allows to use Python UDF in partitioned columns.

### How was this patch tested?

Unittest added.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47033

Closes apache#47313 from HyukjinKwon/SPARK-48666.

Lead-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Co-authored-by: Wei Zheng <weiz@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants