-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-2871] [PySpark] add countApproxDistinct() API #2142
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Conflicts: python/pyspark/tests.py
@mateiz @JoshRosen this one was separated from #1791, please take a look at it. ( @mateiz had reviewed this part in that PR, sorry for the duplicated reviewing). |
counters that require more space. | ||
It must be greater than 0.000017. | ||
|
||
>>> n = sc.parallelize(range(1000)).map(str).countApproxDistinct() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can u add a test to make sure that if u have 1000 non-distinct elements (i.e. the same element appearing 1000 times), this doesn't return ~ 1000?
Asking because I'm not sure how pyspark interacts with Java - if it is through byte array, then the hashcode could be wrong for byte arrays.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, it helped me to find out an invalid test case of tuple, which will be unpickled as []Object in Java, and the hashCode of it is not determined by content, so I changed it into set([]), which should have similar behavior across Python and Java.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had changed it to do hash calculation in Python, so it can support all hashable types in Python.
QA tests have started for PR 2142 at commit
|
dfd2a2a
to
4cba98f
Compare
QA tests have started for PR 2142 at commit
|
QA tests have finished for PR 2142 at commit
|
QA tests have finished for PR 2142 at commit
|
2541785
to
ded624f
Compare
QA tests have started for PR 2142 at commit
|
QA tests have started for PR 2142 at commit
|
QA tests have started for PR 2142 at commit
|
QA tests have finished for PR 2142 at commit
|
QA tests have finished for PR 2142 at commit
|
QA tests have finished for PR 2142 at commit
|
QA tests have started for PR 2142 at commit
|
QA tests have finished for PR 2142 at commit
|
QA tests have started for PR 2142 at commit
|
QA tests have finished for PR 2142 at commit
|
@@ -72,7 +72,7 @@ def portable_hash(x): | |||
for i in x: | |||
h ^= portable_hash(i) | |||
h *= 1000003 | |||
h &= 0xffffffff | |||
h &= sys.maxint |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In 64-bit machines, the hash of tuple() should be 64 bits (sys.maxint is 64bits).
To check my understanding of the error-correction code:
I'm a bit confused about how the current correction term works: c = - sys.maxint * log(1 - float(c) / sys.maxint) It looks like this is correcting for overestimates of the number of distinct elements by subtracting a term based on the collision probability. In general, won't collisions cause us underestimate instead of overestimating? Maybe we should approach this by treating the true number of distinct items (k) as a random variable and figuring out the maximum likelihood estimator of k given an observation c of the result of Before we consider that, though, I wonder whether we even need a correction term. Doesn't the Java implementation of Note: I'm not a statistician; please correct me if I've gotten anything wrong. |
After discussion with @JoshRosen offline, we realized that it does not need correction in Python if they have the same hash space both in Python and Java, so I changed the has space (mapping to 2^32) and remove the correction. Jenkins, test this please. |
QA tests have started for PR 2142 at commit
|
QA tests have finished for PR 2142 at commit
|
Yeah, I don't think we any special correction because Java will use the hashcodes chosen in Python (since Integer.hashcode is just the integer's value). |
LGTM, so I've merged this into |
RDD.countApproxDistinct(relativeSD=0.05): :: Experimental :: Return approximate number of distinct elements in the RDD. The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>. This support all the types of objects, which is supported by Pyrolite, nearly all builtin types. param relativeSD Relative accuracy. Smaller values create counters that require more space. It must be greater than 0.000017. >>> n = sc.parallelize(range(1000)).map(str).countApproxDistinct() >>> 950 < n < 1050 True >>> n = sc.parallelize([i % 20 for i in range(1000)]).countApproxDistinct() >>> 18 < n < 22 True Author: Davies Liu <davies.liu@gmail.com> Closes apache#2142 from davies/countApproxDistinct and squashes the following commits: e20da47 [Davies Liu] remove the correction in Python c38c4e4 [Davies Liu] fix doc tests 2ab157c [Davies Liu] fix doc tests 9d2565f [Davies Liu] add commments and link for hash collision correction d306492 [Davies Liu] change range of hash of tuple to [0, maxint] ded624f [Davies Liu] calculate hash in Python 4cba98f [Davies Liu] add more tests a85a8c6 [Davies Liu] Merge branch 'master' into countApproxDistinct e97e342 [Davies Liu] add countApproxDistinct()
RDD.countApproxDistinct(relativeSD=0.05):