Skip to content

Commit 513f199

Browse files
authored
YARN-10334. Close clients in TestDistributedShell (#2571)
1 parent 3b77cf4 commit 513f199

File tree

2 files changed

+434
-399
lines changed
  • hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src

2 files changed

+434
-399
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.Vector;
3333
import java.util.Arrays;
3434
import java.util.Base64;
35+
import java.util.concurrent.atomic.AtomicBoolean;
3536

3637
import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
3738

@@ -253,6 +254,10 @@ public class Client {
253254
// Command line options
254255
private Options opts;
255256

257+
private final AtomicBoolean stopSignalReceived;
258+
private final AtomicBoolean isRunning;
259+
private final Object objectLock = new Object();
260+
256261
private static final String shellCommandPath = "shellCommands";
257262
private static final String shellArgsPath = "shellArgs";
258263
private static final String appMasterJarPath = "AppMaster.jar";
@@ -413,6 +418,8 @@ public Client(Configuration conf) throws Exception {
413418
opts.addOption("application_tags", true, "Application tags.");
414419
opts.addOption("localize_files", true, "List of files, separated by comma"
415420
+ " to be localized for the command");
421+
stopSignalReceived = new AtomicBoolean(false);
422+
isRunning = new AtomicBoolean(false);
416423
}
417424

418425
/**
@@ -670,8 +677,8 @@ public boolean init(String[] args) throws ParseException {
670677
* @throws YarnException
671678
*/
672679
public boolean run() throws IOException, YarnException {
673-
674680
LOG.info("Running Client");
681+
isRunning.set(true);
675682
yarnClient.start();
676683
// set the client start time.
677684
clientStartTime = System.currentTimeMillis();
@@ -1116,15 +1123,22 @@ private boolean monitorApplication(ApplicationId appId)
11161123

11171124
boolean res = false;
11181125
boolean needForceKill = false;
1119-
while (true) {
1126+
while (isRunning.get()) {
11201127
// Check app status every 1 second.
11211128
try {
1122-
Thread.sleep(APP_MONITOR_INTERVAL);
1129+
synchronized (objectLock) {
1130+
objectLock.wait(APP_MONITOR_INTERVAL);
1131+
}
1132+
needForceKill = stopSignalReceived.get();
11231133
} catch (InterruptedException e) {
11241134
LOG.warn("Thread sleep in monitoring loop interrupted");
11251135
// if the application is to be killed when client times out;
11261136
// then set needForceKill to true
11271137
break;
1138+
} finally {
1139+
if (needForceKill) {
1140+
break;
1141+
}
11281142
}
11291143

11301144
// Get application report for the appId we are interested in
@@ -1177,6 +1191,8 @@ private boolean monitorApplication(ApplicationId appId)
11771191
forceKillApplication(appId);
11781192
}
11791193

1194+
isRunning.set(false);
1195+
11801196
return res;
11811197
}
11821198

@@ -1388,4 +1404,31 @@ static Map<String, Long> parseResourcesString(String resourcesStr) {
13881404
}
13891405
return resources;
13901406
}
1407+
1408+
@VisibleForTesting
1409+
protected void sendStopSignal() {
1410+
LOG.info("Sending stop Signal to Client");
1411+
stopSignalReceived.set(true);
1412+
synchronized (objectLock) {
1413+
objectLock.notifyAll();
1414+
}
1415+
int waitCount = 0;
1416+
LOG.info("Waiting for Client to exit loop");
1417+
while (!isRunning.get()) {
1418+
try {
1419+
Thread.sleep(50);
1420+
} catch (InterruptedException ie) {
1421+
// do nothing
1422+
} finally {
1423+
waitCount++;
1424+
if (isRunning.get() || waitCount > 2000) {
1425+
break;
1426+
}
1427+
}
1428+
}
1429+
LOG.info("Stopping yarnClient within the Client");
1430+
yarnClient.stop();
1431+
yarnClient.waitForServiceToStop(clientTimeout);
1432+
LOG.info("done stopping Client");
1433+
}
13911434
}

0 commit comments

Comments
 (0)