Skip to content

Commit a8550c3

Browse files
committed
Introduce tony.application.x.untracked.timeout to solve partial jobs unfinished
1 parent 7622a6b commit a8550c3

File tree

7 files changed

+178
-12
lines changed

7 files changed

+178
-12
lines changed

tony-core/src/main/java/com/linkedin/tony/ApplicationMaster.java

Lines changed: 63 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,8 @@ public class ApplicationMaster {
192192
private String frameworkType;
193193
private Framework.ApplicationMasterAdapter amRuntimeAdapter;
194194

195+
private long allTrackedTaskFinishedTime;
196+
195197
private ApplicationMaster() {
196198
hdfsConf = new Configuration(false);
197199
yarnConf = new Configuration(false);
@@ -620,6 +622,8 @@ private void reset() {
620622
session = sessionBuilder.build();
621623
applicationRpcServer.reset();
622624
session.sessionId += 1;
625+
626+
allTrackedTaskFinishedTime = 0L;
623627
}
624628

625629
/**
@@ -687,18 +691,10 @@ private boolean monitor() {
687691
break;
688692
}
689693

690-
int numTotalTrackedTasks = session.getTotalTrackedTasks();
691-
if (numTotalTrackedTasks > 0) {
692-
int numCompletedTrackedTasks = session.getNumCompletedTrackedTasks();
693-
if (numCompletedTrackedTasks == numTotalTrackedTasks) {
694-
Utils.printCompletedTrackedTasks(numCompletedTrackedTasks, numTotalTrackedTasks);
695-
break;
696-
}
697-
698-
// Reduce logging frequency to every 100s.
699-
if (counter % 20 == 1) {
700-
Utils.printCompletedTrackedTasks(numCompletedTrackedTasks, numTotalTrackedTasks);
701-
}
694+
// Handle job exit when all tracked tasks finished
695+
if (allTrackedTaskFinished(counter)) {
696+
LOG.info("Application finished due to all tracked executors finished.");
697+
break;
702698
}
703699

704700
// Pause before refresh job status
@@ -718,6 +714,61 @@ private boolean monitor() {
718714
return status == FinalApplicationStatus.SUCCEEDED;
719715
}
720716

717+
private boolean allTrackedTaskFinished(int counter) {
718+
int numTotalTrackedTasks = session.getTotalTrackedTasks();
719+
if (numTotalTrackedTasks > 0) {
720+
int numCompletedTrackedTasks = session.getNumCompletedTrackedTasks();
721+
if (numCompletedTrackedTasks == numTotalTrackedTasks) {
722+
if (allTrackedTaskFinishedTime == 0L) {
723+
allTrackedTaskFinishedTime = System.currentTimeMillis();
724+
}
725+
726+
if (!waitUntrackedTaskWithTimeout()) {
727+
Utils.printCompletedTrackedTasks(numCompletedTrackedTasks, numTotalTrackedTasks);
728+
return true;
729+
}
730+
}
731+
732+
// Reduce logging frequency to every 100s.
733+
if (counter % 20 == 1) {
734+
Utils.printCompletedTrackedTasks(numCompletedTrackedTasks, numTotalTrackedTasks);
735+
}
736+
}
737+
738+
return false;
739+
}
740+
741+
private boolean waitUntrackedTaskWithTimeout() {
742+
Map<String, Long> untrackedTaskTimeoutIndex = Utils.getUntrackedTaskWithTimeouts(tonyConf);
743+
if (untrackedTaskTimeoutIndex == null || untrackedTaskTimeoutIndex.size() <= 0) {
744+
return false;
745+
}
746+
747+
int finishedJobtypeSize = 0;
748+
for (Map.Entry<String, Long> untrackedTask : untrackedTaskTimeoutIndex.entrySet()) {
749+
String jobtype = untrackedTask.getKey();
750+
Long timeout = untrackedTask.getValue();
751+
752+
// When untracked task finished, ignore timeout
753+
if (session.isAllTasksCompletedWithJobtype(jobtype)) {
754+
LOG.info("Untracked job type: [" + jobtype + "] has finished.");
755+
finishedJobtypeSize += 1;
756+
continue;
757+
}
758+
759+
// If waited time > timeout, it will return false directly.
760+
if (System.currentTimeMillis() - allTrackedTaskFinishedTime > timeout) {
761+
LOG.info("Untracked job type: [" + jobtype + "] exceed timeout of " + timeout + " and app will finish.");
762+
return false;
763+
}
764+
}
765+
766+
if (finishedJobtypeSize == untrackedTaskTimeoutIndex.size()) {
767+
return false;
768+
}
769+
return true;
770+
}
771+
721772
/**
722773
* Returns the tasks whose containers have launched but not called {@link ApplicationRpc#registerWorkerSpec} yet.
723774
*/

tony-core/src/main/java/com/linkedin/tony/TonyConfigurationKeys.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,10 @@ public static String getContainerDockerMountKey() {
270270
public static final String UNTRACKED_JOBTYPES = TONY_APPLICATION_PREFIX + "untracked.jobtypes";
271271
public static final String UNTRACKED_JOBTYPES_DEFAULT = "ps";
272272

273+
// Specified untracked job type timeout when all tracked tasks finished
274+
public static final String UNTRACKED_JOBTYPE_TIMEOUT_REGEX = TONY_APPLICATION_PREFIX + "([a-z]+)\\.untracked.timeout";
275+
public static final long UNTRACKED_JOBTYPE_TIMEOUT_DEFAULT = 0;
276+
273277
// Job types that we don't wait to finish and ignore its failure.
274278
public static final String SIDECAR_JOBTYPES = TONY_APPLICATION_PREFIX + "sidecar.jobtypes";
275279
public static final String DEFAULT_SIDECAR_JOBTYPES = SIDECAR_TB_ROLE_NAME;

tony-core/src/main/java/com/linkedin/tony/TonySession.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Objects;
2121
import java.util.Set;
2222
import java.util.concurrent.ConcurrentHashMap;
23+
2324
import org.apache.commons.logging.Log;
2425
import org.apache.commons.logging.LogFactory;
2526
import org.apache.hadoop.conf.Configuration;
@@ -192,6 +193,13 @@ public int getNumCompletedTrackedTasks() {
192193
.flatMap(entry -> Arrays.stream(entry.getValue())).filter(task -> task != null && task.isCompleted()).count();
193194
}
194195

196+
public boolean isAllTasksCompletedWithJobtype(String jobType) {
197+
long taskSize = jobTasks.entrySet().stream().filter(entry -> entry.getKey().equals(jobType)).count();
198+
long finishedSize = jobTasks.entrySet().stream().filter(entry -> entry.getKey().equals(jobType))
199+
.flatMap(entry -> Arrays.stream(entry.getValue())).filter(task -> task != null && task.isCompleted()).count();
200+
return finishedSize == taskSize;
201+
}
202+
195203
public int getNumFailedTasks() {
196204
return (int) jobTasks.values().stream().flatMap(arr -> Arrays.stream(arr)).filter(task -> task != null && task.isFailed()).count();
197205
}

tony-core/src/main/java/com/linkedin/tony/util/Utils.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,19 @@ public static int getNumTotalTasks(Configuration conf) {
459459
.sum();
460460
}
461461

462+
public static Map<String, Long> getUntrackedTaskWithTimeouts(Configuration conf) {
463+
Map<String, String> map = conf.getValByRegex(TonyConfigurationKeys.UNTRACKED_JOBTYPE_TIMEOUT_REGEX);
464+
return map.entrySet().stream()
465+
.collect(Collectors.toMap(entry -> {
466+
Matcher matcher = Pattern.compile(TonyConfigurationKeys.UNTRACKED_JOBTYPE_TIMEOUT_REGEX).matcher(entry.getKey());
467+
if (matcher.matches()) {
468+
return matcher.group(1);
469+
} else {
470+
return null;
471+
}
472+
}, entry -> Long.valueOf(entry.getValue())));
473+
}
474+
462475
/**
463476
* Extracts TensorFlow job name from configuration key of the form "tony.*.instances".
464477
* @param confKey Name of the configuration key

tony-core/src/test/java/com/linkedin/tony/TestTonyE2E.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,77 @@ public void testTonyAllocationTimeoutShouldFail() throws ParseException, IOExcep
603603
Assert.assertEquals(exitCode, -1);
604604
}
605605

606+
/**
607+
* Test untracked tasks specified timeout of 60s.
608+
* The untracked task will block all time and it will finished until tracked task finished 60s
609+
*/
610+
@Test
611+
public void testUntrackedTaskTimeout60sShouldPass() throws ParseException, IOException {
612+
client.init(new String[]{
613+
"--src_dir", "tony-core/src/test/resources/scripts",
614+
"--hdfs_classpath", libPath,
615+
"--shell_env", "ENV_CHECK=ENV_CHECK",
616+
"--container_env", Constants.SKIP_HADOOP_PATH + "=true",
617+
"--python_venv", "tony-core/src/test/resources/test.zip",
618+
"--conf", "tony.ps.instances=1",
619+
"--conf", "tony.worker.instances=1",
620+
"--conf", "tony.ps.command=python sleep_all.py",
621+
"--conf", "tony.worker.command=python sleep_30.py",
622+
"--conf", "tony.application.untracked.jobtypes=ps",
623+
"--conf", "tony.application.ps.untracked.timeout=60000"
624+
});
625+
626+
int exitCode = client.start();
627+
Assert.assertEquals(exitCode, 0);
628+
}
629+
630+
631+
/**
632+
* Test untracked tasks specified timeout of 60s.
633+
* But the untracked task will exit 0 quickly
634+
* This case to verify the untracked-timeout-mechanism no block job finish
635+
*/
636+
@Test
637+
public void testUntrackedTaskFinishedShouldPass() throws ParseException, IOException {
638+
client.init(new String[]{
639+
"--src_dir", "tony-core/src/test/resources/scripts",
640+
"--hdfs_classpath", libPath,
641+
"--shell_env", "ENV_CHECK=ENV_CHECK",
642+
"--container_env", Constants.SKIP_HADOOP_PATH + "=true",
643+
"--python_venv", "tony-core/src/test/resources/test.zip",
644+
"--conf", "tony.ps.instances=1",
645+
"--conf", "tony.worker.instances=1",
646+
"--conf", "tony.ps.command=python exit_0.py",
647+
"--conf", "tony.worker.command=python sleep_30.py",
648+
"--conf", "tony.application.untracked.jobtypes=ps",
649+
"--conf", "tony.application.ps.untracked.timeout=60000"
650+
});
651+
652+
int exitCode = client.start();
653+
Assert.assertEquals(exitCode, 0);
654+
}
655+
656+
@Test
657+
public void testUntrackedTaskFailedShouldFail() throws ParseException, IOException {
658+
client.init(new String[]{
659+
"--src_dir", "tony-core/src/test/resources/scripts",
660+
"--hdfs_classpath", libPath,
661+
"--shell_env", "ENV_CHECK=ENV_CHECK",
662+
"--container_env", Constants.SKIP_HADOOP_PATH + "=true",
663+
"--python_venv", "tony-core/src/test/resources/test.zip",
664+
"--conf", "tony.chief.instances=1",
665+
"--conf", "tony.ps.instances=1",
666+
"--conf", "tony.worker.instances=1",
667+
"--conf", "tony.ps.command=python sleep_all.py",
668+
"--conf", "tony.worker.command=python exit_1.py",
669+
"--conf", "tony.chief.command=python sleep_30.py",
670+
"--conf", "tony.application.untracked.jobtypes=ps,worker"
671+
});
672+
673+
int exitCode = client.start();
674+
Assert.assertEquals(exitCode, -1);
675+
}
676+
606677
/**
607678
* When enable the sidecar tensorboard, it will start the sidecar executor(tensorboard role).
608679
*/

tony-core/src/test/java/com/linkedin/tony/util/TestUtils.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,17 @@ public void testGetAllJobTypes() {
259259
new HashSet(Arrays.asList("worker", "evaluator")));
260260
}
261261

262+
@Test
263+
public void testGetUntrackedTaskTimeout() {
264+
Configuration conf = new Configuration();
265+
conf.addResource("tony-default.xml");
266+
conf.set("tony.application.untracked.jobtypes", "ps,evaluator,worker");
267+
conf.setLong("tony.application.worker.untracked.timeout", 1000);
268+
269+
long timeout = Utils.getUntrackedTaskWithTimeouts(conf).get("worker");
270+
assertEquals(timeout, 1000);
271+
}
272+
262273
@Test
263274
public void testGetNumTotalTasks() {
264275
Configuration conf = new Configuration();
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
#
2+
# Copyright 2021 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license.
3+
# See LICENSE in the project root for license information.
4+
#
5+
import time
6+
7+
while True:
8+
time.sleep(1)

0 commit comments

Comments
 (0)