Skip to content

Commit 4de7a65

Browse files
committed
Modify the config
1 parent 335a3d2 commit 4de7a65

File tree

5 files changed

+93
-65
lines changed

5 files changed

+93
-65
lines changed

tony-core/src/main/java/com/linkedin/tony/runtime/MLGenericRuntime.java

Lines changed: 49 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,10 @@ class AM implements Framework.ApplicationMasterAdapter {
6161
private long runtimeInitialTime = System.currentTimeMillis();
6262

6363
// Group dependencies policy.
64-
Map<String, List<String>> groupMembers;
65-
Map<String, List<String>> memberInGroups;
64+
Map<String, List<String>> grpWithMembersIndex;
65+
Map<String, List<String>> taskInGrpsIndex;
6666
// todo: Need to support single group dependent multiple other groups
67-
Map<String, Pair<String, Long>> groupDependencies;
67+
Map<String, Pair<String, Long>> taskWithDependentGrpsIndex;
6868

6969
@Override
7070
public String constructClusterSpec(String taskId) throws IOException {
@@ -140,11 +140,11 @@ public boolean isHealthy(Configuration tonyConf) {
140140
* So if we use the configuration as follows, when evaluator is still running after timeout and
141141
* chief/workers are finished, the mechanism of dependency group timeout will make job failed.
142142
*
143-
* dependency group timeout configuration as follows:
143+
* Dependency group timeout configuration as follows:
144144
*
145145
* tony.application.group.A = worker,chief
146-
* tony.application.group.B = evaluator
147-
* tony.application.dependency.B.after.timeout.A = 3600
146+
* tony.application.dependency.evaluator.timeout.after.A = 3600
147+
*
148148
*/
149149
String errorMsg = groupDependencyTimeout(tonyConf);
150150
if (errorMsg != null) {
@@ -156,63 +156,70 @@ public boolean isHealthy(Configuration tonyConf) {
156156

157157
@VisibleForTesting
158158
protected String groupDependencyTimeout(Configuration tonyConf) {
159-
if (groupDependencies == null) {
160-
groupDependencies = Utils.getGroupDependencies(tonyConf);
159+
if (taskWithDependentGrpsIndex == null) {
160+
taskWithDependentGrpsIndex = Utils.getJobTypeDependentGrps(tonyConf);
161161
}
162-
163-
if (groupDependencies == null || groupDependencies.isEmpty()) {
162+
// groupDependencies is map, key: waiting role, value: pre-dependent groups and waiting timeout
163+
if (taskWithDependentGrpsIndex == null || taskWithDependentGrpsIndex.isEmpty()) {
164164
return null;
165165
}
166166

167-
if (groupMembers == null) {
168-
groupMembers = Utils.getAllGroupJobTypes(tonyConf);
167+
// groupMembers is map, key: groupName, value: its members in this group
168+
if (grpWithMembersIndex == null) {
169+
grpWithMembersIndex = Utils.getAllGroupJobTypes(tonyConf);
169170
}
170171

171-
if (memberInGroups == null) {
172-
memberInGroups = getMemberInGroups(groupMembers);
172+
// memberInGroups is map. key: jobtype name, value: in which groups
173+
if (taskInGrpsIndex == null) {
174+
taskInGrpsIndex = getMemberInGroups(grpWithMembersIndex);
173175
}
174176

175-
176177
Map<String, TonySession.TonyTask[]> allTasks = session.getTonyTasks();
177178
List<TonySession.TonyTask> runningTasks = session.getRunningTasks();
178179

179180
// Get the running jobs' type, like the tf roles of ps/worker/chief/evaluator
180181
Set<String> runningJobTypes = runningTasks.stream()
181182
.map(TonySession.TonyTask::getJobName)
182-
.filter(jobname -> memberInGroups.containsKey(jobname))
183+
.filter(jobname -> taskWithDependentGrpsIndex.containsKey(jobname))
183184
.collect(Collectors.toSet());
184185

185186
for (String runningTaskType : runningJobTypes) {
186-
for (String group : memberInGroups.get(runningTaskType)) {
187-
if (!groupDependencies.containsKey(group)) {
188-
continue;
189-
}
187+
Pair<String, Long> dependentGroupPair = taskWithDependentGrpsIndex.get(runningTaskType);
188+
String dependentGroupName = dependentGroupPair.getKey();
189+
long timeout = dependentGroupPair.getValue() * 1000;
190190

191-
Pair<String, Long> dependentGroupPair = groupDependencies.get(group);
192-
String dependentGroupName = dependentGroupPair.getKey();
193-
long timeout = dependentGroupPair.getValue() * 1000;
191+
if (!grpWithMembersIndex.containsKey(dependentGroupName)) {
192+
continue;
193+
}
194194

195-
if (!groupMembers.containsKey(dependentGroupName)) {
196-
continue;
197-
}
195+
boolean allDependentTaskFinished = true;
196+
long latestEndTimeInAllDependentTasks = 0L;
197+
for (String dependentsGroupJobtype : grpWithMembersIndex.get(dependentGroupName)) {
198198

199-
for (String dependentsGroupJobtype : groupMembers.get(dependentGroupName)) {
200-
if (Utils.existRunningTasksWithJobtype(runningTasks, dependentsGroupJobtype)) {
201-
continue;
202-
}
203-
// Find out the latest finished task in this task type, if the specified timeout exceed,
204-
// make the job fail.
205-
long latestFinishedTime =
206-
Arrays.stream(allTasks.get(dependentsGroupJobtype))
207-
.mapToLong(x -> x.getEndTime() == 0L ? System.currentTimeMillis() : x.getEndTime())
208-
.max().getAsLong();
209-
210-
if (System.currentTimeMillis() - latestFinishedTime > timeout) {
211-
return String.format("Jobtype: %s in group: %s runs exceeded timeout due it's "
212-
+ "dependent jobtype: %s in group: %s has been finished.",
213-
runningTaskType, group, dependentsGroupJobtype, dependentGroupName);
214-
}
199+
if (Utils.existRunningTasksWithJobtype(runningTasks, dependentsGroupJobtype)) {
200+
allDependentTaskFinished = false;
201+
break;
215202
}
203+
204+
// Find out the latest finished task in this task type, if the specified timeout exceed,
205+
// make the job fail.
206+
latestEndTimeInAllDependentTasks = Math.max(
207+
Arrays.stream(allTasks.get(dependentsGroupJobtype))
208+
.mapToLong(x -> x.getEndTime())
209+
.max().getAsLong(),
210+
latestEndTimeInAllDependentTasks
211+
);
212+
}
213+
214+
if (!allDependentTaskFinished) {
215+
continue;
216+
}
217+
218+
if (System.currentTimeMillis() - latestEndTimeInAllDependentTasks > timeout) {
219+
return String.format("Jobtype: %s runs exceeded timeout because it's "
220+
+ "dependent group: %s (task set: [%s]) has been finished.",
221+
runningTaskType, dependentGroupName,
222+
StringUtils.join(grpWithMembersIndex.get(dependentGroupName), ","));
216223
}
217224
}
218225

tony-core/src/main/java/com/linkedin/tony/util/Utils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -462,11 +462,11 @@ public static int getNumTotalTasks(Configuration conf) {
462462
.sum();
463463
}
464464

465-
public static Map<String, Pair<String, Long>> getGroupDependencies(Configuration tonyConf) {
465+
public static Map<String, Pair<String, Long>> getJobTypeDependentGrps(Configuration tonyConf) {
466466
return tonyConf.getValByRegex(TonyConfigurationKeys.GROUP_DEPEND_TIMEOUT_REGEX).keySet().stream()
467467
.map(Utils::getDependentGrps)
468468
.map(pair -> Utils.getDependentTimeout(tonyConf, pair))
469-
.collect(Collectors.toMap(Triple::getLeft, x -> Pair.of(x.getMiddle(), x.getRight())));
469+
.collect(Collectors.toMap(Triple::getLeft, x -> Pair.of(x.getMiddle(), x.getRight()), (oldV, newV) -> newV));
470470
}
471471

472472
private static Triple<String, String, Long> getDependentTimeout(Configuration tonyConf, Pair<String, String> pair) {

tony-core/src/test/java/com/linkedin/tony/TestTonyE2E.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -642,10 +642,8 @@ public void testGroupDependencyTimeoutShouldPass() throws ParseException, IOExce
642642
"--conf", "tony.worker.instances=2",
643643
"--conf", "tony.worker.command=python forever_not_exit.py",
644644
"--conf", "tony.application.framework=tensorflow",
645-
"--container_env", Constants.SIDECAR_TB_TEST_KEY + "=true",
646645
"--conf", "tony.application.group.A=chief",
647-
"--conf", "tony.application.group.B=worker",
648-
"--conf", "tony.application.dependency.B.timeout.after.A=10",
646+
"--conf", "tony.application.dependency.worker.timeout.after.A=10",
649647
});
650648
client.addListener(handler);
651649
int exitCode = client.start();

tony-core/src/test/java/com/linkedin/tony/runtime/TestMLGenericRuntime.java

Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.linkedin.tony.runtime;
1717

1818
import org.apache.hadoop.conf.Configuration;
19+
import org.apache.hadoop.yarn.api.records.CollectorInfo;
1920
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
2021
import org.testng.Assert;
2122
import org.testng.annotations.BeforeTest;
@@ -96,8 +97,8 @@ public void testNeedReserveTBPort() {
9697
public void testGroupDependencyNoConfShouldPass() {
9798
Configuration conf = new Configuration();
9899
conf.addResource("tony-default.xml");
99-
conf.set("tony.application.dependency.A.timeout.after.B", "3600");
100-
conf.set("tony.application.dependency.B.timeout.after.C", "3600");
100+
conf.set("tony.application.dependency.evaluator.timeout.after.B", "3600");
101+
conf.set("tony.application.dependency.chief.timeout.after.C", "3600");
101102

102103
TonySession session = buildMockSession(conf);
103104
MLGenericRuntime.AM am = (MLGenericRuntime.AM) runtime.getAMAdapter();
@@ -112,8 +113,7 @@ public void testGroupDependencyShouldPass() {
112113
Configuration conf = new Configuration();
113114
conf.addResource("tony-default.xml");
114115
conf.set("tony.application.group.A", "worker,chief");
115-
conf.set("tony.application.group.B", "evaluator");
116-
conf.set("tony.application.dependency.B.timeout.after.A", "3600");
116+
conf.set("tony.application.dependency.evaluator.timeout.after.A", "3600");
117117

118118
TonySession session = buildMockSession(conf);
119119
TonySession.TonyTask chiefTask = session.getTask("chief", "0");
@@ -123,8 +123,8 @@ public void testGroupDependencyShouldPass() {
123123
am.setTonySession(session);
124124
Assert.assertEquals(
125125
am.groupDependencyTimeout(conf),
126-
"Jobtype: evaluator in group: B runs exceeded timeout due it's dependent "
127-
+ "jobtype: chief in group: A has been finished."
126+
"Jobtype: evaluator runs exceeded timeout because it's dependent group: A "
127+
+ "(task set: [worker,chief]) has been finished."
128128
);
129129
}
130130

@@ -133,8 +133,7 @@ public void testGroupDependencyWorkerWhenChiefFinished() {
133133
Configuration conf = new Configuration();
134134
conf.addResource("tony-default.xml");
135135
conf.set("tony.application.group.A", "chief");
136-
conf.set("tony.application.group.B", "otherWorker");
137-
conf.set("tony.application.dependency.B.timeout.after.A", "3600");
136+
conf.set("tony.application.dependency.otherWorker.timeout.after.A", "3600");
138137

139138
TonySession session = buildMockSession(conf);
140139
TonySession.TonyTask chiefTask = session.getTask("chief", "0");
@@ -144,7 +143,7 @@ public void testGroupDependencyWorkerWhenChiefFinished() {
144143
am.setTonySession(session);
145144
Assert.assertEquals(
146145
am.groupDependencyTimeout(conf),
147-
"Jobtype: otherWorker in group: B runs exceeded timeout due it's dependent jobtype: chief in group: A has been finished."
146+
"Jobtype: otherWorker runs exceeded timeout because it's dependent group: A (task set: [chief]) has been finished."
148147
);
149148
}
150149

@@ -153,12 +152,10 @@ public void testGroupDependencyWithMultipleGroup() {
153152
Configuration conf = new Configuration();
154153
conf.addResource("tony-default.xml");
155154
conf.set("tony.application.group.A", "chief");
156-
conf.set("tony.application.group.B", "otherWorker");
157-
conf.set("tony.application.dependency.B.timeout.after.A", String.valueOf(60 * 240));
155+
conf.set("tony.application.dependency.otherWorker.timeout.after.A", String.valueOf(60 * 240));
158156

159-
conf.set("tony.application.group.C", "chief");
160-
conf.set("tony.application.group.D", "otherWorker");
161-
conf.set("tony.application.dependency.D.timeout.after.C", "3600");
157+
conf.set("tony.application.group.B", "chief,worker");
158+
conf.set("tony.application.dependency.evaluator.timeout.after.B", "3600");
162159

163160
TonySession session = buildMockSession(conf);
164161
TonySession.TonyTask chiefTask = session.getTask("chief", "0");
@@ -168,17 +165,43 @@ public void testGroupDependencyWithMultipleGroup() {
168165
am.setTonySession(session);
169166
Assert.assertEquals(
170167
am.groupDependencyTimeout(conf),
171-
"Jobtype: otherWorker in group: D runs exceeded timeout due it's dependent jobtype: chief in group: C has been finished."
168+
"Jobtype: evaluator runs exceeded timeout because it's dependent group: B (task set: [chief,worker]) has been finished."
172169
);
173170
}
174171

172+
/**
173+
* Test case as follows:
174+
* the role of chief has been finished, and otherWorker is running and not exceed the timeout. so it should pass
175+
*/
175176
@Test
176177
public void testGroupDependencyWithoutTimeoutMultipleGroup() {
177178
Configuration conf = new Configuration();
178179
conf.addResource("tony-default.xml");
179180
conf.set("tony.application.group.A", "chief");
180-
conf.set("tony.application.group.B", "otherWorker");
181-
conf.set("tony.application.dependency.B.timeout.after.A", String.valueOf(60 * 240));
181+
conf.set("tony.application.dependency.otherWorker.timeout.after.A", String.valueOf(60 * 240));
182+
183+
TonySession session = buildMockSession(conf);
184+
TonySession.TonyTask chiefTask = session.getTask("chief", "0");
185+
chiefTask.setEndTime(System.currentTimeMillis() - 1000 * 60 * 120);
186+
187+
MLGenericRuntime.AM am = (MLGenericRuntime.AM) runtime.getAMAdapter();
188+
am.setTonySession(session);
189+
Assert.assertNull(
190+
am.groupDependencyTimeout(conf)
191+
);
192+
}
193+
194+
/**
195+
* Test case as follows:
196+
* the role of chief has finished, but otherWorker is running.
197+
* And the role of evaluator depends on GroupA including chief and otherWorker, so it will not throw exception.
198+
*/
199+
@Test
200+
public void testGrpDependentWithoutTimeout() {
201+
Configuration conf = new Configuration();
202+
conf.addResource("tony-default.xml");
203+
conf.set("tony.application.group.A", "chief,otherWorker");
204+
conf.set("tony.application.dependency.evaluator.timeout.after.A", String.valueOf(60 * 240));
182205

183206
TonySession session = buildMockSession(conf);
184207
TonySession.TonyTask chiefTask = session.getTask("chief", "0");

tony-core/src/test/java/com/linkedin/tony/util/TestUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,7 @@ public void testGetGroupDependencies() {
344344
conf.set("tony.application.dependency.A.timeout.after.B", "3600");
345345
conf.set("tony.application.dependency.B.timeout.after.C", "3600");
346346

347-
Map<String, Pair<String, Long>> dependenciesIndex = Utils.getGroupDependencies(conf);
347+
Map<String, Pair<String, Long>> dependenciesIndex = Utils.getJobTypeDependentGrps(conf);
348348
assertTrue(dependenciesIndex.containsKey("A"));
349349
assertTrue(dependenciesIndex.containsKey("B"));
350350
assertEquals(dependenciesIndex.get("A").getKey(), "B");

0 commit comments

Comments
 (0)