Skip to content

[SPARK-2470] PEP8 fixes to PySpark #1505

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 33 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
69da6cf
Merge pull request #1 from apache/master
nchammas Jun 10, 2014
6544b7e
[SPARK-2065] give launched instances names
nchammas Jun 10, 2014
2627247
broke up lines before they hit 100 chars
nchammas Jun 11, 2014
69f6e22
PEP8 fixes
nchammas Jun 11, 2014
89fde08
Merge pull request #2 from apache/master
nchammas Jun 13, 2014
2e4fe00
Merge pull request #3 from apache/master
nchammas Jun 20, 2014
de7292a
Merge pull request #4 from apache/master
nchammas Jul 9, 2014
a36eed0
name ec2 instances and security groups consistently
nchammas Jul 9, 2014
f7e4581
unrelated pep8 fix
nchammas Jul 9, 2014
4dd148f
Merge pull request #5 from apache/master
nchammas Jul 20, 2014
f0a7ebf
[SPARK-2470] PEP8 fixes to rddsampler.py
nchammas Jul 20, 2014
a6d5e4b
[SPARK-2470] PEP8 fixes to cloudpickle.py
nchammas Jul 20, 2014
f4e0039
[SPARK-2470] PEP8 fixes to conf.py
nchammas Jul 20, 2014
ca2d28b
[SPARK-2470] PEP8 fixes to context.py
nchammas Jul 20, 2014
7fc849c
[SPARK-2470] PEP8 fixes to daemon.py
nchammas Jul 20, 2014
1bde265
[SPARK-2470] PEP8 fixes to java_gateway.py
nchammas Jul 20, 2014
81fcb20
[SPARK-2470] PEP8 fixes to resultiterable.py
nchammas Jul 20, 2014
d14f2f1
[SPARK-2470] PEP8 fixes to __init__.py
nchammas Jul 20, 2014
c85e1e5
[SPARK-2470] PEP8 fixes to join.py
nchammas Jul 20, 2014
a0fec2e
[SPARK-2470] PEP8 fixes to mllib
nchammas Jul 20, 2014
95d1d95
[SPARK-2470] PEP8 fixes to serializers.py
nchammas Jul 20, 2014
1916859
[SPARK-2470] PEP8 fixes to shell.py
nchammas Jul 20, 2014
aa3a7b6
[SPARK-2470] PEP8 fixes to sql.py
nchammas Jul 20, 2014
d644477
[SPARK-2470] PEP8 fixes to worker.py
nchammas Jul 20, 2014
b3b96cf
[SPARK-2470] PEP8 fixes to statcounter.py
nchammas Jul 20, 2014
8f8e4c0
[SPARK-2470] PEP8 fixes to storagelevel.py
nchammas Jul 20, 2014
7d557b7
[SPARK-2470] PEP8 fixes to tests.py
nchammas Jul 20, 2014
24639bc
[SPARK-2470] fix whitespace for doctest
nchammas Jul 21, 2014
22132a4
[SPARK-2470] wrap conditionals in parentheses
nchammas Jul 21, 2014
9127d2b
[SPARK-2470] wrap expression lists in parentheses
nchammas Jul 21, 2014
e178dbe
[SPARK-2470] style - change position of line break
nchammas Jul 21, 2014
cba7768
[SPARK-2470] wrap expression list in parentheses
nchammas Jul 21, 2014
98171af
[SPARK-2470] revert PEP 8 fixes to cloudpickle
nchammas Jul 21, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion python/pyspark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,5 @@
from pyspark.storagelevel import StorageLevel


