Skip to content

Conversation

@rdblue
Copy link
Contributor

@rdblue rdblue commented Aug 23, 2018

What changes were proposed in this pull request?

The v2 API always adds a projection when converting to physical plan to ensure that rows are all UnsafeRow. This is added after any filters run by Spark, assuming that the filter and projection can handle InternalRow, but this fails if those nodes contain python UDFs. This PR detects the Python UDFs and adds a projection above the filter to immediately convert to UnsafeRow before passing data to python.

How was this patch tested?

This adds a test for the case reported in SPARK-25213 in python's SQL tests.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA
Copy link

SparkQA commented Aug 23, 2018

Test build #95173 has finished for PR 22206 at commit c49157e.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

ProjectExec(project, withFilter) :: Nil
if (project.exists(hasScalarPythonUDF)) {
val references = project.map(_.references).reduce(_ ++ _).toSeq
ProjectExec(project, ProjectExec(references, withFilter)) :: Nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to add extra Project on top of Filter here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The v2 data sources return InternalRow, not UnsafeRow. Python UDFs can't handle InternalRow, so this is intended to add a projection to convert to unsafe before the projection that contains a python UDF.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I see. It is also used to make sure PythonUDF in top Project takes unsafe row input.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: If we already add Project on top of Filter, we don't need to add another Project here, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That one was only added if there was a filter and if that filter ran a UDF. This will add an unnecessary project if both the filter and the project have python UDFs, but I thought that was probably okay. I can add a boolean to signal if the filter caused one to be added already if you think it's worth it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Let's leave as it is now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for leaving as is.

from pyspark.sql.functions import udf

df = self.spark.read.format("org.apache.spark.sql.sources.v2.SimpleDataSourceV2").load()
result = df.withColumn('x', udf(lambda x: x, 'int')(df['i']))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only tests Project with Scalar PythonUDF? Might be better to also test Filter case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I was just verifying that the fix worked before spending more time on it.

@SparkQA
Copy link

SparkQA commented Aug 24, 2018

Test build #95187 has finished for PR 22206 at commit 550368e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

def test_pyspark_udf_SPARK_25213(self):
from pyspark.sql.functions import udf

df = self.spark.read.format("org.apache.spark.sql.sources.v2.SimpleDataSourceV2").load()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this test will fail if test classes are not compiled. Can we check if test classes are compiled and then skip if not existent?



class DataSourceV2Tests(ReusedSQLTestCase):
def test_pyspark_udf_SPARK_25213(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a big deal but I would avoid SPARK_25213 postfix at the end just for consistency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that the tests in Scala include this information somewhere. Is there a better place for it in PySpark? I'm not aware of another way to pass extra metadata, but I'm open to if it there's a better way.

@HyukjinKwon
Copy link
Member

Not a big deal but PR title: [SPARK-25213][PYTHON] ... per the guide.

@rdblue rdblue changed the title SPARK-25213: Add project to v2 scans before python filters. [SPARK-25213][PYTHON] Add project to v2 scans before python filters. Aug 27, 2018
@rdblue
Copy link
Contributor Author

rdblue commented Aug 27, 2018

@HyukjinKwon and @viirya, thank you for looking at this commit, but I like @cloud-fan's approach to fixing this in #22244 better than this work-around. I'm going to close this in favor of that approach, although if we need a quick fix I can pick this back up.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants