Skip to content

Commit dd93dc0

Browse files
committed
test for shuffle service w/ NM restarts
1 parent d596969 commit dd93dc0

File tree

5 files changed

+151
-7
lines changed

5 files changed

+151
-7
lines changed

network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@
4848
public class ExternalShuffleBlockHandler extends RpcHandler {
4949
private final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockHandler.class);
5050

51-
private final ExternalShuffleBlockResolver blockManager;
51+
@VisibleForTesting
52+
final ExternalShuffleBlockResolver blockManager;
5253
private final OneForOneStreamManager streamManager;
5354

5455
public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile)

network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,16 @@ public class ExternalShuffleBlockResolver {
5050
private static final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockResolver.class);
5151

5252
// Map containing all registered executors' metadata.
53-
private final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors;
53+
@VisibleForTesting
54+
final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors;
5455

5556
// Single-threaded Java executor used to perform expensive recursive directory deletion.
5657
private final Executor directoryCleaner;
5758

5859
private final TransportConf conf;
5960

60-
private final File registeredExecutorFile;
61+
@VisibleForTesting
62+
final File registeredExecutorFile;
6163

6264
public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile)
6365
throws IOException, ClassNotFoundException {
@@ -233,11 +235,12 @@ static File getFile(String[] localDirs, int subDirsPerLocalDir, String filename)
233235
}
234236

235237
/** Simply encodes an executor's full ID, which is appId + execId. */
236-
private static class AppExecId implements Serializable {
238+
@VisibleForTesting
239+
static class AppExecId implements Serializable {
237240
final String appId;
238241
final String execId;
239242

240-
private AppExecId(String appId, String execId) {
243+
AppExecId(String appId, String execId) {
241244
this.appId = appId;
242245
this.execId = execId;
243246
}

network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.nio.ByteBuffer;
2323
import java.util.List;
2424

25+
import com.google.common.annotations.VisibleForTesting;
2526
import com.google.common.collect.Lists;
2627
import org.apache.hadoop.conf.Configuration;
2728
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -79,9 +80,11 @@ public class YarnShuffleService extends AuxiliaryService {
7980
private TransportServer shuffleServer = null;
8081

8182
// Handles registering executors and opening shuffle blocks
82-
private ExternalShuffleBlockHandler blockHandler;
83+
@VisibleForTesting
84+
ExternalShuffleBlockHandler blockHandler;
8385

84-
private File registeredExecutorFile;
86+
@VisibleForTesting
87+
File registeredExecutorFile;
8588

8689
public YarnShuffleService() {
8790
super("spark_shuffle");
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.network.shuffle
18+
19+
import java.io.File
20+
21+
import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.AppExecId
22+
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
23+
24+
/**
25+
* just a cheat to get package-visible members in tests
26+
*/
27+
object TestUtil {
28+
29+
def getBlockResolver(handler: ExternalShuffleBlockHandler): ExternalShuffleBlockResolver = {
30+
handler.blockManager
31+
}
32+
33+
def getExecutorInfo(
34+
appId: String,
35+
execId: String,
36+
resolver: ExternalShuffleBlockResolver
37+
): Option[ExecutorShuffleInfo] = {
38+
val id = new AppExecId(appId, execId)
39+
Option(resolver.executors.get(id))
40+
}
41+
42+
def registeredExecutorFile(resolver: ExternalShuffleBlockResolver): File = {
43+
resolver.registeredExecutorFile
44+
}
45+
46+
47+
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.network.yarn
18+
19+
import java.io.File
20+
21+
import org.apache.commons.io.FileUtils
22+
import org.apache.hadoop.yarn.api.records.ApplicationId
23+
import org.apache.hadoop.yarn.conf.YarnConfiguration
24+
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext
25+
import org.scalatest.Matchers
26+
27+
import org.apache.spark.SparkFunSuite
28+
import org.apache.spark.network.shuffle.TestUtil
29+
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
30+
31+
class YarnShuffleServiceSuite extends SparkFunSuite with Matchers {
32+
private[yarn] var yarnConfig: YarnConfiguration = new YarnConfiguration
33+
34+
{
35+
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, "spark_shuffle");
36+
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"),
37+
"org.apache.spark.network.yarn.YarnShuffleService");
38+
39+
yarnConfig.get("yarn.nodemanager.local-dirs").split(",").foreach { dir =>
40+
println("making dir " + dir)
41+
val d = new File(dir)
42+
if (d.exists()) {
43+
FileUtils.deleteDirectory(d)
44+
}
45+
FileUtils.forceMkdir(d)
46+
}
47+
}
48+
49+
test("executor state kept across NM restart") {
50+
val service: YarnShuffleService = new YarnShuffleService
51+
service.init(yarnConfig)
52+
val appId = ApplicationId.newInstance(0, 0)
53+
val appData: ApplicationInitializationContext =
54+
new ApplicationInitializationContext("user", appId, null)
55+
service.initializeApplication(appData)
56+
57+
val execStateFile = service.registeredExecutorFile
58+
execStateFile should not be (null)
59+
execStateFile.exists() should be (false)
60+
61+
val blockHandler = service.blockHandler
62+
val blockResolver = TestUtil.getBlockResolver(blockHandler)
63+
TestUtil.registeredExecutorFile(blockResolver) should be (execStateFile)
64+
65+
val shuffleInfo = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, "sort")
66+
blockResolver.registerExecutor(appId.toString, "exec-1", shuffleInfo)
67+
val executor = TestUtil.getExecutorInfo(appId.toString, "exec-1", blockResolver)
68+
executor should be (Some(shuffleInfo))
69+
70+
execStateFile.exists() should be (true)
71+
72+
// now we pretend the shuffle service goes down, and comes back up
73+
service.stop()
74+
75+
val s2: YarnShuffleService = new YarnShuffleService
76+
s2.init(yarnConfig)
77+
service.registeredExecutorFile should be (execStateFile)
78+
79+
val handler2 = service.blockHandler
80+
val resolver2 = TestUtil.getBlockResolver(handler2)
81+
82+
// until we initial the application, don't know about any executors
83+
84+
// TestUtil.getExecutorInfo(appId.toString, "exec-1", blockResolver) should be (None)
85+
86+
s2.initializeApplication(appData)
87+
val ex2 = TestUtil.getExecutorInfo(appId.toString, "exec-1", resolver2)
88+
ex2 should be (Some(shuffleInfo))
89+
}
90+
}

0 commit comments

Comments
 (0)