-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-47547] BloomFilter fpp degradation #50933
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[SPARK-47547] BloomFilter fpp degradation #50933
Conversation
…errors in scala suite
…of the combined hash
} | ||
} | ||
|
||
long mightContainEven = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please rename these 2 in this test case to clarify that these are actually indices of numbers in a randomly generated stream.
optimalNumOfBits / Byte.SIZE / 1024 / 1024 | ||
); | ||
Assumptions.assumeTrue( | ||
2 * optimalNumOfBits / Byte.SIZE < 4 * ONE_GB, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess 4 * ONE_GB
is a reasoable limit, can we extract it to a constant and add some comment to it.
…eckstyle errors, renaming test vars
…ward compatible with previously serialized streams
"mightContainLong must return true for all inserted numbers" | ||
); | ||
|
||
double actualFpp = (double) mightContainOddIndexed / numItems; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/ numItems
doesn't seem correct here as you don't test numItems
number of numbers that were surely not added into the filter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indeed, it should probably be very close to the proper value, but this calculation doesn't account for the odd indexes ignored based on the secondary's result.
let me try to address that somehow.
8edf4dd
to
57298f0
Compare
… in random test + test formatting
57298f0
to
f589e2c
Compare
Can you please post the output of the new |
the tests with the 4GB limit are still running, I'll post a summary from the results tomorrow, and start a new run that can cover all of the 5G element count cases. |
The filter-from-hex-constant test started to make me worry about compatibility with serialized instances created with the older logic. Even if we can deserialize the buffer and the seed properly, the actual bits will be set in completely different positions. That is, there's no point in trying to use an old (serialized) buffer with the new logic. Should we create a dedicated BloomFilterImplV2 class for the fixed logic, just so we can keep the old V1 implementation for deserializing old byte streams? |
I don't think we need to keep the old implementation just to support old serialized versions. It seems we use our bloom filter implementation only in cc @cloud-fan |
I ran into some trouble with generating the test results (running on a single thread, the whole batch takes ~10h on my machine). I'll try to make an update on Monday. |
…t output capture - 2nd take
…t output capture - 3rd take
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, actualFpp%
seems to be much better when the number of inserted items (n
) is huge (~1B).
I'm not sure that the bug actually caused any issues in the injected runtime filters due to the much lower default values of spark.sql.optimizer.runtime.bloomFilter.max...
configs, but it is also possible to build a bloom filter manually so it is better to fix it.
BTW, this issue seems to have been observed in Spark: https://stackoverflow.com/questions/78162973/why-is-observed-false-positive-rate-in-spark-bloom-filter-higher-than-expected and was tried to fix with #46370 before.
That old PR was similar to how the issue was fixed in Guava with adding a new strategy / Murmur implementation while this PR fixes the root cause in the current Bloom filter implementation.
@cloud-fan, as you added the original bloom filter implementation to Spark, could you please take a look at this PR? |
the only relevant difference between the
|
import java.util.concurrent.ConcurrentHashMap; | ||
|
||
|
||
@Disabled |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Submitting a test case and directly disabling it is not an ideal approach. Why can't it undergo regular validation through GitHub Actions?
Additionally, I suggest first creating a micro-benchmark relevant to this scenario and recording the results without this pr. Then, update the code and the new benchmark results in this pr to demonstrate the optimization effect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or can the scenarios in org.apache.spark.sql.execution.benchmark.BloomFilterBenchmark
reflect the optimizations brought about by the current pr?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Submitting a test case and directly disabling it is not an ideal approach. Why can't it undergo regular validation through GitHub Actions?
I agree, in spirit, this test code I submitted is much more close to a benchmark (measurement rather than validation) than to an actual test case, with the emphasis on expectations and assertions.
The reason I disabled it by default, because on a single thread, it takes 10+ hours to run the all the cases, and I didn't want to interfere with running time of the regular test suites.
I wasn't aware of the benchmark workflow, I will have a look whether I can fit this logic in there. Not sure, if it will be a snap in fit, because the code focuses on obtaining a Bloom filter specific measure, the false pos rate, not some more usual or generic measures like running time or resource consumption.
Moreover, the performance gains won't be directly apparent on the sketch level. If anything, it will have a slightly worse running time (but shouldn't consume more mem than the previous logic). The gains should only be measurable in client code (like sql) that uses the sketch implementation. E.g. with a reasonably low error rate in the implementation won't force almost any queried element (in a client) on the slow path when the filter is saturated.
Or can the scenarios in
org.apache.spark.sql.execution.benchmark.BloomFilterBenchmark
reflect the optimizations brought about by the current pr?
This may or may not be measurable with the current benchmarks, I haven't looked into that yet. As a rule of thumb, in the current implementation, after a few hundred million elements the false pos rate gets noticeably (a few percents) higher than expected, around about a billion (2^30) it diverges significantly (a few tens of percents), and above 2G (2^31) items, it gets prohibitively high (90%+). With the proposed new logic the error rate remains within a few percents off of the expected on all scales.
If the current benchmarks already use Bloom filters with more than a few hundred million items inserted, then the performance improvements should be visible there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll try to adapt the current tests into the benchmark workflow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, unfortunately Spark benchmarks can measure only time, but can't measure qualities like the false positive rate of a bloom filter.
I wonder shall we remove TestSparkBloomFilter
from this PR or add some comments to it to explain why it is disabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there are other test cases that already cover the changes in the current pull request? If so, I agree to remove TestSparkBloomFilter
(as per Spark's coding conventions, it should actually be named SparkBloomFilterSuite
). There's no point in adding a test case that cannot be continuously verified by GitHub Actions, as it's likely that no one will remember to execute it later on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add some comments to it to explain why it is disabled?
@peter-toth I suspect the reason it was marked with @Disabled
is that the execution time was too long. I tried running it using GitHub Actions, but it was eventually terminated because it exceeded the two-hour execution limit...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. Another option is to reduce the test cases to still validate the improvement of the PR, but with reasoable runtimes.
It seems like the degradation of false positive rate is not yet visible at n = 1M
. But when n = 1000M
the actual FPP is much higher than the expected. (Actuals are 50.920521%, 59.888499% and 76.025548% when expecteds are 5%, 3% and 1%). Unfortunately it seems those test cases took 30-40 mins to complete each.
So how about testing only 1 n
between those 2 where the improvement of the PR is visible but the test completes in let's say 5-10 mins. It shoule be enough to test the 3% default FPP case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be excellent if the verification could be carried out within a relatively concise case.
Moreover, it would be even better if the test case could be rewritten in Scala.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the late reply, I got a bit lost in the test configuration of the project, it took a while until I could come up with something reasonable to address the concerns.
Submitting a test case and directly disabling it is not an ideal approach. Why can't it undergo regular validation through GitHub Actions?
I think I have already mentioned why I disabled the test in the first place, just for the sake of completeness repeating it here, indeed, the main reason is the impractical running time. If not parallelized properly, running all the slower testcases one after the other, the total running time could easily end up at dozens of hours.
The intention wasn't removing it from regular runs altogether, but to err on the safe side, and not to add an extra 10+ hours of runtime accidentally to e.g. pre-merge runs (supposedly on a fast path).
Fortunately, the individual cases can be run concurrently, so if there are enough threads to run the suite, even the slowest cases can complete in ~2.5h
Or can the scenarios in org.apache.spark.sql.execution.benchmark.BloomFilterBenchmark reflect the optimizations brought about by the current pr?
possibly, yes, but I haven't managed to run the sql benchmarks, and we would still have to solve the problem of capturing a custom measure (error rate) in the benchmarks, instead of the currently supported (e.g. running time).
[...] (as per Spark's coding conventions, it should actually be named SparkBloomFilterSuite). [...]
I have renamed the testclass, Idea now complains about:
Test class name 'SparkBloomFilterSuite' doesn't match regex '[A-Z][A-Za-z\d]*Test(s|Case)?|Test[A-Z][A-Za-z\d]*|IT(.*)|(.*) IT(Case)?'
other than that, everything seems functional.
I would rather not remove the new tests, in the end, at the moment these are the only piece of logic that can demonstrate the error with the current implementation. Rewriting the tests in scala may be an option, but I'm not that comfortable with my scala skills, to confidently jump into that.
<dependency> | ||
<groupId>org.junit-pioneer</groupId> | ||
<artifactId>junit-pioneer</artifactId> | ||
<version>2.3.0</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the management of dependency versions, they should be placed in the parent pom.xml
. However, if TestSparkBloomFilter
can be removed from the current pr, then it seems that this dependency is no longer needed either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll defer addressing this, until we decide what should happen with TestSparkBloomFilter
.
(remove & move the versions under managed dependencies)
70f7c5e
to
d9d6980
Compare
Now I have added a new tag
The problem is, I don't quite know where those inclusion exclusions should happen exactly.
but I'm not quite sure, how to add the new configuration to them for the |
@LuciferYang |
... so, I expect the running time of the pre merge build to blow up on the next run. |
Those tags are only used for grouping the tests. It doesn't imply that tests labeled as |
@TagAnnotation | ||
@Retention(RetentionPolicy.RUNTIME) | ||
@Target({ElementType.METHOD, ElementType.TYPE}) | ||
public @interface SlowTest { } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to add a new TagAnnotation
here.
I totally get that. The workflows still have to be adjusted to consider the new tag, but (I think) this can be the cleanest way to isolate this very specialized and very slow test from the rest of the regular tests. Perhaps we can configure the
Although I do have some performance improvement ideas already in mind, I don't think the slowest testcase (5G elements, 0.1% fpp) can be completed under an hour. If we could somehow guarantee (code or documentation) that the impl class wouldn't be instantiated with a lower error rate than 3%, then we could get rid of the 1% and the 0.1% cases, which constitute the bulk of the runtime. IIRC, one round of the 3% case takes around half an hour, so with the other parameter combinations, we can bring down the total runtime to around 3h. If we can run the suite on multiple cores (and we can configure maven to use them), we should be able to fit into the 2h GH actions execution limit conveniently. Do we know anything about the number of cores in the runners? |
Github-hosted runners generally have 4 vcores. Currently, the submit pipeline uses sbt for testing. Additionally, I would like to reiterate my viewpoint: we should strive to have this test completed within a few minutes(5~10 mins), rather than taking 2 hours. Otherwise, we ought to optimize it or temporarily remove this test case. |
… fpp problems by default
cutting down the testcases to the bare minimum (3%fpp and 1G items), the test now completes in little over 10minutes on my machine. would this be an acceptable running time? if the testing concerns are adequately addressed, can we please have a look on the serialization/compatibility questions that came up earlier? in hindsight, it feels really sketchy to deserialize old bytestreams into the updated implementation without any errors or warnings (query results from the inconsistently deserialized object won't make any sense). adding a new version enum feels like the clean solution, but I'm not sure if it is not an overkill. (e.g. if a serialized bloomfilter never gets shared between different application runs) |
Yeah, I believe 10 minutes runtime is acceptable, but if I were you I would test with 100M, maybe the improvement is visible there as well and 1 minute is just enough.
Bloom filter functions ( |
ok, I'll try to provide an update by EOW. |
What changes were proposed in this pull request?
This change fixes a performance degradation issue in the current BloomFilter implementation.
The current bit index calculation logic does not use any part of the indexable space above the first 31bits, so when the inserted item count approaches (or exceeds) Integer.MAX_VALUE, it will produce significantly worse collision rates than an (ideal) uniformly distributing hash function.
Why are the changes needed?
This should qualify as a bug.
The upper bound on the bit capacity of the current BloomFilter implementation in spark is approx 137G bits (64 bit longs in an Integer.MAX_VALUE sized array). The current indexing scheme can only address about 2G bits of these.
On the other hand, due to the way the BloomFilters are used, the bug won't cause any logical errors, it will gradually render the BloomFilter instance useless by forcing more-and-more queries on the slow path.
Does this PR introduce any user-facing change?
No
How was this patch tested?
new test
One new java testclass was added to
sketch
to test different combinations of item counts and expected fpp rates.testAccuracyEvenOdd
in N number of iterations inserts N even numbers (2*i), and leaves out N odd numbers (2*i+1) from the BloomFilter.
The test checks the 100% accuracy of
mightContain=true
on all of the even items, and measures themightContain=true
(false positive) rate on the not-inserted odd numbers.testAccuracyRandom
in 2N number of iterations inserts N pseudorandomly generated numbers in two differently seeded (theoretically independent) BloomFilter instances. All the random numbers generated in an even-iteration will be inserted into both filters, all the random numbers generated in an odd-iteration will be left out from both.
The test checks the 100% accuracy of
mightContain=true
for all of the items inserted in an even-loop. It counts the false positives as the number of odd-loop items for which the primary filter reportsmightContain=true
but secondary reportsmightContain=false
. Since we inserted the same elements into both instances, and the secondary reports non-insertion, themightContain=true
from the primary can only be a false positive.patched
One minor (test) issue was fixed in
where the potential repetitions in the randomly generated stream of insertable items resulted in slightly worse fpp measurements than the actual. The problem affected the those testcases more where the cardinality of the tested type is low (the chance of repetition is high), e.g. Byte and Short.
removed from the default runs
Running these test as part of the default build process was turned off with adding
@Disabled
annotation to the new testclass.Was this patch authored or co-authored using generative AI tooling?
No