Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-44871][SQL] Fix percentile_disc behaviour #42559

Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.trees.{BinaryLike, TernaryLike, UnaryLike}
import org.apache.spark.sql.catalyst.types.PhysicalDataType
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.TypeCollection.NumericAndAnsiInterval
import org.apache.spark.util.collection.OpenHashMap
Expand All @@ -49,6 +50,9 @@ abstract class PercentileBase
@transient
private lazy val returnPercentileArray = percentageExpression.dataType.isInstanceOf[ArrayType]

private val legacyDiscCalculation: Boolean =
SQLConf.get.getConf(SQLConf.LEGACY_PERCENTILE_DISC_CALCULATION)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok, thanks. I will move this to class param then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've refactored the PR a bit so that only PercentileDisc requires the new legacy flag.


@transient
protected lazy val percentages = percentageExpression.eval() match {
case null => null
Expand Down Expand Up @@ -164,10 +168,14 @@ abstract class PercentileBase
val accumulatedCounts = sortedCounts.scanLeft((sortedCounts.head._1, 0L)) {
case ((key1, count1), (key2, count2)) => (key2, count1 + count2)
}.tail
val maxPosition = accumulatedCounts.last._2 - 1
val maxPosition = accumulatedCounts.last._2

percentages.map { percentile =>
getPercentile(accumulatedCounts, maxPosition * percentile)
if (discrete && !legacyDiscCalculation) {
getPercentileDisc(accumulatedCounts, maxPosition * percentile)
} else {
getPercentile(accumulatedCounts, (maxPosition - 1) * percentile)
}
}
}

Expand Down Expand Up @@ -215,13 +223,29 @@ abstract class PercentileBase
}

