-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-25213][PYTHON] Add project to v2 scans before python filters. #22206
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25213][PYTHON] Add project to v2 scans before python filters. #22206
Conversation
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
|
Test build #95173 has finished for PR 22206 at commit
|
| ProjectExec(project, withFilter) :: Nil | ||
| if (project.exists(hasScalarPythonUDF)) { | ||
| val references = project.map(_.references).reduce(_ ++ _).toSeq | ||
| ProjectExec(project, ProjectExec(references, withFilter)) :: Nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to add extra Project on top of Filter here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The v2 data sources return InternalRow, not UnsafeRow. Python UDFs can't handle InternalRow, so this is intended to add a projection to convert to unsafe before the projection that contains a python UDF.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, I see. It is also used to make sure PythonUDF in top Project takes unsafe row input.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: If we already add Project on top of Filter, we don't need to add another Project here, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That one was only added if there was a filter and if that filter ran a UDF. This will add an unnecessary project if both the filter and the project have python UDFs, but I thought that was probably okay. I can add a boolean to signal if the filter caused one to be added already if you think it's worth it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. Let's leave as it is now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for leaving as is.
| from pyspark.sql.functions import udf | ||
|
|
||
| df = self.spark.read.format("org.apache.spark.sql.sources.v2.SimpleDataSourceV2").load() | ||
| result = df.withColumn('x', udf(lambda x: x, 'int')(df['i'])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This only tests Project with Scalar PythonUDF? Might be better to also test Filter case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. I was just verifying that the fix worked before spending more time on it.
|
Test build #95187 has finished for PR 22206 at commit
|
| def test_pyspark_udf_SPARK_25213(self): | ||
| from pyspark.sql.functions import udf | ||
|
|
||
| df = self.spark.read.format("org.apache.spark.sql.sources.v2.SimpleDataSourceV2").load() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this test will fail if test classes are not compiled. Can we check if test classes are compiled and then skip if not existent?
|
|
||
|
|
||
| class DataSourceV2Tests(ReusedSQLTestCase): | ||
| def test_pyspark_udf_SPARK_25213(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not a big deal but I would avoid SPARK_25213 postfix at the end just for consistency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like that the tests in Scala include this information somewhere. Is there a better place for it in PySpark? I'm not aware of another way to pass extra metadata, but I'm open to if it there's a better way.
|
Not a big deal but PR title: |
|
@HyukjinKwon and @viirya, thank you for looking at this commit, but I like @cloud-fan's approach to fixing this in #22244 better than this work-around. I'm going to close this in favor of that approach, although if we need a quick fix I can pick this back up. |
What changes were proposed in this pull request?
The v2 API always adds a projection when converting to physical plan to ensure that rows are all
UnsafeRow. This is added after any filters run by Spark, assuming that the filter and projection can handle InternalRow, but this fails if those nodes contain python UDFs. This PR detects the Python UDFs and adds a projection above the filter to immediately convert toUnsafeRowbefore passing data to python.How was this patch tested?
This adds a test for the case reported in SPARK-25213 in python's SQL tests.