Skip to content

Commit 10b6466

Browse files
AngersZhuuuumaropu
authored andcommitted
[SPARK-33084][CORE][SQL] Add jar support ivy path
### What changes were proposed in this pull request? Support add jar with ivy path ### Why are the changes needed? Since submit app can support ivy, add jar we can also support ivy now. ### Does this PR introduce _any_ user-facing change? User can add jar with sql like ``` add jar ivy:://group:artifict:version?exclude=xxx,xxx&transitive=true add jar ivy:://group:artifict:version?exclude=xxx,xxx&transitive=false ``` core api ``` sparkContext.addJar("ivy:://group:artifict:version?exclude=xxx,xxx&transitive=true") sparkContext.addJar("ivy:://group:artifict:version?exclude=xxx,xxx&transitive=false") ``` #### Doc Update snapshot ![image](https://user-images.githubusercontent.com/46485123/101227738-de451200-36d3-11eb-813d-78a8b879da4f.png) ### How was this patch tested? Added UT Closes #29966 from AngersZhuuuu/support-add-jar-ivy. Lead-authored-by: angerszhu <angers.zhu@gmail.com> Co-authored-by: AngersZhuuuu <angers.zhu@gmail.com> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
1 parent 65a9ac2 commit 10b6466

File tree

15 files changed

+475
-50
lines changed

15 files changed

+475
-50
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1929,7 +1929,7 @@ class SparkContext(config: SparkConf) extends Logging {
19291929
}
19301930

