Skip to content

[SPARK-5811] Added documentation for maven coordinates and added Spark Packages support #4662

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 37 additions & 15 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,26 @@ object SparkSubmit {

val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER

// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
// too for Spark Package compatibility.
val resolvedMavenCoordinates =
SparkSubmitUtils.resolveMavenCoordinates(
args.packages, Option(args.repositories), Option(args.ivyRepoPath))
if (!resolvedMavenCoordinates.trim.isEmpty) {
if (args.jars == null || args.jars.trim.isEmpty) {
args.jars = resolvedMavenCoordinates
} else {
args.jars += s",$resolvedMavenCoordinates"
}
if (args.isPython) {
if (args.pyFiles == null || args.pyFiles.trim.isEmpty) {
args.pyFiles = resolvedMavenCoordinates
} else {
args.pyFiles += s",$resolvedMavenCoordinates"
}
}
}

// Require all python files to be local, so we can add them to the PYTHONPATH
// In YARN cluster mode, python files are distributed as regular files, which can be non-local
if (args.isPython && !isYarnCluster) {
Expand Down Expand Up @@ -307,18 +327,6 @@ object SparkSubmit {
// Special flag to avoid deprecation warnings at the client
sysProps("SPARK_SUBMIT") = "true"

// Resolve maven dependencies if there are any and add classpath to jars
val resolvedMavenCoordinates =
SparkSubmitUtils.resolveMavenCoordinates(
args.packages, Option(args.repositories), Option(args.ivyRepoPath))
if (!resolvedMavenCoordinates.trim.isEmpty) {
if (args.jars == null || args.jars.trim.isEmpty) {
args.jars = resolvedMavenCoordinates
} else {
args.jars += s",$resolvedMavenCoordinates"
}
}

// A list of rules to map each argument to system properties or command-line options in
// each deploy mode; we iterate through these below
val options = List[OptionAssigner](
Expand Down Expand Up @@ -646,13 +654,15 @@ private[spark] object SparkSubmitUtils {
private[spark] case class MavenCoordinate(groupId: String, artifactId: String, version: String)

/**
* Extracts maven coordinates from a comma-delimited string
* Extracts maven coordinates from a comma-delimited string. Coordinates should be provided
* in the format `groupId:artifactId:version` or `groupId/artifactId:version`. The latter provides
* simplicity for Spark Package users.
* @param coordinates Comma-delimited string of maven coordinates
* @return Sequence of Maven coordinates
*/
private[spark] def extractMavenCoordinates(coordinates: String): Seq[MavenCoordinate] = {
coordinates.split(",").map { p =>
val splits = p.split(":")
val splits = p.replace("/", ":").split(":")
require(splits.length == 3, s"Provided Maven Coordinates must be in the form " +
s"'groupId:artifactId:version'. The coordinate provided is: $p")
require(splits(0) != null && splits(0).trim.nonEmpty, s"The groupId cannot be null or " +
Expand Down Expand Up @@ -682,6 +692,13 @@ private[spark] object SparkSubmitUtils {
br.setName("central")
cr.add(br)

val sp: IBiblioResolver = new IBiblioResolver
sp.setM2compatible(true)
sp.setUsepoms(true)
sp.setRoot("http://dl.bintray.com/spark-packages/maven")
sp.setName("spark-packages")
cr.add(sp)

val repositoryList = remoteRepos.getOrElse("")
// add any other remote repositories other than maven central
if (repositoryList.trim.nonEmpty) {
Expand Down Expand Up @@ -794,14 +811,19 @@ private[spark] object SparkSubmitUtils {
val md = getModuleDescriptor
md.setDefaultConf(ivyConfName)

// Add an exclusion rule for Spark
// Add an exclusion rule for Spark and Scala Library
val sparkArtifacts = new ArtifactId(new ModuleId("org.apache.spark", "*"), "*", "*", "*")
val sparkDependencyExcludeRule =
new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null)
sparkDependencyExcludeRule.addConfiguration(ivyConfName)
val scalaArtifacts = new ArtifactId(new ModuleId("*", "scala-library"), "*", "*", "*")
val scalaDependencyExcludeRule =
new DefaultExcludeRule(scalaArtifacts, ivySettings.getMatcher("glob"), null)
scalaDependencyExcludeRule.addConfiguration(ivyConfName)

// Exclude any Spark dependencies, and add all supplied maven artifacts as dependencies
md.addExcludeRule(sparkDependencyExcludeRule)
md.addExcludeRule(scalaDependencyExcludeRule)
addDependenciesToIvy(md, artifacts, ivyConfName)

// resolve dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,23 @@ class SparkSubmitUtilsSuite extends FunSuite with BeforeAndAfterAll {

test("create repo resolvers") {
val resolver1 = SparkSubmitUtils.createRepoResolvers(None)
// should have central by default
assert(resolver1.getResolvers.size() === 1)
// should have central and spark-packages by default
assert(resolver1.getResolvers.size() === 2)
assert(resolver1.getResolvers.get(0).asInstanceOf[IBiblioResolver].getName === "central")
assert(resolver1.getResolvers.get(1).asInstanceOf[IBiblioResolver].getName === "spark-packages")

val repos = "a/1,b/2,c/3"
val resolver2 = SparkSubmitUtils.createRepoResolvers(Option(repos))
assert(resolver2.getResolvers.size() === 4)
assert(resolver2.getResolvers.size() === 5)
val expected = repos.split(",").map(r => s"$r/")
resolver2.getResolvers.toArray.zipWithIndex.foreach { case (resolver: IBiblioResolver, i) =>
if (i == 0) {
assert(resolver.getName === "central")
} else if (i == 1) {
assert(resolver.getName === "spark-packages")
} else {
assert(resolver.getName === s"repo-$i")
assert(resolver.getRoot === expected(i - 1))
assert(resolver.getName === s"repo-${i - 1}")
assert(resolver.getRoot === expected(i - 2))
}
}
}
Expand Down
19 changes: 16 additions & 3 deletions docs/programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,11 @@ in-process.
In the Spark shell, a special interpreter-aware SparkContext is already created for you, in the
variable called `sc`. Making your own SparkContext will not work. You can set which master the
context connects to using the `--master` argument, and you can add JARs to the classpath
by passing a comma-separated list to the `--jars` argument.
For example, to run `bin/spark-shell` on exactly four cores, use:
by passing a comma-separated list to the `--jars` argument. You can also add dependencies
(e.g. Spark Packages) to your shell session by supplying a comma-separated list of maven coordinates
to the `--packages` argument. Any additional repositories where dependencies might exist (e.g. SonaType)
can be passed to the `--repositories` argument. For example, to run `bin/spark-shell` on exactly
four cores, use:

{% highlight bash %}
$ ./bin/spark-shell --master local[4]
Expand All @@ -186,6 +189,12 @@ Or, to also add `code.jar` to its classpath, use:
$ ./bin/spark-shell --master local[4] --jars code.jar
{% endhighlight %}

To include a dependency using maven coordinates:

{% highlight bash %}
$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"
{% endhighlight %}

For a complete list of options, run `spark-shell --help`. Behind the scenes,
`spark-shell` invokes the more general [`spark-submit` script](submitting-applications.html).

Expand All @@ -196,7 +205,11 @@ For a complete list of options, run `spark-shell --help`. Behind the scenes,
In the PySpark shell, a special interpreter-aware SparkContext is already created for you, in the
variable called `sc`. Making your own SparkContext will not work. You can set which master the
context connects to using the `--master` argument, and you can add Python .zip, .egg or .py files
to the runtime path by passing a comma-separated list to `--py-files`.
to the runtime path by passing a comma-separated list to `--py-files`. You can also add dependencies
(e.g. Spark Packages) to your shell session by supplying a comma-separated list of maven coordinates
to the `--packages` argument. Any additional repositories where dependencies might exist (e.g. SonaType)
can be passed to the `--repositories` argument. Any python dependencies a Spark Package has (listed in
the requirements.txt of that package) must be manually installed using pip when necessary.
For example, to run `bin/pyspark` on exactly four cores, use:

{% highlight bash %}
Expand Down
5 changes: 5 additions & 0 deletions docs/submitting-applications.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ This can use up a significant amount of space over time and will need to be clea
is handled automatically, and with Spark standalone, automatic cleanup can be configured with the
`spark.worker.cleanup.appDataTtl` property.

Users may also include any other dependencies by supplying a comma-delimited list of maven coordinates
with `--packages`. All transitive dependencies will be handled when using this command. Additional
repositories (or resolvers in SBT) can be added in a comma-delimited fashion with the flag `--repositories`.
These commands can be used with `pyspark`, `spark-shell`, and `spark-submit` to include Spark Packages.

For Python, the equivalent `--py-files` option can be used to distribute `.egg`, `.zip` and `.py` libraries
to executors.

Expand Down
6 changes: 4 additions & 2 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ class SparkContext(object):
_lock = Lock()
_python_includes = None # zip and egg files that need to be added to PYTHONPATH

PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar')

def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
environment=None, batchSize=0, serializer=PickleSerializer(), conf=None,
gateway=None, jsc=None, profiler_cls=BasicProfiler):
Expand Down Expand Up @@ -185,7 +187,7 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
for path in self._conf.get("spark.submit.pyFiles", "").split(","):
if path != "":
(dirname, filename) = os.path.split(path)
if filename.lower().endswith("zip") or filename.lower().endswith("egg"):
if filename[-4:].lower() in self.PACKAGE_EXTENSIONS:
self._python_includes.append(filename)
sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename))

Expand Down Expand Up @@ -705,7 +707,7 @@ def addPyFile(self, path):
self.addFile(path)
(dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix

if filename.endswith('.zip') or filename.endswith('.ZIP') or filename.endswith('.egg'):
if filename[-4:].lower() in self.PACKAGE_EXTENSIONS:
self._python_includes.append(filename)
# for tests in local mode
sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename))
Expand Down
69 changes: 65 additions & 4 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1404,31 +1404,59 @@ def setUp(self):
def tearDown(self):
shutil.rmtree(self.programDir)

def createTempFile(self, name, content):
def createTempFile(self, name, content, dir=None):
"""
Create a temp file with the given name and content and return its path.
Strips leading spaces from content up to the first '|' in each line.
"""
pattern = re.compile(r'^ *\|', re.MULTILINE)
content = re.sub(pattern, '', content.strip())
path = os.path.join(self.programDir, name)
if dir is None:
path = os.path.join(self.programDir, name)
else:
os.makedirs(os.path.join(self.programDir, dir))
path = os.path.join(self.programDir, dir, name)
with open(path, "w") as f:
f.write(content)
return path

def createFileInZip(self, name, content):
def createFileInZip(self, name, content, ext=".zip", dir=None, zip_name=None):
"""
Create a zip archive containing a file with the given content and return its path.
Strips leading spaces from content up to the first '|' in each line.
"""
pattern = re.compile(r'^ *\|', re.MULTILINE)
content = re.sub(pattern, '', content.strip())
path = os.path.join(self.programDir, name + ".zip")
if dir is None:
path = os.path.join(self.programDir, name + ext)
else:
path = os.path.join(self.programDir, dir, zip_name + ext)
zip = zipfile.ZipFile(path, 'w')
zip.writestr(name, content)
zip.close()
return path

def create_spark_package(self, artifact_name):
group_id, artifact_id, version = artifact_name.split(":")
self.createTempFile("%s-%s.pom" % (artifact_id, version), ("""
|<?xml version="1.0" encoding="UTF-8"?>
|<project xmlns="http://maven.apache.org/POM/4.0.0"
| xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
| xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
| http://maven.apache.org/xsd/maven-4.0.0.xsd">
| <modelVersion>4.0.0</modelVersion>
| <groupId>%s</groupId>
| <artifactId>%s</artifactId>
| <version>%s</version>
|</project>
""" % (group_id, artifact_id, version)).lstrip(),
os.path.join(group_id, artifact_id, version))
self.createFileInZip("%s.py" % artifact_id, """
|def myfunc(x):
| return x + 1
""", ".jar", os.path.join(group_id, artifact_id, version),
"%s-%s" % (artifact_id, version))

def test_single_script(self):
"""Submit and test a single script file"""
script = self.createTempFile("test.py", """
Expand Down Expand Up @@ -1497,6 +1525,39 @@ def test_module_dependency_on_cluster(self):
self.assertEqual(0, proc.returncode)
self.assertIn("[2, 3, 4]", out)

def test_package_dependency(self):
"""Submit and test a script with a dependency on a Spark Package"""
script = self.createTempFile("test.py", """
|from pyspark import SparkContext
|from mylib import myfunc
|
|sc = SparkContext()
|print sc.parallelize([1, 2, 3]).map(myfunc).collect()
""")
self.create_spark_package("a:mylib:0.1")
proc = subprocess.Popen([self.sparkSubmit, "--packages", "a:mylib:0.1", "--repositories",
"file:" + self.programDir, script], stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
self.assertIn("[2, 3, 4]", out)

def test_package_dependency_on_cluster(self):
"""Submit and test a script with a dependency on a Spark Package on a cluster"""
script = self.createTempFile("test.py", """
|from pyspark import SparkContext
|from mylib import myfunc
|
|sc = SparkContext()
|print sc.parallelize([1, 2, 3]).map(myfunc).collect()
""")
self.create_spark_package("a:mylib:0.1")
proc = subprocess.Popen([self.sparkSubmit, "--packages", "a:mylib:0.1", "--repositories",
"file:" + self.programDir, "--master",
"local-cluster[1,1,512]", script], stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
self.assertIn("[2, 3, 4]", out)

def test_single_script_on_cluster(self):
"""Submit and test a single script on a cluster"""
script = self.createTempFile("test.py", """
Expand Down