Skip to content

[SPARK-4477] [PySpark] remove numpy from RDDSampler #3351

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 16 commits into from
Closed
10 changes: 6 additions & 4 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,11 @@ def distinct(self, numPartitions=None):

def sample(self, withReplacement, fraction, seed=None):
"""
Return a sampled subset of this RDD (relies on numpy and falls back
on default random generator if numpy is unavailable).
Return a sampled subset of this RDD.

>>> rdd = sc.parallelize(range(100), 4)
>>> rdd.sample(False, 0.1, 81).count()
10
"""
assert fraction >= 0.0, "Negative fraction value: %s" % fraction
return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True)
Expand Down Expand Up @@ -343,8 +346,7 @@ def randomSplit(self, weights, seed=None):
# this is ported from scala/spark/RDD.scala
def takeSample(self, withReplacement, num, seed=None):
"""
Return a fixed-size sampled subset of this RDD (currently requires
numpy).
Return a fixed-size sampled subset of this RDD.

>>> rdd = sc.parallelize(range(0, 10))
>>> len(rdd.takeSample(True, 20, 1))
Expand Down
99 changes: 34 additions & 65 deletions python/pyspark/rddsampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,81 +17,48 @@

import sys
import random
import math


class RDDSamplerBase(object):

def __init__(self, withReplacement, seed=None):
try:
import numpy
self._use_numpy = True
except ImportError:
print >> sys.stderr, (
"NumPy does not appear to be installed. "
"Falling back to default random generator for sampling.")
self._use_numpy = False

self._seed = seed if seed is not None else random.randint(0, 2 ** 32 - 1)
self._seed = seed if seed is not None else random.randint(0, sys.maxint)
self._withReplacement = withReplacement
self._random = None
self._split = None
self._rand_initialized = False

def initRandomGenerator(self, split):
if self._use_numpy:
import numpy
self._random = numpy.random.RandomState(self._seed ^ split)
else:
self._random = random.Random(self._seed ^ split)
self._random = random.Random(self._seed ^ split)

# mixing because the initial seeds are close to each other
for _ in xrange(10):
self._random.randint(0, 1)

self._split = split
self._rand_initialized = True

def getUniformSample(self, split):
if not self._rand_initialized or split != self._split:
self.initRandomGenerator(split)

if self._use_numpy:
return self._random.random_sample()
def getUniformSample(self):
return self._random.random()

def getPoissonSample(self, mean):
# Using Knuth's algorithm described in
# http://en.wikipedia.org/wiki/Poisson_distribution
if mean < 20.0:
# one exp and k+1 random calls
l = math.exp(-mean)
p = self._random.random()
k = 0
while p > l:
k += 1
p *= self._random.random()
else:
return self._random.uniform(0.0, 1.0)

def getPoissonSample(self, split, mean):
if not self._rand_initialized or split != self._split:
self.initRandomGenerator(split)

if self._use_numpy:
return self._random.poisson(mean)
else:
# here we simulate drawing numbers n_i ~ Poisson(lambda = 1/mean) by
# drawing a sequence of numbers delta_j ~ Exp(mean)
num_arrivals = 1
cur_time = 0.0

cur_time += self._random.expovariate(mean)
# switch to the log domain, k+1 expovariate (random + log) calls
p = self._random.expovariate(mean)
k = 0
while p < 1.0:
k += 1
p += self._random.expovariate(mean)
return k

if cur_time > 1.0:
return 0

while(cur_time <= 1.0):
cur_time += self._random.expovariate(mean)
num_arrivals += 1

return (num_arrivals - 1)

def shuffle(self, vals):
if self._random is None:
self.initRandomGenerator(0) # this should only ever called on the master so
# the split does not matter

if self._use_numpy:
self._random.shuffle(vals)
else:
self._random.shuffle(vals, self._random.random)
def func(self, split, iterator):
raise NotImplementedError


class RDDSampler(RDDSamplerBase):
Expand All @@ -101,31 +68,32 @@ def __init__(self, withReplacement, fraction, seed=None):
self._fraction = fraction

def func(self, split, iterator):
self.initRandomGenerator(split)
if self._withReplacement:
for obj in iterator:
# For large datasets, the expected number of occurrences of each element in
# a sample with replacement is Poisson(frac). We use that to get a count for
# each element.
count = self.getPoissonSample(split, mean=self._fraction)
count = self.getPoissonSample(self._fraction)
for _ in range(0, count):
yield obj
else:
for obj in iterator:
if self.getUniformSample(split) <= self._fraction:
if self.getUniformSample() < self._fraction:
yield obj


class RDDRangeSampler(RDDSamplerBase):

def __init__(self, lowerBound, upperBound, seed=None):
RDDSamplerBase.__init__(self, False, seed)
self._use_numpy = False # no performance gain from numpy
self._lowerBound = lowerBound
self._upperBound = upperBound

def func(self, split, iterator):
self.initRandomGenerator(split)
for obj in iterator:
if self._lowerBound <= self.getUniformSample(split) < self._upperBound:
if self._lowerBound <= self.getUniformSample() < self._upperBound:
yield obj


Expand All @@ -136,15 +104,16 @@ def __init__(self, withReplacement, fractions, seed=None):
self._fractions = fractions

def func(self, split, iterator):
self.initRandomGenerator(split)
if self._withReplacement:
for key, val in iterator:
# For large datasets, the expected number of occurrences of each element in
# a sample with replacement is Poisson(frac). We use that to get a count for
# each element.
count = self.getPoissonSample(split, mean=self._fractions[key])
count = self.getPoissonSample(self._fractions[key])
for _ in range(0, count):
yield key, val
else:
for key, val in iterator:
if self.getUniformSample(split) <= self._fractions[key]:
if self.getUniformSample() < self._fractions[key]:
yield key, val