Skip to content

Commit b9d2ced

Browse files
committed
incomplete setup for external shuffle service tests
1 parent 85a50a6 commit b9d2ced

File tree

2 files changed

+311
-0
lines changed

2 files changed

+311
-0
lines changed

yarn/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,12 @@
3939
<artifactId>spark-core_${scala.binary.version}</artifactId>
4040
<version>${project.version}</version>
4141
</dependency>
42+
<dependency>
43+
<groupId>org.apache.spark</groupId>
44+
<artifactId>spark-network-yarn_${scala.binary.version}</artifactId>
45+
<version>${project.version}</version>
46+
<scope>test</scope>
47+
</dependency>
4248
<dependency>
4349
<groupId>org.apache.spark</groupId>
4450
<artifactId>spark-core_${scala.binary.version}</artifactId>
Lines changed: 305 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,305 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.yarn
19+
20+
import java.io.{File, FileOutputStream, OutputStreamWriter}
21+
import java.net.URL
22+
import java.util.Properties
23+
import java.util.concurrent.TimeUnit
24+
25+
import scala.collection.JavaConversions._
26+
import scala.collection.mutable
27+
28+
import com.google.common.base.Charsets.UTF_8
29+
import com.google.common.io.ByteStreams
30+
import com.google.common.io.Files
31+
import org.apache.hadoop.yarn.conf.YarnConfiguration
32+
import org.apache.hadoop.yarn.server.MiniYARNCluster
33+
import org.scalatest.{BeforeAndAfterAll, Matchers}
34+
35+
import org.apache.spark._
36+
import org.apache.spark.scheduler.cluster.ExecutorInfo
37+
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationStart,
38+
SparkListenerExecutorAdded}
39+
import org.apache.spark.util.Utils
40+
41+
/**
42+
* Integration tests for YARN; these tests use a mini Yarn cluster to run Spark-on-YARN
43+
* applications, and require the Spark assembly to be built before they can be successfully
44+
* run.
45+
*/
46+
class ExternalShuffleSuite extends SparkFunSuite with BeforeAndAfterAll with Matchers with Logging {
47+
48+
// log4j configuration for the YARN containers, so that their output is collected
49+
// by YARN instead of trying to overwrite unit-tests.log.
50+
private val LOG4J_CONF = """
51+
|log4j.rootCategory=DEBUG, console
52+
|log4j.appender.console=org.apache.log4j.ConsoleAppender
53+
|log4j.appender.console.target=System.err
54+
|log4j.appender.console.layout=org.apache.log4j.PatternLayout
55+
|log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
56+
""".stripMargin
57+
58+
private val TEST_PYFILE = """
59+
|import mod1, mod2
60+
|import sys
61+
|from operator import add
62+
|
63+
|from pyspark import SparkConf , SparkContext
64+
|if __name__ == "__main__":
65+
| if len(sys.argv) != 2:
66+
| print >> sys.stderr, "Usage: test.py [result file]"
67+
| exit(-1)
68+
| sc = SparkContext(conf=SparkConf())
69+
| status = open(sys.argv[1],'w')
70+
| result = "failure"
71+
| rdd = sc.parallelize(range(10)).map(lambda x: x * mod1.func() * mod2.func())
72+
| cnt = rdd.count()
73+
| if cnt == 10:
74+
| result = "success"
75+
| status.write(result)
76+
| status.close()
77+
| sc.stop()
78+
""".stripMargin
79+
80+
private val TEST_PYMODULE = """
81+
|def func():
82+
| return 42
83+
""".stripMargin
84+
85+
private var yarnCluster: MiniYARNCluster = _
86+
private var tempDir: File = _
87+
private var fakeSparkJar: File = _
88+
private var hadoopConfDir: File = _
89+
private var logConfDir: File = _
90+
91+
override def beforeAll() {
92+
super.beforeAll()
93+
94+
tempDir = Utils.createTempDir()
95+
logConfDir = new File(tempDir, "log4j")
96+
logConfDir.mkdir()
97+
System.setProperty("SPARK_YARN_MODE", "true")
98+
99+
val logConfFile = new File(logConfDir, "log4j.properties")
100+
Files.write(LOG4J_CONF, logConfFile, UTF_8)
101+
102+
yarnCluster = new MiniYARNCluster(getClass().getName(), 1, 1, 1)
103+
val yarnConfig = new YarnConfiguration()
104+
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, "spark_shuffle")
105+
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"),
106+
"org.apache.spark.network.yarn.YarnShuffleService")
107+
yarnCluster.init(yarnConfig)
108+
yarnCluster.start()
109+
110+
// There's a race in MiniYARNCluster in which start() may return before the RM has updated
111+
// its address in the configuration. You can see this in the logs by noticing that when
112+
// MiniYARNCluster prints the address, it still has port "0" assigned, although later the
113+
// test works sometimes:
114+
//
115+
// INFO MiniYARNCluster: MiniYARN ResourceManager address: blah:0
116+
//
117+
// That log message prints the contents of the RM_ADDRESS config variable. If you check it
118+
// later on, it looks something like this:
119+
//
120+
// INFO YarnClusterSuite: RM address in configuration is blah:42631
121+
//
122+
// This hack loops for a bit waiting for the port to change, and fails the test if it hasn't
123+
// done so in a timely manner (defined to be 10 seconds).
124+
val config = yarnCluster.getConfig()
125+
val deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10)
126+
while (config.get(YarnConfiguration.RM_ADDRESS).split(":")(1) == "0") {
127+
if (System.currentTimeMillis() > deadline) {
128+
throw new IllegalStateException("Timed out waiting for RM to come up.")
129+
}
130+
logDebug("RM address still not set in configuration, waiting...")
131+
TimeUnit.MILLISECONDS.sleep(100)
132+
}
133+
134+
logInfo(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}")
135+
136+
fakeSparkJar = File.createTempFile("sparkJar", null, tempDir)
137+
hadoopConfDir = new File(tempDir, Client.LOCALIZED_CONF_DIR)
138+
assert(hadoopConfDir.mkdir())
139+
File.createTempFile("token", ".txt", hadoopConfDir)
140+
}
141+
142+
override def afterAll() {
143+
yarnCluster.stop()
144+
System.clearProperty("SPARK_YARN_MODE")
145+
super.afterAll()
146+
}
147+
148+
test("external shuffle service") {
149+
val result = File.createTempFile("result", null, tempDir)
150+
runSpark(
151+
false,
152+
mainClassName(ExternalShuffleDriver.getClass),
153+
appArgs = Seq(result.getAbsolutePath()),
154+
extraConf = Map()
155+
// extraConf = Map("spark.shuffle.service.enabled" -> "true")
156+
)
157+
checkResult(result)
158+
}
159+
160+
private def runSpark(
161+
clientMode: Boolean,
162+
klass: String,
163+
appArgs: Seq[String] = Nil,
164+
sparkArgs: Seq[String] = Nil,
165+
extraClassPath: Seq[String] = Nil,
166+
extraJars: Seq[String] = Nil,
167+
extraConf: Map[String, String] = Map()): Unit = {
168+
val master = if (clientMode) "yarn-client" else "yarn-cluster"
169+
val props = new Properties()
170+
171+
props.setProperty("spark.yarn.jar", "local:" + fakeSparkJar.getAbsolutePath())
172+
173+
val childClasspath = logConfDir.getAbsolutePath() +
174+
File.pathSeparator +
175+
sys.props("java.class.path") +
176+
File.pathSeparator +
177+
extraClassPath.mkString(File.pathSeparator)
178+
props.setProperty("spark.driver.extraClassPath", childClasspath)
179+
props.setProperty("spark.executor.extraClassPath", childClasspath)
180+
181+
// SPARK-4267: make sure java options are propagated correctly.
182+
props.setProperty("spark.driver.extraJavaOptions", "-Dfoo=\"one two three\"")
183+
props.setProperty("spark.executor.extraJavaOptions", "-Dfoo=\"one two three\"")
184+
185+
yarnCluster.getConfig().foreach { e =>
186+
props.setProperty("spark.hadoop." + e.getKey(), e.getValue())
187+
}
188+
189+
sys.props.foreach { case (k, v) =>
190+
if (k.startsWith("spark.")) {
191+
props.setProperty(k, v)
192+
}
193+
}
194+
195+
extraConf.foreach { case (k, v) => props.setProperty(k, v) }
196+
197+
val propsFile = File.createTempFile("spark", ".properties", tempDir)
198+
val writer = new OutputStreamWriter(new FileOutputStream(propsFile), UTF_8)
199+
props.store(writer, "Spark properties.")
200+
writer.close()
201+
202+
val extraJarArgs = if (!extraJars.isEmpty()) Seq("--jars", extraJars.mkString(",")) else Nil
203+
val mainArgs =
204+
if (klass.endsWith(".py")) {
205+
Seq(klass)
206+
} else {
207+
Seq("--class", klass, fakeSparkJar.getAbsolutePath())
208+
}
209+
val argv =
210+
Seq(
211+
new File(sys.props("spark.test.home"), "bin/spark-submit").getAbsolutePath(),
212+
"--master", master,
213+
"--num-executors", "1",
214+
"--properties-file", propsFile.getAbsolutePath()) ++
215+
extraJarArgs ++
216+
sparkArgs ++
217+
mainArgs ++
218+
appArgs
219+
220+
Utils.executeAndGetOutput(argv,
221+
extraEnvironment = Map("YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath()))
222+
}
223+
224+
/**
225+
* This is a workaround for an issue with yarn-cluster mode: the Client class will not provide
226+
* any sort of error when the job process finishes successfully, but the job itself fails. So
227+
* the tests enforce that something is written to a file after everything is ok to indicate
228+
* that the job succeeded.
229+
*/
230+
private def checkResult(result: File): Unit = {
231+
checkResult(result, "success")
232+
}
233+
234+
private def checkResult(result: File, expected: String): Unit = {
235+
var resultString = Files.toString(result, UTF_8)
236+
resultString should be (expected)
237+
}
238+
239+
private def mainClassName(klass: Class[_]): String = {
240+
klass.getName().stripSuffix("$")
241+
}
242+
243+
}
244+
245+
private object ExternalShuffleDriver extends Logging with Matchers {
246+
247+
val WAIT_TIMEOUT_MILLIS = 10000
248+
249+
def main(args: Array[String]): Unit = {
250+
if (args.length != 1) {
251+
// scalastyle:off println
252+
System.err.println(
253+
s"""
254+
|Invalid command line: ${args.mkString(" ")}
255+
|
256+
|Usage: ExternalShuffleDriver [result file]
257+
""".stripMargin)
258+
// scalastyle:on println
259+
System.exit(1)
260+
}
261+
262+
val sc = new SparkContext(new SparkConf()
263+
.set("spark.extraListeners", classOf[SaveExecutorInfo].getName)
264+
.setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns"))
265+
val conf = sc.getConf
266+
val status = new File(args(0))
267+
var result = "failure"
268+
try {
269+
val data = sc.parallelize(1 to 4, 4).collect().toSet
270+
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
271+
data should be (Set(1, 2, 3, 4))
272+
result = "success"
273+
} finally {
274+
sc.stop()
275+
Files.write(result, status, UTF_8)
276+
}
277+
278+
// verify log urls are present
279+
val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo]
280+
assert(listeners.size === 1)
281+
val listener = listeners(0)
282+
val executorInfos = listener.addedExecutorInfos.values
283+
assert(executorInfos.nonEmpty)
284+
executorInfos.foreach { info =>
285+
assert(info.logUrlMap.nonEmpty)
286+
}
287+
288+
// If we are running in yarn-cluster mode, verify that driver logs links and present and are
289+
// in the expected format.
290+
if (conf.get("spark.master") == "yarn-cluster") {
291+
assert(listener.driverLogs.nonEmpty)
292+
val driverLogs = listener.driverLogs.get
293+
assert(driverLogs.size === 2)
294+
assert(driverLogs.containsKey("stderr"))
295+
assert(driverLogs.containsKey("stdout"))
296+
val urlStr = driverLogs("stderr")
297+
// Ensure that this is a valid URL, else this will throw an exception
298+
new URL(urlStr)
299+
val containerId = YarnSparkHadoopUtil.get.getContainerId
300+
val user = Utils.getCurrentUserName()
301+
assert(urlStr.endsWith(s"/node/containerlogs/$containerId/$user/stderr?start=-4096"))
302+
}
303+
}
304+
305+
}

0 commit comments

Comments
 (0)