-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-44871][SQL] Fix percentile_disc behaviour #42559
Changes from 1 commit
ac423c0
1d9f1a0
5ab5084
da83c65
93a018c
18639c6
ca1d971
82bbdc9
50aead0
6cd697c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.trees.{BinaryLike, TernaryLike, UnaryLike} | |
import org.apache.spark.sql.catalyst.types.PhysicalDataType | ||
import org.apache.spark.sql.catalyst.util._ | ||
import org.apache.spark.sql.errors.QueryExecutionErrors | ||
import org.apache.spark.sql.internal.SQLConf | ||
import org.apache.spark.sql.types._ | ||
import org.apache.spark.sql.types.TypeCollection.NumericAndAnsiInterval | ||
import org.apache.spark.util.collection.OpenHashMap | ||
|
@@ -49,6 +50,9 @@ abstract class PercentileBase | |
@transient | ||
private lazy val returnPercentileArray = percentageExpression.dataType.isInstanceOf[ArrayType] | ||
|
||
private val legacyDiscCalculation: Boolean = | ||
SQLConf.get.getConf(SQLConf.LEGACY_PERCENTILE_DISC_CALCULATION) | ||
|
||
@transient | ||
protected lazy val percentages = percentageExpression.eval() match { | ||
case null => null | ||
|
@@ -164,10 +168,14 @@ abstract class PercentileBase | |
val accumulatedCounts = sortedCounts.scanLeft((sortedCounts.head._1, 0L)) { | ||
case ((key1, count1), (key2, count2)) => (key2, count1 + count2) | ||
}.tail | ||
val maxPosition = accumulatedCounts.last._2 - 1 | ||
val maxPosition = accumulatedCounts.last._2 | ||
|
||
percentages.map { percentile => | ||
getPercentile(accumulatedCounts, maxPosition * percentile) | ||
if (discrete && !legacyDiscCalculation) { | ||
getPercentileDisc(accumulatedCounts, maxPosition * percentile) | ||
} else { | ||
getPercentile(accumulatedCounts, (maxPosition - 1) * percentile) | ||
} | ||
} | ||
} | ||
|
||
|
@@ -222,6 +230,30 @@ abstract class PercentileBase | |
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add a comment in the old getPercentile discrete codepath that it's only used when the legacy flag is set |
||
|
||
private def getPercentileDisc( | ||
accumulatedCounts: Seq[(AnyRef, Long)], | ||
position: Double): Double = { | ||
val lower = position.floor.toLong | ||
val higher = position.ceil.toLong | ||
|
||
// Use binary search to find the lower and the higher position. | ||
val countsArray = accumulatedCounts.map(_._2).toArray[Long] | ||
|
||
val lowerIndex = binarySearchCount(countsArray, 0, accumulatedCounts.size, lower) | ||
val lowerKey = accumulatedCounts(lowerIndex)._1 | ||
if (higher == lower) { | ||
return toDoubleValue(lowerKey) | ||
} | ||
|
||
val higherIndex = binarySearchCount(countsArray, 0, accumulatedCounts.size, higher) | ||
val higherKey = accumulatedCounts(higherIndex)._1 | ||
if (higherKey == lowerKey) { | ||
return toDoubleValue(lowerKey) | ||
} | ||
|
||
toDoubleValue(higherKey) | ||
} | ||
|
||
/** | ||
* use a binary search to find the index of the position closest to the current value. | ||
*/ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4368,6 +4368,13 @@ object SQLConf { | |
.booleanConf | ||
.createWithDefault(true) | ||
|
||
val LEGACY_PERCENTILE_DISC_CALCULATION = buildConf("spark.sql.legacy.percentileDiscCalculation") | ||
.internal() | ||
.doc("If true the old bogus percentile_disc calculation is used.") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe adding a bit more explanation on what is the legacy behavior so in the future people do not need to go back to this PR to understand the context? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added in 93a018c. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks! |
||
.version("4.0.0") | ||
.booleanConf | ||
.createWithDefault(false) | ||
|
||
/** | ||
* Holds information about keys that have been deprecated. | ||
* | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -730,3 +730,103 @@ struct<k:int,median(dt2):interval day to second,percentile(dt2, 0.5, 1):interval | |
2 0 00:22:30.000000000 0 00:22:30.000000000 0 00:22:30.000000000 | ||
3 0 01:00:00.000000000 0 01:00:00.000000000 0 01:00:00.000000000 | ||
4 NULL NULL NULL | ||
|
||
|
||
-- !query | ||
SELECT | ||
percentile_disc(0.0) WITHIN GROUP (ORDER BY a) as p0, | ||
percentile_disc(0.1) WITHIN GROUP (ORDER BY a) as p1, | ||
percentile_disc(0.2) WITHIN GROUP (ORDER BY a) as p2, | ||
percentile_disc(0.3) WITHIN GROUP (ORDER BY a) as p3, | ||
percentile_disc(0.4) WITHIN GROUP (ORDER BY a) as p4, | ||
percentile_disc(0.5) WITHIN GROUP (ORDER BY a) as p5, | ||
percentile_disc(0.6) WITHIN GROUP (ORDER BY a) as p6, | ||
percentile_disc(0.7) WITHIN GROUP (ORDER BY a) as p7, | ||
percentile_disc(0.8) WITHIN GROUP (ORDER BY a) as p8, | ||
percentile_disc(0.9) WITHIN GROUP (ORDER BY a) as p9, | ||
percentile_disc(1.0) WITHIN GROUP (ORDER BY a) as p10 | ||
FROM VALUES (0) AS v(a) | ||
-- !query schema | ||
struct<p0:double,p1:double,p2:double,p3:double,p4:double,p5:double,p6:double,p7:double,p8:double,p9:double,p10:double> | ||
-- !query output | ||
0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 | ||
|
||
|
||
-- !query | ||
SELECT | ||
percentile_disc(0.0) WITHIN GROUP (ORDER BY a) as p0, | ||
percentile_disc(0.1) WITHIN GROUP (ORDER BY a) as p1, | ||
percentile_disc(0.2) WITHIN GROUP (ORDER BY a) as p2, | ||
percentile_disc(0.3) WITHIN GROUP (ORDER BY a) as p3, | ||
percentile_disc(0.4) WITHIN GROUP (ORDER BY a) as p4, | ||
percentile_disc(0.5) WITHIN GROUP (ORDER BY a) as p5, | ||
percentile_disc(0.6) WITHIN GROUP (ORDER BY a) as p6, | ||
percentile_disc(0.7) WITHIN GROUP (ORDER BY a) as p7, | ||
percentile_disc(0.8) WITHIN GROUP (ORDER BY a) as p8, | ||
percentile_disc(0.9) WITHIN GROUP (ORDER BY a) as p9, | ||
percentile_disc(1.0) WITHIN GROUP (ORDER BY a) as p10 | ||
FROM VALUES (0), (1) AS v(a) | ||
-- !query schema | ||
struct<p0:double,p1:double,p2:double,p3:double,p4:double,p5:double,p6:double,p7:double,p8:double,p9:double,p10:double> | ||
-- !query output | ||
0.0 0.0 0.0 0.0 0.0 0.0 1.0 1.0 1.0 1.0 1.0 | ||
|
||
|
||
-- !query | ||
SELECT | ||
percentile_disc(0.0) WITHIN GROUP (ORDER BY a) as p0, | ||
percentile_disc(0.1) WITHIN GROUP (ORDER BY a) as p1, | ||
percentile_disc(0.2) WITHIN GROUP (ORDER BY a) as p2, | ||
percentile_disc(0.3) WITHIN GROUP (ORDER BY a) as p3, | ||
percentile_disc(0.4) WITHIN GROUP (ORDER BY a) as p4, | ||
percentile_disc(0.5) WITHIN GROUP (ORDER BY a) as p5, | ||
percentile_disc(0.6) WITHIN GROUP (ORDER BY a) as p6, | ||
percentile_disc(0.7) WITHIN GROUP (ORDER BY a) as p7, | ||
percentile_disc(0.8) WITHIN GROUP (ORDER BY a) as p8, | ||
percentile_disc(0.9) WITHIN GROUP (ORDER BY a) as p9, | ||
percentile_disc(1.0) WITHIN GROUP (ORDER BY a) as p10 | ||
FROM VALUES (0), (0), (1) AS v(a) | ||
-- !query schema | ||
struct<p0:double,p1:double,p2:double,p3:double,p4:double,p5:double,p6:double,p7:double,p8:double,p9:double,p10:double> | ||
-- !query output | ||
0.0 0.0 0.0 0.0 0.0 0.0 0.0 1.0 1.0 1.0 1.0 | ||
|
||
|
||
-- !query | ||
SELECT | ||
percentile_disc(0.0) WITHIN GROUP (ORDER BY a) as p0, | ||
percentile_disc(0.1) WITHIN GROUP (ORDER BY a) as p1, | ||
percentile_disc(0.2) WITHIN GROUP (ORDER BY a) as p2, | ||
percentile_disc(0.3) WITHIN GROUP (ORDER BY a) as p3, | ||
percentile_disc(0.4) WITHIN GROUP (ORDER BY a) as p4, | ||
percentile_disc(0.5) WITHIN GROUP (ORDER BY a) as p5, | ||
percentile_disc(0.6) WITHIN GROUP (ORDER BY a) as p6, | ||
percentile_disc(0.7) WITHIN GROUP (ORDER BY a) as p7, | ||
percentile_disc(0.8) WITHIN GROUP (ORDER BY a) as p8, | ||
percentile_disc(0.9) WITHIN GROUP (ORDER BY a) as p9, | ||
percentile_disc(1.0) WITHIN GROUP (ORDER BY a) as p10 | ||
FROM VALUES (0), (1), (1) AS v(a) | ||
-- !query schema | ||
struct<p0:double,p1:double,p2:double,p3:double,p4:double,p5:double,p6:double,p7:double,p8:double,p9:double,p10:double> | ||
-- !query output | ||
0.0 0.0 0.0 0.0 1.0 1.0 1.0 1.0 1.0 1.0 1.0 | ||
|
||
|
||
-- !query | ||
SELECT | ||
percentile_disc(0.0) WITHIN GROUP (ORDER BY a) as p0, | ||
percentile_disc(0.1) WITHIN GROUP (ORDER BY a) as p1, | ||
percentile_disc(0.2) WITHIN GROUP (ORDER BY a) as p2, | ||
percentile_disc(0.3) WITHIN GROUP (ORDER BY a) as p3, | ||
percentile_disc(0.4) WITHIN GROUP (ORDER BY a) as p4, | ||
percentile_disc(0.5) WITHIN GROUP (ORDER BY a) as p5, | ||
percentile_disc(0.6) WITHIN GROUP (ORDER BY a) as p6, | ||
percentile_disc(0.7) WITHIN GROUP (ORDER BY a) as p7, | ||
percentile_disc(0.8) WITHIN GROUP (ORDER BY a) as p8, | ||
percentile_disc(0.9) WITHIN GROUP (ORDER BY a) as p9, | ||
percentile_disc(1.0) WITHIN GROUP (ORDER BY a) as p10 | ||
FROM VALUES (0), (1), (2), (3), (4) AS v(a) | ||
-- !query schema | ||
struct<p0:double,p1:double,p2:double,p3:double,p4:double,p5:double,p6:double,p7:double,p8:double,p9:double,p10:double> | ||
-- !query output | ||
0.0 0.0 0.0 1.0 1.0 2.0 2.0 3.0 3.0 4.0 4.0 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These results are verified with MSSQL and PostgreSQL. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might not work well, see:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah ok, thanks. I will move this to class param then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've refactored the PR a bit so that only
PercentileDisc
requires the new legacy flag.