__all__ = ["SparkConf", "SparkContext", "SQLContext", "RDD", "SchemaRDD", "SparkFiles", "StorageLevel", "Row"]
__all__ = ["SparkConf", "SparkContext", "SQLContext", "RDD", "SchemaRDD",
"SparkFiles", "StorageLevel", "Row"]
9 changes: 5 additions & 4 deletions python/pyspark/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@
spark.executorEnv.VAR4=value4
spark.home=/path
>>> sorted(conf.getAll(), key=lambda p: p[0])
[(u'spark.executorEnv.VAR1', u'value1'), (u'spark.executorEnv.VAR3', u'value3'), (u'spark.executorEnv.VAR4', u'value4'), (u'spark.home', u'/path')]
[(u'spark.executorEnv.VAR1', u'value1'), (u'spark.executorEnv.VAR3', u'value3'), \
(u'spark.executorEnv.VAR4', u'value4'), (u'spark.home', u'/path')]
"""


Expand Down Expand Up @@ -118,9 +119,9 @@ def setExecutorEnv(self, key=None, value=None, pairs=None):
"""Set an environment variable to be passed to executors."""
if (key is not None and pairs is not None) or (key is None and pairs is None):
raise Exception("Either pass one key-value pair or a list of pairs")
elif key != None:
elif key is not None:
self._jconf.setExecutorEnv(key, value)
elif pairs != None:
elif pairs is not None:
for (k, v) in pairs:
self._jconf.setExecutorEnv(k, v)
return self
Expand All @@ -137,7 +138,7 @@ def setAll(self, pairs):

def get(self, key, defaultValue=None):
"""Get the configured value for some key, or return a default otherwise."""
if defaultValue == None: # Py4J doesn't call the right get() if we pass None
if defaultValue is None: # Py4J doesn't call the right get() if we pass None
if not self._jconf.contains(key):
return None
return self._jconf.get(key)
Expand Down
45 changes: 25 additions & 20 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from pyspark.files import SparkFiles
from pyspark.java_gateway import launch_gateway
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
PairDeserializer
PairDeserializer
from pyspark.storagelevel import StorageLevel
from pyspark import rdd
from pyspark.rdd import RDD
Expand All @@ -50,12 +50,11 @@ class SparkContext(object):
_next_accum_id = 0
_active_spark_context = None
_lock = Lock()
_python_includes = None # zip and egg files that need to be added to PYTHONPATH

_python_includes = None # zip and egg files that need to be added to PYTHONPATH

def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
gateway=None):
environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
gateway=None):
"""
Create a new SparkContext. At least the master and app name should be set,
either through the named parameters here or through C{conf}.
Expand Down Expand Up @@ -138,8 +137,8 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
self._accumulatorServer = accumulators._start_update_server()
(host, port) = self._accumulatorServer.server_address
self._javaAccumulator = self._jsc.accumulator(
self._jvm.java.util.ArrayList(),
self._jvm.PythonAccumulatorParam(host, port))
self._jvm.java.util.ArrayList(),
self._jvm.PythonAccumulatorParam(host, port))

self.pythonExec = os.environ.get("PYSPARK_PYTHON", 'python')

Expand All @@ -165,7 +164,7 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
(dirname, filename) = os.path.split(path)
self._python_includes.append(filename)
sys.path.append(path)
if not dirname in sys.path:
if dirname not in sys.path:
sys.path.append(dirname)

# Create a temporary directory inside spark.local.dir:
Expand All @@ -192,15 +191,19 @@ def _ensure_initialized(cls, instance=None, gateway=None):
SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile

if instance:
if SparkContext._active_spark_context and SparkContext._active_spark_context != instance:
if (SparkContext._active_spark_context and
SparkContext._active_spark_context != instance):
currentMaster = SparkContext._active_spark_context.master
currentAppName = SparkContext._active_spark_context.appName
callsite = SparkContext._active_spark_context._callsite

# Raise error if there is already a running Spark context
raise ValueError("Cannot run multiple SparkContexts at once; existing SparkContext(app=%s, master=%s)" \
" created by %s at %s:%s " \
% (currentAppName, currentMaster, callsite.function, callsite.file, callsite.linenum))
raise ValueError(
"Cannot run multiple SparkContexts at once; "
"existing SparkContext(app=%s, master=%s)"
" created by %s at %s:%s "
% (currentAppName, currentMaster,
callsite.function, callsite.file, callsite.linenum))
else:
SparkContext._active_spark_context = instance

Expand Down Expand Up @@ -290,7 +293,7 @@ def textFile(self, name, minPartitions=None):
Read a text file from HDFS, a local file system (available on all
nodes), or any Hadoop-supported file system URI, and return it as an
RDD of Strings.

