Skip to content

[SPARK-1824] Remove <master> from Python examples #802

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,15 @@ The `--master` option specifies the
locally with one thread, or `local[N]` to run locally with N threads. You should start by using
`local` for testing. For a full list of options, run Spark shell with the `--help` option.

Spark also provides a Python interface. To run an example Spark application written in Python, use
`bin/pyspark <program> [params]`. For example,
Spark also provides a Python interface. To run Spark interactively in a Python interpreter, use
`bin/pyspark`. As in Spark shell, you can also pass in the `--master` option to configure your
master URL.

./bin/pyspark examples/src/main/python/pi.py local[2] 10
./bin/pyspark --master local[2]

or simply `bin/pyspark` without any arguments to run Spark interactively in a python interpreter.
Example applications are also provided in Python. For example,

./bin/spark-submit examples/src/main/python/pi.py 10

# Launching on a Cluster

Expand Down
32 changes: 17 additions & 15 deletions docs/python-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,9 @@ By default, PySpark requires `python` to be available on the system `PATH` and u

All of PySpark's library dependencies, including [Py4J](http://py4j.sourceforge.net/), are bundled with PySpark and automatically imported.

Standalone PySpark applications should be run using the `bin/spark-submit` script, which automatically
configures the Java and Python environment for running Spark.


# Interactive Use

The `bin/pyspark` script launches a Python interpreter that is configured to run PySpark applications. To use `pyspark` interactively, first build Spark, then launch it directly from the command line without any options:
The `bin/pyspark` script launches a Python interpreter that is configured to run PySpark applications. To use `pyspark` interactively, first build Spark, then launch it directly from the command line:

{% highlight bash %}
$ sbt/sbt assembly
Expand All @@ -83,20 +79,24 @@ The Python shell can be used explore data interactively and is a simple way to l
{% endhighlight %}

By default, the `bin/pyspark` shell creates SparkContext that runs applications locally on all of
your machine's logical cores.
To connect to a non-local cluster, or to specify a number of cores, set the `MASTER` environment variable.
For example, to use the `bin/pyspark` shell with a [standalone Spark cluster](spark-standalone.html):
your machine's logical cores. To connect to a non-local cluster, or to specify a number of cores,
set the `--master` flag. For example, to use the `bin/pyspark` shell with a
[standalone Spark cluster](spark-standalone.html):

{% highlight bash %}
$ MASTER=spark://IP:PORT ./bin/pyspark
$ ./bin/pyspark --master spark://1.2.3.4:7077
{% endhighlight %}

Or, to use exactly four cores on the local machine:

{% highlight bash %}
$ MASTER=local[4] ./bin/pyspark
$ ./bin/pyspark --master local[4]
{% endhighlight %}

