Skip to content

Commit

Permalink
YARN-6411. Clean up the overwrite of createDispatcher() in subclass o…
Browse files Browse the repository at this point in the history
…f MockRM. Contributed by Yufei Gu
  • Loading branch information
jlowe committed Mar 31, 2017
1 parent 28cdc5a commit 4d1fac5
Show file tree
Hide file tree
Showing 15 changed files with 277 additions and 475 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
Expand Down Expand Up @@ -126,22 +124,20 @@ public void testAMRMClientResendsRequestsOnRMRestart() throws Exception {
// Phase-1 Start 1st RM
MyResourceManager rm1 = new MyResourceManager(conf, memStore);
rm1.start();
DrainDispatcher dispatcher =
(DrainDispatcher) rm1.getRMContext().getDispatcher();

// Submit the application
RMApp app = rm1.submitApp(1024);
dispatcher.await();
rm1.drainEvents();

MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
nm1.nodeHeartbeat(true); // Node heartbeat
dispatcher.await();
rm1.drainEvents();

ApplicationAttemptId appAttemptId =
app.getCurrentAppAttempt().getAppAttemptId();
rm1.sendAMLaunched(appAttemptId);
dispatcher.await();
rm1.drainEvents();

org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> token =
rm1.getRMContext().getRMApps().get(appAttemptId.getApplicationId())
Expand Down Expand Up @@ -176,7 +172,7 @@ public void testAMRMClientResendsRequestsOnRMRestart() throws Exception {
blacklistAdditions.remove("h2");// remove from local list

AllocateResponse allocateResponse = amClient.allocate(0.1f);
dispatcher.await();
rm1.drainEvents();
Assert.assertEquals("No of assignments must be 0", 0, allocateResponse
.getAllocatedContainers().size());

Expand All @@ -189,10 +185,10 @@ public void testAMRMClientResendsRequestsOnRMRestart() throws Exception {
// Step-2 : NM heart beat is sent.
// On 2nd AM allocate request, RM allocates 3 containers to AM
nm1.nodeHeartbeat(true); // Node heartbeat
dispatcher.await();
rm1.drainEvents();

allocateResponse = amClient.allocate(0.2f);
dispatcher.await();
rm1.drainEvents();
// 3 containers are allocated i.e for cRequest1, cRequest2 and cRequest3.
Assert.assertEquals("No of assignments must be 0", 3, allocateResponse
.getAllocatedContainers().size());
Expand All @@ -207,7 +203,7 @@ public void testAMRMClientResendsRequestsOnRMRestart() throws Exception {
amClient.removeContainerRequest(cRequest3);

allocateResponse = amClient.allocate(0.2f);
dispatcher.await();
rm1.drainEvents();
Assert.assertEquals("No of assignments must be 0", 0, allocateResponse
.getAllocatedContainers().size());
assertAsksAndReleases(4, 0, rm1);
Expand All @@ -233,7 +229,7 @@ public void testAMRMClientResendsRequestsOnRMRestart() throws Exception {
// request
nm1.nodeHeartbeat(containerId.getApplicationAttemptId(),
containerId.getContainerId(), ContainerState.RUNNING);
dispatcher.await();
rm1.drainEvents();
amClient.requestContainerUpdate(
container, UpdateContainerRequest.newInstance(
container.getVersion(), container.getId(),
Expand All @@ -242,7 +238,7 @@ public void testAMRMClientResendsRequestsOnRMRestart() throws Exception {
it.remove();

allocateResponse = amClient.allocate(0.3f);
dispatcher.await();
rm1.drainEvents();
Assert.assertEquals("No of assignments must be 0", 0, allocateResponse
.getAllocatedContainers().size());
assertAsksAndReleases(3, pendingRelease, rm1);
Expand All @@ -258,7 +254,6 @@ public void testAMRMClientResendsRequestsOnRMRestart() throws Exception {
rm2.start();
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
((MyAMRMClientImpl) amClient).updateRMProxy(rm2);
dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher();

// NM should be rebooted on heartbeat, even first heartbeat for nm2
NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
Expand All @@ -274,7 +269,7 @@ public void testAMRMClientResendsRequestsOnRMRestart() throws Exception {
Collections.singletonList(
containerId.getApplicationAttemptId().getApplicationId()));
nm1.nodeHeartbeat(true);
dispatcher.await();
rm2.drainEvents();

blacklistAdditions.add("h3");
amClient.updateBlacklist(blacklistAdditions, null);
Expand All @@ -296,7 +291,7 @@ public void testAMRMClientResendsRequestsOnRMRestart() throws Exception {
// containerRequest and blacklisted nodes.
// Intern RM send resync command,AMRMClient resend allocate request
allocateResponse = amClient.allocate(0.3f);
dispatcher.await();
rm2.drainEvents();

completedContainer =
allocateResponse.getCompletedContainersStatuses().size();
Expand All @@ -313,7 +308,7 @@ public void testAMRMClientResendsRequestsOnRMRestart() throws Exception {

// Step-5 : Allocater after resync command
allocateResponse = amClient.allocate(0.5f);
dispatcher.await();
rm2.drainEvents();
Assert.assertEquals("No of assignments must be 0", 0, allocateResponse
.getAllocatedContainers().size());

Expand All @@ -326,10 +321,10 @@ public void testAMRMClientResendsRequestsOnRMRestart() throws Exception {
int count = 5;
while (count-- > 0) {
nm1.nodeHeartbeat(true);
dispatcher.await();
rm2.drainEvents();

allocateResponse = amClient.allocate(0.5f);
dispatcher.await();
rm2.drainEvents();
noAssignedContainer += allocateResponse.getAllocatedContainers().size();
if (noAssignedContainer == 3) {
break;
Expand Down Expand Up @@ -358,22 +353,20 @@ public void testAMRMClientForUnregisterAMOnRMRestart() throws Exception {
// Phase-1 Start 1st RM
MyResourceManager rm1 = new MyResourceManager(conf, memStore);
rm1.start();
DrainDispatcher dispatcher =
(DrainDispatcher) rm1.getRMContext().getDispatcher();

// Submit the application
RMApp app = rm1.submitApp(1024);
dispatcher.await();
rm1.drainEvents();

MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
nm1.nodeHeartbeat(true); // Node heartbeat
dispatcher.await();
rm1.drainEvents();

ApplicationAttemptId appAttemptId =
app.getCurrentAppAttempt().getAppAttemptId();
rm1.sendAMLaunched(appAttemptId);
dispatcher.await();
rm1.drainEvents();

org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> token =
rm1.getRMContext().getRMApps().get(appAttemptId.getApplicationId())
Expand All @@ -393,7 +386,6 @@ public void testAMRMClientForUnregisterAMOnRMRestart() throws Exception {
rm2.start();
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
((MyAMRMClientImpl) amClient).updateRMProxy(rm2);
dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher();

// NM should be rebooted on heartbeat, even first heartbeat for nm2
NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
Expand All @@ -409,7 +401,7 @@ public void testAMRMClientForUnregisterAMOnRMRestart() throws Exception {
Priority.newInstance(0), 0);
nm1.registerNode(Arrays.asList(containerReport), null);
nm1.nodeHeartbeat(true);
dispatcher.await();
rm2.drainEvents();

amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
null, null);
Expand All @@ -421,7 +413,6 @@ public void testAMRMClientForUnregisterAMOnRMRestart() throws Exception {
amClient.stop();
rm1.stop();
rm2.stop();

}


Expand All @@ -439,22 +430,20 @@ public void testAMRMClientOnAMRMTokenRollOverOnRMRestart() throws Exception {
// start first RM
MyResourceManager2 rm1 = new MyResourceManager2(conf, memStore);
rm1.start();
DrainDispatcher dispatcher =
(DrainDispatcher) rm1.getRMContext().getDispatcher();
Long startTime = System.currentTimeMillis();
// Submit the application
RMApp app = rm1.submitApp(1024);
dispatcher.await();
rm1.drainEvents();

MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
nm1.nodeHeartbeat(true); // Node heartbeat
dispatcher.await();
rm1.drainEvents();

ApplicationAttemptId appAttemptId =
app.getCurrentAppAttempt().getAppAttemptId();
rm1.sendAMLaunched(appAttemptId);
dispatcher.await();
rm1.drainEvents();

AMRMTokenSecretManager amrmTokenSecretManagerForRM1 =
rm1.getRMContext().getAMRMTokenSecretManager();
Expand Down Expand Up @@ -513,7 +502,6 @@ public void testAMRMClientOnAMRMTokenRollOverOnRMRestart() throws Exception {
rm2.start();
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
((MyAMRMClientImpl) amClient).updateRMProxy(rm2);
dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher();

AMRMTokenSecretManager amrmTokenSecretManagerForRM2 =
rm2.getRMContext().getAMRMTokenSecretManager();
Expand Down Expand Up @@ -615,11 +603,6 @@ public void serviceStart() throws Exception {
MyResourceManager.setClusterTimeStamp(fakeClusterTimeStamp);
}

@Override
protected Dispatcher createDispatcher() {
return new DrainDispatcher();
}

@Override
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
// Dispatch inline for test sanity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,9 @@
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.junit.Before;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public abstract class ACLsTestBase {

protected static final String COMMON_USER = "common_user";
Expand Down Expand Up @@ -80,11 +75,6 @@ this.queueACLsManager, getRMContext()
.getRMDelegationTokenSecretManager());
}

@Override
protected Dispatcher createDispatcher() {
return new DrainDispatcher();
}

@Override
protected void doSecureLogin() throws IOException {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,17 @@
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -108,20 +103,9 @@ protected MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
}

protected void startRMs() throws IOException {
rm1 = new MockRM(confForRM1, null, false, false){
@Override
protected Dispatcher createDispatcher() {
return new DrainDispatcher();
}
};
rm2 = new MockRM(confForRM2, null, false, false){
@Override
protected Dispatcher createDispatcher() {
return new DrainDispatcher();
}
};
rm1 = new MockRM(confForRM1, null, false, false);
rm2 = new MockRM(confForRM2, null, false, false);
startRMs(rm1, confForRM1, rm2, confForRM2);

}

protected void startRMsWithCustomizedRMAppManager() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
Expand Down Expand Up @@ -463,9 +462,7 @@ private void registerNode(String host, int memory, int vCores) throws
int attempts = 10;
Collection<Plan> plans;
do {
DrainDispatcher dispatcher =
(DrainDispatcher) resourceManager.getRMContext().getDispatcher();
dispatcher.await();
resourceManager.drainEvents();
LOG.info("Waiting for node capacity to be added to plan");
plans = resourceManager.getRMContext().getReservationSystem()
.getAllPlans().values();
Expand Down
Loading

0 comments on commit 4d1fac5

Please sign in to comment.