>>> path = os.path.join(tempdir, "sample-text.txt")
>>> with open(path, "w") as testFile:
... testFile.write("Hello world!")
Expand Down Expand Up @@ -584,11 +587,12 @@ def addPyFile(self, path):
HTTP, HTTPS or FTP URI.
"""
self.addFile(path)
(dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix
(dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix

if filename.endswith('.zip') or filename.endswith('.ZIP') or filename.endswith('.egg'):
self._python_includes.append(filename)
sys.path.append(os.path.join(SparkFiles.getRootDirectory(), filename)) # for tests in local mode
# for tests in local mode
sys.path.append(os.path.join(SparkFiles.getRootDirectory(), filename))

def setCheckpointDir(self, dirName):
"""
Expand Down Expand Up @@ -649,9 +653,9 @@ def setJobGroup(self, groupId, description, interruptOnCancel=False):
Cancelled

If interruptOnCancel is set to true for the job group, then job cancellation will result
in Thread.interrupt() being called on the job's executor threads. This is useful to help ensure
that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208,
where HDFS may respond to Thread.interrupt() by marking nodes as dead.
in Thread.interrupt() being called on the job's executor threads. This is useful to help
ensure that the tasks are actually stopped in a timely manner, but is off by default due
to HDFS-1208, where HDFS may respond to Thread.interrupt() by marking nodes as dead.
"""
self._jsc.setJobGroup(groupId, description, interruptOnCancel)

Expand Down Expand Up @@ -688,7 +692,7 @@ def cancelAllJobs(self):
"""
self._jsc.sc().cancelAllJobs()

def runJob(self, rdd, partitionFunc, partitions = None, allowLocal = False):
def runJob(self, rdd, partitionFunc, partitions=None, allowLocal=False):
"""
Executes the given partitionFunc on the specified set of partitions,
returning the result as an array of elements.
Expand All @@ -703,7 +707,7 @@ def runJob(self, rdd, partitionFunc, partitions = None, allowLocal = False):
>>> sc.runJob(myRDD, lambda part: [x * x for x in part], [0, 2], True)
[0, 1, 16, 25]
"""
if partitions == None:
if partitions is None:
partitions = range(rdd._jrdd.partitions().size())
javaPartitions = ListConverter().convert(partitions, self._gateway._gateway_client)

Expand All @@ -714,6 +718,7 @@ def runJob(self, rdd, partitionFunc, partitions = None, allowLocal = False):
it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal)
return list(mappedRDD._collect_iterator_through_file(it))


def _test():
import atexit
import doctest
Expand Down
12 changes: 6 additions & 6 deletions python/pyspark/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ def should_exit():


def compute_real_exit_code(exit_code):
# SystemExit's code can be integer or string, but os._exit only accepts integers
import numbers
if isinstance(exit_code, numbers.Integral):
return exit_code
else:
return 1
# SystemExit's code can be integer or string, but os._exit only accepts integers
import numbers
if isinstance(exit_code, numbers.Integral):
return exit_code
else:
return 1


def worker(listen_sock):
Expand Down
1 change: 1 addition & 0 deletions python/pyspark/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from threading import Thread
from py4j.java_gateway import java_import, JavaGateway, GatewayClient


def launch_gateway():
SPARK_HOME = os.environ["SPARK_HOME"]

Expand Down
4 changes: 3 additions & 1 deletion python/pyspark/join.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@

from pyspark.resultiterable import ResultIterable


def _do_python_join(rdd, other, numPartitions, dispatch):
vs = rdd.map(lambda (k, v): (k, (1, v)))
ws = other.map(lambda (k, v): (k, (2, v)))
return vs.union(ws).groupByKey(numPartitions).flatMapValues(lambda x : dispatch(x.__iter__()))
return vs.union(ws).groupByKey(numPartitions).flatMapValues(lambda x: dispatch(x.__iter__()))


def python_join(rdd, other, numPartitions):
Expand Down Expand Up @@ -85,6 +86,7 @@ def make_mapper(i):
vrdds = [rdd.map(make_mapper(i)) for i, rdd in enumerate(rdds)]
union_vrdds = reduce(lambda acc, other: acc.union(other), vrdds)
rdd_len = len(vrdds)

