-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-28287][SQL][PYTHON][TESTS] Convert and port 'udaf.sql' into UDF test base #25113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-- !query 2
-SELECT default.myDoubleAvg(int_col1) as my_avg from t1
+SELECT default.myDoubleAvg(int_col1), default.myDoubleAvg(udf(int_col1)) as my_avg from t1
-- !query 2 schema
-struct<my_avg:double>
+struct<mydoubleavg(CAST(int_col1 AS DOUBLE)):double,my_avg:double>
-- !query 2 output
-102.5
+102.5 102.5
This one looks a bit weird comparing to the original file.
Test build #107528 has finished for PR 25113 at commit
|
…n udf-aggregates_part1.sql to avoid Python float limitation ## What changes were proposed in this pull request? The tests added at apache#25069 seem flaky in some environments. See apache#25069 (comment) Python's string representation of floats can make the tests flaky. See https://docs.python.org/3/tutorial/floatingpoint.html. I think it's just better to explicitly cast everywhere udf returns a float (or a double) to stay safe. (note that we're not targeting the Python <> Scala value conversions - there are inevitable differences between Python and Scala; therefore, other languages' UDFs cannot guarantee the same results between Python and Scala). This PR proposes to cast cases to long, integer and decimal explicitly to make the test cases robust. <details><summary>Diff comparing to 'pgSQL/aggregates_part1.sql'</summary> <p> ```diff diff --git a/sql/core/src/test/resources/sql-tests/results/pgSQL/aggregates_part1.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-aggregates_part1.sql.out index 51ca1d5..734634b7388 100644 --- a/sql/core/src/test/resources/sql-tests/results/pgSQL/aggregates_part1.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-aggregates_part1.sql.out -3,23 +3,23 -- !query 0 -SELECT avg(four) AS avg_1 FROM onek +SELECT CAST(avg(udf(four)) AS decimal(10,3)) AS avg_1 FROM onek -- !query 0 schema -struct<avg_1:double> +struct<avg_1:decimal(10,3)> -- !query 0 output 1.5 -- !query 1 -SELECT avg(a) AS avg_32 FROM aggtest WHERE a < 100 +SELECT CAST(udf(avg(a)) AS decimal(10,3)) AS avg_32 FROM aggtest WHERE a < 100 -- !query 1 schema -struct<avg_32:double> +struct<avg_32:decimal(10,3)> -- !query 1 output -32.666666666666664 +32.667 -- !query 2 -select CAST(avg(b) AS Decimal(10,3)) AS avg_107_943 FROM aggtest +select CAST(avg(udf(b)) AS Decimal(10,3)) AS avg_107_943 FROM aggtest -- !query 2 schema struct<avg_107_943:decimal(10,3)> -- !query 2 output -27,39 +27,39 struct<avg_107_943:decimal(10,3)> -- !query 3 -SELECT sum(four) AS sum_1500 FROM onek +SELECT CAST(sum(udf(four)) AS int) AS sum_1500 FROM onek -- !query 3 schema -struct<sum_1500:bigint> +struct<sum_1500:int> -- !query 3 output 1500 -- !query 4 -SELECT sum(a) AS sum_198 FROM aggtest +SELECT udf(sum(a)) AS sum_198 FROM aggtest -- !query 4 schema -struct<sum_198:bigint> +struct<sum_198:string> -- !query 4 output 198 -- !query 5 -SELECT sum(b) AS avg_431_773 FROM aggtest +SELECT CAST(udf(udf(sum(b))) AS decimal(10,3)) AS avg_431_773 FROM aggtest -- !query 5 schema -struct<avg_431_773:double> +struct<avg_431_773:decimal(10,3)> -- !query 5 output -431.77260909229517 +431.773 -- !query 6 -SELECT max(four) AS max_3 FROM onek +SELECT udf(max(four)) AS max_3 FROM onek -- !query 6 schema -struct<max_3:int> +struct<max_3:string> -- !query 6 output 3 -- !query 7 -SELECT max(a) AS max_100 FROM aggtest +SELECT max(CAST(udf(a) AS int)) AS max_100 FROM aggtest -- !query 7 schema struct<max_100:int> -- !query 7 output -67,245 +67,246 struct<max_100:int> -- !query 8 -SELECT max(aggtest.b) AS max_324_78 FROM aggtest +SELECT CAST(udf(udf(max(aggtest.b))) AS decimal(10,3)) AS max_324_78 FROM aggtest -- !query 8 schema -struct<max_324_78:float> +struct<max_324_78:decimal(10,3)> -- !query 8 output 324.78 -- !query 9 -SELECT stddev_pop(b) FROM aggtest +SELECT CAST(stddev_pop(udf(b)) AS decimal(10,3)) FROM aggtest -- !query 9 schema -struct<stddev_pop(CAST(b AS DOUBLE)):double> +struct<CAST(stddev_pop(CAST(udf(b) AS DOUBLE)) AS DECIMAL(10,3)):decimal(10,3)> -- !query 9 output -131.10703231895047 +131.107 -- !query 10 -SELECT stddev_samp(b) FROM aggtest +SELECT CAST(udf(stddev_samp(b)) AS decimal(10,3)) FROM aggtest -- !query 10 schema -struct<stddev_samp(CAST(b AS DOUBLE)):double> +struct<CAST(udf(stddev_samp(cast(b as double))) AS DECIMAL(10,3)):decimal(10,3)> -- !query 10 output -151.38936080399804 +151.389 -- !query 11 -SELECT var_pop(b) FROM aggtest +SELECT CAST(var_pop(udf(b)) AS decimal(10,3)) FROM aggtest -- !query 11 schema -struct<var_pop(CAST(b AS DOUBLE)):double> +struct<CAST(var_pop(CAST(udf(b) AS DOUBLE)) AS DECIMAL(10,3)):decimal(10,3)> -- !query 11 output -17189.053923482323 +17189.054 -- !query 12 -SELECT var_samp(b) FROM aggtest +SELECT CAST(udf(var_samp(b)) AS decimal(10,3)) FROM aggtest -- !query 12 schema -struct<var_samp(CAST(b AS DOUBLE)):double> +struct<CAST(udf(var_samp(cast(b as double))) AS DECIMAL(10,3)):decimal(10,3)> -- !query 12 output -22918.738564643096 +22918.739 -- !query 13 -SELECT stddev_pop(CAST(b AS Decimal(38,0))) FROM aggtest +SELECT CAST(udf(stddev_pop(CAST(b AS Decimal(38,0)))) AS decimal(10,3)) FROM aggtest -- !query 13 schema -struct<stddev_pop(CAST(CAST(b AS DECIMAL(38,0)) AS DOUBLE)):double> +struct<CAST(udf(stddev_pop(cast(cast(b as decimal(38,0)) as double))) AS DECIMAL(10,3)):decimal(10,3)> -- !query 13 output -131.18117242958306 +131.181 -- !query 14 -SELECT stddev_samp(CAST(b AS Decimal(38,0))) FROM aggtest +SELECT CAST(stddev_samp(CAST(udf(b) AS Decimal(38,0))) AS decimal(10,3)) FROM aggtest -- !query 14 schema -struct<stddev_samp(CAST(CAST(b AS DECIMAL(38,0)) AS DOUBLE)):double> +struct<CAST(stddev_samp(CAST(CAST(udf(b) AS DECIMAL(38,0)) AS DOUBLE)) AS DECIMAL(10,3)):decimal(10,3)> -- !query 14 output -151.47497042966097 +151.475 -- !query 15 -SELECT var_pop(CAST(b AS Decimal(38,0))) FROM aggtest +SELECT CAST(udf(var_pop(CAST(b AS Decimal(38,0)))) AS decimal(10,3)) FROM aggtest -- !query 15 schema -struct<var_pop(CAST(CAST(b AS DECIMAL(38,0)) AS DOUBLE)):double> +struct<CAST(udf(var_pop(cast(cast(b as decimal(38,0)) as double))) AS DECIMAL(10,3)):decimal(10,3)> -- !query 15 output 17208.5 -- !query 16 -SELECT var_samp(CAST(b AS Decimal(38,0))) FROM aggtest +SELECT CAST(var_samp(udf(CAST(b AS Decimal(38,0)))) AS decimal(10,3)) FROM aggtest -- !query 16 schema -struct<var_samp(CAST(CAST(b AS DECIMAL(38,0)) AS DOUBLE)):double> +struct<CAST(var_samp(CAST(udf(cast(b as decimal(38,0))) AS DOUBLE)) AS DECIMAL(10,3)):decimal(10,3)> -- !query 16 output -22944.666666666668 +22944.667 -- !query 17 -SELECT var_pop(1.0), var_samp(2.0) +SELECT CAST(udf(var_pop(1.0)) AS int), var_samp(udf(2.0)) -- !query 17 schema -struct<var_pop(CAST(1.0 AS DOUBLE)):double,var_samp(CAST(2.0 AS DOUBLE)):double> +struct<CAST(udf(var_pop(cast(1.0 as double))) AS INT):int,var_samp(CAST(udf(2.0) AS DOUBLE)):double> -- !query 17 output -0.0 NaN +0 NaN -- !query 18 -SELECT stddev_pop(CAST(3.0 AS Decimal(38,0))), stddev_samp(CAST(4.0 AS Decimal(38,0))) +SELECT CAST(stddev_pop(udf(CAST(3.0 AS Decimal(38,0)))) AS int), stddev_samp(CAST(udf(4.0) AS Decimal(38,0))) -- !query 18 schema -struct<stddev_pop(CAST(CAST(3.0 AS DECIMAL(38,0)) AS DOUBLE)):double,stddev_samp(CAST(CAST(4.0 AS DECIMAL(38,0)) AS DOUBLE)):double> +struct<CAST(stddev_pop(CAST(udf(cast(3.0 as decimal(38,0))) AS DOUBLE)) AS INT):int,stddev_samp(CAST(CAST(udf(4.0) AS DECIMAL(38,0)) AS DOUBLE)):double> -- !query 18 output -0.0 NaN +0 NaN -- !query 19 -select sum(CAST(null AS int)) from range(1,4) +select sum(udf(CAST(null AS int))) from range(1,4) -- !query 19 schema -struct<sum(CAST(NULL AS INT)):bigint> +struct<sum(CAST(udf(cast(null as int)) AS DOUBLE)):double> -- !query 19 output NULL -- !query 20 -select sum(CAST(null AS long)) from range(1,4) +select sum(udf(CAST(null AS long))) from range(1,4) -- !query 20 schema -struct<sum(CAST(NULL AS BIGINT)):bigint> +struct<sum(CAST(udf(cast(null as bigint)) AS DOUBLE)):double> -- !query 20 output NULL -- !query 21 -select sum(CAST(null AS Decimal(38,0))) from range(1,4) +select sum(udf(CAST(null AS Decimal(38,0)))) from range(1,4) -- !query 21 schema -struct<sum(CAST(NULL AS DECIMAL(38,0))):decimal(38,0)> +struct<sum(CAST(udf(cast(null as decimal(38,0))) AS DOUBLE)):double> -- !query 21 output NULL -- !query 22 -select sum(CAST(null AS DOUBLE)) from range(1,4) +select sum(udf(CAST(null AS DOUBLE))) from range(1,4) -- !query 22 schema -struct<sum(CAST(NULL AS DOUBLE)):double> +struct<sum(CAST(udf(cast(null as double)) AS DOUBLE)):double> -- !query 22 output NULL -- !query 23 -select avg(CAST(null AS int)) from range(1,4) +select avg(udf(CAST(null AS int))) from range(1,4) -- !query 23 schema -struct<avg(CAST(NULL AS INT)):double> +struct<avg(CAST(udf(cast(null as int)) AS DOUBLE)):double> -- !query 23 output NULL -- !query 24 -select avg(CAST(null AS long)) from range(1,4) +select avg(udf(CAST(null AS long))) from range(1,4) -- !query 24 schema -struct<avg(CAST(NULL AS BIGINT)):double> +struct<avg(CAST(udf(cast(null as bigint)) AS DOUBLE)):double> -- !query 24 output NULL -- !query 25 -select avg(CAST(null AS Decimal(38,0))) from range(1,4) +select avg(udf(CAST(null AS Decimal(38,0)))) from range(1,4) -- !query 25 schema -struct<avg(CAST(NULL AS DECIMAL(38,0))):decimal(38,4)> +struct<avg(CAST(udf(cast(null as decimal(38,0))) AS DOUBLE)):double> -- !query 25 output NULL -- !query 26 -select avg(CAST(null AS DOUBLE)) from range(1,4) +select avg(udf(CAST(null AS DOUBLE))) from range(1,4) -- !query 26 schema -struct<avg(CAST(NULL AS DOUBLE)):double> +struct<avg(CAST(udf(cast(null as double)) AS DOUBLE)):double> -- !query 26 output NULL -- !query 27 -select sum(CAST('NaN' AS DOUBLE)) from range(1,4) +select sum(CAST(udf('NaN') AS DOUBLE)) from range(1,4) -- !query 27 schema -struct<sum(CAST(NaN AS DOUBLE)):double> +struct<sum(CAST(udf(NaN) AS DOUBLE)):double> -- !query 27 output NaN -- !query 28 -select avg(CAST('NaN' AS DOUBLE)) from range(1,4) +select avg(CAST(udf('NaN') AS DOUBLE)) from range(1,4) -- !query 28 schema -struct<avg(CAST(NaN AS DOUBLE)):double> +struct<avg(CAST(udf(NaN) AS DOUBLE)):double> -- !query 28 output NaN -- !query 30 -SELECT avg(CAST(x AS DOUBLE)), var_pop(CAST(x AS DOUBLE)) +SELECT avg(CAST(udf(x) AS DOUBLE)), var_pop(CAST(udf(x) AS DOUBLE)) FROM (VALUES ('Infinity'), ('1')) v(x) -- !query 30 schema -struct<avg(CAST(x AS DOUBLE)):double,var_pop(CAST(x AS DOUBLE)):double> +struct<avg(CAST(udf(x) AS DOUBLE)):double,var_pop(CAST(udf(x) AS DOUBLE)):double> -- !query 30 output Infinity NaN -- !query 31 -SELECT avg(CAST(x AS DOUBLE)), var_pop(CAST(x AS DOUBLE)) +SELECT avg(CAST(udf(x) AS DOUBLE)), var_pop(CAST(udf(x) AS DOUBLE)) FROM (VALUES ('Infinity'), ('Infinity')) v(x) -- !query 31 schema -struct<avg(CAST(x AS DOUBLE)):double,var_pop(CAST(x AS DOUBLE)):double> +struct<avg(CAST(udf(x) AS DOUBLE)):double,var_pop(CAST(udf(x) AS DOUBLE)):double> -- !query 31 output Infinity NaN -- !query 32 -SELECT avg(CAST(x AS DOUBLE)), var_pop(CAST(x AS DOUBLE)) +SELECT avg(CAST(udf(x) AS DOUBLE)), var_pop(CAST(udf(x) AS DOUBLE)) FROM (VALUES ('-Infinity'), ('Infinity')) v(x) -- !query 32 schema -struct<avg(CAST(x AS DOUBLE)):double,var_pop(CAST(x AS DOUBLE)):double> +struct<avg(CAST(udf(x) AS DOUBLE)):double,var_pop(CAST(udf(x) AS DOUBLE)):double> -- !query 32 output NaN NaN -- !query 33 -SELECT avg(CAST(x AS DOUBLE)), var_pop(CAST(x AS DOUBLE)) +SELECT CAST(avg(udf(CAST(x AS DOUBLE))) AS int), CAST(udf(var_pop(CAST(x AS DOUBLE))) AS decimal(10,3)) FROM (VALUES (100000003), (100000004), (100000006), (100000007)) v(x) -- !query 33 schema -struct<avg(CAST(x AS DOUBLE)):double,var_pop(CAST(x AS DOUBLE)):double> +struct<CAST(avg(CAST(udf(cast(x as double)) AS DOUBLE)) AS INT):int,CAST(udf(var_pop(cast(x as double))) AS DECIMAL(10,3)):decimal(10,3)> -- !query 33 output -1.00000005E8 2.5 +100000005 2.5 -- !query 34 -SELECT avg(CAST(x AS DOUBLE)), var_pop(CAST(x AS DOUBLE)) +SELECT CAST(avg(udf(CAST(x AS DOUBLE))) AS long), CAST(udf(var_pop(CAST(x AS DOUBLE))) AS decimal(10,3)) FROM (VALUES (7000000000005), (7000000000007)) v(x) -- !query 34 schema -struct<avg(CAST(x AS DOUBLE)):double,var_pop(CAST(x AS DOUBLE)):double> +struct<CAST(avg(CAST(udf(cast(x as double)) AS DOUBLE)) AS BIGINT):bigint,CAST(udf(var_pop(cast(x as double))) AS DECIMAL(10,3)):decimal(10,3)> -- !query 34 output -7.000000000006E12 1.0 +7000000000006 1 -- !query 35 -SELECT covar_pop(b, a), covar_samp(b, a) FROM aggtest +SELECT CAST(udf(covar_pop(b, udf(a))) AS decimal(10,3)), CAST(covar_samp(udf(b), a) as decimal(10,3)) FROM aggtest -- !query 35 schema -struct<covar_pop(CAST(b AS DOUBLE), CAST(a AS DOUBLE)):double,covar_samp(CAST(b AS DOUBLE), CAST(a AS DOUBLE)):double> +struct<CAST(udf(covar_pop(cast(b as double), cast(udf(a) as double))) AS DECIMAL(10,3)):decimal(10,3),CAST(covar_samp(CAST(udf(b) AS DOUBLE), CAST(a AS DOUBLE)) AS DECIMAL(10,3)):decimal(10,3)> -- !query 35 output -653.6289553875104 871.5052738500139 +653.629 871.505 -- !query 36 -SELECT corr(b, a) FROM aggtest +SELECT CAST(corr(b, udf(a)) AS decimal(10,3)) FROM aggtest -- !query 36 schema -struct<corr(CAST(b AS DOUBLE), CAST(a AS DOUBLE)):double> +struct<CAST(corr(CAST(b AS DOUBLE), CAST(udf(a) AS DOUBLE)) AS DECIMAL(10,3)):decimal(10,3)> -- !query 36 output -0.1396345165178734 +0.14 -- !query 37 -SELECT count(four) AS cnt_1000 FROM onek +SELECT count(udf(four)) AS cnt_1000 FROM onek -- !query 37 schema struct<cnt_1000:bigint> -- !query 37 output -313,18 +314,18 struct<cnt_1000:bigint> -- !query 38 -SELECT count(DISTINCT four) AS cnt_4 FROM onek +SELECT udf(count(DISTINCT four)) AS cnt_4 FROM onek -- !query 38 schema -struct<cnt_4:bigint> +struct<cnt_4:string> -- !query 38 output 4 -- !query 39 -select ten, count(*), sum(four) from onek +select ten, udf(count(*)), CAST(sum(udf(four)) AS int) from onek group by ten order by ten -- !query 39 schema -struct<ten:int,count(1):bigint,sum(four):bigint> +struct<ten:int,udf(count(1)):string,CAST(sum(CAST(udf(four) AS DOUBLE)) AS INT):int> -- !query 39 output 0 100 100 1 100 200 -339,10 +340,10 struct<ten:int,count(1):bigint,sum(four):bigint> -- !query 40 -select ten, count(four), sum(DISTINCT four) from onek +select ten, count(udf(four)), udf(sum(DISTINCT four)) from onek group by ten order by ten -- !query 40 schema -struct<ten:int,count(four):bigint,sum(DISTINCT four):bigint> +struct<ten:int,count(udf(four)):bigint,udf(sum(distinct cast(four as bigint))):string> -- !query 40 output 0 100 2 1 100 4 -357,11 +358,11 struct<ten:int,count(four):bigint,sum(DISTINCT four):bigint> -- !query 41 -select ten, sum(distinct four) from onek a +select ten, udf(sum(distinct four)) from onek a group by ten -having exists (select 1 from onek b where sum(distinct a.four) = b.four) +having exists (select 1 from onek b where udf(sum(distinct a.four)) = b.four) -- !query 41 schema -struct<ten:int,sum(DISTINCT four):bigint> +struct<ten:int,udf(sum(distinct cast(four as bigint))):string> -- !query 41 output 0 2 2 2 -374,23 +375,23 struct<ten:int,sum(DISTINCT four):bigint> select ten, sum(distinct four) from onek a group by ten having exists (select 1 from onek b - where sum(distinct a.four + b.four) = b.four) + where sum(distinct a.four + b.four) = udf(b.four)) -- !query 42 schema struct<> -- !query 42 output org.apache.spark.sql.AnalysisException Aggregate/Window/Generate expressions are not valid in where clause of the query. -Expression in where clause: [(sum(DISTINCT CAST((outer() + b.`four`) AS BIGINT)) = CAST(b.`four` AS BIGINT))] +Expression in where clause: [(sum(DISTINCT CAST((outer() + b.`four`) AS BIGINT)) = CAST(udf(four) AS BIGINT))] Invalid expressions: [sum(DISTINCT CAST((outer() + b.`four`) AS BIGINT))]; -- !query 43 select - (select max((select i.unique2 from tenk1 i where i.unique1 = o.unique1))) + (select udf(max((select i.unique2 from tenk1 i where i.unique1 = o.unique1)))) from tenk1 o -- !query 43 schema struct<> -- !query 43 output org.apache.spark.sql.AnalysisException -cannot resolve '`o.unique1`' given input columns: [i.even, i.fivethous, i.four, i.hundred, i.odd, i.string4, i.stringu1, i.stringu2, i.ten, i.tenthous, i.thousand, i.twenty, i.two, i.twothousand, i.unique1, i.unique2]; line 2 pos 63 +cannot resolve '`o.unique1`' given input columns: [i.even, i.fivethous, i.four, i.hundred, i.odd, i.string4, i.stringu1, i.stringu2, i.ten, i.tenthous, i.thousand, i.twenty, i.two, i.twothousand, i.unique1, i.unique2]; line 2 pos 67 ``` </p> </details> ## How was this patch tested? Manually tested in local. Also, with JDK 11: ``` Using /.../jdk-11.0.3.jdk/Contents/Home as default JAVA_HOME. Note, this will be overridden by -java-home if it is set. [info] Loading project definition from /.../spark/project [info] Updating {file:/.../spark/project/}spark-build... ... [info] SQLQueryTestSuite: ... [info] - udf/pgSQL/udf-aggregates_part1.sql - Scala UDF (17 seconds, 228 milliseconds) [info] - udf/pgSQL/udf-aggregates_part1.sql - Regular Python UDF (36 seconds, 170 milliseconds) [info] - udf/pgSQL/udf-aggregates_part1.sql - Scalar Pandas UDF (41 seconds, 132 milliseconds) ... ``` Closes apache#25110 from HyukjinKwon/SPARK-28270-1. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
…umnar ## What changes were proposed in this pull request? This is a second part of the https://issues.apache.org/jira/browse/SPARK-27396 and a follow on to apache#24795 ## How was this patch tested? I did some manual tests and ran/updated the automated tests I did some simple performance tests on a single node to try to verify that there is no performance impact, and I was not able to measure anything beyond noise. Closes apache#25008 from revans2/columnar-remove-batch-scan. Authored-by: Robert (Bobby) Evans <bobby@apache.org> Signed-off-by: Thomas Graves <tgraves@apache.org>
## What changes were proposed in this pull request? Cleaned up (removed) code duplication in `ObjectProducerExec` operators so they use the trait's methods. ## How was this patch tested? Local build. Waiting for Jenkins. Closes apache#25065 from jaceklaskowski/ObjectProducerExec-operators-cleanup. Authored-by: Jacek Laskowski <jacek@japila.pl> Signed-off-by: Sean Owen <sean.owen@databricks.com>
Fixed the review comment, thanks |
…le expressions ## What changes were proposed in this pull request? Reverted initialization of date-time constants in `DateTimeUtils` introduced by apache#23878. As a comment in [Delta repo](https://github.com/delta-io/delta) states, the compiler can do additional optimizations if values can be calculated at compile time: https://github.com/delta-io/delta/blob/master/src/main/scala/org/apache/spark/sql/delta/util/DateTimeUtils.scala#L63-L75 ## How was this patch tested? This was tested by existing test suites. Closes apache#25116 from MaxGekk/datetime-consts-init. Authored-by: Maxim Gekk <maxim.gekk@databricks.com> Signed-off-by: herman <herman@databricks.com>
…onfigurations. ## What changes were proposed in this pull request? At the moment Kafka delegation tokens are fetched through `AdminClient` but there is no possibility to add custom configuration parameters. In [options](https://spark.apache.org/docs/2.4.3/structured-streaming-kafka-integration.html#kafka-specific-configurations) there is already a possibility to add custom configurations. In this PR I've added similar this possibility to `AdminClient`. ## How was this patch tested? Existing + added unit tests. ``` cd docs/ SKIP_API=1 jekyll build ``` Manual webpage check. Closes apache#24875 from gaborgsomogyi/SPARK-28055. Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request? This PR adds two new config properties: `spark.driver.defaultJavaOptions` and `spark.executor.defaultJavaOptions`. These are intended to be set by administrators in a file of defaults for options like JVM garbage collection algorithm. Users will still set `extraJavaOptions` properties, and both sets of JVM options will be added to start a JVM (default options are prepended to extra options). ## How was this patch tested? Existing + additional unit tests. ``` cd docs/ SKIP_API=1 jekyll build ``` Manual webpage check. Closes apache#24804 from gaborgsomogyi/SPARK-23472. Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
Test build #107540 has finished for PR 25113 at commit
|
## What changes were proposed in this pull request? This PR is to port select.sql from PostgreSQL regression tests. https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/select.sql The expected results can be found in the link: https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/expected/select.out When porting the test cases, found four PostgreSQL specific features that do not exist in Spark SQL: [SPARK-28010](https://issues.apache.org/jira/browse/SPARK-28010): Support ORDER BY ... USING syntax [SPARK-28329](https://issues.apache.org/jira/browse/SPARK-28329): Support SELECT INTO syntax [SPARK-28330](https://issues.apache.org/jira/browse/SPARK-28330): Enhance query limit [SPARK-28296](https://issues.apache.org/jira/browse/SPARK-28296): Improved VALUES support Also, found one inconsistent behavior: [SPARK-28333](https://issues.apache.org/jira/browse/SPARK-28333): `NULLS FIRST` for `DESC` and `NULLS LAST` for `ASC` ## How was this patch tested? N/A Closes apache#25096 from wangyum/SPARK-28334. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request? Implement `ALTER TABLE` for v2 tables: * Add `AlterTable` logical plan and `AlterTableExec` physical plan * Convert `ALTER TABLE` parsed plans to `AlterTable` when a v2 catalog is responsible for an identifier * Validate that columns to alter exist in analyzer checks * Fix nested type handling in `CatalogV2Util` ## How was this patch tested? * Add extensive tests in `DataSourceV2SQLSuite` Closes apache#24937 from rdblue/SPARK-28139-add-v2-alter-table. Lead-authored-by: Ryan Blue <blue@apache.org> Co-authored-by: Ryan Blue <rdblue@users.noreply.github.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request? 0-args Java UDF alone calls the function even before making it as an expression. It causes that the function always returns the same value and the function is called at driver side. Seems like a mistake. ## How was this patch tested? Unit test was added Closes apache#25108 from HyukjinKwon/SPARK-28321. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…nput of UDF as double in the failed test in udf-aggregate_part1.sql ## What changes were proposed in this pull request? It still can be flaky on certain environments due to float limitation described at apache#25110 . See apache#25110 (comment) - https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.7/6584/testReport/org.apache.spark.sql/SQLQueryTestSuite/udf_pgSQL_udf_aggregates_part1_sql___Regular_Python_UDF/ ``` Expected "700000000000[6] 1", but got "700000000000[5] 1" Result did not match for query apache#33&apache#10;SELECT CAST(avg(udf(CAST(x AS DOUBLE))) AS long), CAST(udf(var_pop(CAST(x AS DOUBLE))) AS decimal(10,3))&apache#10;FROM (VALUES (7000000000005), (7000000000007)) v(x) ``` Here;s what's going on: apache#25110 (comment) ``` scala> Seq("7000000000004.999", "7000000000006.999").toDF().selectExpr("CAST(avg(value) AS long)").show() +--------------------------+ |CAST(avg(value) AS BIGINT)| +--------------------------+ | 7000000000005| +--------------------------+ ``` Therefore, this PR just avoid to cast in the specific test. This is a temp fix. We need more robust way to avoid such cases. ## How was this patch tested? It passes with Maven in my local before/after this PR. I believe the problem seems similarly the Python or OS installed in the machine. I should test this against PR builder with `test-maven` for sure.. Closes apache#25128 from HyukjinKwon/SPARK-28270-2. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request? There are some hardcoded configs, using config entry to replace them. ## How was this patch tested? Existing UT Closes apache#25059 from WangGuangxin/ConfigEntry. Authored-by: wangguangxin.cn <wangguangxin.cn@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request? This PR is to port with.sql from PostgreSQL regression tests. https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/with.sql The expected results can be found in the link: https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/expected/with.out When porting the test cases, found 7 PostgreSQL specific features that do not exist in Spark SQL: - [SPARK-19799](https://issues.apache.org/jira/browse/SPARK-19799) Support WITH clause in subqueries - [SPARK-24497](https://issues.apache.org/jira/browse/SPARK-24497) Support recursive SQL query - [SPARK-28297](https://issues.apache.org/jira/browse/SPARK-28297) Handling outer links in CTE subquery expressions - [SPARK-28296](https://issues.apache.org/jira/browse/SPARK-28296) Improved VALUES support - [SPARK-28146](https://issues.apache.org/jira/browse/SPARK-28146) Support IS OF type predicate - [SPARK-28147](https://issues.apache.org/jira/browse/SPARK-28147) Support RETURNING clause - [SPARK-27878](https://issues.apache.org/jira/browse/SPARK-27878) Support ARRAY(sub-SELECT) expressions Also, found one inconsistent behavior: - [SPARK-28299](https://issues.apache.org/jira/browse/SPARK-28299) Evaluation of multiple CTE uses Also, added the following notes: - Spark SQL doesn't support DELETE statement - Spark SQL doesn't support UPDATE statement - Spark SQL doesn't support RULEs - Spark SQL doesn't support UNIQUE constraints - Spark SQL doesn't support ON CONFLICT clause - Spark SQL doesn't support TRIGGERs - Spark SQL doesn't support INHERITS clause ## How was this patch tested? N/A Closes apache#24860 from peter-toth/SPARK-28034. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…ile appender - size-based rolling compressed ## What changes were proposed in this pull request? `SizeBasedRollingPolicy.shouldRollover` returns false when the size is equal to `rolloverSizeBytes`. ```scala /** Should rollover if the next set of bytes is going to exceed the size limit */ def shouldRollover(bytesToBeWritten: Long): Boolean = { logDebug(s"$bytesToBeWritten + $bytesWrittenSinceRollover > $rolloverSizeBytes") bytesToBeWritten + bytesWrittenSinceRollover > rolloverSizeBytes } ``` - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/107553/testReport/org.apache.spark.util/FileAppenderSuite/rolling_file_appender___size_based_rolling__compressed_/ ``` org.scalatest.exceptions.TestFailedException: 1000 was not less than 1000 ``` ## How was this patch tested? Pass the Jenkins with the updated test. Closes apache#25125 from dongjoon-hyun/SPARK-28357. Authored-by: Dongjoon Hyun <dhyun@apple.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request? `dev/merge_spark_pr.py` script always fail for some users because they have different `name` and `key`. - https://issues.apache.org/jira/rest/api/2/user?username=yumwang JIRA Client expects `name`, but we are using `key`. This PR fixes it. ```python # This is JIRA client code `/usr/local/lib/python2.7/site-packages/jira/client.py` def assign_issue(self, issue, assignee): """Assign an issue to a user. None will set it to unassigned. -1 will set it to Automatic. :param issue: the issue ID or key to assign :param assignee: the user to assign the issue to :type issue: int or str :type assignee: str :rtype: bool """ url = self._options['server'] + \ '/rest/api/latest/issue/' + str(issue) + '/assignee' payload = {'name': assignee} r = self._session.put( url, data=json.dumps(payload)) raise_on_error(r) return True ``` ## How was this patch tested? Manual with the committer ID/password. ```python import jira.client asf_jira = jira.client.JIRA({'server': 'https://issues.apache.org/jira'}, basic_auth=('yourid', 'passwd')) asf_jira.assign_issue("SPARK-28354", "q79969786") # This will raise exception. asf_jira.assign_issue("SPARK-28354", "yumwang") # This works. ``` Closes apache#25120 from dongjoon-hyun/SPARK-28354. Authored-by: Dongjoon Hyun <dhyun@apple.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request? This PR adds compatibility of handling a `WITH` clause within another `WITH` cause. Before this PR these queries retuned `1` while after this PR they return `2` as PostgreSQL does: ``` WITH t AS (SELECT 1), t2 AS ( WITH t AS (SELECT 2) SELECT * FROM t ) SELECT * FROM t2 ``` ``` WITH t AS (SELECT 1) SELECT ( WITH t AS (SELECT 2) SELECT * FROM t ) ``` As this is an incompatible change, the PR introduces the `spark.sql.legacy.cte.substitution.enabled` flag as an option to restore old behaviour. ## How was this patch tested? Added new UTs. Closes apache#25029 from peter-toth/SPARK-28228. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request? The `ThriftServerTab` displays a FINISHED state when the operation finishes execution, but quite often it still takes a lot of time to fetch the results. OperationState has state CLOSED for when after the iterator is closed. This PR add CLOSED state to ExecutionState, and override the `close()` in SparkExecuteStatementOperation, SparkGetColumnsOperation, SparkGetSchemasOperation and SparkGetTablesOperation. ## How was this patch tested? manual tests 1. Add `Thread.sleep(10000)` before [SparkExecuteStatementOperation.scala#L112](https://github.com/apache/spark/blob/b2e7677f4d3d8f47f5f148680af39d38f2b558f0/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala#L112) 2. Switch to `ThriftServerTab`:  3. After a while:  Closes apache#25062 from wangyum/SPARK-28260. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: gatorsmile <gatorsmile@gmail.com>
…t.range(0, v.size) ## What changes were proposed in this pull request? fix typo in spark-28159 `transfromWithMean` -> `transformWithMean` ## How was this patch tested? existing test Closes apache#25129 from zhengruifeng/to_ml_vec_cleanup. Authored-by: zhengruifeng <ruifengz@foxmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…lass name A code gen test in WholeStageCodeGenSuite was flaky because it used the codegen metrics class to test if the generated code for equivalent plans was identical under a particular flag. This patch switches the test to compare the generated code directly. N/A Closes apache#25131 from gatorsmile/WholeStageCodegenSuite. Authored-by: gatorsmile <gatorsmile@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…mand is compressed by broadcast ## What changes were proposed in this pull request? The `_prepare_for_python_RDD` method currently broadcasts a pickled command if its length is greater than the hardcoded value `1 << 20` (1M). This change sets this value as a Spark conf instead. ## How was this patch tested? Unit tests, manual tests. Closes apache#25123 from jessecai/SPARK-28355. Authored-by: Jesse Cai <jesse.cai@databricks.com> Signed-off-by: gatorsmile <gatorsmile@gmail.com>
…inuousSuite ## What changes were proposed in this pull request? This patch fixes the flaky test "query without test harness" on ContinuousSuite, via adding some more gaps on waiting query to commit the epoch which writes output rows. The observation of this issue is below (injected some debug logs to get them): ``` reader creation time 1562225320210 epoch 1 launched 1562225320593 (+380ms from reader creation time) epoch 13 launched 1562225321702 (+1.5s from reader creation time) partition reader creation time 1562225321715 (+1.5s from reader creation time) next read time for first next call 1562225321210 (+1s from reader creation time) first next called in partition reader 1562225321746 (immediately after creation of partition reader) wait finished in next called in partition reader 1562225321746 (no wait) second next called in partition reader 1562225321747 (immediately after first next()) epoch 0 commit started 1562225321861 writing rows (0, 1) (belong to epoch 13) 1562225321866 (+100ms after first next()) wait start in waitForRateSourceTriggers(2) 1562225322059 next read time for second next call 1562225322210 (+1s from previous "next read time") wait finished in next called in partition reader 1562225322211 (+450ms wait) writing rows (2, 3) (belong to epoch 13) 1562225322211 (immediately after next()) epoch 14 launched 1562225322246 desired wait time in waitForRateSourceTriggers(2) 1562225322510 (+2.3s from reader creation time) epoch 12 committed 1562225323034 ``` These rows were written within desired wait time, but the epoch 13 couldn't be committed within it. Interestingly, epoch 12 was lucky to be committed within a gap between finished waiting in waitForRateSourceTriggers and query.stop() - but even suppose the rows were written in epoch 12, it would be just in luck and epoch should be committed within desired wait time. This patch modifies Rate continuous stream to track the highest committed value, so that test can wait until desired value is reported to the stream as committed. This patch also modifies Rate continuous stream to track the timestamp at stream gets the first committed offset, and let `waitForRateSourceTriggers` use the timestamp. This also relies on waiting for specific period, but safer approach compared to current based on the observation above. Based on the change, this patch saves couple of seconds in test time. ## How was this patch tested? 10 sequential test runs succeeded locally. Closes apache#25048 from HeartSaVioR/SPARK-28247. Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com> Signed-off-by: Sean Owen <sean.owen@databricks.com>
…Type and CalendarIntervalType ## What changes were proposed in this pull request? Existing random generators in tests produce wide ranges of values that can be out of supported ranges for: - `DateType`, the valid range is `[0001-01-01, 9999-12-31]` - `TimestampType` supports values in `[0001-01-01T00:00:00.000000Z, 9999-12-31T23:59:59.999999Z]` - `CalendarIntervalType` should define intervals for the ranges above. Dates and timestamps produced by random literal generators are usually out of valid ranges for those types. And tests just check invalid values or values caused by arithmetic overflow. In the PR, I propose to restrict tested pseudo-random values by valid ranges of `DateType`, `TimestampType` and `CalendarIntervalType`. This should allow to check valid values in test, and avoid wasting time on a priori invalid inputs. ## How was this patch tested? The changes were checked by `DateExpressionsSuite` and modified `DateTimeUtils.dateAddMonths`: ```Scala def dateAddMonths(days: SQLDate, months: Int): SQLDate = { localDateToDays(LocalDate.ofEpochDay(days).plusMonths(months)) } ``` Closes apache#25166 from MaxGekk/datetime-lit-random-gen. Authored-by: Maxim Gekk <max.gekk@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…down to join ## What changes were proposed in this pull request? A `Filter` predicate using `PythonUDF` can't be push down into join condition, currently. A predicate like that should be able to push down to join condition. For `PythonUDF`s that can't be evaluated in join condition, `PullOutPythonUDFInJoinCondition` will pull them out later. An example like: ```scala val pythonTestUDF = TestPythonUDF(name = "udf") val left = Seq((1, 2), (2, 3)).toDF("a", "b") val right = Seq((1, 2), (3, 4)).toDF("c", "d") val df = left.crossJoin(right).where(pythonTestUDF($"a") === pythonTestUDF($"c")) ``` Query plan before the PR: ``` == Physical Plan == *(3) Project [a#2121, b#2122, c#2132, d#2133] +- *(3) Filter (pythonUDF0#2142 = pythonUDF1#2143) +- BatchEvalPython [udf(a#2121), udf(c#2132)], [pythonUDF0#2142, pythonUDF1#2143] +- BroadcastNestedLoopJoin BuildRight, Cross :- *(1) Project [_1#2116 AS a#2121, _2#2117 AS b#2122] : +- LocalTableScan [_1#2116, _2#2117] +- BroadcastExchange IdentityBroadcastMode +- *(2) Project [_1#2127 AS c#2132, _2#2128 AS d#2133] +- LocalTableScan [_1#2127, _2#2128] ``` Query plan after the PR: ``` == Physical Plan == *(3) Project [a#2121, b#2122, c#2132, d#2133] +- *(3) BroadcastHashJoin [pythonUDF0#2142], [pythonUDF0#2143], Cross, BuildRight :- BatchEvalPython [udf(a#2121)], [pythonUDF0#2142] : +- *(1) Project [_1#2116 AS a#2121, _2#2117 AS b#2122] : +- LocalTableScan [_1#2116, _2#2117] +- BroadcastExchange HashedRelationBroadcastMode(List(input[2, string, true])) +- BatchEvalPython [udf(c#2132)], [pythonUDF0#2143] +- *(2) Project [_1#2127 AS c#2132, _2#2128 AS d#2133] +- LocalTableScan [_1#2127, _2#2128] ``` After this PR, the join can use `BroadcastHashJoin`, instead of `BroadcastNestedLoopJoin`. ## How was this patch tested? Added tests. Closes apache#25106 from viirya/pythonudf-join-condition. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
…Context, check jar path exist first. ## What changes were proposed in this pull request? ISSUE : https://issues.apache.org/jira/browse/SPARK-28106 When we use add jar in SQL, it will have three step: - add jar to HiveClient's classloader - HiveClientImpl.runHiveSQL("ADD JAR" + PATH) - SessionStateBuilder.addJar The second step seems has no impact to the whole process. Since event it failed, we still can execute. The first step will add jar path to HiveClient's ClassLoader, then we can use the jar in HiveClientImpl The Third Step will add this jar path to SparkContext. But expect local file path, it will call RpcServer's FileServer to add this to Env, the is you pass wrong path. it will cause error, but if you pass HDFS path or VIEWFS path, it won't check it and just add it to jar Path Map. Then when next TaskSetManager send out Task, this path will be brought by TaskDescription. Then Executor will call updateDependencies, this method will check all jar path and file path in TaskDescription. Then error happends like below:  ## How was this patch tested? Exist Unit Test Environment Test Closes apache#24909 from AngersZhuuuu/SPARK-28106. Lead-authored-by: Angers <angers.zhu@gamil.com> Co-authored-by: 朱夷 <zhuyi01@corp.netease.com> Signed-off-by: jerryshao <jerryshao@tencent.com>
## What changes were proposed in this pull request? PostgreSQL, Teradata, SQL Server, DB2 and Presto perform integral division with the `/` operator. But Oracle, Vertica, Hive, MySQL and MariaDB perform fractional division with the `/` operator. This pr add a flag(`spark.sql.function.preferIntegralDivision`) to control whether to use integral division with the `/` operator. Examples: **PostgreSQL**: ```sql postgres=# select substr(version(), 0, 16), cast(10 as int) / cast(3 as int), cast(10.1 as float8) / cast(3 as int), cast(10 as int) / cast(3.1 as float8), cast(10.1 as float8)/cast(3.1 as float8); substr | ?column? | ?column? | ?column? | ?column? -----------------+----------+------------------+-----------------+------------------ PostgreSQL 11.3 | 3 | 3.36666666666667 | 3.2258064516129 | 3.25806451612903 (1 row) ``` **SQL Server**: ```sql 1> select cast(10 as int) / cast(3 as int), cast(10.1 as float) / cast(3 as int), cast(10 as int) / cast(3.1 as float), cast(10.1 as float)/cast(3.1 as float); 2> go ----------- ------------------------ ------------------------ ------------------------ 3 3.3666666666666667 3.225806451612903 3.258064516129032 (1 rows affected) ``` **DB2**: ```sql [db2inst12f3c821d36b7 ~]$ db2 "select cast(10 as int) / cast(3 as int), cast(10.1 as double) / cast(3 as int), cast(10 as int) / cast(3.1 as double), cast(10.1 as double)/cast(3.1 as double) from table (sysproc.env_get_inst_info())" 1 2 3 4 ----------- ------------------------ ------------------------ ------------------------ 3 +3.36666666666667E+000 +3.22580645161290E+000 +3.25806451612903E+000 1 record(s) selected. ``` **Presto**: ```sql presto> select cast(10 as int) / cast(3 as int), cast(10.1 as double) / cast(3 as int), cast(10 as int) / cast(3.1 as double), cast(10.1 as double)/cast(3.1 as double); _col0 | _col1 | _col2 | _col3 -------+--------------------+-------------------+------------------- 3 | 3.3666666666666667 | 3.225806451612903 | 3.258064516129032 (1 row) ``` **Teradata**:  **Oracle**: ```sql SQL> select 10 / 3 from dual; 10/3 ---------- 3.33333333 ``` **Vertica** ```sql dbadmin=> select version(), cast(10 as int) / cast(3 as int), cast(10.1 as float8) / cast(3 as int), cast(10 as int) / cast(3.1 as float8), cast(10.1 as float8)/cast(3.1 as float8); version | ?column? | ?column? | ?column? | ?column? ------------------------------------+----------------------+------------------+-----------------+------------------ Vertica Analytic Database v9.1.1-0 | 3.333333333333333333 | 3.36666666666667 | 3.2258064516129 | 3.25806451612903 (1 row) ``` **Hive**: ```sql hive> select cast(10 as int) / cast(3 as int), cast(10.1 as double) / cast(3 as int), cast(10 as int) / cast(3.1 as double), cast(10.1 as double)/cast(3.1 as double); OK 3.3333333333333335 3.3666666666666667 3.225806451612903 3.258064516129032 Time taken: 0.143 seconds, Fetched: 1 row(s) ``` **MariaDB**: ```sql MariaDB [(none)]> select version(), cast(10 as int) / cast(3 as int), cast(10.1 as double) / cast(3 as int), cast(10 as int) / cast(3.1 as double), cast(10.1 as double)/cast(3.1 as double); +--------------------------------------+----------------------------------+---------------------------------------+---------------------------------------+------------------------------------------+ | version() | cast(10 as int) / cast(3 as int) | cast(10.1 as double) / cast(3 as int) | cast(10 as int) / cast(3.1 as double) | cast(10.1 as double)/cast(3.1 as double) | +--------------------------------------+----------------------------------+---------------------------------------+---------------------------------------+------------------------------------------+ | 10.4.6-MariaDB-1:10.4.6+maria~bionic | 3.3333 | 3.3666666666666667 | 3.225806451612903 | 3.258064516129032 | +--------------------------------------+----------------------------------+---------------------------------------+---------------------------------------+------------------------------------------+ 1 row in set (0.000 sec) ``` **MySQL**: ```sql mysql> select version(), 10 / 3, 10 / 3.1, 10.1 / 3, 10.1 / 3.1; +-----------+--------+----------+----------+------------+ | version() | 10 / 3 | 10 / 3.1 | 10.1 / 3 | 10.1 / 3.1 | +-----------+--------+----------+----------+------------+ | 8.0.16 | 3.3333 | 3.2258 | 3.36667 | 3.25806 | +-----------+--------+----------+----------+------------+ 1 row in set (0.00 sec) ``` ## How was this patch tested? unit tests Closes apache#25158 from wangyum/SPARK-28395. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…ng to the new exception message ## What changes were proposed in this pull request? apache#25010 breaks the integration test suite due to the changing the user-facing exception like the following. This PR fixes the integration test suite. ```scala - require( - decimalVal.precision <= precision, - s"Decimal precision ${decimalVal.precision} exceeds max precision $precision") + if (decimalVal.precision > precision) { + throw new ArithmeticException( + s"Decimal precision ${decimalVal.precision} exceeds max precision $precision") + } ``` ## How was this patch tested? Manual test. ``` $ build/mvn install -DskipTests $ build/mvn -Pdocker-integration-tests -pl :spark-docker-integration-tests_2.12 test ``` Closes apache#25165 from dongjoon-hyun/SPARK-28201. Authored-by: Dongjoon Hyun <dhyun@apple.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…essions gracefully ## What changes were proposed in this pull request? When reordering joins EnsureRequirements only checks if all the join keys are present in the partitioning expression seq. This is problematic when the joins keys and and partitioning expressions both contain duplicates but not the same number of duplicates for each expression, e.g. `Seq(a, a, b)` vs `Seq(a, b, b)`. This fails with an index lookup failure in the `reorder` function. This PR fixes this removing the equality checking logic from the `reorderJoinKeys` function, and by doing the multiset equality in the `reorder` function while building the reordered key sequences. ## How was this patch tested? Added a unit test to the `PlannerSuite` and added an integration test to `JoinSuite` Closes apache#25167 from hvanhovell/SPARK-27485. Authored-by: herman <herman@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…tion in adaptive execution ## What changes were proposed in this pull request? Adaptive execution reduces the number of post-shuffle partitions at runtime, even for shuffles caused by repartition. However, the user likely wants to get the desired number of partition when he calls repartition even in adaptive execution. This PR adds an internal config to control this and by default adaptive execution will not change the number of post-shuffle partition for repartition. ## How was this patch tested? New tests added. Closes apache#25121 from carsonwang/AE_repartition. Authored-by: Carson Wang <carson.wang@intel.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request? This PR is to port float8.sql from PostgreSQL regression tests. https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/float8.sql The expected results can be found in the link: https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/expected/float8.out When porting the test cases, found six PostgreSQL specific features that do not exist in Spark SQL: [SPARK-28060](https://issues.apache.org/jira/browse/SPARK-28060): Double type can not accept some special inputs [SPARK-28027](https://issues.apache.org/jira/browse/SPARK-28027): Spark SQL does not support prefix operator `` and `|/` [SPARK-28061](https://issues.apache.org/jira/browse/SPARK-28061): Support for converting float to binary format [SPARK-23906](https://issues.apache.org/jira/browse/SPARK-23906): Support Truncate number [SPARK-28134](https://issues.apache.org/jira/browse/SPARK-28134): Missing Trigonometric Functions Also, found two bug: [SPARK-28024](https://issues.apache.org/jira/browse/SPARK-28024): Incorrect value when out of range [SPARK-28135](https://issues.apache.org/jira/browse/SPARK-28135): ceil/ceiling/floor/power returns incorrect values Also, found four inconsistent behavior: [SPARK-27923](https://issues.apache.org/jira/browse/SPARK-27923): Spark SQL insert bad inputs to NULL [SPARK-28028](https://issues.apache.org/jira/browse/SPARK-28028): Cast numeric to integral type need round [SPARK-27923](https://issues.apache.org/jira/browse/SPARK-27923): Spark SQL returns NULL when dividing by zero [SPARK-28007](https://issues.apache.org/jira/browse/SPARK-28007): Caret operator (^) means bitwise XOR in Spark/Hive and exponentiation in Postgres ## How was this patch tested? N/A Closes apache#24931 from wangyum/SPARK-28129. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
…partition in adaptive execution - fix compilation ## What changes were proposed in this pull request? PR builder failed with the following error: ``` [error] /home/jenkins/workspace/SparkPullRequestBuilder/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala:714: wrong number of arguments for pattern org.apache.spark.sql.execution.exchange.ShuffleExchangeExec(outputPartitioning: org.apache.spark.sql.catalyst.plans.physical.Partitioning,child: org.apache.spark.sql.execution.SparkPlan,canChangeNumPartitions: Boolean) [error] ShuffleExchangeExec(HashPartitioning(leftPartitioningExpressions, _), _), _), [error] ^ [error] /home/jenkins/workspace/SparkPullRequestBuilder/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala:716: wrong number of arguments for pattern org.apache.spark.sql.execution.exchange.ShuffleExchangeExec(outputPartitioning: org.apache.spark.sql.catalyst.plans.physical.Partitioning,child: org.apache.spark.sql.execution.SparkPlan,canChangeNumPartitions: Boolean) [error] ShuffleExchangeExec(HashPartitioning(rightPartitioningExpressions, _), _), _)) => [error] ^ ``` ## How was this patch tested? Existing unit test. Closes apache#25171 from gaborgsomogyi/SPARK-27485. Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com> Signed-off-by: herman <herman@databricks.com>
…names ## What changes were proposed in this pull request? In regression/clustering/ovr/als, if an output column name is empty, igore it. And if all names are empty, log a warning msg, then do nothing. ## How was this patch tested? existing tests Closes apache#24793 from zhengruifeng/aft_iso_check_empty_outputCol. Authored-by: zhengruifeng <ruifengz@foxmail.com> Signed-off-by: Sean Owen <sean.owen@databricks.com>
…ntegralDivision for PostgreSQL testing ## What changes were proposed in this pull request? This PR enables `spark.sql.function.preferIntegralDivision` for PostgreSQL testing. ## How was this patch tested? N/A Closes apache#25170 from wangyum/SPARK-28343-2. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request? we are adding in generic resource support into spark where we have suffix for the amount of the resource so that we could support other configs. Spark on yarn already had added configs to request resources via the configs spark.yarn.{executor/driver/am}.resource=<some amount>, where the <some amount> is value and unit together. We should change those configs to have a `.amount` suffix on them to match the spark configs and to allow future configs to be more easily added. YARN itself already supports tags and attributes so if we want the user to be able to pass those from spark at some point having a suffix makes sense. it would allow for a spark.yarn.{executor/driver/am}.resource.{resource}.tag= type config. ## How was this patch tested? Tested via unit tests and manually on a yarn 3.x cluster with GPU resources configured on. Closes apache#24989 from tgravescs/SPARK-27959-yarn-resourceconfigs. Authored-by: Thomas Graves <tgraves@nvidia.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request? Add 4 additional agg to KeyValueGroupedDataset ## How was this patch tested? New test in DatasetSuite for typed aggregation Closes apache#24993 from nooberfsh/sqlagg. Authored-by: nooberfsh <nooberfsh@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This change adds a new option that enables dynamic allocation without the need for a shuffle service. This mode works by tracking which stages generate shuffle files, and keeping executors that generate data for those shuffles alive while the jobs that use them are active. A separate timeout is also added for shuffle data; so that executors that hold shuffle data can use a separate timeout before being removed because of being idle. This allows the shuffle data to be kept around in case it is needed by some new job, or allow users to be more aggressive in timing out executors that don't have shuffle data in active use. The code also hooks up to the context cleaner so that shuffles that are garbage collected are detected, and the respective executors not held unnecessarily. Testing done with added unit tests, and also with TPC-DS workloads on YARN without a shuffle service. Closes apache#24817 from vanzin/SPARK-27963. Authored-by: Marcelo Vanzin <vanzin@cloudera.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
…ution_listener_on_collect' ## What changes were proposed in this pull request? It fixes a flaky test: ``` ERROR [0.164s]: test_query_execution_listener_on_collect (pyspark.sql.tests.test_dataframe.QueryExecutionListenerTests) ---------------------------------------------------------------------- Traceback (most recent call last): File "/home/jenkins/python/pyspark/sql/tests/test_dataframe.py", line 758, in test_query_execution_listener_on_collect "The callback from the query execution listener should be called after 'collect'") AssertionError: The callback from the query execution listener should be called after 'collect' ``` Seems it can be failed because the event was somehow delayed but checked first. ## How was this patch tested? Manually. Closes apache#25177 from HyukjinKwon/SPARK-28418. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
… making UDFs (virtually) no-op ## What changes were proposed in this pull request? Current UDFs available in `IntegratedUDFTestUtils` are not exactly no-op. It converts input column to strings and outputs to strings. It causes some issues when we convert and port the tests at SPARK-27921. Integrated UDF test cases share one output file and it should outputs the same. However, 1. Special values are converted into strings differently: | Scala | Python | | ---------- | ------ | | `null` | `None` | | `Infinity` | `inf` | | `-Infinity`| `-inf` | | `NaN` | `nan` | 2. Due to float limitation at Python (see https://docs.python.org/3/tutorial/floatingpoint.html), if float is passed into Python and sent back to JVM, the values are potentially not exactly correct. See apache#25128 and apache#25110 To work around this, this PR targets to change the current UDF to be wrapped by cast. So, Input column is casted into string, UDF returns strings as are, and then output column is casted back to the input column. Roughly: **Before:** ``` JVM (col1) -> (cast to string within Python) Python (string) -> (string) JVM ``` **After:** ``` JVM (cast col1 to string) -> (string) Python (string) -> (cast back to col1's type) JVM ``` In this way, UDF is virtually no-op although there might be some subtleties due to roundtrip in string cast. I believe this is good enough. Python native functions and Scala native functions will take strings and output strings as are. So, there will be no potential test failures due to differences of conversion between Python and Scala. After this fix, for instance, `udf-aggregates_part1.sql` outputs exactly same as `aggregates_part1.sql`: <details><summary>Diff comparing to 'pgSQL/aggregates_part1.sql'</summary> <p> ```diff diff --git a/sql/core/src/test/resources/sql-tests/results/pgSQL/aggregates_part1.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-aggregates_part1.sql.out index 51ca1d5..801735781c7 100644 --- a/sql/core/src/test/resources/sql-tests/results/pgSQL/aggregates_part1.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-aggregates_part1.sql.out -3,7 +3,7 -- !query 0 -SELECT avg(four) AS avg_1 FROM onek +SELECT avg(udf(four)) AS avg_1 FROM onek -- !query 0 schema struct<avg_1:double> -- !query 0 output -11,7 +11,7 struct<avg_1:double> -- !query 1 -SELECT avg(a) AS avg_32 FROM aggtest WHERE a < 100 +SELECT udf(avg(a)) AS avg_32 FROM aggtest WHERE a < 100 -- !query 1 schema struct<avg_32:double> -- !query 1 output -19,7 +19,7 struct<avg_32:double> -- !query 2 -select CAST(avg(b) AS Decimal(10,3)) AS avg_107_943 FROM aggtest +select CAST(avg(udf(b)) AS Decimal(10,3)) AS avg_107_943 FROM aggtest -- !query 2 schema struct<avg_107_943:decimal(10,3)> -- !query 2 output -27,7 +27,7 struct<avg_107_943:decimal(10,3)> -- !query 3 -SELECT sum(four) AS sum_1500 FROM onek +SELECT sum(udf(four)) AS sum_1500 FROM onek -- !query 3 schema struct<sum_1500:bigint> -- !query 3 output -35,7 +35,7 struct<sum_1500:bigint> -- !query 4 -SELECT sum(a) AS sum_198 FROM aggtest +SELECT udf(sum(a)) AS sum_198 FROM aggtest -- !query 4 schema struct<sum_198:bigint> -- !query 4 output -43,7 +43,7 struct<sum_198:bigint> -- !query 5 -SELECT sum(b) AS avg_431_773 FROM aggtest +SELECT udf(udf(sum(b))) AS avg_431_773 FROM aggtest -- !query 5 schema struct<avg_431_773:double> -- !query 5 output -51,7 +51,7 struct<avg_431_773:double> -- !query 6 -SELECT max(four) AS max_3 FROM onek +SELECT udf(max(four)) AS max_3 FROM onek -- !query 6 schema struct<max_3:int> -- !query 6 output -59,7 +59,7 struct<max_3:int> -- !query 7 -SELECT max(a) AS max_100 FROM aggtest +SELECT max(udf(a)) AS max_100 FROM aggtest -- !query 7 schema struct<max_100:int> -- !query 7 output -67,7 +67,7 struct<max_100:int> -- !query 8 -SELECT max(aggtest.b) AS max_324_78 FROM aggtest +SELECT udf(udf(max(aggtest.b))) AS max_324_78 FROM aggtest -- !query 8 schema struct<max_324_78:float> -- !query 8 output -75,237 +75,238 struct<max_324_78:float> -- !query 9 -SELECT stddev_pop(b) FROM aggtest +SELECT stddev_pop(udf(b)) FROM aggtest -- !query 9 schema -struct<stddev_pop(CAST(b AS DOUBLE)):double> +struct<stddev_pop(CAST(CAST(udf(cast(b as string)) AS FLOAT) AS DOUBLE)):double> -- !query 9 output 131.10703231895047 -- !query 10 -SELECT stddev_samp(b) FROM aggtest +SELECT udf(stddev_samp(b)) FROM aggtest -- !query 10 schema -struct<stddev_samp(CAST(b AS DOUBLE)):double> +struct<CAST(udf(cast(stddev_samp(cast(b as double)) as string)) AS DOUBLE):double> -- !query 10 output 151.38936080399804 -- !query 11 -SELECT var_pop(b) FROM aggtest +SELECT var_pop(udf(b)) FROM aggtest -- !query 11 schema -struct<var_pop(CAST(b AS DOUBLE)):double> +struct<var_pop(CAST(CAST(udf(cast(b as string)) AS FLOAT) AS DOUBLE)):double> -- !query 11 output 17189.053923482323 -- !query 12 -SELECT var_samp(b) FROM aggtest +SELECT udf(var_samp(b)) FROM aggtest -- !query 12 schema -struct<var_samp(CAST(b AS DOUBLE)):double> +struct<CAST(udf(cast(var_samp(cast(b as double)) as string)) AS DOUBLE):double> -- !query 12 output 22918.738564643096 -- !query 13 -SELECT stddev_pop(CAST(b AS Decimal(38,0))) FROM aggtest +SELECT udf(stddev_pop(CAST(b AS Decimal(38,0)))) FROM aggtest -- !query 13 schema -struct<stddev_pop(CAST(CAST(b AS DECIMAL(38,0)) AS DOUBLE)):double> +struct<CAST(udf(cast(stddev_pop(cast(cast(b as decimal(38,0)) as double)) as string)) AS DOUBLE):double> -- !query 13 output 131.18117242958306 -- !query 14 -SELECT stddev_samp(CAST(b AS Decimal(38,0))) FROM aggtest +SELECT stddev_samp(CAST(udf(b) AS Decimal(38,0))) FROM aggtest -- !query 14 schema -struct<stddev_samp(CAST(CAST(b AS DECIMAL(38,0)) AS DOUBLE)):double> +struct<stddev_samp(CAST(CAST(CAST(udf(cast(b as string)) AS FLOAT) AS DECIMAL(38,0)) AS DOUBLE)):double> -- !query 14 output 151.47497042966097 -- !query 15 -SELECT var_pop(CAST(b AS Decimal(38,0))) FROM aggtest +SELECT udf(var_pop(CAST(b AS Decimal(38,0)))) FROM aggtest -- !query 15 schema -struct<var_pop(CAST(CAST(b AS DECIMAL(38,0)) AS DOUBLE)):double> +struct<CAST(udf(cast(var_pop(cast(cast(b as decimal(38,0)) as double)) as string)) AS DOUBLE):double> -- !query 15 output 17208.5 -- !query 16 -SELECT var_samp(CAST(b AS Decimal(38,0))) FROM aggtest +SELECT var_samp(udf(CAST(b AS Decimal(38,0)))) FROM aggtest -- !query 16 schema -struct<var_samp(CAST(CAST(b AS DECIMAL(38,0)) AS DOUBLE)):double> +struct<var_samp(CAST(CAST(udf(cast(cast(b as decimal(38,0)) as string)) AS DECIMAL(38,0)) AS DOUBLE)):double> -- !query 16 output 22944.666666666668 -- !query 17 -SELECT var_pop(1.0), var_samp(2.0) +SELECT udf(var_pop(1.0)), var_samp(udf(2.0)) -- !query 17 schema -struct<var_pop(CAST(1.0 AS DOUBLE)):double,var_samp(CAST(2.0 AS DOUBLE)):double> +struct<CAST(udf(cast(var_pop(cast(1.0 as double)) as string)) AS DOUBLE):double,var_samp(CAST(CAST(udf(cast(2.0 as string)) AS DECIMAL(2,1)) AS DOUBLE)):double> -- !query 17 output 0.0 NaN -- !query 18 -SELECT stddev_pop(CAST(3.0 AS Decimal(38,0))), stddev_samp(CAST(4.0 AS Decimal(38,0))) +SELECT stddev_pop(udf(CAST(3.0 AS Decimal(38,0)))), stddev_samp(CAST(udf(4.0) AS Decimal(38,0))) -- !query 18 schema -struct<stddev_pop(CAST(CAST(3.0 AS DECIMAL(38,0)) AS DOUBLE)):double,stddev_samp(CAST(CAST(4.0 AS DECIMAL(38,0)) AS DOUBLE)):double> +struct<stddev_pop(CAST(CAST(udf(cast(cast(3.0 as decimal(38,0)) as string)) AS DECIMAL(38,0)) AS DOUBLE)):double,stddev_samp(CAST(CAST(CAST(udf(cast(4.0 as string)) AS DECIMAL(2,1)) AS DECIMAL(38,0)) AS DOUBLE)):double> -- !query 18 output 0.0 NaN -- !query 19 -select sum(CAST(null AS int)) from range(1,4) +select sum(udf(CAST(null AS int))) from range(1,4) -- !query 19 schema -struct<sum(CAST(NULL AS INT)):bigint> +struct<sum(CAST(udf(cast(cast(null as int) as string)) AS INT)):bigint> -- !query 19 output NULL -- !query 20 -select sum(CAST(null AS long)) from range(1,4) +select sum(udf(CAST(null AS long))) from range(1,4) -- !query 20 schema -struct<sum(CAST(NULL AS BIGINT)):bigint> +struct<sum(CAST(udf(cast(cast(null as bigint) as string)) AS BIGINT)):bigint> -- !query 20 output NULL -- !query 21 -select sum(CAST(null AS Decimal(38,0))) from range(1,4) +select sum(udf(CAST(null AS Decimal(38,0)))) from range(1,4) -- !query 21 schema -struct<sum(CAST(NULL AS DECIMAL(38,0))):decimal(38,0)> +struct<sum(CAST(udf(cast(cast(null as decimal(38,0)) as string)) AS DECIMAL(38,0))):decimal(38,0)> -- !query 21 output NULL -- !query 22 -select sum(CAST(null AS DOUBLE)) from range(1,4) +select sum(udf(CAST(null AS DOUBLE))) from range(1,4) -- !query 22 schema -struct<sum(CAST(NULL AS DOUBLE)):double> +struct<sum(CAST(udf(cast(cast(null as double) as string)) AS DOUBLE)):double> -- !query 22 output NULL -- !query 23 -select avg(CAST(null AS int)) from range(1,4) +select avg(udf(CAST(null AS int))) from range(1,4) -- !query 23 schema -struct<avg(CAST(NULL AS INT)):double> +struct<avg(CAST(udf(cast(cast(null as int) as string)) AS INT)):double> -- !query 23 output NULL -- !query 24 -select avg(CAST(null AS long)) from range(1,4) +select avg(udf(CAST(null AS long))) from range(1,4) -- !query 24 schema -struct<avg(CAST(NULL AS BIGINT)):double> +struct<avg(CAST(udf(cast(cast(null as bigint) as string)) AS BIGINT)):double> -- !query 24 output NULL -- !query 25 -select avg(CAST(null AS Decimal(38,0))) from range(1,4) +select avg(udf(CAST(null AS Decimal(38,0)))) from range(1,4) -- !query 25 schema -struct<avg(CAST(NULL AS DECIMAL(38,0))):decimal(38,4)> +struct<avg(CAST(udf(cast(cast(null as decimal(38,0)) as string)) AS DECIMAL(38,0))):decimal(38,4)> -- !query 25 output NULL -- !query 26 -select avg(CAST(null AS DOUBLE)) from range(1,4) +select avg(udf(CAST(null AS DOUBLE))) from range(1,4) -- !query 26 schema -struct<avg(CAST(NULL AS DOUBLE)):double> +struct<avg(CAST(udf(cast(cast(null as double) as string)) AS DOUBLE)):double> -- !query 26 output NULL -- !query 27 -select sum(CAST('NaN' AS DOUBLE)) from range(1,4) +select sum(CAST(udf('NaN') AS DOUBLE)) from range(1,4) -- !query 27 schema -struct<sum(CAST(NaN AS DOUBLE)):double> +struct<sum(CAST(CAST(udf(cast(NaN as string)) AS STRING) AS DOUBLE)):double> -- !query 27 output NaN -- !query 28 -select avg(CAST('NaN' AS DOUBLE)) from range(1,4) +select avg(CAST(udf('NaN') AS DOUBLE)) from range(1,4) -- !query 28 schema -struct<avg(CAST(NaN AS DOUBLE)):double> +struct<avg(CAST(CAST(udf(cast(NaN as string)) AS STRING) AS DOUBLE)):double> -- !query 28 output NaN -- !query 30 -SELECT avg(CAST(x AS DOUBLE)), var_pop(CAST(x AS DOUBLE)) +SELECT avg(CAST(udf(x) AS DOUBLE)), var_pop(CAST(udf(x) AS DOUBLE)) FROM (VALUES ('Infinity'), ('1')) v(x) -- !query 30 schema -struct<avg(CAST(x AS DOUBLE)):double,var_pop(CAST(x AS DOUBLE)):double> +struct<avg(CAST(CAST(udf(cast(x as string)) AS STRING) AS DOUBLE)):double,var_pop(CAST(CAST(udf(cast(x as string)) AS STRING) AS DOUBLE)):double> -- !query 30 output Infinity NaN -- !query 31 -SELECT avg(CAST(x AS DOUBLE)), var_pop(CAST(x AS DOUBLE)) +SELECT avg(CAST(udf(x) AS DOUBLE)), var_pop(CAST(udf(x) AS DOUBLE)) FROM (VALUES ('Infinity'), ('Infinity')) v(x) -- !query 31 schema -struct<avg(CAST(x AS DOUBLE)):double,var_pop(CAST(x AS DOUBLE)):double> +struct<avg(CAST(CAST(udf(cast(x as string)) AS STRING) AS DOUBLE)):double,var_pop(CAST(CAST(udf(cast(x as string)) AS STRING) AS DOUBLE)):double> -- !query 31 output Infinity NaN -- !query 32 -SELECT avg(CAST(x AS DOUBLE)), var_pop(CAST(x AS DOUBLE)) +SELECT avg(CAST(udf(x) AS DOUBLE)), var_pop(CAST(udf(x) AS DOUBLE)) FROM (VALUES ('-Infinity'), ('Infinity')) v(x) -- !query 32 schema -struct<avg(CAST(x AS DOUBLE)):double,var_pop(CAST(x AS DOUBLE)):double> +struct<avg(CAST(CAST(udf(cast(x as string)) AS STRING) AS DOUBLE)):double,var_pop(CAST(CAST(udf(cast(x as string)) AS STRING) AS DOUBLE)):double> -- !query 32 output NaN NaN -- !query 33 -SELECT avg(CAST(x AS DOUBLE)), var_pop(CAST(x AS DOUBLE)) +SELECT avg(udf(CAST(x AS DOUBLE))), udf(var_pop(CAST(x AS DOUBLE))) FROM (VALUES (100000003), (100000004), (100000006), (100000007)) v(x) -- !query 33 schema -struct<avg(CAST(x AS DOUBLE)):double,var_pop(CAST(x AS DOUBLE)):double> +struct<avg(CAST(udf(cast(cast(x as double) as string)) AS DOUBLE)):double,CAST(udf(cast(var_pop(cast(x as double)) as string)) AS DOUBLE):double> -- !query 33 output 1.00000005E8 2.5 -- !query 34 -SELECT avg(CAST(x AS DOUBLE)), var_pop(CAST(x AS DOUBLE)) +SELECT avg(udf(CAST(x AS DOUBLE))), udf(var_pop(CAST(x AS DOUBLE))) FROM (VALUES (7000000000005), (7000000000007)) v(x) -- !query 34 schema -struct<avg(CAST(x AS DOUBLE)):double,var_pop(CAST(x AS DOUBLE)):double> +struct<avg(CAST(udf(cast(cast(x as double) as string)) AS DOUBLE)):double,CAST(udf(cast(var_pop(cast(x as double)) as string)) AS DOUBLE):double> -- !query 34 output 7.000000000006E12 1.0 -- !query 35 -SELECT covar_pop(b, a), covar_samp(b, a) FROM aggtest +SELECT udf(covar_pop(b, udf(a))), covar_samp(udf(b), a) FROM aggtest -- !query 35 schema -struct<covar_pop(CAST(b AS DOUBLE), CAST(a AS DOUBLE)):double,covar_samp(CAST(b AS DOUBLE), CAST(a AS DOUBLE)):double> +struct<CAST(udf(cast(covar_pop(cast(b as double), cast(cast(udf(cast(a as string)) as int) as double)) as string)) AS DOUBLE):double,covar_samp(CAST(CAST(udf(cast(b as string)) AS FLOAT) AS DOUBLE), CAST(a AS DOUBLE)):double> -- !query 35 output 653.6289553875104 871.5052738500139 -- !query 36 -SELECT corr(b, a) FROM aggtest +SELECT corr(b, udf(a)) FROM aggtest -- !query 36 schema -struct<corr(CAST(b AS DOUBLE), CAST(a AS DOUBLE)):double> +struct<corr(CAST(b AS DOUBLE), CAST(CAST(udf(cast(a as string)) AS INT) AS DOUBLE)):double> -- !query 36 output 0.1396345165178734 -- !query 37 -SELECT count(four) AS cnt_1000 FROM onek +SELECT count(udf(four)) AS cnt_1000 FROM onek -- !query 37 schema struct<cnt_1000:bigint> -- !query 37 output -313,7 +314,7 struct<cnt_1000:bigint> -- !query 38 -SELECT count(DISTINCT four) AS cnt_4 FROM onek +SELECT udf(count(DISTINCT four)) AS cnt_4 FROM onek -- !query 38 schema struct<cnt_4:bigint> -- !query 38 output -321,10 +322,10 struct<cnt_4:bigint> -- !query 39 -select ten, count(*), sum(four) from onek +select ten, udf(count(*)), sum(udf(four)) from onek group by ten order by ten -- !query 39 schema -struct<ten:int,count(1):bigint,sum(four):bigint> +struct<ten:int,CAST(udf(cast(count(1) as string)) AS BIGINT):bigint,sum(CAST(udf(cast(four as string)) AS INT)):bigint> -- !query 39 output 0 100 100 1 100 200 -339,10 +340,10 struct<ten:int,count(1):bigint,sum(four):bigint> -- !query 40 -select ten, count(four), sum(DISTINCT four) from onek +select ten, count(udf(four)), udf(sum(DISTINCT four)) from onek group by ten order by ten -- !query 40 schema -struct<ten:int,count(four):bigint,sum(DISTINCT four):bigint> +struct<ten:int,count(CAST(udf(cast(four as string)) AS INT)):bigint,CAST(udf(cast(sum(distinct cast(four as bigint)) as string)) AS BIGINT):bigint> -- !query 40 output 0 100 2 1 100 4 -357,11 +358,11 struct<ten:int,count(four):bigint,sum(DISTINCT four):bigint> -- !query 41 -select ten, sum(distinct four) from onek a +select ten, udf(sum(distinct four)) from onek a group by ten -having exists (select 1 from onek b where sum(distinct a.four) = b.four) +having exists (select 1 from onek b where udf(sum(distinct a.four)) = b.four) -- !query 41 schema -struct<ten:int,sum(DISTINCT four):bigint> +struct<ten:int,CAST(udf(cast(sum(distinct cast(four as bigint)) as string)) AS BIGINT):bigint> -- !query 41 output 0 2 2 2 -374,23 +375,23 struct<ten:int,sum(DISTINCT four):bigint> select ten, sum(distinct four) from onek a group by ten having exists (select 1 from onek b - where sum(distinct a.four + b.four) = b.four) + where sum(distinct a.four + b.four) = udf(b.four)) -- !query 42 schema struct<> -- !query 42 output org.apache.spark.sql.AnalysisException Aggregate/Window/Generate expressions are not valid in where clause of the query. -Expression in where clause: [(sum(DISTINCT CAST((outer() + b.`four`) AS BIGINT)) = CAST(b.`four` AS BIGINT))] +Expression in where clause: [(sum(DISTINCT CAST((outer() + b.`four`) AS BIGINT)) = CAST(CAST(udf(cast(four as string)) AS INT) AS BIGINT))] Invalid expressions: [sum(DISTINCT CAST((outer() + b.`four`) AS BIGINT))]; -- !query 43 select - (select max((select i.unique2 from tenk1 i where i.unique1 = o.unique1))) + (select udf(max((select i.unique2 from tenk1 i where i.unique1 = o.unique1)))) from tenk1 o -- !query 43 schema struct<> -- !query 43 output org.apache.spark.sql.AnalysisException -cannot resolve '`o.unique1`' given input columns: [i.even, i.fivethous, i.four, i.hundred, i.odd, i.string4, i.stringu1, i.stringu2, i.ten, i.tenthous, i.thousand, i.twenty, i.two, i.twothousand, i.unique1, i.unique2]; line 2 pos 63 +cannot resolve '`o.unique1`' given input columns: [i.even, i.fivethous, i.four, i.hundred, i.odd, i.string4, i.stringu1, i.stringu2, i.ten, i.tenthous, i.thousand, i.twenty, i.two, i.twothousand, i.unique1, i.unique2]; line 2 pos 67 ``` </p> </details> ## How was this patch tested? Manually tested. Closes apache#25130 from HyukjinKwon/SPARK-28359. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request? PostgreSQL doesn't have `TINYINT`, which would map directly, but `SMALLINT`s are sufficient for uni-directional translation. A side-effect of this fix is that `AggregatedDialect` is now usable with multiple dialects targeting `jdbc:postgresql`, as `PostgresDialect.getJDBCType` no longer throws (for which reason backporting this fix would be lovely): https://github.com/apache/spark/blob/1217996f1574f758d8cccc1c4e3846452d24b35b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala#L42 `dialects.flatMap` currently throws on the first attempt to get a JDBC type preventing subsequent dialects in the chain from providing an alternative. ## How was this patch tested? Unit tests. Closes apache#24845 from mojodna/postgres-byte-type-mapping. Authored-by: Seth Fitzsimmons <seth@mojodna.net> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request? In the PR, I propose to convert options values to strings by using `to_str()` for the following functions: `from_csv()`, `to_csv()`, `from_json()`, `to_json()`, `schema_of_csv()` and `schema_of_json()`. This will make handling of function options consistent to option handling in `DataFrameReader`/`DataFrameWriter`. For example: ```Python df.select(from_csv(df.value, "s string", {'ignoreLeadingWhiteSpace': True}) ``` ## How was this patch tested? Added an example for `from_csv()` which was tested by: ```Shell ./python/run-tests --testnames pyspark.sql.functions ``` Closes apache#25182 from MaxGekk/options_to_str. Authored-by: Maxim Gekk <max.gekk@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request? In the following python code ``` df.write.mode("overwrite").insertInto("table") ``` ```insertInto``` ignores ```mode("overwrite")``` and appends by default. ## How was this patch tested? Add Unit test. Closes apache#25175 from huaxingao/spark-28411. Authored-by: Huaxin Gao <huaxing@us.ibm.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
Test build #107815 has finished for PR 25113 at commit
|
Closing this PR due to code synching will raise a new PR |
Test build #107831 has finished for PR 25113 at commit
|
Test build #107833 has finished for PR 25113 at commit
|
What changes were proposed in this pull request?
This PR adds some tests converted from 'udaf.sql' to test UDFs
Diff comparing to 'udaf.sql'
How was this patch tested?
Tested as guided in SPARK-27921.