Skip to content

Commit 7fa73fa

Browse files
committed
YARN-9439. Support asynchronized scheduling mode and multi-node lookup mechanism for app activities. Contributed by Tao Yang.
1 parent c4c16ca commit 7fa73fa

File tree

6 files changed

+179
-48
lines changed

6 files changed

+179
-48
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,10 @@ public static void recordRejectedAppActivityFromLeafQueue(
6363
ActivitiesManager activitiesManager, SchedulerNode node,
6464
SchedulerApplicationAttempt application, Priority priority,
6565
String diagnostic) {
66-
NodeId nodeId = getRecordingNodeId(activitiesManager, node);
67-
if (nodeId == null) {
66+
if (activitiesManager == null) {
6867
return;
6968
}
69+
NodeId nodeId = getRecordingNodeId(activitiesManager, node);
7070
if (activitiesManager.shouldRecordThisNode(nodeId)) {
7171
recordActivity(activitiesManager, nodeId, application.getQueueName(),
7272
application.getApplicationId().toString(), priority,
@@ -85,10 +85,10 @@ public static void recordAppActivityWithoutAllocation(
8585
ActivitiesManager activitiesManager, SchedulerNode node,
8686
SchedulerApplicationAttempt application, Priority priority,
8787
String diagnostic, ActivityState appState) {
88-
NodeId nodeId = getRecordingNodeId(activitiesManager, node);
89-
if (nodeId == null) {
88+
if (activitiesManager == null) {
9089
return;
9190
}
91+
NodeId nodeId = getRecordingNodeId(activitiesManager, node);
9292
if (activitiesManager.shouldRecordThisNode(nodeId)) {
9393
String type = "container";
9494
// Add application-container activity into specific node allocation.
@@ -123,10 +123,10 @@ public static void recordAppActivityWithAllocation(
123123
ActivitiesManager activitiesManager, SchedulerNode node,
124124
SchedulerApplicationAttempt application, RMContainer updatedContainer,
125125
ActivityState activityState) {
126-
NodeId nodeId = getRecordingNodeId(activitiesManager, node);
127-
if (nodeId == null) {
126+
if (activitiesManager == null) {
128127
return;
129128
}
129+
NodeId nodeId = getRecordingNodeId(activitiesManager, node);
130130
if (activitiesManager.shouldRecordThisNode(nodeId)) {
131131
String type = "container";
132132
// Add application-container activity into specific node allocation.
@@ -163,10 +163,10 @@ public static void startAppAllocationRecording(
163163
ActivitiesManager activitiesManager, FiCaSchedulerNode node,
164164
long currentTime,
165165
SchedulerApplicationAttempt application) {
166-
NodeId nodeId = getRecordingNodeId(activitiesManager, node);
167-
if (nodeId == null) {
166+
if (activitiesManager == null) {
168167
return;
169168
}
169+
NodeId nodeId = getRecordingNodeId(activitiesManager, node);
170170
activitiesManager
171171
.startAppAllocationRecording(nodeId, currentTime,
172172
application);
@@ -214,10 +214,10 @@ public static class QUEUE {
214214
public static void recordQueueActivity(ActivitiesManager activitiesManager,
215215
SchedulerNode node, String parentQueueName, String queueName,
216216
ActivityState state, String diagnostic) {
217-
NodeId nodeId = getRecordingNodeId(activitiesManager, node);
218-
if (nodeId == null) {
217+
if (activitiesManager == null) {
219218
return;
220219
}
220+
NodeId nodeId = getRecordingNodeId(activitiesManager, node);
221221
if (activitiesManager.shouldRecordThisNode(nodeId)) {
222222
recordActivity(activitiesManager, nodeId, parentQueueName, queueName,
223223
null, state, diagnostic, null);

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java

Lines changed: 49 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,15 @@
2727
import org.apache.hadoop.yarn.api.records.NodeId;
2828
import org.apache.hadoop.yarn.api.records.ContainerId;
2929
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
30+
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
3031
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
3132
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
3233
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
3334
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
3435
import org.apache.hadoop.yarn.util.SystemClock;
3536

3637
import java.util.concurrent.ConcurrentHashMap;
38+
import java.util.concurrent.ConcurrentLinkedQueue;
3739
import java.util.concurrent.ConcurrentMap;
3840
import java.util.List;
3941
import java.util.Set;
@@ -57,9 +59,10 @@ public class ActivitiesManager extends AbstractService {
5759
private Set<NodeId> activeRecordedNodes;
5860
private ConcurrentMap<ApplicationId, Long>
5961
recordingAppActivitiesUntilSpecifiedTime;
60-
private ConcurrentMap<ApplicationId, AppAllocation> appsAllocation;
61-
private ConcurrentMap<ApplicationId, List<AppAllocation>>
62-
completedAppAllocations;
62+
private ThreadLocal<Map<ApplicationId, AppAllocation>>
63+
appsAllocation;
64+
@VisibleForTesting
65+
ConcurrentMap<ApplicationId, Queue<AppAllocation>> completedAppAllocations;
6366
private boolean recordNextAvailableNode = false;
6467
private List<NodeAllocation> lastAvailableNodeActivities = null;
6568
private Thread cleanUpThread;
@@ -71,19 +74,23 @@ public ActivitiesManager(RMContext rmContext) {
7174
super(ActivitiesManager.class.getName());
7275
recordingNodesAllocation = ThreadLocal.withInitial(() -> new HashMap());
7376
completedNodeAllocations = new ConcurrentHashMap<>();
74-
appsAllocation = new ConcurrentHashMap<>();
77+
appsAllocation = ThreadLocal.withInitial(() -> new HashMap());
7578
completedAppAllocations = new ConcurrentHashMap<>();
7679
activeRecordedNodes = Collections.newSetFromMap(new ConcurrentHashMap<>());
7780
recordingAppActivitiesUntilSpecifiedTime = new ConcurrentHashMap<>();
7881
this.rmContext = rmContext;
7982
}
8083

8184
public AppActivitiesInfo getAppActivitiesInfo(ApplicationId applicationId) {
82-
if (rmContext.getRMApps().get(applicationId).getFinalApplicationStatus()
85+
RMApp app = rmContext.getRMApps().get(applicationId);
86+
if (app != null && app.getFinalApplicationStatus()
8387
== FinalApplicationStatus.UNDEFINED) {
84-
List<AppAllocation> allocations = completedAppAllocations.get(
85-
applicationId);
86-
88+
Queue<AppAllocation> curAllocations =
89+
completedAppAllocations.get(applicationId);
90+
List<AppAllocation> allocations = null;
91+
if (curAllocations != null) {
92+
allocations = new ArrayList(curAllocations);
93+
}
8794
return new AppActivitiesInfo(allocations, applicationId);
8895
} else {
8996
return new AppActivitiesInfo(
@@ -135,13 +142,13 @@ public void run() {
135142
}
136143
}
137144

138-
Iterator<Map.Entry<ApplicationId, List<AppAllocation>>> iteApp =
145+
Iterator<Map.Entry<ApplicationId, Queue<AppAllocation>>> iteApp =
139146
completedAppAllocations.entrySet().iterator();
140147
while (iteApp.hasNext()) {
141-
Map.Entry<ApplicationId, List<AppAllocation>> appAllocation =
148+
Map.Entry<ApplicationId, Queue<AppAllocation>> appAllocation =
142149
iteApp.next();
143-
if (rmContext.getRMApps().get(appAllocation.getKey())
144-
.getFinalApplicationStatus()
150+
RMApp rmApp = rmContext.getRMApps().get(appAllocation.getKey());
151+
if (rmApp == null || rmApp.getFinalApplicationStatus()
145152
!= FinalApplicationStatus.UNDEFINED) {
146153
iteApp.remove();
147154
}
@@ -191,18 +198,16 @@ void startAppAllocationRecording(NodeId nodeID, long currTS,
191198
SchedulerApplicationAttempt application) {
192199
ApplicationId applicationId = application.getApplicationId();
193200

194-
if (recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId)
195-
&& recordingAppActivitiesUntilSpecifiedTime.get(applicationId)
196-
> currTS) {
197-
appsAllocation.put(applicationId,
198-
new AppAllocation(application.getPriority(), nodeID,
199-
application.getQueueName()));
200-
}
201-
202-
if (recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId)
203-
&& recordingAppActivitiesUntilSpecifiedTime.get(applicationId)
204-
<= currTS) {
205-
turnOffActivityMonitoringForApp(applicationId);
201+
Long turnOffTimestamp =
202+
recordingAppActivitiesUntilSpecifiedTime.get(applicationId);
203+
if (turnOffTimestamp != null) {
204+
if (turnOffTimestamp > currTS) {
205+
appsAllocation.get().put(applicationId,
206+
new AppAllocation(application.getPriority(), nodeID,
207+
application.getQueueName()));
208+
} else {
209+
turnOffActivityMonitoringForApp(applicationId);
210+
}
206211
}
207212
}
208213

@@ -223,7 +228,7 @@ void addSchedulingActivityForApp(ApplicationId applicationId,
223228
ContainerId containerId, String priority, ActivityState state,
224229
String diagnostic, String type) {
225230
if (shouldRecordThisApp(applicationId)) {
226-
AppAllocation appAllocation = appsAllocation.get(applicationId);
231+
AppAllocation appAllocation = appsAllocation.get().get(applicationId);
227232
appAllocation.addAppAllocationActivity(containerId == null ?
228233
"Container-Id-Not-Assigned" :
229234
containerId.toString(), priority, state, diagnostic, type);
@@ -245,24 +250,27 @@ void finishAppAllocationRecording(ApplicationId applicationId,
245250
ContainerId containerId, ActivityState appState, String diagnostic) {
246251
if (shouldRecordThisApp(applicationId)) {
247252
long currTS = SystemClock.getInstance().getTime();
248-
AppAllocation appAllocation = appsAllocation.remove(applicationId);
253+
AppAllocation appAllocation = appsAllocation.get().remove(applicationId);
249254
appAllocation.updateAppContainerStateAndTime(containerId, appState,
250255
currTS, diagnostic);
251256

252-
List<AppAllocation> appAllocations;
253-
if (completedAppAllocations.containsKey(applicationId)) {
254-
appAllocations = completedAppAllocations.get(applicationId);
255-
} else {
256-
appAllocations = new ArrayList<>();
257-
completedAppAllocations.put(applicationId, appAllocations);
257+
Queue<AppAllocation> appAllocations =
258+
completedAppAllocations.get(applicationId);
259+
if (appAllocations == null) {
260+
appAllocations = new ConcurrentLinkedQueue<>();
261+
Queue<AppAllocation> curAppAllocations =
262+
completedAppAllocations.putIfAbsent(applicationId, appAllocations);
263+
if (curAppAllocations != null) {
264+
appAllocations = curAppAllocations;
265+
}
258266
}
259267
if (appAllocations.size() == 1000) {
260-
appAllocations.remove(0);
268+
appAllocations.poll();
261269
}
262270
appAllocations.add(appAllocation);
263-
264-
if (recordingAppActivitiesUntilSpecifiedTime.get(applicationId)
265-
<= currTS) {
271+
Long stopTime =
272+
recordingAppActivitiesUntilSpecifiedTime.get(applicationId);
273+
if (stopTime != null && stopTime <= currTS) {
266274
turnOffActivityMonitoringForApp(applicationId);
267275
}
268276
}
@@ -292,8 +300,12 @@ void finishNodeUpdateRecording(NodeId nodeID) {
292300
}
293301

294302
boolean shouldRecordThisApp(ApplicationId applicationId) {
303+
if (recordingAppActivitiesUntilSpecifiedTime.isEmpty()
304+
|| appsAllocation.get().isEmpty()) {
305+
return false;
306+
}
295307
return recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId)
296-
&& appsAllocation.containsKey(applicationId);
308+
&& appsAllocation.get().containsKey(applicationId);
297309
}
298310

299311
boolean shouldRecordThisNode(NodeId nodeID) {

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public void addAppAllocationActivity(String containerId, String priority,
6868
}
6969

7070
public String getNodeId() {
71-
return nodeId.toString();
71+
return nodeId == null ? null : nodeId.toString();
7272
}
7373

7474
public String getQueueName() {

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -746,6 +746,7 @@ public AppActivitiesInfo getAppActivities(@Context HttpServletRequest hsr,
746746
return appActivitiesInfo;
747747
} catch (Exception e) {
748748
String errMessage = "Cannot find application with given appId";
749+
LOG.error(errMessage, e);
749750
return new AppActivitiesInfo(errMessage, appId);
750751
}
751752

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.ArrayList;
2222
import java.util.List;
2323
import java.util.Map;
24+
import java.util.Queue;
2425
import java.util.Random;
2526
import java.util.concurrent.Callable;
2627
import java.util.concurrent.ConcurrentHashMap;
@@ -41,6 +42,8 @@
4142
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
4243
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
4344
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
45+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
46+
import org.apache.hadoop.yarn.util.SystemClock;
4447
import org.junit.Assert;
4548
import org.junit.Before;
4649
import org.junit.Test;
@@ -189,6 +192,55 @@ public void testRecordingSchedulerActivitiesForMultiNodesInMultiThreads()
189192
Assert.assertEquals(1, activitiesManager.historyNodeAllocations.size());
190193
}
191194

195+
196+
/**
197+
* Test recording app activities in multiple threads,
198+
* only one activity info should be recorded by one of these threads.
199+
*/
200+
@Test
201+
public void testRecordingAppActivitiesInMultiThreads()
202+
throws Exception {
203+
Random rand = new Random();
204+
// start recording activities for a random app
205+
SchedulerApplicationAttempt randomApp = apps.get(rand.nextInt(NUM_APPS));
206+
activitiesManager
207+
.turnOnAppActivitiesRecording(randomApp.getApplicationId(), 3);
208+
List<Future<Void>> futures = new ArrayList<>();
209+
// generate app activities
210+
int nTasks = 20;
211+
for (int i=0; i<nTasks; i++) {
212+
Callable<Void> task = () -> {
213+
ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
214+
(FiCaSchedulerNode) nodes.get(0),
215+
SystemClock.getInstance().getTime(), randomApp);
216+
for (SchedulerNode node : nodes) {
217+
ActivitiesLogger.APP
218+
.recordAppActivityWithoutAllocation(activitiesManager, node,
219+
randomApp, Priority.newInstance(0),
220+
ActivityDiagnosticConstant.FAIL_TO_ALLOCATE,
221+
ActivityState.REJECTED);
222+
}
223+
ActivitiesLogger.APP
224+
.finishAllocatedAppAllocationRecording(activitiesManager,
225+
randomApp.getApplicationId(), null, ActivityState.SKIPPED,
226+
ActivityDiagnosticConstant.SKIPPED_ALL_PRIORITIES);
227+
return null;
228+
};
229+
futures.add(threadPoolExecutor.submit(task));
230+
}
231+
// Check activities for multi-nodes should be recorded only once
232+
for (Future<Void> future : futures) {
233+
future.get();
234+
}
235+
Queue<AppAllocation> appAllocations =
236+
activitiesManager.completedAppAllocations
237+
.get(randomApp.getApplicationId());
238+
Assert.assertEquals(nTasks, appAllocations.size());
239+
for(AppAllocation aa : appAllocations) {
240+
Assert.assertEquals(NUM_NODES, aa.getAllocationAttempts().size());
241+
}
242+
}
243+
192244
/**
193245
* Testing activities manager which can record all history information about
194246
* node allocations.

0 commit comments

Comments
 (0)