if (discrete) {
// We end up here only if spark.sql.legacy.percentileDiscCalculation=true
toDoubleValue(lowerKey)
} else {
// Linear interpolation to get the exact percentile
(higher - position) * toDoubleValue(lowerKey) + (position - lower) * toDoubleValue(higherKey)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment in the old getPercentile discrete codepath that it's only used when the legacy flag is set


// `percentile_disc(p)` returns the value with the smallest `cume_dist()` value given that is
// greater than or equal to `p` so `position` here is `p` adjusted by `maxPosition`.
private def getPercentileDisc(
accumulatedCounts: Seq[(AnyRef, Long)],
position: Double): Double = {
val higher = position.ceil.toLong

// Use binary search to find the higher position.
val countsArray = accumulatedCounts.map(_._2).toArray[Long]
val higherIndex = binarySearchCount(countsArray, 0, accumulatedCounts.size, higher)
val higherKey = accumulatedCounts(higherIndex)._1

toDoubleValue(higherKey)
}

/**
* use a binary search to find the index of the position closest to the current value.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4368,6 +4368,13 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val LEGACY_PERCENTILE_DISC_CALCULATION = buildConf("spark.sql.legacy.percentileDiscCalculation")
.internal()
.doc("If true the old bogus percentile_disc calculation is used.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe adding a bit more explanation on what is the legacy behavior so in the future people do not need to go back to this PR to understand the context?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in 93a018c.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

.version("4.0.0")
.booleanConf
.createWithDefault(false)

/**
* Holds information about keys that have been deprecated.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -760,3 +760,83 @@ Sort [k#x ASC NULLS FIRST], true
+- Project [k#x, dt#x, ym#x, dt2#x]
+- SubqueryAlias intervals
+- LocalRelation [k#x, dt#x, ym#x, dt2#x]


-- !query
SELECT
percentile_disc(0.0) WITHIN GROUP (ORDER BY a) as p0,
percentile_disc(0.1) WITHIN GROUP (ORDER BY a) as p1,
percentile_disc(0.2) WITHIN GROUP (ORDER BY a) as p2,
percentile_disc(0.3) WITHIN GROUP (ORDER BY a) as p3,
percentile_disc(0.4) WITHIN GROUP (ORDER BY a) as p4,
percentile_disc(0.5) WITHIN GROUP (ORDER BY a) as p5,
percentile_disc(0.6) WITHIN GROUP (ORDER BY a) as p6,
percentile_disc(0.7) WITHIN GROUP (ORDER BY a) as p7,
percentile_disc(0.8) WITHIN GROUP (ORDER BY a) as p8,
percentile_disc(0.9) WITHIN GROUP (ORDER BY a) as p9,
percentile_disc(1.0) WITHIN GROUP (ORDER BY a) as p10
FROM VALUES (0) AS v(a)
-- !query analysis
Aggregate [percentile_disc(a#x, cast(0.0 as double), false, 0, 0) AS p0#x, percentile_disc(a#x, cast(0.1 as double), false, 0, 0) AS p1#x, percentile_disc(a#x, cast(0.2 as double), false, 0, 0) AS p2#x, percentile_disc(a#x, cast(0.3 as double), false, 0, 0) AS p3#x, percentile_disc(a#x, cast(0.4 as double), false, 0, 0) AS p4#x, percentile_disc(a#x, cast(0.5 as double), false, 0, 0) AS p5#x, percentile_disc(a#x, cast(0.6 as double), false, 0, 0) AS p6#x, percentile_disc(a#x, cast(0.7 as double), false, 0, 0) AS p7#x, percentile_disc(a#x, cast(0.8 as double), false, 0, 0) AS p8#x, percentile_disc(a#x, cast(0.9 as double), false, 0, 0) AS p9#x, percentile_disc(a#x, cast(1.0 as double), false, 0, 0) AS p10#x]
+- SubqueryAlias v
+- LocalRelation [a#x]


-- !query
SELECT
percentile_disc(0.0) WITHIN GROUP (ORDER BY a) as p0,
percentile_disc(0.1) WITHIN GROUP (ORDER BY a) as p1,
percentile_disc(0.2) WITHIN GROUP (ORDER BY a) as p2,
percentile_disc(0.3) WITHIN GROUP (ORDER BY a) as p3,
percentile_disc(0.4) WITHIN GROUP (ORDER BY a) as p4,
percentile_disc(0.5) WITHIN GROUP (ORDER BY a) as p5,
percentile_disc(0.6) WITHIN GROUP (ORDER BY a) as p6,
percentile_disc(0.7) WITHIN GROUP (ORDER BY a) as p7,
percentile_disc(0.8) WITHIN GROUP (ORDER BY a) as p8,
percentile_disc(0.9) WITHIN GROUP (ORDER BY a) as p9,
percentile_disc(1.0) WITHIN GROUP (ORDER BY a) as p10
FROM VALUES (0), (1) AS v(a)
-- !query analysis
Aggregate [percentile_disc(a#x, cast(0.0 as double), false, 0, 0) AS p0#x, percentile_disc(a#x, cast(0.1 as double), false, 0, 0) AS p1#x, percentile_disc(a#x, cast(0.2 as double), false, 0, 0) AS p2#x, percentile_disc(a#x, cast(0.3 as double), false, 0, 0) AS p3#x, percentile_disc(a#x, cast(0.4 as double), false, 0, 0) AS p4#x, percentile_disc(a#x, cast(0.5 as double), false, 0, 0) AS p5#x, percentile_disc(a#x, cast(0.6 as double), false, 0, 0) AS p6#x, percentile_disc(a#x, cast(0.7 as double), false, 0, 0) AS p7#x, percentile_disc(a#x, cast(0.8 as double), false, 0, 0) AS p8#x, percentile_disc(a#x, cast(0.9 as double), false, 0, 0) AS p9#x, percentile_disc(a#x, cast(1.0 as double), false, 0, 0) AS p10#x]
+- SubqueryAlias v
+- LocalRelation [a#x]


-- !query
SELECT
percentile_disc(0.0) WITHIN GROUP (ORDER BY a) as p0,
percentile_disc(0.1) WITHIN GROUP (ORDER BY a) as p1,
percentile_disc(0.2) WITHIN GROUP (ORDER BY a) as p2,
percentile_disc(0.3) WITHIN GROUP (ORDER BY a) as p3,
percentile_disc(0.4) WITHIN GROUP (ORDER BY a) as p4,
percentile_disc(0.5) WITHIN GROUP (ORDER BY a) as p5,
percentile_disc(0.6) WITHIN GROUP (ORDER BY a) as p6,
percentile_disc(0.7) WITHIN GROUP (ORDER BY a) as p7,
percentile_disc(0.8) WITHIN GROUP (ORDER BY a) as p8,
percentile_disc(0.9) WITHIN GROUP (ORDER BY a) as p9,
percentile_disc(1.0) WITHIN GROUP (ORDER BY a) as p10
FROM VALUES (0), (1), (2) AS v(a)
-- !query analysis
Aggregate [percentile_disc(a#x, cast(0.0 as double), false, 0, 0) AS p0#x, percentile_disc(a#x, cast(0.1 as double), false, 0, 0) AS p1#x, percentile_disc(a#x, cast(0.2 as double), false, 0, 0) AS p2#x, percentile_disc(a#x, cast(0.3 as double), false, 0, 0) AS p3#x, percentile_disc(a#x, cast(0.4 as double), false, 0, 0) AS p4#x, percentile_disc(a#x, cast(0.5 as double), false, 0, 0) AS p5#x, percentile_disc(a#x, cast(0.6 as double), false, 0, 0) AS p6#x, percentile_disc(a#x, cast(0.7 as double), false, 0, 0) AS p7#x, percentile_disc(a#x, cast(0.8 as double), false, 0, 0) AS p8#x, percentile_disc(a#x, cast(0.9 as double), false, 0, 0) AS p9#x, percentile_disc(a#x, cast(1.0 as double), false, 0, 0) AS p10#x]
+- SubqueryAlias v
+- LocalRelation [a#x]


-- !query
SELECT
percentile_disc(0.0) WITHIN GROUP (ORDER BY a) as p0,
percentile_disc(0.1) WITHIN GROUP (ORDER BY a) as p1,
percentile_disc(0.2) WITHIN GROUP (ORDER BY a) as p2,
percentile_disc(0.3) WITHIN GROUP (ORDER BY a) as p3,
percentile_disc(0.4) WITHIN GROUP (ORDER BY a) as p4,
percentile_disc(0.5) WITHIN GROUP (ORDER BY a) as p5,
percentile_disc(0.6) WITHIN GROUP (ORDER BY a) as p6,
percentile_disc(0.7) WITHIN GROUP (ORDER BY a) as p7,
percentile_disc(0.8) WITHIN GROUP (ORDER BY a) as p8,
percentile_disc(0.9) WITHIN GROUP (ORDER BY a) as p9,
percentile_disc(1.0) WITHIN GROUP (ORDER BY a) as p10
FROM VALUES (0), (1), (2), (3), (4) AS v(a)
-- !query analysis
Aggregate [percentile_disc(a#x, cast(0.0 as double), false, 0, 0) AS p0#x, percentile_disc(a#x, cast(0.1 as double), false, 0, 0) AS p1#x, percentile_disc(a#x, cast(0.2 as double), false, 0, 0) AS p2#x, percentile_disc(a#x, cast(0.3 as double), false, 0, 0) AS p3#x, percentile_disc(a#x, cast(0.4 as double), false, 0, 0) AS p4#x, percentile_disc(a#x, cast(0.5 as double), false, 0, 0) AS p5#x, percentile_disc(a#x, cast(0.6 as double), false, 0, 0) AS p6#x, percentile_disc(a#x, cast(0.7 as double), false, 0, 0) AS p7#x, percentile_disc(a#x, cast(0.8 as double), false, 0, 0) AS p8#x, percentile_disc(a#x, cast(0.9 as double), false, 0, 0) AS p9#x, percentile_disc(a#x, cast(1.0 as double), false, 0, 0) AS p10#x]
+- SubqueryAlias v
+- LocalRelation [a#x]
59 changes: 58 additions & 1 deletion sql/core/src/test/resources/sql-tests/inputs/percentiles.sql
Original file line number Diff line number Diff line change
Expand Up @@ -299,4 +299,61 @@ SELECT
percentile_cont(0.5) WITHIN GROUP (ORDER BY dt2)
FROM intervals
GROUP BY k
ORDER BY k;
ORDER BY k;

-- SPARK-44871: Fix percentile_disc behaviour
SELECT
percentile_disc(0.0) WITHIN GROUP (ORDER BY a) as p0,
percentile_disc(0.1) WITHIN GROUP (ORDER BY a) as p1,
percentile_disc(0.2) WITHIN GROUP (ORDER BY a) as p2,
percentile_disc(0.3) WITHIN GROUP (ORDER BY a) as p3,
percentile_disc(0.4) WITHIN GROUP (ORDER BY a) as p4,
percentile_disc(0.5) WITHIN GROUP (ORDER BY a) as p5,
percentile_disc(0.6) WITHIN GROUP (ORDER BY a) as p6,
percentile_disc(0.7) WITHIN GROUP (ORDER BY a) as p7,
percentile_disc(0.8) WITHIN GROUP (ORDER BY a) as p8,
percentile_disc(0.9) WITHIN GROUP (ORDER BY a) as p9,
percentile_disc(1.0) WITHIN GROUP (ORDER BY a) as p10
FROM VALUES (0) AS v(a);

SELECT
percentile_disc(0.0) WITHIN GROUP (ORDER BY a) as p0,
percentile_disc(0.1) WITHIN GROUP (ORDER BY a) as p1,
percentile_disc(0.2) WITHIN GROUP (ORDER BY a) as p2,
percentile_disc(0.3) WITHIN GROUP (ORDER BY a) as p3,
percentile_disc(0.4) WITHIN GROUP (ORDER BY a) as p4,
percentile_disc(0.5) WITHIN GROUP (ORDER BY a) as p5,
percentile_disc(0.6) WITHIN GROUP (ORDER BY a) as p6,
percentile_disc(0.7) WITHIN GROUP (ORDER BY a) as p7,
percentile_disc(0.8) WITHIN GROUP (ORDER BY a) as p8,
percentile_disc(0.9) WITHIN GROUP (ORDER BY a) as p9,
percentile_disc(1.0) WITHIN GROUP (ORDER BY a) as p10
FROM VALUES (0), (1) AS v(a);

SELECT
percentile_disc(0.0) WITHIN GROUP (ORDER BY a) as p0,
percentile_disc(0.1) WITHIN GROUP (ORDER BY a) as p1,
percentile_disc(0.2) WITHIN GROUP (ORDER BY a) as p2,
percentile_disc(0.3) WITHIN GROUP (ORDER BY a) as p3,
percentile_disc(0.4) WITHIN GROUP (ORDER BY a) as p4,
percentile_disc(0.5) WITHIN GROUP (ORDER BY a) as p5,
percentile_disc(0.6) WITHIN GROUP (ORDER BY a) as p6,
percentile_disc(0.7) WITHIN GROUP (ORDER BY a) as p7,
percentile_disc(0.8) WITHIN GROUP (ORDER BY a) as p8,
percentile_disc(0.9) WITHIN GROUP (ORDER BY a) as p9,
percentile_disc(1.0) WITHIN GROUP (ORDER BY a) as p10
FROM VALUES (0), (1), (2) AS v(a);

SELECT
percentile_disc(0.0) WITHIN GROUP (ORDER BY a) as p0,
percentile_disc(0.1) WITHIN GROUP (ORDER BY a) as p1,
percentile_disc(0.2) WITHIN GROUP (ORDER BY a) as p2,
percentile_disc(0.3) WITHIN GROUP (ORDER BY a) as p3,
percentile_disc(0.4) WITHIN GROUP (ORDER BY a) as p4,
percentile_disc(0.5) WITHIN GROUP (ORDER BY a) as p5,
percentile_disc(0.6) WITHIN GROUP (ORDER BY a) as p6,
percentile_disc(0.7) WITHIN GROUP (ORDER BY a) as p7,
percentile_disc(0.8) WITHIN GROUP (ORDER BY a) as p8,
percentile_disc(0.9) WITHIN GROUP (ORDER BY a) as p9,
percentile_disc(1.0) WITHIN GROUP (ORDER BY a) as p10
FROM VALUES (0), (1), (2), (3), (4) AS v(a);
80 changes: 80 additions & 0 deletions sql/core/src/test/resources/sql-tests/results/percentiles.sql.out
Original file line number Diff line number Diff line change
Expand Up @@ -730,3 +730,83 @@ struct<k:int,median(dt2):interval day to second,percentile(dt2, 0.5, 1):interval
2 0 00:22:30.000000000 0 00:22:30.000000000 0 00:22:30.000000000
3 0 01:00:00.000000000 0 01:00:00.000000000 0 01:00:00.000000000
4 NULL NULL NULL


-- !query
SELECT
percentile_disc(0.0) WITHIN GROUP (ORDER BY a) as p0,
percentile_disc(0.1) WITHIN GROUP (ORDER BY a) as p1,
percentile_disc(0.2) WITHIN GROUP (ORDER BY a) as p2,
percentile_disc(0.3) WITHIN GROUP (ORDER BY a) as p3,
percentile_disc(0.4) WITHIN GROUP (ORDER BY a) as p4,
percentile_disc(0.5) WITHIN GROUP (ORDER BY a) as p5,
percentile_disc(0.6) WITHIN GROUP (ORDER BY a) as p6,
percentile_disc(0.7) WITHIN GROUP (ORDER BY a) as p7,
percentile_disc(0.8) WITHIN GROUP (ORDER BY a) as p8,
percentile_disc(0.9) WITHIN GROUP (ORDER BY a) as p9,
percentile_disc(1.0) WITHIN GROUP (ORDER BY a) as p10
FROM VALUES (0) AS v(a)
-- !query schema
struct<p0:double,p1:double,p2:double,p3:double,p4:double,p5:double,p6:double,p7:double,p8:double,p9:double,p10:double>
-- !query output
0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0


-- !query
SELECT
percentile_disc(0.0) WITHIN GROUP (ORDER BY a) as p0,
percentile_disc(0.1) WITHIN GROUP (ORDER BY a) as p1,
percentile_disc(0.2) WITHIN GROUP (ORDER BY a) as p2,
percentile_disc(0.3) WITHIN GROUP (ORDER BY a) as p3,
percentile_disc(0.4) WITHIN GROUP (ORDER BY a) as p4,
percentile_disc(0.5) WITHIN GROUP (ORDER BY a) as p5,
percentile_disc(0.6) WITHIN GROUP (ORDER BY a) as p6,
percentile_disc(0.7) WITHIN GROUP (ORDER BY a) as p7,
percentile_disc(0.8) WITHIN GROUP (ORDER BY a) as p8,
percentile_disc(0.9) WITHIN GROUP (ORDER BY a) as p9,
percentile_disc(1.0) WITHIN GROUP (ORDER BY a) as p10
FROM VALUES (0), (1) AS v(a)
-- !query schema
struct<p0:double,p1:double,p2:double,p3:double,p4:double,p5:double,p6:double,p7:double,p8:double,p9:double,p10:double>
-- !query output
0.0 0.0 0.0 0.0 0.0 0.0 1.0 1.0 1.0 1.0 1.0


-- !query
SELECT
percentile_disc(0.0) WITHIN GROUP (ORDER BY a) as p0,
percentile_disc(0.1) WITHIN GROUP (ORDER BY a) as p1,
percentile_disc(0.2) WITHIN GROUP (ORDER BY a) as p2,
percentile_disc(0.3) WITHIN GROUP (ORDER BY a) as p3,
percentile_disc(0.4) WITHIN GROUP (ORDER BY a) as p4,
percentile_disc(0.5) WITHIN GROUP (ORDER BY a) as p5,
percentile_disc(0.6) WITHIN GROUP (ORDER BY a) as p6,
percentile_disc(0.7) WITHIN GROUP (ORDER BY a) as p7,
percentile_disc(0.8) WITHIN GROUP (ORDER BY a) as p8,
percentile_disc(0.9) WITHIN GROUP (ORDER BY a) as p9,
percentile_disc(1.0) WITHIN GROUP (ORDER BY a) as p10
FROM VALUES (0), (1), (2) AS v(a)
-- !query schema
struct<p0:double,p1:double,p2:double,p3:double,p4:double,p5:double,p6:double,p7:double,p8:double,p9:double,p10:double>
-- !query output
0.0 0.0 0.0 0.0 1.0 1.0 1.0 2.0 2.0 2.0 2.0


-- !query
SELECT
percentile_disc(0.0) WITHIN GROUP (ORDER BY a) as p0,
percentile_disc(0.1) WITHIN GROUP (ORDER BY a) as p1,
percentile_disc(0.2) WITHIN GROUP (ORDER BY a) as p2,
percentile_disc(0.3) WITHIN GROUP (ORDER BY a) as p3,
percentile_disc(0.4) WITHIN GROUP (ORDER BY a) as p4,
percentile_disc(0.5) WITHIN GROUP (ORDER BY a) as p5,
percentile_disc(0.6) WITHIN GROUP (ORDER BY a) as p6,
percentile_disc(0.7) WITHIN GROUP (ORDER BY a) as p7,
percentile_disc(0.8) WITHIN GROUP (ORDER BY a) as p8,
percentile_disc(0.9) WITHIN GROUP (ORDER BY a) as p9,
percentile_disc(1.0) WITHIN GROUP (ORDER BY a) as p10
FROM VALUES (0), (1), (2), (3), (4) AS v(a)
-- !query schema
struct<p0:double,p1:double,p2:double,p3:double,p4:double,p5:double,p6:double,p7:double,p8:double,p9:double,p10:double>
-- !query output
0.0 0.0 0.0 1.0 1.0 2.0 2.0 3.0 3.0 4.0 4.0
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These results are verified with MSSQL and PostgreSQL.