-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-22930][PYTHON][SQL] Improve the description of Vectorized UDFs for non-deterministic cases #20142
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22930][PYTHON][SQL] Improve the description of Vectorized UDFs for non-deterministic cases #20142
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -386,6 +386,7 @@ def test_udf3(self): | |
self.assertEqual(row[0], 5) | ||
|
||
def test_nondeterministic_udf(self): | ||
# Test that nondeterministic UDFs are evaluated only once in chained UDF evaluations | ||
from pyspark.sql.functions import udf | ||
import random | ||
udf_random_col = udf(lambda: int(100 * random.random()), IntegerType()).asNondeterministic() | ||
|
@@ -413,6 +414,18 @@ def test_nondeterministic_udf2(self): | |
pydoc.render_doc(random_udf) | ||
pydoc.render_doc(random_udf1) | ||
|
||
def test_nondeterministic_udf_in_aggregate(self): | ||
from pyspark.sql.functions import udf, sum | ||
import random | ||
udf_random_col = udf(lambda: int(100 * random.random()), 'int').asNondeterministic() | ||
df = self.spark.range(10) | ||
|
||
with QuietTest(self.sc): | ||
with self.assertRaisesRegexp(AnalysisException, "nondeterministic"): | ||
df.groupby('id').agg(sum(udf_random_col())).collect() | ||
with self.assertRaisesRegexp(AnalysisException, "nondeterministic"): | ||
df.agg(sum(udf_random_col())).collect() | ||
|
||
def test_chained_udf(self): | ||
self.spark.catalog.registerFunction("double", lambda x: x + x, IntegerType()) | ||
[row] = self.spark.sql("SELECT double(1)").collect() | ||
|
@@ -3567,6 +3580,18 @@ def tearDownClass(cls): | |
time.tzset() | ||
ReusedSQLTestCase.tearDownClass() | ||
|
||
@property | ||
def random_udf(self): | ||
from pyspark.sql.functions import pandas_udf | ||
|
||
@pandas_udf('double') | ||
def random_udf(v): | ||
import pandas as pd | ||
import numpy as np | ||
return pd.Series(np.random.random(len(v))) | ||
random_udf = random_udf.asNondeterministic() | ||
return random_udf | ||
|
||
def test_vectorized_udf_basic(self): | ||
from pyspark.sql.functions import pandas_udf, col | ||
df = self.spark.range(10).select( | ||
|
@@ -3950,6 +3975,33 @@ def test_vectorized_udf_timestamps_respect_session_timezone(self): | |
finally: | ||
self.spark.conf.set("spark.sql.session.timeZone", orig_tz) | ||
|
||
def test_nondeterministic_udf(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. test_vectorized_nondeterministic_udf There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. test_nondeterministic_vectorized_udf |
||
# Test that nondeterministic UDFs are evaluated only once in chained UDF evaluations | ||
from pyspark.sql.functions import udf, pandas_udf, col | ||
|
||
@pandas_udf('double') | ||
def plus_ten(v): | ||
return v + 10 | ||
random_udf = self.random_udf | ||
|
||
df = self.spark.range(10).withColumn('rand', random_udf(col('id'))) | ||
result1 = df.withColumn('plus_ten(rand)', plus_ten(df['rand'])).toPandas() | ||
|
||
self.assertEqual(random_udf.deterministic, False) | ||
self.assertTrue(result1['plus_ten(rand)'].equals(result1['rand'] + 10)) | ||
|
||
def test_nondeterministic_udf_in_aggregate(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. test_vectorized_nondeterministic_udf_in_aggregate There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. test_nondeterministic_vectorized_udf_in_aggregate |
||
from pyspark.sql.functions import pandas_udf, sum | ||
|
||
df = self.spark.range(10) | ||
random_udf = self.random_udf | ||
|
||
with QuietTest(self.sc): | ||
with self.assertRaisesRegexp(AnalysisException, 'nondeterministic'): | ||
df.groupby(df.id).agg(sum(random_udf(df.id))).collect() | ||
with self.assertRaisesRegexp(AnalysisException, 'nondeterministic'): | ||
df.agg(sum(random_udf(df.id))).collect() | ||
|
||
|
||
@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") | ||
class GroupbyApplyTests(ReusedSQLTestCase): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add "nondeterministic" in its name somehow?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe
nondeterministic_udf
. So we don't have duplicate name torandom_udf
too.