Under the hood `bin/pyspark` is a wrapper around the
[Spark submit script](cluster-overview.html#launching-applications-with-spark-submit), so these
two scripts share the same list of options. For a complete list of options, run `bin/pyspark` with
the `--help` option.

## IPython

Expand All @@ -115,13 +115,14 @@ the [IPython Notebook](http://ipython.org/notebook.html) with PyLab graphing sup
$ IPYTHON_OPTS="notebook --pylab inline" ./bin/pyspark
{% endhighlight %}

IPython also works on a cluster or on multiple cores if you set the `MASTER` environment variable.
IPython also works on a cluster or on multiple cores if you set the `--master` flag.


# Standalone Programs

PySpark can also be used from standalone Python scripts by creating a SparkContext in your script and running the script using `bin/spark-submit`.
The Quick Start guide includes a [complete example](quick-start.html#standalone-applications) of a standalone Python application.
PySpark can also be used from standalone Python scripts by creating a SparkContext in your script
and running the script using `bin/spark-submit`. The Quick Start guide includes a
[complete example](quick-start.html#standalone-applications) of a standalone Python application.

Code dependencies can be deployed by passing .zip or .egg files in the `--py-files` option of `spark-submit`:

Expand All @@ -138,6 +139,7 @@ You can set [configuration properties](configuration.html#spark-properties) by p
{% highlight python %}
from pyspark import SparkConf, SparkContext
conf = (SparkConf()
.setMaster("local")
.setAppName("My app")
.set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)
Expand All @@ -164,6 +166,6 @@ some example applications.
PySpark also includes several sample programs in the [`examples/src/main/python` folder](https://github.com/apache/spark/tree/master/examples/src/main/python).
You can run them by passing the files to `pyspark`; e.g.:

./bin/spark-submit examples/src/main/python/wordcount.py local[2] README.md
./bin/spark-submit examples/src/main/python/wordcount.py README.md

Each program prints usage help when run without arguments.
Each program prints usage help when run without the sufficient arguments.
18 changes: 9 additions & 9 deletions examples/src/main/python/als.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ def update(i, vec, mat, ratings):
return np.linalg.solve(XtX, Xty)

if __name__ == "__main__":
if len(sys.argv) < 2:
print >> sys.stderr, "Usage: als <master> <M> <U> <F> <iters> <slices>"
exit(-1)
sc = SparkContext(sys.argv[1], "PythonALS", pyFiles=[realpath(__file__)])
M = int(sys.argv[2]) if len(sys.argv) > 2 else 100
U = int(sys.argv[3]) if len(sys.argv) > 3 else 500
F = int(sys.argv[4]) if len(sys.argv) > 4 else 10
ITERATIONS = int(sys.argv[5]) if len(sys.argv) > 5 else 5
slices = int(sys.argv[6]) if len(sys.argv) > 6 else 2
"""
Usage: als [M] [U] [F] [iterations] [slices]"
"""
sc = SparkContext(appName="PythonALS")
M = int(sys.argv[1]) if len(sys.argv) > 1 else 100
U = int(sys.argv[2]) if len(sys.argv) > 2 else 500
F = int(sys.argv[3]) if len(sys.argv) > 3 else 10
ITERATIONS = int(sys.argv[4]) if len(sys.argv) > 4 else 5
slices = int(sys.argv[5]) if len(sys.argv) > 5 else 2

print "Running ALS with M=%d, U=%d, F=%d, iters=%d, slices=%d\n" % \
(M, U, F, ITERATIONS, slices)
Expand Down
12 changes: 6 additions & 6 deletions examples/src/main/python/kmeans.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ def closestPoint(p, centers):


if __name__ == "__main__":
if len(sys.argv) < 5:
print >> sys.stderr, "Usage: kmeans <master> <file> <k> <convergeDist>"
if len(sys.argv) != 4:
print >> sys.stderr, "Usage: kmeans <file> <k> <convergeDist>"
exit(-1)
sc = SparkContext(sys.argv[1], "PythonKMeans")
lines = sc.textFile(sys.argv[2])
sc = SparkContext(appName="PythonKMeans")
lines = sc.textFile(sys.argv[1])
data = lines.map(parseVector).cache()
K = int(sys.argv[3])
convergeDist = float(sys.argv[4])
K = int(sys.argv[2])
convergeDist = float(sys.argv[3])

kPoints = data.takeSample(False, K, 1)
tempDist = 1.0
Expand Down
10 changes: 5 additions & 5 deletions examples/src/main/python/logistic_regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ def readPointBatch(iterator):
return [matrix]

if __name__ == "__main__":
if len(sys.argv) != 4:
print >> sys.stderr, "Usage: logistic_regression <master> <file> <iters>"
if len(sys.argv) != 3:
print >> sys.stderr, "Usage: logistic_regression <file> <iterations>"
exit(-1)
sc = SparkContext(sys.argv[1], "PythonLR", pyFiles=[realpath(__file__)])
points = sc.textFile(sys.argv[2]).mapPartitions(readPointBatch).cache()
iterations = int(sys.argv[3])
sc = SparkContext(appName="PythonLR")
points = sc.textFile(sys.argv[1]).mapPartitions(readPointBatch).cache()
iterations = int(sys.argv[2])

# Initialize w to a random value
w = 2 * np.random.ranf(size=D) - 1
Expand Down
10 changes: 5 additions & 5 deletions examples/src/main/python/mllib/kmeans.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ def parseVector(line):


if __name__ == "__main__":
if len(sys.argv) < 4:
print >> sys.stderr, "Usage: kmeans <master> <file> <k>"
if len(sys.argv) != 3:
print >> sys.stderr, "Usage: kmeans <file> <k>"
exit(-1)
sc = SparkContext(sys.argv[1], "KMeans")
lines = sc.textFile(sys.argv[2])
sc = SparkContext(appName="KMeans")
lines = sc.textFile(sys.argv[1])
data = lines.map(parseVector)
k = int(sys.argv[3])
k = int(sys.argv[2])
model = KMeans.train(data, k)
print "Final centers: " + str(model.clusterCenters)
10 changes: 5 additions & 5 deletions examples/src/main/python/mllib/logistic_regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ def parsePoint(line):


if __name__ == "__main__":
if len(sys.argv) != 4:
print >> sys.stderr, "Usage: logistic_regression <master> <file> <iters>"
if len(sys.argv) != 3:
print >> sys.stderr, "Usage: logistic_regression <file> <iterations>"
exit(-1)
sc = SparkContext(sys.argv[1], "PythonLR")
points = sc.textFile(sys.argv[2]).map(parsePoint)
iterations = int(sys.argv[3])
sc = SparkContext(appName="PythonLR")
points = sc.textFile(sys.argv[1]).map(parsePoint)
iterations = int(sys.argv[2])
model = LogisticRegressionWithSGD.train(points, iterations)
print "Final weights: " + str(model.weights)
print "Final intercept: " + str(model.intercept)
10 changes: 5 additions & 5 deletions examples/src/main/python/pagerank.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,19 @@ def parseNeighbors(urls):


if __name__ == "__main__":
if len(sys.argv) < 3:
print >> sys.stderr, "Usage: pagerank <master> <file> <number_of_iterations>"
if len(sys.argv) != 3:
print >> sys.stderr, "Usage: pagerank <file> <iterations>"
exit(-1)

# Initialize the spark context.
sc = SparkContext(sys.argv[1], "PythonPageRank")
sc = SparkContext(appName="PythonPageRank")

# Loads in input file. It should be in format of:
# URL neighbor URL
# URL neighbor URL
# URL neighbor URL
# ...
lines = sc.textFile(sys.argv[2], 1)
lines = sc.textFile(sys.argv[1], 1)

# Loads all URLs from input file and initialize their neighbors.
links = lines.map(lambda urls: parseNeighbors(urls)).distinct().groupByKey().cache()
Expand All @@ -57,7 +57,7 @@ def parseNeighbors(urls):
ranks = links.map(lambda (url, neighbors): (url, 1.0))

# Calculates and updates URL ranks continuously using PageRank algorithm.
for iteration in xrange(int(sys.argv[3])):
for iteration in xrange(int(sys.argv[2])):
# Calculates URL contributions to the rank of other URLs.
contribs = links.join(ranks).flatMap(lambda (url, (urls, rank)):
computeContribs(urls, rank))
Expand Down
10 changes: 5 additions & 5 deletions examples/src/main/python/pi.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@


if __name__ == "__main__":
if len(sys.argv) == 1:
print >> sys.stderr, "Usage: pi <master> [<slices>]"
exit(-1)
sc = SparkContext(sys.argv[1], "PythonPi")
slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2
"""
Usage: pi [slices]
"""
sc = SparkContext(appName="PythonPi")
slices = int(sys.argv[1]) if len(sys.argv) > 1 else 2
n = 100000 * slices
def f(_):
x = random() * 2 - 1
Expand Down
8 changes: 4 additions & 4 deletions examples/src/main/python/sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@


if __name__ == "__main__":
if len(sys.argv) < 3:
print >> sys.stderr, "Usage: sort <master> <file>"
if len(sys.argv) != 2:
print >> sys.stderr, "Usage: sort <file>"
exit(-1)
sc = SparkContext(sys.argv[1], "PythonSort")
lines = sc.textFile(sys.argv[2], 1)
sc = SparkContext(appName="PythonSort")
lines = sc.textFile(sys.argv[1], 1)
sortedCount = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (int(x), 1)) \
.sortByKey(lambda x: x)
Expand Down
10 changes: 5 additions & 5 deletions examples/src/main/python/transitive_closure.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ def generateGraph():


if __name__ == "__main__":
if len(sys.argv) == 1:
print >> sys.stderr, "Usage: transitive_closure <master> [<slices>]"
exit(-1)
sc = SparkContext(sys.argv[1], "PythonTransitiveClosure")
slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2
"""
Usage: transitive_closure [slices]
"""
sc = SparkContext(appName="PythonTransitiveClosure")
slices = int(sys.argv[1]) if len(sys.argv) > 1 else 2
tc = sc.parallelize(generateGraph(), slices).cache()

# Linear transitive closure: each round grows paths by one edge,
Expand Down
8 changes: 4 additions & 4 deletions examples/src/main/python/wordcount.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@


if __name__ == "__main__":
if len(sys.argv) < 3:
print >> sys.stderr, "Usage: wordcount <master> <file>"
if len(sys.argv) != 2:
print >> sys.stderr, "Usage: wordcount <file>"
exit(-1)
sc = SparkContext(sys.argv[1], "PythonWordCount")
lines = sc.textFile(sys.argv[2], 1)
sc = SparkContext(appName="PythonWordCount")
lines = sc.textFile(sys.argv[1], 1)
counts = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(add)
Expand Down