Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 60 additions & 1 deletion python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import sys
import shlex
import traceback
from bisect import bisect_right
from subprocess import Popen, PIPE
from tempfile import NamedTemporaryFile
from threading import Thread
Expand Down Expand Up @@ -572,7 +573,6 @@ def func(iterator):

# TODO: aggregate


def max(self):
"""
Find the maximum item in this RDD.
Expand Down Expand Up @@ -666,6 +666,65 @@ def sampleVariance(self):
"""
return self.stats().sampleVariance()

def _getBuckets(self, bucketCount):
#use the statscounter as a quick way of getting max and min
mm_stats = self.stats()
min = mm_stats.min()
max = mm_stats.max()

increment = (max-min)/bucketCount
buckets = range(min,min)
if increment != 0:
buckets = range(min,max, increment)

return {"min":min, "max":max, "buckets":buckets}

def histogram(self, bucketCount, buckets=None):
"""
Compute a histogram of the data using bucketCount number of buckets
evenly spaced between the min and max of the RDD.

>>> sc.parallelize([1,49, 23, 100, 12, 13, 20, 22, 75, 50]).histogram(3)
defaultdict(<type 'int'>, {(67, 100): 2, (1, 33): 6, (34, 66): 2})
"""
min = float("-inf")
max = float("inf")
evenBuckets = False
if not buckets:
b = self._getBuckets(bucketCount)
buckets = b["buckets"]
min = b["min"]
max = b["max"]

if len(buckets) < 2:
raise ValueError("requires more than 1 bucket")
if len(buckets) % 2 == 0:
evenBuckets = True

def histogramPartition(iterator):
counters = defaultdict(int)
for obj in iterator:
k = bisect_right(buckets, obj)
if k < len(buckets) and k > 0:
key = (buckets[k-1], buckets[k]-1)
elif k == len(buckets):
key = (buckets[k-1], max)
elif k == 0:
key = (min, buckets[k]-1)
print obj, k, key
counters[key] += 1
yield counters


def mergeCounters(d1, d2):
for k in d2.keys():
if k in d1:
d1[k] += d2[k]
return d1

return self.mapPartitions(histogramPartition).reduce(mergeCounters)


def countByValue(self):
"""
Return the count of each unique value in this RDD as a dictionary of
Expand Down