-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-2871] [PySpark] add countApproxDistinct() API #2142
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
e97e342
a85a8c6
4cba98f
ded624f
d306492
9d2565f
2ab157c
c38c4e4
e20da47
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -62,7 +62,7 @@ def portable_hash(x): | |
|
||
>>> portable_hash(None) | ||
0 | ||
>>> portable_hash((None, 1)) | ||
>>> portable_hash((None, 1)) & 0xffffffff | ||
219750521 | ||
""" | ||
if x is None: | ||
|
@@ -72,7 +72,7 @@ def portable_hash(x): | |
for i in x: | ||
h ^= portable_hash(i) | ||
h *= 1000003 | ||
h &= 0xffffffff | ||
h &= sys.maxint | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why this change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In 64-bit machines, the hash of tuple() should be 64 bits (sys.maxint is 64bits). |
||
h ^= len(x) | ||
if h == -1: | ||
h = -2 | ||
|
@@ -1942,7 +1942,7 @@ def _is_pickled(self): | |
return True | ||
return False | ||
|
||
def _to_jrdd(self): | ||
def _to_java_object_rdd(self): | ||
""" Return an JavaRDD of Object by unpickling | ||
|
||
It will convert each Python object into Java object by Pyrolite, whenever the | ||
|
@@ -1977,7 +1977,7 @@ def sumApprox(self, timeout, confidence=0.95): | |
>>> (rdd.sumApprox(1000) - r) / r < 0.05 | ||
True | ||
""" | ||
jrdd = self.mapPartitions(lambda it: [float(sum(it))])._to_jrdd() | ||
jrdd = self.mapPartitions(lambda it: [float(sum(it))])._to_java_object_rdd() | ||
jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd()) | ||
r = jdrdd.sumApprox(timeout, confidence).getFinalValue() | ||
return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high()) | ||
|
@@ -1993,11 +1993,40 @@ def meanApprox(self, timeout, confidence=0.95): | |
>>> (rdd.meanApprox(1000) - r) / r < 0.05 | ||
True | ||
""" | ||
jrdd = self.map(float)._to_jrdd() | ||
jrdd = self.map(float)._to_java_object_rdd() | ||
jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd()) | ||
r = jdrdd.meanApprox(timeout, confidence).getFinalValue() | ||
return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high()) | ||
|
||
def countApproxDistinct(self, relativeSD=0.05): | ||
""" | ||
:: Experimental :: | ||
Return approximate number of distinct elements in the RDD. | ||
|
||
The algorithm used is based on streamlib's implementation of | ||
"HyperLogLog in Practice: Algorithmic Engineering of a State | ||
of The Art Cardinality Estimation Algorithm", available | ||
<a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>. | ||
|
||
@param relativeSD Relative accuracy. Smaller values create | ||
counters that require more space. | ||
It must be greater than 0.000017. | ||
|
||
>>> n = sc.parallelize(range(1000)).map(str).countApproxDistinct() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can u add a test to make sure that if u have 1000 non-distinct elements (i.e. the same element appearing 1000 times), this doesn't return ~ 1000? Asking because I'm not sure how pyspark interacts with Java - if it is through byte array, then the hashcode could be wrong for byte arrays. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, it helped me to find out an invalid test case of tuple, which will be unpickled as []Object in Java, and the hashCode of it is not determined by content, so I changed it into set([]), which should have similar behavior across Python and Java. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had changed it to do hash calculation in Python, so it can support all hashable types in Python. |
||
>>> 950 < n < 1050 | ||
True | ||
>>> n = sc.parallelize([i % 20 for i in range(1000)]).countApproxDistinct() | ||
>>> 18 < n < 22 | ||
True | ||
""" | ||
if relativeSD < 0.000017: | ||
raise ValueError("relativeSD should be greater than 0.000017") | ||
if relativeSD > 0.37: | ||
raise ValueError("relativeSD should be smaller than 0.37") | ||
# the hash space in Java is 2^32 | ||
hashRDD = self.map(lambda x: portable_hash(x) & 0xFFFFFFFF) | ||
return hashRDD._to_java_object_rdd().countApproxDistinct(relativeSD) | ||
|
||
|
||
class PipelinedRDD(RDD): | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!