Skip to content

Commit 1a4c8f7

Browse files
sarutakMridul Muralidharan
authored andcommitted
[SPARK-32119][CORE] ExecutorPlugin doesn't work with Standalone Cluster and Kubernetes with --jars
### What changes were proposed in this pull request? This PR changes Executor to load jars and files added by --jars and --files on Executor initialization. To avoid downloading those jars/files twice, they are assosiated with `startTime` as their uploaded timestamp. ### Why are the changes needed? ExecutorPlugin can't work with Standalone Cluster and Kubernetes when a jar which contains plugins and files used by the plugins are added by --jars and --files option with spark-submit. This is because jars and files added by --jars and --files are not loaded on Executor initialization. I confirmed it works with YARN because jars/files are distributed as distributed cache. ### Does this PR introduce _any_ user-facing change? Yes. jars/files added by --jars and --files are downloaded on each executor on initialization. ### How was this patch tested? Added a new testcase. Closes apache#28939 from sarutak/fix-plugin-issue. Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
1 parent c6be207 commit 1a4c8f7

File tree

4 files changed

+122
-9
lines changed

4 files changed

+122
-9
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,7 @@ class SparkContext(config: SparkConf) extends Logging {
384384
try {
385385
_conf = config.clone()
386386
_conf.validateSettings()
387+
_conf.set("spark.app.startTime", startTime.toString)
387388

388389
if (!_conf.contains("spark.master")) {
389390
throw new SparkException("A master URL must be set in your configuration")
@@ -492,11 +493,17 @@ class SparkContext(config: SparkConf) extends Logging {
492493

493494
// Add each JAR given through the constructor
494495
if (jars != null) {
495-
jars.foreach(addJar)
496+
jars.foreach(jar => addJar(jar, true))
497+
if (addedJars.nonEmpty) {
498+
_conf.set("spark.app.initial.jar.urls", addedJars.keys.toSeq.mkString(","))
499+
}
496500
}
497501

498502
if (files != null) {
499-
files.foreach(addFile)
503+
files.foreach(file => addFile(file, false, true))
504+
if (addedFiles.nonEmpty) {
505+
_conf.set("spark.app.initial.file.urls", addedFiles.keys.toSeq.mkString(","))
506+
}
500507
}
501508

502509
_executorMemory = _conf.getOption(EXECUTOR_MEMORY.key)
@@ -1500,7 +1507,7 @@ class SparkContext(config: SparkConf) extends Logging {
15001507
* @note A path can be added only once. Subsequent additions of the same path are ignored.
15011508
*/
15021509
def addFile(path: String): Unit = {
1503-
addFile(path, false)
1510+
addFile(path, false, false)
15041511
}
15051512

15061513
/**
@@ -1522,6 +1529,10 @@ class SparkContext(config: SparkConf) extends Logging {
15221529
* @note A path can be added only once. Subsequent additions of the same path are ignored.
15231530
*/
15241531
def addFile(path: String, recursive: Boolean): Unit = {
1532+
addFile(path, recursive, false)
1533+
}
1534+
1535+
private def addFile(path: String, recursive: Boolean, addedOnSubmit: Boolean): Unit = {
15251536
val uri = new Path(path).toUri
15261537
val schemeCorrectedURI = uri.getScheme match {
15271538
case null => new File(path).getCanonicalFile.toURI
@@ -1559,7 +1570,7 @@ class SparkContext(config: SparkConf) extends Logging {
15591570
path
15601571
}
15611572
}
1562-
val timestamp = System.currentTimeMillis
1573+
val timestamp = if (addedOnSubmit) startTime else System.currentTimeMillis
15631574
if (addedFiles.putIfAbsent(key, timestamp).isEmpty) {
15641575
logInfo(s"Added file $path at $key with timestamp $timestamp")
15651576
// Fetch the file locally so that closures which are run on the driver can still use the
@@ -1569,7 +1580,7 @@ class SparkContext(config: SparkConf) extends Logging {
15691580
postEnvironmentUpdate()
15701581
} else {
15711582
logWarning(s"The path $path has been added already. Overwriting of added paths " +
1572-
"is not supported in the current version.")
1583+
"is not supported in the current version.")
15731584
}
15741585
}
15751586

@@ -1840,6 +1851,10 @@ class SparkContext(config: SparkConf) extends Logging {
18401851
* @note A path can be added only once. Subsequent additions of the same path are ignored.
18411852
*/
18421853
def addJar(path: String): Unit = {
1854+
addJar(path, false)
1855+
}
1856+
1857+
private def addJar(path: String, addedOnSubmit: Boolean): Unit = {
18431858
def addLocalJarFile(file: File): String = {
18441859
try {
18451860
if (!file.exists()) {
@@ -1904,7 +1919,7 @@ class SparkContext(config: SparkConf) extends Logging {
19041919
}
19051920
}
19061921
if (key != null) {
1907-
val timestamp = System.currentTimeMillis
1922+
val timestamp = if (addedOnSubmit) startTime else System.currentTimeMillis
19081923
if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
19091924
logInfo(s"Added JAR $path at $key with timestamp $timestamp")
19101925
postEnvironmentUpdate()

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,20 @@ private[spark] class Executor(
220220

221221
heartbeater.start()
222222

223+
private val appStartTime = conf.getLong("spark.app.startTime", 0)
224+
225+
// To allow users to distribute plugins and their required files
226+
// specified by --jars and --files on application submission, those jars/files should be
227+
// downloaded and added to the class loader via updateDependencies.
228+
// This should be done before plugin initialization below
229+
// because executors search plugins from the class loader and initialize them.
230+
private val Seq(initialUserJars, initialUserFiles) = Seq("jar", "file").map { key =>
231+
conf.getOption(s"spark.app.initial.$key.urls").map { urls =>
232+
Map(urls.split(",").map(url => (url, appStartTime)): _*)
233+
}.getOrElse(Map.empty)
234+
}
235+
updateDependencies(initialUserFiles, initialUserJars)
236+
223237
// Plugins need to load using a class loader that includes the executor's user classpath.
224238
// Plugins also needs to be initialized after the heartbeater started
225239
// to avoid blocking to send heartbeat (see SPARK-32175).

core/src/test/java/test/org/apache/spark/JavaSparkContextSuite.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,21 +28,24 @@
2828

2929
import org.apache.spark.api.java.*;
3030
import org.apache.spark.*;
31+
import org.apache.spark.util.Utils;
3132

3233
/**
3334
* Java apps can use both Java-friendly JavaSparkContext and Scala SparkContext.
3435
*/
3536
public class JavaSparkContextSuite implements Serializable {
3637

3738
@Test
38-
public void javaSparkContext() {
39+
public void javaSparkContext() throws IOException {
40+
File tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "spark");
41+
String dummyJarFile = File.createTempFile(tempDir.toString(), "jarFile").toString();
3942
String[] jars = new String[] {};
4043
java.util.Map<String, String> environment = new java.util.HashMap<>();
4144

4245
new JavaSparkContext(new SparkConf().setMaster("local").setAppName("name")).stop();
4346
new JavaSparkContext("local", "name", new SparkConf()).stop();
4447
new JavaSparkContext("local", "name").stop();
45-
new JavaSparkContext("local", "name", "sparkHome", "jarFile").stop();
48+
new JavaSparkContext("local", "name", "sparkHome", dummyJarFile).stop();
4649
new JavaSparkContext("local", "name", "sparkHome", jars).stop();
4750
new JavaSparkContext("local", "name", "sparkHome", jars, environment).stop();
4851
}

core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -570,7 +570,8 @@ class SparkSubmitSuite
570570
}
571571
}
572572

573-
val clArgs2 = Seq("--class", "org.SomeClass", "thejar.jar")
573+
val dummyJarFile = TestUtils.createJarWithClasses(Seq.empty)
574+
val clArgs2 = Seq("--class", "org.SomeClass", dummyJarFile.toString)
574575
val appArgs2 = new SparkSubmitArguments(clArgs2)
575576
val (_, _, conf2, _) = submit.prepareSubmitEnvironment(appArgs2)
576577
assert(!conf2.contains(UI_SHOW_CONSOLE_PROGRESS))
@@ -1220,6 +1221,86 @@ class SparkSubmitSuite
12201221
testRemoteResources(enableHttpFs = true, forceDownloadSchemes = Seq("*"))
12211222
}
12221223

1224+
test("SPARK-32119: Jars and files should be loaded when Executors launch for plugins") {
1225+
val tempDir = Utils.createTempDir()
1226+
val tempFileName = "test.txt"
1227+
val tempFile = new File(tempDir, tempFileName)
1228+
1229+
// scalastyle:off println
1230+
Utils.tryWithResource {
1231+
new PrintWriter(tempFile)
1232+
} { writer =>
1233+
writer.println("SparkPluginTest")
1234+
}
1235+
// scalastyle:on println
1236+
1237+
val sparkPluginCodeBody =
1238+
"""
1239+
|@Override
1240+
|public org.apache.spark.api.plugin.ExecutorPlugin executorPlugin() {
1241+
| return new TestExecutorPlugin();
1242+
|}
1243+
|
1244+
|@Override
1245+
|public org.apache.spark.api.plugin.DriverPlugin driverPlugin() { return null; }
1246+
""".stripMargin
1247+
val executorPluginCodeBody =
1248+
s"""
1249+
|@Override
1250+
|public void init(
1251+
| org.apache.spark.api.plugin.PluginContext ctx,
1252+
| java.util.Map<String, String> extraConf) {
1253+
| String str = null;
1254+
| try (java.io.BufferedReader reader =
1255+
| new java.io.BufferedReader(new java.io.InputStreamReader(
1256+
| new java.io.FileInputStream("$tempFileName")))) {
1257+
| str = reader.readLine();
1258+
| } catch (java.io.IOException e) {
1259+
| throw new RuntimeException(e);
1260+
| } finally {
1261+
| assert str == "SparkPluginTest";
1262+
| }
1263+
|}
1264+
""".stripMargin
1265+
1266+
val compiledExecutorPlugin = TestUtils.createCompiledClass(
1267+
"TestExecutorPlugin",
1268+
tempDir,
1269+
"",
1270+
null,
1271+
Seq.empty,
1272+
Seq("org.apache.spark.api.plugin.ExecutorPlugin"),
1273+
executorPluginCodeBody)
1274+
1275+
val thisClassPath =
1276+
sys.props("java.class.path").split(File.pathSeparator).map(p => new File(p).toURI.toURL)
1277+
val compiledSparkPlugin = TestUtils.createCompiledClass(
1278+
"TestSparkPlugin",
1279+
tempDir,
1280+
"",
1281+
null,
1282+
Seq(tempDir.toURI.toURL) ++ thisClassPath,
1283+
Seq("org.apache.spark.api.plugin.SparkPlugin"),
1284+
sparkPluginCodeBody)
1285+
1286+
val jarUrl = TestUtils.createJar(
1287+
Seq(compiledSparkPlugin, compiledExecutorPlugin),
1288+
new File(tempDir, "testplugin.jar"))
1289+
1290+
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
1291+
val unusedFile = Files.createTempFile(tempDir.toPath, "unused", null)
1292+
val args = Seq(
1293+
"--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"),
1294+
"--name", "testApp",
1295+
"--master", "local-cluster[1,1,1024]",
1296+
"--conf", "spark.plugins=TestSparkPlugin",
1297+
"--conf", "spark.ui.enabled=false",
1298+
"--jars", jarUrl.toString + "," + unusedJar.toString,
1299+
"--files", tempFile.toString + "," + unusedFile.toString,
1300+
unusedJar.toString)
1301+
runSparkSubmit(args)
1302+
}
1303+
12231304
private def testRemoteResources(
12241305
enableHttpFs: Boolean,
12251306
forceDownloadSchemes: Seq[String] = Nil): Unit = {

0 commit comments

Comments
 (0)