def dispatch(seq):
bufs = [[] for i in range(rdd_len)]
for (n, v) in seq:
Expand Down
4 changes: 3 additions & 1 deletion python/pyspark/mllib/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def _deserialize_double_vector(ba, offset=0):
nb = len(ba) - offset
if nb < 5:
raise TypeError("_deserialize_double_vector called on a %d-byte array, "
"which is too short" % nb)
"which is too short" % nb)
if ba[offset] == DENSE_VECTOR_MAGIC:
return _deserialize_dense_vector(ba, offset)
elif ba[offset] == SPARSE_VECTOR_MAGIC:
Expand Down Expand Up @@ -272,6 +272,7 @@ def _serialize_labeled_point(p):
header_float[0] = p.label
return header + serialized_features


def _deserialize_labeled_point(ba, offset=0):
"""Deserialize a LabeledPoint from a mutually understood format."""
from pyspark.mllib.regression import LabeledPoint
Expand All @@ -283,6 +284,7 @@ def _deserialize_labeled_point(ba, offset=0):
features = _deserialize_double_vector(ba, offset + 9)
return LabeledPoint(label, features)


def _copyto(array, buffer, offset, shape, dtype):
"""
Copy the contents of a vector to a destination bytearray at the
Expand Down
1 change: 1 addition & 0 deletions python/pyspark/mllib/linalg.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ def stringify(vector):
else:
return "[" + ",".join([str(v) for v in vector]) + "]"


def _test():
import doctest
(failure_count, test_count) = doctest.testmod(optionflags=doctest.ELLIPSIS)
Expand Down
2 changes: 0 additions & 2 deletions python/pyspark/mllib/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from pyspark.serializers import NoOpSerializer



class MLUtils:
"""
Helper methods to load, save and pre-process data used in MLlib.
Expand Down Expand Up @@ -154,7 +153,6 @@ def saveAsLibSVMFile(data, dir):
lines = data.map(lambda p: MLUtils._convert_labeled_point_to_libsvm(p))
lines.saveAsTextFile(dir)


@staticmethod
def loadLabeledPoints(sc, path, minPartitions=None):
"""
Expand Down
24 changes: 12 additions & 12 deletions python/pyspark/rddsampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
import sys
import random


class RDDSampler(object):
def __init__(self, withReplacement, fraction, seed=None):
try:
import numpy
self._use_numpy = True
except ImportError:
print >> sys.stderr, "NumPy does not appear to be installed. Falling back to default random generator for sampling."
print >> sys.stderr, (
"NumPy does not appear to be installed. "
"Falling back to default random generator for sampling.")
self._use_numpy = False

self._seed = seed if seed is not None else random.randint(0, sys.maxint)
Expand Down Expand Up @@ -61,7 +64,7 @@ def getUniformSample(self, split):
def getPoissonSample(self, split, mean):
if not self._rand_initialized or split != self._split:
self.initRandomGenerator(split)

if self._use_numpy:
return self._random.poisson(mean)
else:
Expand All @@ -80,30 +83,27 @@ def getPoissonSample(self, split, mean):
num_arrivals += 1

return (num_arrivals - 1)

def shuffle(self, vals):
if self._random is None:
self.initRandomGenerator(0) # this should only ever called on the master so
# the split does not matter

if self._use_numpy:
self._random.shuffle(vals)
else:
self._random.shuffle(vals, self._random.random)

def func(self, split, iterator):
if self._withReplacement:
if self._withReplacement:
for obj in iterator:
# For large datasets, the expected number of occurrences of each element in a sample with
# replacement is Poisson(frac). We use that to get a count for each element.
count = self.getPoissonSample(split, mean = self._fraction)
# For large datasets, the expected number of occurrences of each element in
# a sample with replacement is Poisson(frac). We use that to get a count for
# each element.
count = self.getPoissonSample(split, mean=self._fraction)
for _ in range(0, count):
yield obj
else:
for obj in iterator:
if self.getUniformSample(split) <= self._fraction:
yield obj




3 changes: 3 additions & 0 deletions python/pyspark/resultiterable.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import collections


class ResultIterable(collections.Iterable):
"""
A special result iterable. This is used because the standard iterator can not be pickled
Expand All @@ -27,7 +28,9 @@ def __init__(self, data):
self.data = data
self.index = 0
self.maxindex = len(data)

def __iter__(self):
return iter(self.data)

def __len__(self):
return len(self.data)
Loading