19311931
private def addJar(path: String, addedOnSubmit: Boolean): Unit = {
1932-
def addLocalJarFile(file: File): String = {
1932+
def addLocalJarFile(file: File): Seq[String] = {
19331933
try {
19341934
if (!file.exists()) {
19351935
throw new FileNotFoundException(s"Jar ${file.getAbsolutePath} not found")
@@ -1938,15 +1938,15 @@ class SparkContext(config: SparkConf) extends Logging {
19381938
throw new IllegalArgumentException(
19391939
s"Directory ${file.getAbsoluteFile} is not allowed for addJar")
19401940
}
1941-
env.rpcEnv.fileServer.addJar(file)
1941+
Seq(env.rpcEnv.fileServer.addJar(file))
19421942
} catch {
19431943
case NonFatal(e) =>
19441944
logError(s"Failed to add $path to Spark environment", e)
1945-
null
1945+
Nil
19461946
}
19471947
}
19481948

1949-
def checkRemoteJarFile(path: String): String = {
1949+
def checkRemoteJarFile(path: String): Seq[String] = {
19501950
val hadoopPath = new Path(path)
19511951
val scheme = hadoopPath.toUri.getScheme
19521952
if (!Array("http", "https", "ftp").contains(scheme)) {
@@ -1959,47 +1959,58 @@ class SparkContext(config: SparkConf) extends Logging {
19591959
throw new IllegalArgumentException(
19601960
s"Directory ${path} is not allowed for addJar")
19611961
}
1962-
path
1962+
Seq(path)
19631963
} catch {
19641964
case NonFatal(e) =>
19651965
logError(s"Failed to add $path to Spark environment", e)
1966-
null
1966+
Nil
19671967
}
19681968
} else {
1969-
path
1969+
Seq(path)
19701970
}
19711971
}
19721972

19731973
if (path == null || path.isEmpty) {
19741974
logWarning("null or empty path specified as parameter to addJar")
19751975
} else {
1976-
val key = if (path.contains("\\") && Utils.isWindows) {
1976+
val (keys, scheme) = if (path.contains("\\") && Utils.isWindows) {
19771977
// For local paths with backslashes on Windows, URI throws an exception
1978-
addLocalJarFile(new File(path))
1978+
(addLocalJarFile(new File(path)), "local")
19791979
} else {
19801980
val uri = new Path(path).toUri
19811981
// SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies
19821982
Utils.validateURL(uri)
1983-
uri.getScheme match {
1983+
val uriScheme = uri.getScheme
1984+
val jarPaths = uriScheme match {
19841985
// A JAR file which exists only on the driver node
19851986
case null =>
19861987
// SPARK-22585 path without schema is not url encoded
19871988
addLocalJarFile(new File(uri.getPath))
19881989
// A JAR file which exists only on the driver node
19891990
case "file" => addLocalJarFile(new File(uri.getPath))
19901991
// A JAR file which exists locally on every worker node
1991-
case "local" => "file:" + uri.getPath
1992+
case "local" => Seq("file:" + uri.getPath)
1993+
case "ivy" =>
1994+
// Since `new Path(path).toUri` will lose query information,
1995+
// so here we use `URI.create(path)`
1996+
DependencyUtils.resolveMavenDependencies(URI.create(path))
1997+
.flatMap(jar => addLocalJarFile(new File(jar)))
19921998
case _ => checkRemoteJarFile(path)
19931999
}
2000+
(jarPaths, uriScheme)
19942001
}
1995-
if (key != null) {
2002+
if (keys.nonEmpty) {
19962003
val timestamp = if (addedOnSubmit) startTime else System.currentTimeMillis
1997-
if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
1998-
logInfo(s"Added JAR $path at $key with timestamp $timestamp")
2004+
val (added, existed) = keys.partition(addedJars.putIfAbsent(_, timestamp).isEmpty)
2005+
if (added.nonEmpty) {
2006+
val jarMessage = if (scheme != "ivy") "JAR" else "dependency jars of Ivy URI"
2007+
logInfo(s"Added $jarMessage $path at ${added.mkString(",")} with timestamp $timestamp")
19992008
postEnvironmentUpdate()
2000-
} else {
2001-
logWarning(s"The jar $path has been added already. Overwriting of added jars " +
2002-
"is not supported in the current version.")
2009+
}
2010+
if (existed.nonEmpty) {
2011+
val jarMessage = if (scheme != "ivy") "JAR" else "dependency jars of Ivy URI"
2012+
logInfo(s"The $jarMessage $path at ${existed.mkString(",")} has been added already." +
2013+
" Overwriting of added jar is not supported in the current version.")
20032014
}
20042015
}
20052016
}

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -304,8 +304,8 @@ private[spark] class SparkSubmit extends Logging {
304304
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
305305
// too for packages that include Python code
306306
val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(
307-
args.packagesExclusions, args.packages, args.repositories, args.ivyRepoPath,
308-
args.ivySettingsPath)
307+
packagesTransitive = true, args.packagesExclusions, args.packages,
308+
args.repositories, args.ivyRepoPath, args.ivySettingsPath)
309309

310310
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
311311
// In K8s client mode, when in the driver, add resolved jars early as we might need
@@ -1360,13 +1360,15 @@ private[spark] object SparkSubmitUtils {
13601360
* Resolves any dependencies that were supplied through maven coordinates
13611361
* @param coordinates Comma-delimited string of maven coordinates
13621362
* @param ivySettings An IvySettings containing resolvers to use
1363+
* @param transitive Whether resolving transitive dependencies, default is true
13631364
* @param exclusions Exclusions to apply when resolving transitive dependencies
13641365
* @return The comma-delimited path to the jars of the given maven artifacts including their
13651366
* transitive dependencies
13661367
*/
13671368
def resolveMavenCoordinates(
13681369
coordinates: String,
13691370
ivySettings: IvySettings,
1371+
transitive: Boolean,
13701372
exclusions: Seq[String] = Nil,
13711373
isTest: Boolean = false): String = {
13721374
if (coordinates == null || coordinates.trim.isEmpty) {
@@ -1396,7 +1398,7 @@ private[spark] object SparkSubmitUtils {
13961398
val ivy = Ivy.newInstance(ivySettings)
13971399
// Set resolve options to download transitive dependencies as well
13981400
val resolveOptions = new ResolveOptions
1399-
resolveOptions.setTransitive(true)
1401+
resolveOptions.setTransitive(transitive)
14001402
val retrieveOptions = new RetrieveOptions
14011403
// Turn downloading and logging off for testing
14021404
if (isTest) {

core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.io.File
2222
import org.apache.commons.lang3.StringUtils
2323

2424
import org.apache.spark.{SecurityManager, SparkConf}
25-
import org.apache.spark.deploy.{DependencyUtils, SparkHadoopUtil}
25+
import org.apache.spark.deploy.SparkHadoopUtil
2626
import org.apache.spark.internal.{config, Logging}
2727
import org.apache.spark.rpc.RpcEnv
2828
import org.apache.spark.util._
@@ -79,17 +79,11 @@ object DriverWrapper extends Logging {
7979
val secMgr = new SecurityManager(sparkConf)
8080
val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf)
8181

82-
val Seq(packagesExclusions, packages, repositories, ivyRepoPath, ivySettingsPath) =
83-
Seq(
84-
"spark.jars.excludes",
85-
"spark.jars.packages",
86-
"spark.jars.repositories",
87-
"spark.jars.ivy",
88-
"spark.jars.ivySettings"
89-
).map(sys.props.get(_).orNull)
82+
val ivyProperties = DependencyUtils.getIvyProperties()
9083

91-
val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(packagesExclusions,
92-
packages, repositories, ivyRepoPath, Option(ivySettingsPath))
84+
val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(true,
85+
ivyProperties.packagesExclusions, ivyProperties.packages, ivyProperties.repositories,
86+
ivyProperties.ivyRepoPath, Option(ivyProperties.ivySettingsPath))
9387
val jars = {
9488
val jarsProp = sys.props.get(config.JARS.key).orNull
9589
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {

core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala renamed to core/src/main/scala/org/apache/spark/util/DependencyUtils.scala

Lines changed: 133 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.deploy
18+
package org.apache.spark.util
1919

2020
import java.io.File
2121
import java.net.URI
@@ -25,12 +25,140 @@ import org.apache.hadoop.conf.Configuration
2525
import org.apache.hadoop.fs.{FileSystem, Path}
2626

2727
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
28+
import org.apache.spark.deploy.SparkSubmitUtils
2829
import org.apache.spark.internal.Logging
29-
import org.apache.spark.util.{MutableURLClassLoader, Utils}
3030

31-
private[deploy] object DependencyUtils extends Logging {
31+
case class IvyProperties(
32+
packagesExclusions: String,
33+
packages: String,
34+
repositories: String,
35+
ivyRepoPath: String,
36+
ivySettingsPath: String)
37+
38+
private[spark] object DependencyUtils extends Logging {
39+
40+
def getIvyProperties(): IvyProperties = {
41+
val Seq(packagesExclusions, packages, repositories, ivyRepoPath, ivySettingsPath) = Seq(
42+
"spark.jars.excludes",
43+
"spark.jars.packages",
44+
"spark.jars.repositories",
45+
"spark.jars.ivy",
46+
"spark.jars.ivySettings"
47+
).map(sys.props.get(_).orNull)
48+
IvyProperties(packagesExclusions, packages, repositories, ivyRepoPath, ivySettingsPath)
49+
}
50+
51+
private def isInvalidQueryString(tokens: Array[String]): Boolean = {
52+
tokens.length != 2 || StringUtils.isBlank(tokens(0)) || StringUtils.isBlank(tokens(1))
53+
}
54+
55+
/**
56+
* Parse URI query string's parameter value of `transitive` and `exclude`.
57+
* Other invalid parameters will be ignored.
58+
*
59+
* @param uri Ivy URI need to be downloaded.
60+
* @return Tuple value of parameter `transitive` and `exclude` value.
61+
*
62+
* 1. transitive: whether to download dependency jar of Ivy URI, default value is false
63+
* and this parameter value is case-sensitive. Invalid value will be treat as false.
64+
* Example: Input: exclude=org.mortbay.jetty:jetty&transitive=true
65+
* Output: true
66+
*
67+
* 2. exclude: comma separated exclusions to apply when resolving transitive dependencies,
68+
* consists of `group:module` pairs separated by commas.
69+
* Example: Input: excludeorg.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
70+
* Output: [org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http]
71+
*/
72+
private def parseQueryParams(uri: URI): (Boolean, String) = {
73+
val uriQuery = uri.getQuery
74+
if (uriQuery == null) {
75+
(false, "")
76+
} else {
77+
val mapTokens = uriQuery.split("&").map(_.split("="))
78+
if (mapTokens.exists(isInvalidQueryString)) {
79+
throw new IllegalArgumentException(
80+
s"Invalid query string in Ivy URI ${uri.toString}: $uriQuery")
81+
}
82+
val groupedParams = mapTokens.map(kv => (kv(0), kv(1))).groupBy(_._1)
83+
84+
// Parse transitive parameters (e.g., transitive=true) in an Ivy URI, default value is false
85+
val transitiveParams = groupedParams.get("transitive")
86+
if (transitiveParams.map(_.size).getOrElse(0) > 1) {
87+
logWarning("It's best to specify `transitive` parameter in ivy URI query only once." +
88+
" If there are multiple `transitive` parameter, we will select the last one")
89+
}
90+
val transitive =
91+
transitiveParams.flatMap(_.takeRight(1).map(_._2 == "true").headOption).getOrElse(false)
92+
93+
// Parse an excluded list (e.g., exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http)
94+
// in an Ivy URI. When download Ivy URI jar, Spark won't download transitive jar
95+
// in a excluded list.
96+
val exclusionList = groupedParams.get("exclude").map { params =>
97+
params.map(_._2).flatMap { excludeString =>
98+
val excludes = excludeString.split(",")
99+
if (excludes.map(_.split(":")).exists(isInvalidQueryString)) {
100+
throw new IllegalArgumentException(
101+
s"Invalid exclude string in Ivy URI ${uri.toString}:" +
102+
" expected 'org:module,org:module,..', found " + excludeString)
103+
}
104+
excludes
105+
}.mkString(",")
106+
}.getOrElse("")
107+
108+
val validParams = Set("transitive", "exclude")
109+
val invalidParams = groupedParams.keys.filterNot(validParams.contains).toSeq
110+
if (invalidParams.nonEmpty) {
111+
logWarning(s"Invalid parameters `${invalidParams.sorted.mkString(",")}` found " +
112+
s"in Ivy URI query `$uriQuery`.")
113+
}
114+
115+
(transitive, exclusionList)
116+
}
117+
}
118+
119+
/**
120+
* Download Ivy URI's dependency jars.
121+
*
122+
* @param uri Ivy URI need to be downloaded. The URI format should be:
123+
* `ivy://group:module:version[?query]`
124+
* Ivy URI query part format should be:
125+
* `parameter=value&parameter=value...`
126+
* Note that currently Ivy URI query part support two parameters:
127+
* 1. transitive: whether to download dependent jars related to your Ivy URI.
128+
* transitive=false or `transitive=true`, if not set, the default value is false.
129+
* 2. exclude: exclusion list when download Ivy URI jar and dependency jars.
130+
* The `exclude` parameter content is a ',' separated `group:module` pair string :
131+
* `exclude=group:module,group:module...`
132+
* @return Comma separated string list of jars downloaded.
133+
*/
134+
def resolveMavenDependencies(uri: URI): Seq[String] = {
135+
val ivyProperties = DependencyUtils.getIvyProperties()
136+
val authority = uri.getAuthority
137+
if (authority == null) {
138+
throw new IllegalArgumentException(
139+
s"Invalid Ivy URI authority in uri ${uri.toString}:" +
140+
" Expected 'org:module:version', found null.")
141+
}
142+
if (authority.split(":").length != 3) {
143+
throw new IllegalArgumentException(
144+
s"Invalid Ivy URI authority in uri ${uri.toString}:" +
145+
s" Expected 'org:module:version', found $authority.")
146+
}
147+
148+
val (transitive, exclusionList) = parseQueryParams(uri)
149+
150+
resolveMavenDependencies(
151+
transitive,
152+
exclusionList,
153+
authority,
154+
ivyProperties.repositories,
155+
ivyProperties.ivyRepoPath,
156+
Option(ivyProperties.ivySettingsPath)
157+
).split(",")
158+
}
32159

33160
def resolveMavenDependencies(
161+
packagesTransitive: Boolean,
34162
packagesExclusions: String,
35163
packages: String,
36164
repositories: String,
@@ -51,7 +179,8 @@ private[deploy] object DependencyUtils extends Logging {
51179
SparkSubmitUtils.buildIvySettings(Option(repositories), Option(ivyRepoPath))
52180
}
53181

54-
SparkSubmitUtils.resolveMavenCoordinates(packages, ivySettings, exclusions = exclusions)
182+
SparkSubmitUtils.resolveMavenCoordinates(packages, ivySettings,
183+
transitive = packagesTransitive, exclusions = exclusions)
55184
}
56185

57186
def resolveAndDownloadJars(

0 commit comments

Comments
 (0)