Skip to content

Commit

Permalink
YARN-4000. RM crashes with NPE if leaf queue becomes parent queue dur…
Browse files Browse the repository at this point in the history
…ing restart. Contributed by Varun Saxena
  • Loading branch information
jian-he committed Oct 21, 2015
1 parent 0dc69af commit 7021e01
Show file tree
Hide file tree
Showing 24 changed files with 395 additions and 317 deletions.
3 changes: 3 additions & 0 deletions hadoop-yarn-project/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ Release 2.7.2 - UNRELEASED

YARN-4281. 2.7 RM app page is broken (Chang Li via jlowe)

YARN-4000. RM crashes with NPE if leaf queue becomes parent queue during restart.
(Varun Saxena via jianhe)

Release 2.7.1 - 2015-07-06

INCOMPATIBLE CHANGES
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -643,8 +643,9 @@ public KillApplicationResponse forceKillApplication(
return KillApplicationResponse.newInstance(true);
}

this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.KILL));
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.KILL,
"Application killed by user."));

// For UnmanagedAMs, return true so they don't retry
return KillApplicationResponse.newInstance(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRecoverEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
Expand Down Expand Up @@ -296,7 +295,8 @@ protected void submitApplication(
// scheduler about the existence of the application
assert application.getState() == RMAppState.NEW;
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppRejectedEvent(applicationId, e.getMessage()));
.handle(new RMAppEvent(applicationId,
RMAppEventType.APP_REJECTED, e.getMessage()));
throw RPCUtil.getRemoteException(e);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
import org.apache.hadoop.yarn.util.ConverterUtils;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -255,8 +254,8 @@ public void run() {
String message = "Error launching " + application.getAppAttemptId()
+ ". Got exception: " + StringUtils.stringifyException(ie);
LOG.info(message);
handler.handle(new RMAppAttemptLaunchFailedEvent(application
.getAppAttemptId(), message));
handler.handle(new RMAppAttemptEvent(application
.getAppAttemptId(), RMAppAttemptEventType.LAUNCH_FAILED, message));
}
break;
case CLEANUP:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,24 @@
public class RMAppEvent extends AbstractEvent<RMAppEventType>{

private final ApplicationId appId;
private final String diagnosticMsg;

public RMAppEvent(ApplicationId appId, RMAppEventType type) {
this(appId, type, "");
}

public RMAppEvent(ApplicationId appId, RMAppEventType type,
String diagnostic) {
super(type);
this.appId = appId;
this.diagnosticMsg = diagnostic;
}

public ApplicationId getApplicationId() {
return this.appId;
}

public String getDiagnosticMsg() {
return this.diagnosticMsg;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,14 @@

public class RMAppFailedAttemptEvent extends RMAppEvent {

private final String diagnostics;
private final boolean transferStateFromPreviousAttempt;

public RMAppFailedAttemptEvent(ApplicationId appId, RMAppEventType event,
String diagnostics, boolean transferStateFromPreviousAttempt) {
super(appId, event);
this.diagnostics = diagnostics;
super(appId, event, diagnostics);
this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt;
}

public String getDiagnostics() {
return this.diagnostics;
}

public boolean getTransferStateFromPreviousAttempt() {
return transferStateFromPreviousAttempt;
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -957,12 +957,12 @@ private String getAppAttemptFailedDiagnostics(RMAppEvent event) {
if (this.submissionContext.getUnmanagedAM()) {
// RM does not manage the AM. Do not retry
msg = "Unmanaged application " + this.getApplicationId()
+ " failed due to " + failedEvent.getDiagnostics()
+ " failed due to " + failedEvent.getDiagnosticMsg()
+ ". Failing the application.";
} else if (this.isNumAttemptsBeyondThreshold) {
msg = "Application " + this.getApplicationId() + " failed "
+ this.maxAppAttempts + " times due to "
+ failedEvent.getDiagnostics() + ". Failing the application.";
+ failedEvent.getDiagnosticMsg() + ". Failing the application.";
}
return msg;
}
Expand Down Expand Up @@ -1003,21 +1003,14 @@ private void rememberTargetTransitionsAndStoreState(RMAppEvent event,
String diags = null;
switch (event.getType()) {
case APP_REJECTED:
RMAppRejectedEvent rejectedEvent = (RMAppRejectedEvent) event;
diags = rejectedEvent.getMessage();
break;
case ATTEMPT_FINISHED:
RMAppFinishedAttemptEvent finishedEvent =
(RMAppFinishedAttemptEvent) event;
diags = finishedEvent.getDiagnostics();
case ATTEMPT_KILLED:
diags = event.getDiagnosticMsg();
break;
case ATTEMPT_FAILED:
RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
diags = getAppAttemptFailedDiagnostics(failedEvent);
break;
case ATTEMPT_KILLED:
diags = getAppKilledDiagnostics();
break;
default:
break;
}
Expand Down Expand Up @@ -1065,9 +1058,7 @@ public AppFinishedTransition() {
}

public void transition(RMAppImpl app, RMAppEvent event) {
RMAppFinishedAttemptEvent finishedEvent =
(RMAppFinishedAttemptEvent)event;
app.diagnostics.append(finishedEvent.getDiagnostics());
app.diagnostics.append(event.getDiagnosticMsg());
super.transition(app, event);
};
}
Expand Down Expand Up @@ -1113,21 +1104,21 @@ public AppKilledTransition() {

@Override
public void transition(RMAppImpl app, RMAppEvent event) {
app.diagnostics.append(getAppKilledDiagnostics());
app.diagnostics.append(event.getDiagnosticMsg());
super.transition(app, event);
};
}

private static String getAppKilledDiagnostics() {
return "Application killed by user.";
}

private static class KillAttemptTransition extends RMAppTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
app.stateBeforeKilling = app.getState();
app.handler.handle(new RMAppAttemptEvent(app.currentAttempt
.getAppAttemptId(), RMAppAttemptEventType.KILL));
// Forward app kill diagnostics in the event to kill app attempt.
// These diagnostics will be returned back in ATTEMPT_KILLED event sent by
// RMAppAttemptImpl.
app.handler.handle(
new RMAppAttemptEvent(app.currentAttempt.getAppAttemptId(),
RMAppAttemptEventType.KILL, event.getDiagnosticMsg()));
}
}

Expand All @@ -1138,8 +1129,7 @@ public AppRejectedTransition() {
}

public void transition(RMAppImpl app, RMAppEvent event) {
RMAppRejectedEvent rejectedEvent = (RMAppRejectedEvent)event;
app.diagnostics.append(rejectedEvent.getMessage());
app.diagnostics.append(event.getDiagnosticMsg());
super.transition(app, event);
};
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,25 @@
public class RMAppAttemptEvent extends AbstractEvent<RMAppAttemptEventType> {

private final ApplicationAttemptId appAttemptId;
private final String diagnosticMsg;

public RMAppAttemptEvent(ApplicationAttemptId appAttemptId,
RMAppAttemptEventType type) {
this(appAttemptId, type, "");
}

public RMAppAttemptEvent(ApplicationAttemptId appAttemptId,
RMAppAttemptEventType type, String diagnostics) {
super(type);
this.appAttemptId = appAttemptId;
this.diagnosticMsg = diagnostics;
}

public ApplicationAttemptId getApplicationAttemptId() {
return this.appAttemptId;
}

public String getDiagnosticMsg() {
return diagnosticMsg;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
Expand Down Expand Up @@ -1021,8 +1018,9 @@ public void run() {
LOG.warn("Interrupted while waiting to resend the"
+ " ContainerAllocated Event.");
}
appAttempt.eventHandler.handle(new RMAppAttemptContainerAllocatedEvent(
appAttempt.applicationAttemptId));
appAttempt.eventHandler.handle(
new RMAppAttemptEvent(appAttempt.applicationAttemptId,
RMAppAttemptEventType.CONTAINER_ALLOCATED));
}
}.start();
}
Expand Down Expand Up @@ -1124,17 +1122,15 @@ private void rememberTargetTransitionsAndStoreState(RMAppAttemptEvent event,
int exitStatus = ContainerExitStatus.INVALID;
switch (event.getType()) {
case LAUNCH_FAILED:
RMAppAttemptLaunchFailedEvent launchFaileEvent =
(RMAppAttemptLaunchFailedEvent) event;
diags = launchFaileEvent.getMessage();
diags = event.getDiagnosticMsg();
break;
case REGISTERED:
diags = getUnexpectedAMRegisteredDiagnostics();
break;
case UNREGISTERED:
RMAppAttemptUnregistrationEvent unregisterEvent =
(RMAppAttemptUnregistrationEvent) event;
diags = unregisterEvent.getDiagnostics();
diags = unregisterEvent.getDiagnosticMsg();
// reset finalTrackingUrl to url sent by am
finalTrackingUrl = sanitizeTrackingUrl(unregisterEvent.getFinalTrackingUrl());
finalStatus = unregisterEvent.getFinalApplicationStatus();
Expand Down Expand Up @@ -1233,17 +1229,19 @@ public void transition(RMAppAttemptImpl appAttempt,
switch (finalAttemptState) {
case FINISHED:
{
appEvent = new RMAppFinishedAttemptEvent(applicationId,
appEvent =
new RMAppEvent(applicationId, RMAppEventType.ATTEMPT_FINISHED,
appAttempt.getDiagnostics());
}
break;
case KILLED:
{
appAttempt.invalidateAMHostAndPort();
// Forward diagnostics received in attempt kill event.
appEvent =
new RMAppFailedAttemptEvent(applicationId,
RMAppEventType.ATTEMPT_KILLED,
"Application killed by user.", false);
event.getDiagnosticMsg(), false);
}
break;
case FAILED:
Expand Down Expand Up @@ -1355,9 +1353,7 @@ public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {

// Use diagnostic from launcher
RMAppAttemptLaunchFailedEvent launchFaileEvent
= (RMAppAttemptLaunchFailedEvent) event;
appAttempt.diagnostics.append(launchFaileEvent.getMessage());
appAttempt.diagnostics.append(event.getDiagnosticMsg());

// Tell the app, scheduler
super.transition(appAttempt, event);
Expand Down Expand Up @@ -1612,7 +1608,7 @@ private void updateInfoOnAMUnregister(RMAppAttemptEvent event) {
progress = 1.0f;
RMAppAttemptUnregistrationEvent unregisterEvent =
(RMAppAttemptUnregistrationEvent) event;
diagnostics.append(unregisterEvent.getDiagnostics());
diagnostics.append(unregisterEvent.getDiagnosticMsg());
originalTrackingUrl = sanitizeTrackingUrl(unregisterEvent.getFinalTrackingUrl());
finalStatus = unregisterEvent.getFinalApplicationStatus();
}
Expand Down
Loading

0 comments on commit 7021e01

Please sign in to comment.