Skip to content

SPARK-6480 [CORE] histogram() bucket function is wrong in some simple edge cases #5148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -191,25 +191,23 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
}
}
// Determine the bucket function in constant time. Requires that buckets are evenly spaced
def fastBucketFunction(min: Double, increment: Double, count: Int)(e: Double): Option[Int] = {
def fastBucketFunction(min: Double, max: Double, count: Int)(e: Double): Option[Int] = {
// If our input is not a number unless the increment is also NaN then we fail fast
if (e.isNaN()) {
return None
}
val bucketNumber = (e - min)/(increment)
// We do this rather than buckets.lengthCompare(bucketNumber)
// because Array[Double] fails to override it (for now).
if (bucketNumber > count || bucketNumber < 0) {
if (e.isNaN || e < min || e > max) {
None
} else {
Some(bucketNumber.toInt.min(count - 1))
// Compute ratio of e's distance along range to total range first, for better precision
val bucketNumber = (((e - min) / (max - min)) * count).toInt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

max - min should stay constant, so I think we could make this decently faster by precomputing (count / (max - min)) and multiplying by it. Maybe the compiler makes this kind of optimization, but I certainly wouldn't count on it. Would that give us the same problem as before?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My gut was that it would be more accurate to compute the ratio of two potentially Huge numbers first, then multiply by something Small, rather than compute the ratio of Small-to-Huge then multiply by a Huge number. If you try min = 0, max = 1e20, count = 1000000000 (thats 10^9), e = 1e11, you get 1 from this expression (correct) whereas the alternative says 0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed

// should be less than count, but will equal count if e == max, in which case
// it's part of the last end-range-inclusive bucket, so return count-1
Some(math.min(bucketNumber, count - 1))
}
}
// Decide which bucket function to pass to histogramPartition. We decide here
// rather than having a general function so that the decission need only be made
// rather than having a general function so that the decision need only be made
// once rather than once per shard
val bucketFunction = if (evenBuckets) {
fastBucketFunction(buckets(0), buckets(1)-buckets(0), buckets.length-1) _
fastBucketFunction(buckets.head, buckets.last, buckets.length - 1) _
} else {
basicBucketFunction _
}
Expand Down
24 changes: 20 additions & 4 deletions core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext {
assert(histogramBuckets === expectedHistogramBuckets)
}

test("WorksWithDoubleValuesAtMinMax") {
val rdd = sc.parallelize(Seq(1, 1, 1, 2, 3, 3))
assert(Array(3, 0, 1, 2) === rdd.map(_.toDouble).histogram(4)._2)
assert(Array(3, 1, 2) === rdd.map(_.toDouble).histogram(3)._2)
}

test("WorksWithoutBucketsWithMoreRequestedThanElements") {
// Verify the basic case of one bucket and all elements in that bucket works
val rdd = sc.parallelize(Seq(1, 2))
Expand All @@ -248,7 +254,7 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext {
}

test("WorksWithoutBucketsForLargerDatasets") {
// Verify the case of slighly larger datasets
// Verify the case of slightly larger datasets
val rdd = sc.parallelize(6 to 99)
val (histogramBuckets, histogramResults) = rdd.histogram(8)
val expectedHistogramResults =
Expand All @@ -259,17 +265,27 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext {
assert(histogramBuckets === expectedHistogramBuckets)
}

test("WorksWithoutBucketsWithIrrationalBucketEdges") {
// Verify the case of buckets with irrational edges. See #SPARK-2862.
test("WorksWithoutBucketsWithNonIntegralBucketEdges") {
// Verify the case of buckets with nonintegral edges. See #SPARK-2862.
val rdd = sc.parallelize(6 to 99)
val (histogramBuckets, histogramResults) = rdd.histogram(9)
// Buckets are 6.0, 16.333333333333336, 26.666666666666668, 37.0, 47.333333333333336 ...
val expectedHistogramResults =
Array(11, 10, 11, 10, 10, 11, 10, 10, 11)
Array(11, 10, 10, 11, 10, 10, 11, 10, 11)
assert(histogramResults === expectedHistogramResults)
assert(histogramBuckets(0) === 6.0)
assert(histogramBuckets(9) === 99.0)
}

test("WorksWithHugeRange") {
val rdd = sc.parallelize(Array(0, 1.0e24, 1.0e30))
val histogramResults = rdd.histogram(1000000)._2
assert(histogramResults(0) === 1)
assert(histogramResults(1) === 1)
assert(histogramResults.last === 1)
assert((2 to histogramResults.length - 2).forall(i => histogramResults(i) == 0))
}

// Test the failure mode with an invalid RDD
test("ThrowsExceptionOnInvalidRDDs") {
// infinity
Expand Down