Skip to content

Commit ca2d28b

Browse files
committed
[SPARK-2470] PEP8 fixes to context.py
1 parent f4e0039 commit ca2d28b

File tree

1 file changed

+25
-20
lines changed

1 file changed

+25
-20
lines changed

python/pyspark/context.py

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from pyspark.files import SparkFiles
3030
from pyspark.java_gateway import launch_gateway
3131
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
32-
PairDeserializer
32+
PairDeserializer
3333
from pyspark.storagelevel import StorageLevel
3434
from pyspark import rdd
3535
from pyspark.rdd import RDD
@@ -50,12 +50,11 @@ class SparkContext(object):
5050
_next_accum_id = 0
5151
_active_spark_context = None
5252
_lock = Lock()
53-
_python_includes = None # zip and egg files that need to be added to PYTHONPATH
54-
53+
_python_includes = None # zip and egg files that need to be added to PYTHONPATH
5554

5655
def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
57-
environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
58-
gateway=None):
56+
environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
57+
gateway=None):
5958
"""
6059
Create a new SparkContext. At least the master and app name should be set,
6160
either through the named parameters here or through C{conf}.
@@ -138,8 +137,8 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
138137
self._accumulatorServer = accumulators._start_update_server()
139138
(host, port) = self._accumulatorServer.server_address
140139
self._javaAccumulator = self._jsc.accumulator(
141-
self._jvm.java.util.ArrayList(),
142-
self._jvm.PythonAccumulatorParam(host, port))
140+
self._jvm.java.util.ArrayList(),
141+
self._jvm.PythonAccumulatorParam(host, port))
143142

144143
self.pythonExec = os.environ.get("PYSPARK_PYTHON", 'python')
145144

@@ -165,7 +164,7 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
165164
(dirname, filename) = os.path.split(path)
166165
self._python_includes.append(filename)
167166
sys.path.append(path)
168-
if not dirname in sys.path:
167+
if dirname not in sys.path:
169168
sys.path.append(dirname)
170169

171170
# Create a temporary directory inside spark.local.dir:
@@ -192,15 +191,19 @@ def _ensure_initialized(cls, instance=None, gateway=None):
192191
SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile
193192

194193
if instance:
195-
if SparkContext._active_spark_context and SparkContext._active_spark_context != instance:
194+
if SparkContext._active_spark_context and
195+
SparkContext._active_spark_context != instance:
196196
currentMaster = SparkContext._active_spark_context.master
197197
currentAppName = SparkContext._active_spark_context.appName
198198
callsite = SparkContext._active_spark_context._callsite
199199

200200
# Raise error if there is already a running Spark context
201-
raise ValueError("Cannot run multiple SparkContexts at once; existing SparkContext(app=%s, master=%s)" \
202-
" created by %s at %s:%s " \
203-
% (currentAppName, currentMaster, callsite.function, callsite.file, callsite.linenum))
201+
raise ValueError(
202+
"Cannot run multiple SparkContexts at once; "
203+
"existing SparkContext(app=%s, master=%s)"
204+
" created by %s at %s:%s "
205+
% (currentAppName, currentMaster,
206+
callsite.function, callsite.file, callsite.linenum))
204207
else:
205208
SparkContext._active_spark_context = instance
206209

@@ -290,7 +293,7 @@ def textFile(self, name, minPartitions=None):
290293
Read a text file from HDFS, a local file system (available on all
291294
nodes), or any Hadoop-supported file system URI, and return it as an
292295
RDD of Strings.
293-
296+
294297
>>> path = os.path.join(tempdir, "sample-text.txt")
295298
>>> with open(path, "w") as testFile:
296299
... testFile.write("Hello world!")
@@ -584,11 +587,12 @@ def addPyFile(self, path):
584587
HTTP, HTTPS or FTP URI.
585588
"""
586589
self.addFile(path)
587-
(dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix
590+
(dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix
588591

589592
if filename.endswith('.zip') or filename.endswith('.ZIP') or filename.endswith('.egg'):
590593
self._python_includes.append(filename)
591-
sys.path.append(os.path.join(SparkFiles.getRootDirectory(), filename)) # for tests in local mode
594+
# for tests in local mode
595+
sys.path.append(os.path.join(SparkFiles.getRootDirectory(), filename))
592596

593597
def setCheckpointDir(self, dirName):
594598
"""
@@ -649,9 +653,9 @@ def setJobGroup(self, groupId, description, interruptOnCancel=False):
649653
Cancelled
650654
651655
If interruptOnCancel is set to true for the job group, then job cancellation will result
652-
in Thread.interrupt() being called on the job's executor threads. This is useful to help ensure
653-
that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208,
654-
where HDFS may respond to Thread.interrupt() by marking nodes as dead.
656+
in Thread.interrupt() being called on the job's executor threads. This is useful to help
657+
ensure that the tasks are actually stopped in a timely manner, but is off by default due
658+
to HDFS-1208, where HDFS may respond to Thread.interrupt() by marking nodes as dead.
655659
"""
656660
self._jsc.setJobGroup(groupId, description, interruptOnCancel)
657661

@@ -688,7 +692,7 @@ def cancelAllJobs(self):
688692
"""
689693
self._jsc.sc().cancelAllJobs()
690694

691-
def runJob(self, rdd, partitionFunc, partitions = None, allowLocal = False):
695+
def runJob(self, rdd, partitionFunc, partitions=None, allowLocal=False):
692696
"""
693697
Executes the given partitionFunc on the specified set of partitions,
694698
returning the result as an array of elements.
@@ -703,7 +707,7 @@ def runJob(self, rdd, partitionFunc, partitions = None, allowLocal = False):
703707
>>> sc.runJob(myRDD, lambda part: [x * x for x in part], [0, 2], True)
704708
[0, 1, 16, 25]
705709
"""
706-
if partitions == None:
710+
if partitions is None:
707711
partitions = range(rdd._jrdd.partitions().size())
708712
javaPartitions = ListConverter().convert(partitions, self._gateway._gateway_client)
709713

@@ -714,6 +718,7 @@ def runJob(self, rdd, partitionFunc, partitions = None, allowLocal = False):
714718
it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal)
715719
return list(mappedRDD._collect_iterator_through_file(it))
716720

721+
717722
def _test():
718723
import atexit
719724
import doctest

0 commit comments

Comments
 (0)