From 4d1fac5df2508011adfc4c5d683beb00fd5ced45 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Fri, 31 Mar 2017 10:05:34 -0500 Subject: [PATCH] YARN-6411. Clean up the overwrite of createDispatcher() in subclass of MockRM. Contributed by Yufei Gu --- .../v2/app/rm/TestRMContainerAllocator.java | 453 ++++++++---------- .../api/impl/TestAMRMClientOnRMRestart.java | 59 +-- .../server/resourcemanager/ACLsTestBase.java | 10 - .../server/resourcemanager/RMHATestBase.java | 20 +- .../ReservationACLsTestBase.java | 5 +- .../TestApplicationCleanup.java | 44 +- .../TestApplicationMasterLauncher.java | 11 +- .../TestApplicationMasterService.java | 19 +- .../TestNodeBlacklistingOnAMFailures.java | 41 +- .../TestReservationSystemWithRMHA.java | 5 +- .../TestAMRMRPCNodeUpdates.java | 18 +- .../resourcetracker/TestNMReconnect.java | 14 +- .../rmcontainer/TestRMContainerImpl.java | 1 - .../capacity/TestApplicationPriority.java | 29 +- .../security/TestClientToAMTokens.java | 23 +- 15 files changed, 277 insertions(+), 475 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index e6aee6ea997f4..933bd013ef64e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -179,21 +179,19 @@ public void testSimple() throws Exception { Configuration conf = new Configuration(); MyResourceManager rm = new MyResourceManager(conf); rm.start(); - DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() - .getDispatcher(); // Submit the application RMApp app = rm.submitApp(1024); - dispatcher.await(); + rm.drainEvents(); MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); amNodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm.drainEvents(); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); @@ -207,7 +205,7 @@ public void testSimple() throws Exception { MockNM nodeManager1 = rm.registerNode("h1:1234", 10240); MockNM nodeManager2 = rm.registerNode("h2:1234", 10240); MockNM nodeManager3 = rm.registerNode("h3:1234", 10240); - dispatcher.await(); + rm.drainEvents(); // create the container request ContainerRequestEvent event1 = createReq(jobId, 1, 1024, @@ -222,7 +220,7 @@ public void testSimple() throws Exception { // this tells the scheduler about the requests // as nodes are not added, no allocations List assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); Assert.assertEquals(4, rm.getMyFifoScheduler().lastAsk.size()); @@ -234,7 +232,7 @@ public void testSimple() throws Exception { // this tells the scheduler about the requests // as nodes are not added, no allocations assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); Assert.assertEquals(3, rm.getMyFifoScheduler().lastAsk.size()); @@ -242,18 +240,18 @@ public void testSimple() throws Exception { nodeManager1.nodeHeartbeat(true); // Node heartbeat nodeManager2.nodeHeartbeat(true); // Node heartbeat nodeManager3.nodeHeartbeat(true); // Node heartbeat - dispatcher.await(); + rm.drainEvents(); assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals(0, rm.getMyFifoScheduler().lastAsk.size()); checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 }, assigned, false); // check that the assigned container requests are cancelled - assigned = allocator.schedule(); - dispatcher.await(); - Assert.assertEquals(5, rm.getMyFifoScheduler().lastAsk.size()); + allocator.schedule(); + rm.drainEvents(); + Assert.assertEquals(5, rm.getMyFifoScheduler().lastAsk.size()); } @Test @@ -269,21 +267,19 @@ public void testMapNodeLocality() throws Exception { Configuration conf = new Configuration(); MyResourceManager rm = new MyResourceManager(conf); rm.start(); - DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() - .getDispatcher(); // Submit the application RMApp app = rm.submitApp(1024); - dispatcher.await(); + rm.drainEvents(); MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); amNodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm.drainEvents(); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); @@ -297,7 +293,7 @@ public void testMapNodeLocality() throws Exception { MockNM nodeManager1 = rm.registerNode("h1:1234", 3072); // can assign 2 maps rm.registerNode("h2:1234", 10240); // wont heartbeat on node local node MockNM nodeManager3 = rm.registerNode("h3:1234", 1536); // assign 1 map - dispatcher.await(); + rm.drainEvents(); // create the container requests for maps ContainerRequestEvent event1 = createReq(jobId, 1, 1024, @@ -313,7 +309,7 @@ public void testMapNodeLocality() throws Exception { // this tells the scheduler about the requests // as nodes are not added, no allocations List assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); // update resources in scheduler @@ -323,10 +319,10 @@ public void testMapNodeLocality() throws Exception { // Node heartbeat from node-local next. This allocates 2 node local // containers for task1 and task2. These should be matched with those tasks. nodeManager1.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 }, assigned, false); // remove the rack-local assignment that should have happened for task3 @@ -350,21 +346,19 @@ public void testResource() throws Exception { Configuration conf = new Configuration(); MyResourceManager rm = new MyResourceManager(conf); rm.start(); - DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() - .getDispatcher(); // Submit the application RMApp app = rm.submitApp(1024); - dispatcher.await(); + rm.drainEvents(); MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); amNodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm.drainEvents(); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); @@ -378,7 +372,7 @@ public void testResource() throws Exception { MockNM nodeManager1 = rm.registerNode("h1:1234", 10240); MockNM nodeManager2 = rm.registerNode("h2:1234", 10240); MockNM nodeManager3 = rm.registerNode("h3:1234", 10240); - dispatcher.await(); + rm.drainEvents(); // create the container request ContainerRequestEvent event1 = createReq(jobId, 1, 1024, @@ -393,17 +387,17 @@ public void testResource() throws Exception { // this tells the scheduler about the requests // as nodes are not added, no allocations List assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); // update resources in scheduler nodeManager1.nodeHeartbeat(true); // Node heartbeat nodeManager2.nodeHeartbeat(true); // Node heartbeat nodeManager3.nodeHeartbeat(true); // Node heartbeat - dispatcher.await(); + rm.drainEvents(); assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); checkAssignments(new ContainerRequestEvent[] { event1, event2 }, assigned, false); } @@ -416,19 +410,17 @@ public void testReducerRampdownDiagnostics() throws Exception { conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f); final MyResourceManager rm = new MyResourceManager(conf); rm.start(); - final DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() - .getDispatcher(); final RMApp app = rm.submitApp(1024); - dispatcher.await(); + rm.drainEvents(); final String host = "host1"; final MockNM nm = rm.registerNode(String.format("%s:1234", host), 2048); nm.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm.drainEvents(); final JobId jobId = MRBuilderUtils .newJobId(appAttemptId.getApplicationId(), 0); final Job mockJob = mock(Job.class); @@ -438,20 +430,20 @@ public void testReducerRampdownDiagnostics() throws Exception { final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, appAttemptId, mockJob, SystemClock.getInstance()); // add resources to scheduler - dispatcher.await(); + rm.drainEvents(); // create the container request final String[] locations = new String[] { host }; allocator.sendRequest(createReq(jobId, 0, 1024, locations, false, true)); for (int i = 0; i < 1;) { - dispatcher.await(); + rm.drainEvents(); i += allocator.schedule().size(); nm.nodeHeartbeat(true); } allocator.sendRequest(createReq(jobId, 0, 1024, locations, true, false)); while (allocator.getTaskAttemptKillEvents().size() == 0) { - dispatcher.await(); + rm.drainEvents(); allocator.schedule().size(); nm.nodeHeartbeat(true); } @@ -468,21 +460,19 @@ public void testPreemptReducers() throws Exception { Configuration conf = new Configuration(); MyResourceManager rm = new MyResourceManager(conf); rm.start(); - DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() - .getDispatcher(); // Submit the application RMApp app = rm.submitApp(1024); - dispatcher.await(); + rm.drainEvents(); MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); amNodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm.drainEvents(); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); @@ -521,21 +511,19 @@ public void testNonAggressivelyPreemptReducers() throws Exception { MyResourceManager rm = new MyResourceManager(conf); rm.start(); - DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() - .getDispatcher(); // Submit the application RMApp app = rm.submitApp(1024); - dispatcher.await(); + rm.drainEvents(); MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); amNodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm.drainEvents(); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); @@ -584,21 +572,19 @@ public void testUnconditionalPreemptReducers() throws Exception { MyResourceManager rm = new MyResourceManager(conf); rm.start(); rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(8192, 8)); - DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() - .getDispatcher(); // Submit the application RMApp app = rm.submitApp(1024); - dispatcher.await(); + rm.drainEvents(); MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); amNodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm.drainEvents(); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); @@ -639,18 +625,16 @@ public void testExcessReduceContainerAssign() throws Exception { conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f); final MyResourceManager2 rm = new MyResourceManager2(conf); rm.start(); - final DrainDispatcher dispatcher = (DrainDispatcher)rm.getRMContext() - .getDispatcher(); final RMApp app = rm.submitApp(2048); - dispatcher.await(); + rm.drainEvents(); final String host = "host1"; final MockNM nm = rm.registerNode(String.format("%s:1234", host), 4096); nm.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm.drainEvents(); final JobId jobId = MRBuilderUtils .newJobId(appAttemptId.getApplicationId(), 0); final Job mockJob = mock(Job.class); @@ -666,14 +650,14 @@ public void testExcessReduceContainerAssign() throws Exception { allocator.scheduleAllReduces(); allocator.makeRemoteRequest(); nm.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); allocator.sendRequest(createReq(jobId, 1, 1024, locations, false, false)); int assignedContainer; for (assignedContainer = 0; assignedContainer < 1;) { assignedContainer += allocator.schedule().size(); nm.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); } // only 1 allocated container should be assigned Assert.assertEquals(assignedContainer, 1); @@ -773,21 +757,19 @@ public void testMapReduceScheduling() throws Exception { Configuration conf = new Configuration(); MyResourceManager rm = new MyResourceManager(conf); rm.start(); - DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() - .getDispatcher(); // Submit the application RMApp app = rm.submitApp(1024); - dispatcher.await(); + rm.drainEvents(); MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); amNodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm.drainEvents(); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); @@ -801,7 +783,7 @@ public void testMapReduceScheduling() throws Exception { MockNM nodeManager1 = rm.registerNode("h1:1234", 1024); MockNM nodeManager2 = rm.registerNode("h2:1234", 10240); MockNM nodeManager3 = rm.registerNode("h3:1234", 10240); - dispatcher.await(); + rm.drainEvents(); // create the container request // send MAP request @@ -822,17 +804,17 @@ public void testMapReduceScheduling() throws Exception { // this tells the scheduler about the requests // as nodes are not added, no allocations List assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); // update resources in scheduler nodeManager1.nodeHeartbeat(true); // Node heartbeat nodeManager2.nodeHeartbeat(true); // Node heartbeat nodeManager3.nodeHeartbeat(true); // Node heartbeat - dispatcher.await(); + rm.drainEvents(); assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); checkAssignments(new ContainerRequestEvent[] { event1, event3 }, assigned, false); @@ -863,11 +845,6 @@ public void serviceStart() throws Exception { MyResourceManager.setClusterTimeStamp(fakeClusterTimeStamp); } - @Override - protected Dispatcher createDispatcher() { - return new DrainDispatcher(); - } - @Override protected EventHandler createSchedulerEventDispatcher() { // Dispatch inline for test sanity @@ -912,16 +889,16 @@ public void testReportedAppProgress() throws Exception { // Submit the application RMApp rmApp = rm.submitApp(1024); - rmDispatcher.await(); + rm.drainEvents(); MockNM amNodeManager = rm.registerNode("amNM:1234", 21504); amNodeManager.nodeHeartbeat(true); - rmDispatcher.await(); + rm.drainEvents(); final ApplicationAttemptId appAttemptId = rmApp.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - rmDispatcher.await(); + rm.drainEvents(); MRApp mrApp = new MRApp(appAttemptId, ContainerId.newContainerId( appAttemptId, 0), 10, 10, false, this.getClass().getName(), true, 1) { @@ -959,11 +936,11 @@ protected ContainerAllocator createContainerAllocator( amDispatcher.await(); allocator.schedule(); - rmDispatcher.await(); + rm.drainEvents(); amNodeManager.nodeHeartbeat(true); - rmDispatcher.await(); + rm.drainEvents(); allocator.schedule(); - rmDispatcher.await(); + rm.drainEvents(); // Wait for all map-tasks to be running for (Task t : job.getTasks().values()) { @@ -973,7 +950,7 @@ protected ContainerAllocator createContainerAllocator( } allocator.schedule(); // Send heartbeat - rmDispatcher.await(); + rm.drainEvents(); Assert.assertEquals(0.05f, job.getProgress(), 0.001f); Assert.assertEquals(0.05f, rmApp.getProgress(), 0.001f); @@ -981,14 +958,14 @@ protected ContainerAllocator createContainerAllocator( Iterator it = job.getTasks().values().iterator(); finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 1); allocator.schedule(); - rmDispatcher.await(); + rm.drainEvents(); Assert.assertEquals(0.095f, job.getProgress(), 0.001f); Assert.assertEquals(0.095f, rmApp.getProgress(), 0.001f); // Finish off 7 more so that map-progress is 80% finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 7); allocator.schedule(); - rmDispatcher.await(); + rm.drainEvents(); Assert.assertEquals(0.41f, job.getProgress(), 0.001f); Assert.assertEquals(0.41f, rmApp.getProgress(), 0.001f); @@ -996,11 +973,11 @@ protected ContainerAllocator createContainerAllocator( finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 2); allocator.schedule(); - rmDispatcher.await(); + rm.drainEvents(); amNodeManager.nodeHeartbeat(true); - rmDispatcher.await(); + rm.drainEvents(); allocator.schedule(); - rmDispatcher.await(); + rm.drainEvents(); // Wait for all reduce-tasks to be running for (Task t : job.getTasks().values()) { @@ -1013,14 +990,14 @@ protected ContainerAllocator createContainerAllocator( finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 2); allocator.schedule(); - rmDispatcher.await(); + rm.drainEvents(); Assert.assertEquals(0.59f, job.getProgress(), 0.001f); Assert.assertEquals(0.59f, rmApp.getProgress(), 0.001f); // Finish off the remaining 8 reduces. finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 8); allocator.schedule(); - rmDispatcher.await(); + rm.drainEvents(); // Remaining is JobCleanup Assert.assertEquals(0.95f, job.getProgress(), 0.001f); Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f); @@ -1064,16 +1041,16 @@ public void testReportedAppProgressWithOnlyMaps() throws Exception { // Submit the application RMApp rmApp = rm.submitApp(1024); - rmDispatcher.await(); + rm.drainEvents(); MockNM amNodeManager = rm.registerNode("amNM:1234", 11264); amNodeManager.nodeHeartbeat(true); - rmDispatcher.await(); + rm.drainEvents(); final ApplicationAttemptId appAttemptId = rmApp.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - rmDispatcher.await(); + rm.drainEvents(); MRApp mrApp = new MRApp(appAttemptId, ContainerId.newContainerId( appAttemptId, 0), 10, 0, false, this.getClass().getName(), true, 1) { @@ -1109,11 +1086,11 @@ protected ContainerAllocator createContainerAllocator( amDispatcher.await(); allocator.schedule(); - rmDispatcher.await(); + rm.drainEvents(); amNodeManager.nodeHeartbeat(true); - rmDispatcher.await(); + rm.drainEvents(); allocator.schedule(); - rmDispatcher.await(); + rm.drainEvents(); // Wait for all map-tasks to be running for (Task t : job.getTasks().values()) { @@ -1121,7 +1098,7 @@ protected ContainerAllocator createContainerAllocator( } allocator.schedule(); // Send heartbeat - rmDispatcher.await(); + rm.drainEvents(); Assert.assertEquals(0.05f, job.getProgress(), 0.001f); Assert.assertEquals(0.05f, rmApp.getProgress(), 0.001f); @@ -1130,21 +1107,21 @@ protected ContainerAllocator createContainerAllocator( // Finish off 1 map so that map-progress is 10% finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 1); allocator.schedule(); - rmDispatcher.await(); + rm.drainEvents(); Assert.assertEquals(0.14f, job.getProgress(), 0.001f); Assert.assertEquals(0.14f, rmApp.getProgress(), 0.001f); // Finish off 5 more map so that map-progress is 60% finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 5); allocator.schedule(); - rmDispatcher.await(); + rm.drainEvents(); Assert.assertEquals(0.59f, job.getProgress(), 0.001f); Assert.assertEquals(0.59f, rmApp.getProgress(), 0.001f); // Finish off remaining map so that map-progress is 100% finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 4); allocator.schedule(); - rmDispatcher.await(); + rm.drainEvents(); Assert.assertEquals(0.95f, job.getProgress(), 0.001f); Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f); } @@ -1154,21 +1131,19 @@ public void testUpdatedNodes() throws Exception { Configuration conf = new Configuration(); MyResourceManager rm = new MyResourceManager(conf); rm.start(); - DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() - .getDispatcher(); // Submit the application RMApp app = rm.submitApp(1024); - dispatcher.await(); + rm.drainEvents(); MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); amNodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); - + rm.drainEvents(); + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, @@ -1177,7 +1152,7 @@ public void testUpdatedNodes() throws Exception { // add resources to scheduler MockNM nm1 = rm.registerNode("h1:1234", 10240); MockNM nm2 = rm.registerNode("h2:1234", 10240); - dispatcher.await(); + rm.drainEvents(); // create the map container request ContainerRequestEvent event = createReq(jobId, 1, 1024, @@ -1193,16 +1168,16 @@ public void testUpdatedNodes() throws Exception { // this tells the scheduler about the requests List assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); nm1.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals(1, allocator.getJobUpdatedNodeEvents().size()); Assert.assertEquals(3, allocator.getJobUpdatedNodeEvents().get(0).getUpdatedNodes().size()); allocator.getJobUpdatedNodeEvents().clear(); // get the assignment assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals(1, assigned.size()); Assert.assertEquals(nm1.getNodeId(), assigned.get(0).getContainer().getNodeId()); // no updated nodes reported @@ -1212,11 +1187,11 @@ public void testUpdatedNodes() throws Exception { // mark nodes bad nm1.nodeHeartbeat(false); nm2.nodeHeartbeat(false); - dispatcher.await(); - + rm.drainEvents(); + // schedule response returns updated nodes assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals(0, assigned.size()); // updated nodes are reported Assert.assertEquals(1, allocator.getJobUpdatedNodeEvents().size()); @@ -1227,7 +1202,7 @@ public void testUpdatedNodes() throws Exception { allocator.getTaskAttemptKillEvents().clear(); assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals(0, assigned.size()); // no updated nodes reported Assert.assertTrue(allocator.getJobUpdatedNodeEvents().isEmpty()); @@ -1247,21 +1222,19 @@ public void testBlackListedNodes() throws Exception { MyResourceManager rm = new MyResourceManager(conf); rm.start(); - DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() - .getDispatcher(); // Submit the application RMApp app = rm.submitApp(1024); - dispatcher.await(); + rm.drainEvents(); MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); amNodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm.drainEvents(); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); @@ -1275,7 +1248,7 @@ public void testBlackListedNodes() throws Exception { MockNM nodeManager1 = rm.registerNode("h1:1234", 10240); MockNM nodeManager2 = rm.registerNode("h2:1234", 10240); MockNM nodeManager3 = rm.registerNode("h3:1234", 10240); - dispatcher.await(); + rm.drainEvents(); // create the container request ContainerRequestEvent event1 = createReq(jobId, 1, 1024, @@ -1295,7 +1268,7 @@ public void testBlackListedNodes() throws Exception { // this tells the scheduler about the requests // as nodes are not added, no allocations List assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); // Send events to blacklist nodes h1 and h2 @@ -1307,28 +1280,28 @@ public void testBlackListedNodes() throws Exception { // update resources in scheduler nodeManager1.nodeHeartbeat(true); // Node heartbeat nodeManager2.nodeHeartbeat(true); // Node heartbeat - dispatcher.await(); + rm.drainEvents(); assigned = allocator.schedule(); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); assertBlacklistAdditionsAndRemovals(2, 0, rm); // mark h1/h2 as bad nodes nodeManager1.nodeHeartbeat(false); nodeManager2.nodeHeartbeat(false); - dispatcher.await(); + rm.drainEvents(); assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); assertBlacklistAdditionsAndRemovals(0, 0, rm); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); nodeManager3.nodeHeartbeat(true); // Node heartbeat - dispatcher.await(); + rm.drainEvents(); assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); assertBlacklistAdditionsAndRemovals(0, 0, rm); Assert.assertTrue("No of assignments must be 3", assigned.size() == 3); @@ -1352,24 +1325,22 @@ public void testIgnoreBlacklisting() throws Exception { MyResourceManager rm = new MyResourceManager(conf); rm.start(); - DrainDispatcher dispatcher = - (DrainDispatcher) rm.getRMContext().getDispatcher(); // Submit the application RMApp app = rm.submitApp(1024); - dispatcher.await(); + rm.drainEvents(); MockNM[] nodeManagers = new MockNM[10]; int nmNum = 0; List assigned = null; - nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher); + nodeManagers[nmNum] = registerNodeManager(nmNum++, rm); nodeManagers[0].nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt().getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm.drainEvents(); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); @@ -1382,7 +1353,7 @@ public void testIgnoreBlacklisting() throws Exception { // Known=1, blacklisted=0, ignore should be false - assign first container assigned = getContainerOnHost(jobId, 1, 1024, new String[] { "h1" }, - nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm); + nodeManagers[0], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); LOG.info("Failing container _1 on H1 (Node should be blacklisted and" @@ -1397,47 +1368,47 @@ public void testIgnoreBlacklisting() throws Exception { // The current call will send blacklisted node "h1" to RM assigned = getContainerOnHost(jobId, 2, 1024, new String[] { "h1" }, - nodeManagers[0], dispatcher, allocator, 1, 0, 0, 1, rm); + nodeManagers[0], allocator, 1, 0, 0, 1, rm); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); // Known=1, blacklisted=1, ignore should be true - assign 1 assigned = getContainerOnHost(jobId, 2, 1024, new String[] { "h1" }, - nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm); + nodeManagers[0], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); - nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher); + nodeManagers[nmNum] = registerNodeManager(nmNum++, rm); // Known=2, blacklisted=1, ignore should be true - assign 1 anyway. assigned = getContainerOnHost(jobId, 3, 1024, new String[] { "h2" }, - nodeManagers[1], dispatcher, allocator, 0, 0, 0, 0, rm); + nodeManagers[1], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); - nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher); + nodeManagers[nmNum] = registerNodeManager(nmNum++, rm); // Known=3, blacklisted=1, ignore should be true - assign 1 anyway. assigned = getContainerOnHost(jobId, 4, 1024, new String[] { "h3" }, - nodeManagers[2], dispatcher, allocator, 0, 0, 0, 0, rm); + nodeManagers[2], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); // Known=3, blacklisted=1, ignore should be true - assign 1 assigned = getContainerOnHost(jobId, 5, 1024, new String[] { "h1" }, - nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm); + nodeManagers[0], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); - nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher); + nodeManagers[nmNum] = registerNodeManager(nmNum++, rm); // Known=4, blacklisted=1, ignore should be false - assign 1 anyway assigned = getContainerOnHost(jobId, 6, 1024, new String[] { "h4" }, - nodeManagers[3], dispatcher, allocator, 0, 0, 1, 0, rm); + nodeManagers[3], allocator, 0, 0, 1, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); // Test blacklisting re-enabled. // Known=4, blacklisted=1, ignore should be false - no assignment on h1 assigned = getContainerOnHost(jobId, 7, 1024, new String[] { "h1" }, - nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm); + nodeManagers[0], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); // RMContainerRequestor would have created a replacement request. @@ -1450,61 +1421,61 @@ public void testIgnoreBlacklisting() throws Exception { // container for the same reason above. assigned = getContainerOnHost(jobId, 8, 1024, new String[] { "h1" }, - nodeManagers[0], dispatcher, allocator, 1, 0, 0, 2, rm); + nodeManagers[0], allocator, 1, 0, 0, 2, rm); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); // Known=4, blacklisted=2, ignore should be true. Should assign 2 // containers. assigned = getContainerOnHost(jobId, 8, 1024, new String[] { "h1" }, - nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm); + nodeManagers[0], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 2", 2, assigned.size()); // Known=4, blacklisted=2, ignore should be true. assigned = getContainerOnHost(jobId, 9, 1024, new String[] { "h2" }, - nodeManagers[1], dispatcher, allocator, 0, 0, 0, 0, rm); + nodeManagers[1], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); // Test blacklist while ignore blacklisting enabled ContainerFailedEvent f3 = createFailEvent(jobId, 4, "h3", false); allocator.sendFailure(f3); - nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher); + nodeManagers[nmNum] = registerNodeManager(nmNum++, rm); // Known=5, blacklisted=3, ignore should be true. assigned = getContainerOnHost(jobId, 10, 1024, new String[] { "h3" }, - nodeManagers[2], dispatcher, allocator, 0, 0, 0, 0, rm); + nodeManagers[2], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); // Assign on 5 more nodes - to re-enable blacklisting for (int i = 0; i < 5; i++) { - nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher); + nodeManagers[nmNum] = registerNodeManager(nmNum++, rm); assigned = getContainerOnHost(jobId, 11 + i, 1024, new String[] { String.valueOf(5 + i) }, nodeManagers[4 + i], - dispatcher, allocator, 0, 0, (i == 4 ? 3 : 0), 0, rm); + allocator, 0, 0, (i == 4 ? 3 : 0), 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); } // Test h3 (blacklisted while ignoring blacklisting) is blacklisted. assigned = getContainerOnHost(jobId, 20, 1024, new String[] { "h3" }, - nodeManagers[2], dispatcher, allocator, 0, 0, 0, 0, rm); + nodeManagers[2], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); } - private MockNM registerNodeManager(int i, MyResourceManager rm, - DrainDispatcher dispatcher) throws Exception { + private MockNM registerNodeManager(int i, MyResourceManager rm) + throws Exception { MockNM nm = rm.registerNode("h" + (i + 1) + ":1234", 10240); - dispatcher.await(); + rm.drainEvents(); return nm; } private List getContainerOnHost(JobId jobId, int taskAttemptId, int memory, String[] hosts, MockNM mockNM, - DrainDispatcher dispatcher, MyContainerAllocator allocator, + MyContainerAllocator allocator, int expectedAdditions1, int expectedRemovals1, int expectedAdditions2, int expectedRemovals2, MyResourceManager rm) throws Exception { @@ -1514,17 +1485,17 @@ List getContainerOnHost(JobId jobId, // Send the request to the RM List assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); assertBlacklistAdditionsAndRemovals( expectedAdditions1, expectedRemovals1, rm); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); // Heartbeat from the required nodeManager mockNM.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); assertBlacklistAdditionsAndRemovals( expectedAdditions2, expectedRemovals2, rm); return assigned; @@ -1542,21 +1513,19 @@ public void testBlackListedNodesWithSchedulingToThatNode() throws Exception { MyResourceManager rm = new MyResourceManager(conf); rm.start(); - DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() - .getDispatcher(); // Submit the application RMApp app = rm.submitApp(1024); - dispatcher.await(); + rm.drainEvents(); MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); amNodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm.drainEvents(); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); @@ -1569,7 +1538,7 @@ public void testBlackListedNodesWithSchedulingToThatNode() throws Exception { // add resources to scheduler MockNM nodeManager1 = rm.registerNode("h1:1234", 10240); MockNM nodeManager3 = rm.registerNode("h3:1234", 10240); - dispatcher.await(); + rm.drainEvents(); LOG.info("Requesting 1 Containers _1 on H1"); // create the container request @@ -1581,17 +1550,17 @@ public void testBlackListedNodesWithSchedulingToThatNode() throws Exception { // this tells the scheduler about the requests // as nodes are not added, no allocations List assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); LOG.info("h1 Heartbeat (To actually schedule the containers)"); // update resources in scheduler nodeManager1.nodeHeartbeat(true); // Node heartbeat - dispatcher.await(); + rm.drainEvents(); LOG.info("RM Heartbeat (To process the scheduled containers)"); assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); assertBlacklistAdditionsAndRemovals(0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); @@ -1608,7 +1577,7 @@ public void testBlackListedNodesWithSchedulingToThatNode() throws Exception { //Update the Scheduler with the new requests. assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); assertBlacklistAdditionsAndRemovals(1, 0, rm); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); @@ -1623,11 +1592,11 @@ public void testBlackListedNodesWithSchedulingToThatNode() throws Exception { LOG.info("h1 Heartbeat (To actually schedule the containers)"); // update resources in scheduler nodeManager1.nodeHeartbeat(true); // Node heartbeat - dispatcher.await(); + rm.drainEvents(); LOG.info("RM Heartbeat (To process the scheduled containers)"); assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); assertBlacklistAdditionsAndRemovals(0, 0, rm); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); @@ -1636,19 +1605,19 @@ public void testBlackListedNodesWithSchedulingToThatNode() throws Exception { //Send a release for the p:5 container + another request. LOG.info("RM Heartbeat (To process the re-scheduled containers)"); assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); assertBlacklistAdditionsAndRemovals(0, 0, rm); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); //Hearbeat from H3 to schedule on this host. LOG.info("h3 Heartbeat (To re-schedule the containers)"); nodeManager3.nodeHeartbeat(true); // Node heartbeat - dispatcher.await(); + rm.drainEvents(); LOG.info("RM Heartbeat (To process the re-scheduled containers for H3)"); assigned = allocator.schedule(); assertBlacklistAdditionsAndRemovals(0, 0, rm); - dispatcher.await(); + rm.drainEvents(); // For debugging for (TaskAttemptContainerAssignedEvent assig : assigned) { @@ -2229,22 +2198,20 @@ public void testCompletedTasksRecalculateSchedule() throws Exception { Configuration conf = new Configuration(); final MyResourceManager rm = new MyResourceManager(conf); rm.start(); - DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() - .getDispatcher(); // Submit the application RMApp app = rm.submitApp(1024); - dispatcher.await(); + rm.drainEvents(); // Make a node to register so as to launch the AM. MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); amNodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm.drainEvents(); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job job = mock(Job.class); @@ -2381,21 +2348,19 @@ public void testUnregistrationOnlyIfRegistered() throws Exception { Configuration conf = new Configuration(); final MyResourceManager rm = new MyResourceManager(conf); rm.start(); - DrainDispatcher rmDispatcher = - (DrainDispatcher) rm.getRMContext().getDispatcher(); // Submit the application RMApp rmApp = rm.submitApp(1024); - rmDispatcher.await(); + rm.drainEvents(); MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 11264); amNodeManager.nodeHeartbeat(true); - rmDispatcher.await(); + rm.drainEvents(); final ApplicationAttemptId appAttemptId = rmApp.getCurrentAppAttempt().getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - rmDispatcher.await(); + rm.drainEvents(); MRApp mrApp = new MRApp(appAttemptId, ContainerId.newContainerId(appAttemptId, 0), 10, @@ -2454,22 +2419,20 @@ public void testRMContainerAllocatorResendsRequestsOnRMRestart() MyResourceManager rm1 = new MyResourceManager(conf, memStore); rm1.start(); - DrainDispatcher dispatcher = - (DrainDispatcher) rm1.getRMContext().getDispatcher(); // Submit the application RMApp app = rm1.submitApp(1024); - dispatcher.await(); + rm1.drainEvents(); MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService()); nm1.registerNode(); nm1.nodeHeartbeat(true); // Node heartbeat - dispatcher.await(); + rm1.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt().getAppAttemptId(); rm1.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm1.drainEvents(); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); @@ -2498,7 +2461,7 @@ public void testRMContainerAllocatorResendsRequestsOnRMRestart() // send allocate request and 1 blacklisted nodes List assignedContainers = allocator.schedule(); - dispatcher.await(); + rm1.drainEvents(); Assert.assertEquals("No of assignments must be 0", 0, assignedContainers.size()); // Why ask is 3, not 4? --> ask from blacklisted node h2 is removed @@ -2506,11 +2469,11 @@ public void testRMContainerAllocatorResendsRequestsOnRMRestart() assertBlacklistAdditionsAndRemovals(1, 0, rm1); nm1.nodeHeartbeat(true); // Node heartbeat - dispatcher.await(); + rm1.drainEvents(); // Step-2 : 2 containers are allocated by RM. assignedContainers = allocator.schedule(); - dispatcher.await(); + rm1.drainEvents(); Assert.assertEquals("No of assignments must be 2", 2, assignedContainers.size()); assertAsksAndReleases(0, 0, rm1); @@ -2545,7 +2508,6 @@ public void testRMContainerAllocatorResendsRequestsOnRMRestart() rm2.start(); nm1.setResourceTrackerService(rm2.getResourceTrackerService()); allocator.updateSchedulerProxy(rm2); - dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher(); // NM should be rebooted on heartbeat, even first heartbeat for nm2 NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true); @@ -2555,7 +2517,7 @@ public void testRMContainerAllocatorResendsRequestsOnRMRestart() nm1 = new MockNM("h1:1234", 10240, rm2.getResourceTrackerService()); nm1.registerNode(); nm1.nodeHeartbeat(true); - dispatcher.await(); + rm2.drainEvents(); // Step-4 : On RM restart, AM(does not know RM is restarted) sends // additional containerRequest(event4) and blacklisted nodes. @@ -2576,7 +2538,7 @@ public void testRMContainerAllocatorResendsRequestsOnRMRestart() // send allocate request to 2nd RM and get resync command allocator.schedule(); - dispatcher.await(); + rm2.drainEvents(); // Step-5 : On Resync,AM sends all outstanding // asks,release,blacklistAaddition @@ -2587,16 +2549,16 @@ public void testRMContainerAllocatorResendsRequestsOnRMRestart() // send all outstanding request again. assignedContainers = allocator.schedule(); - dispatcher.await(); + rm2.drainEvents(); assertAsksAndReleases(3, 2, rm2); assertBlacklistAdditionsAndRemovals(2, 0, rm2); nm1.nodeHeartbeat(true); - dispatcher.await(); + rm2.drainEvents(); // Step-6 : RM allocates containers i.e event3,event4 and cRequest5 assignedContainers = allocator.schedule(); - dispatcher.await(); + rm2.drainEvents(); Assert.assertEquals("Number of container should be 3", 3, assignedContainers.size()); @@ -2699,20 +2661,19 @@ public void testRMUnavailable() MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS, 0); MyResourceManager rm1 = new MyResourceManager(conf); rm1.start(); - DrainDispatcher dispatcher = - (DrainDispatcher) rm1.getRMContext().getDispatcher(); + RMApp app = rm1.submitApp(1024); - dispatcher.await(); + rm1.drainEvents(); MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService()); nm1.registerNode(); nm1.nodeHeartbeat(true); - dispatcher.await(); + rm1.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt().getAppAttemptId(); rm1.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm1.drainEvents(); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); @@ -2728,7 +2689,7 @@ public void testRMUnavailable() } catch (RMContainerAllocationException e) { Assert.assertTrue(e.getMessage().contains("Could not contact RM after")); } - dispatcher.await(); + rm1.drainEvents(); Assert.assertEquals("Should Have 1 Job Event", 1, allocator.jobEvents.size()); JobEvent event = allocator.jobEvents.get(0); @@ -2750,22 +2711,20 @@ public void testAMRMTokenUpdate() throws Exception { rm.start(); AMRMTokenSecretManager secretMgr = rm.getRMContext().getAMRMTokenSecretManager(); - DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() - .getDispatcher(); // Submit the application RMApp app = rm.submitApp(1024); - dispatcher.await(); + rm.drainEvents(); MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); amNodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() .getAppAttemptId(); final ApplicationId appId = app.getApplicationId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm.drainEvents(); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); final Job mockJob = mock(Job.class); @@ -2953,21 +2912,19 @@ public void testAttemptNotFoundCausesRMCommunicatorException() Configuration conf = new Configuration(); MyResourceManager rm = new MyResourceManager(conf); rm.start(); - DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() - .getDispatcher(); // Submit the application RMApp app = rm.submitApp(1024); - dispatcher.await(); + rm.drainEvents(); MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); amNodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm.drainEvents(); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); @@ -2989,21 +2946,19 @@ public void testUpdateAskOnRampDownAllReduces() throws Exception { Configuration conf = new Configuration(); MyResourceManager rm = new MyResourceManager(conf); rm.start(); - DrainDispatcher dispatcher = - (DrainDispatcher) rm.getRMContext().getDispatcher(); // Submit the application RMApp app = rm.submitApp(1024); - dispatcher.await(); + rm.drainEvents(); MockNM amNodeManager = rm.registerNode("amNM:1234", 1260); amNodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm.drainEvents(); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); @@ -3018,7 +2973,7 @@ public void testUpdateAskOnRampDownAllReduces() throws Exception { // Register nodes to RM. MockNM nodeManager = rm.registerNode("h1:1234", 1024); - dispatcher.await(); + rm.drainEvents(); // Request 2 maps and 1 reducer(sone on nodes which are not registered). ContainerRequestEvent event1 = @@ -3034,7 +2989,7 @@ public void testUpdateAskOnRampDownAllReduces() throws Exception { // This will tell the scheduler about the requests but there will be no // allocations as nodes are not added. allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); // Advance clock so that maps can be considered as hanging. clock.setTime(System.currentTimeMillis() + 500000L); @@ -3045,15 +3000,15 @@ public void testUpdateAskOnRampDownAllReduces() throws Exception { allocator.sendRequest(event4); allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); // Update resources in scheduler through node heartbeat from h1. nodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1024, 1)); allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); // One map is assigned. Assert.assertEquals(1, allocator.getAssignedRequests().maps.size()); @@ -3087,7 +3042,7 @@ public void testUpdateAskOnRampDownAllReduces() throws Exception { // On next allocate request to scheduler, headroom reported will be 0. rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(0, 0)); allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); // After allocate response from scheduler, all scheduled reduces are ramped // down and move to pending. 3 asks are also updated with 0 containers to // indicate ramping down of reduces to scheduler. @@ -3155,21 +3110,19 @@ public void testAvoidAskMoreReducersWhenReducerPreemptionIsRequired() Configuration conf = new Configuration(); MyResourceManager rm = new MyResourceManager(conf); rm.start(); - DrainDispatcher dispatcher = - (DrainDispatcher) rm.getRMContext().getDispatcher(); // Submit the application RMApp app = rm.submitApp(1024); - dispatcher.await(); + rm.drainEvents(); MockNM amNodeManager = rm.registerNode("amNM:1234", 1260); amNodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm.drainEvents(); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); @@ -3184,7 +3137,7 @@ public void testAvoidAskMoreReducersWhenReducerPreemptionIsRequired() // Register nodes to RM. MockNM nodeManager = rm.registerNode("h1:1234", 1024); - dispatcher.await(); + rm.drainEvents(); // Request 2 maps and 1 reducer(sone on nodes which are not registered). ContainerRequestEvent event1 = @@ -3200,7 +3153,7 @@ public void testAvoidAskMoreReducersWhenReducerPreemptionIsRequired() // This will tell the scheduler about the requests but there will be no // allocations as nodes are not added. allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); // Advance clock so that maps can be considered as hanging. clock.setTime(System.currentTimeMillis() + 500000L); @@ -3211,15 +3164,15 @@ public void testAvoidAskMoreReducersWhenReducerPreemptionIsRequired() allocator.sendRequest(event4); allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); // Update resources in scheduler through node heartbeat from h1. nodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1024, 1)); allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); // One map is assigned. Assert.assertEquals(1, allocator.getAssignedRequests().maps.size()); @@ -3256,7 +3209,7 @@ public void testAvoidAskMoreReducersWhenReducerPreemptionIsRequired() // On next allocate request to scheduler, headroom reported will be 2048. rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(2048, 0)); allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); // After allocate response from scheduler, all scheduled reduces are ramped // down and move to pending. 3 asks are also updated with 0 containers to // indicate ramping down of reduces to scheduler. @@ -3285,21 +3238,19 @@ public void testExcludeSchedReducesFromHeadroom() throws Exception { conf.setInt(MRJobConfig.MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC, -1); MyResourceManager rm = new MyResourceManager(conf); rm.start(); - DrainDispatcher dispatcher = - (DrainDispatcher) rm.getRMContext().getDispatcher(); // Submit the application RMApp app = rm.submitApp(1024); - dispatcher.await(); + rm.drainEvents(); MockNM amNodeManager = rm.registerNode("amNM:1234", 1260); amNodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm.drainEvents(); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); @@ -3315,10 +3266,10 @@ public void testExcludeSchedReducesFromHeadroom() throws Exception { appAttemptId, mockJob); MockNM nodeManager = rm.registerNode("h1:1234", 4096); - dispatcher.await(); + rm.drainEvents(); // Register nodes to RM. MockNM nodeManager2 = rm.registerNode("h2:1234", 1024); - dispatcher.await(); + rm.drainEvents(); // Request 2 maps and 1 reducer(sone on nodes which are not registered). ContainerRequestEvent event1 = @@ -3334,7 +3285,7 @@ public void testExcludeSchedReducesFromHeadroom() throws Exception { // This will tell the scheduler about the requests but there will be no // allocations as nodes are not added. allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); // Request for another reducer on h3 which has not registered. ContainerRequestEvent event4 = @@ -3342,15 +3293,15 @@ public void testExcludeSchedReducesFromHeadroom() throws Exception { allocator.sendRequest(event4); allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); // Update resources in scheduler through node heartbeat from h1. nodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(3072, 3)); allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); // Two maps are assigned. Assert.assertEquals(2, allocator.getAssignedRequests().maps.size()); @@ -3363,15 +3314,15 @@ public void testExcludeSchedReducesFromHeadroom() throws Exception { Assert.assertEquals(0, allocator.getAssignedRequests().maps.size()); nodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1024, 1)); allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); // h2 heartbeats. nodeManager2.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); // Send request for one more mapper. ContainerRequestEvent event5 = @@ -3380,7 +3331,7 @@ public void testExcludeSchedReducesFromHeadroom() throws Exception { rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(2048, 2)); allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); // One reducer is assigned and one map is scheduled Assert.assertEquals(1, allocator.getScheduledRequests().maps.size()); Assert.assertEquals(1, allocator.getAssignedRequests().reduces.size()); @@ -3388,7 +3339,7 @@ public void testExcludeSchedReducesFromHeadroom() throws Exception { // enough if scheduled reducers resources are deducted. rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1260, 2)); allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); // After allocate response, the one assigned reducer is preempted and killed Assert.assertEquals(1, MyContainerAllocator.getTaskAttemptKillEvents().size()); Assert.assertEquals(RMContainerAllocator.RAMPDOWN_DIAGNOSTIC, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java index 39a76333ab87a..fa3c6afeb36af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java @@ -49,8 +49,6 @@ import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; @@ -126,22 +124,20 @@ public void testAMRMClientResendsRequestsOnRMRestart() throws Exception { // Phase-1 Start 1st RM MyResourceManager rm1 = new MyResourceManager(conf, memStore); rm1.start(); - DrainDispatcher dispatcher = - (DrainDispatcher) rm1.getRMContext().getDispatcher(); // Submit the application RMApp app = rm1.submitApp(1024); - dispatcher.await(); + rm1.drainEvents(); MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService()); nm1.registerNode(); nm1.nodeHeartbeat(true); // Node heartbeat - dispatcher.await(); + rm1.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt().getAppAttemptId(); rm1.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm1.drainEvents(); org.apache.hadoop.security.token.Token token = rm1.getRMContext().getRMApps().get(appAttemptId.getApplicationId()) @@ -176,7 +172,7 @@ public void testAMRMClientResendsRequestsOnRMRestart() throws Exception { blacklistAdditions.remove("h2");// remove from local list AllocateResponse allocateResponse = amClient.allocate(0.1f); - dispatcher.await(); + rm1.drainEvents(); Assert.assertEquals("No of assignments must be 0", 0, allocateResponse .getAllocatedContainers().size()); @@ -189,10 +185,10 @@ public void testAMRMClientResendsRequestsOnRMRestart() throws Exception { // Step-2 : NM heart beat is sent. // On 2nd AM allocate request, RM allocates 3 containers to AM nm1.nodeHeartbeat(true); // Node heartbeat - dispatcher.await(); + rm1.drainEvents(); allocateResponse = amClient.allocate(0.2f); - dispatcher.await(); + rm1.drainEvents(); // 3 containers are allocated i.e for cRequest1, cRequest2 and cRequest3. Assert.assertEquals("No of assignments must be 0", 3, allocateResponse .getAllocatedContainers().size()); @@ -207,7 +203,7 @@ public void testAMRMClientResendsRequestsOnRMRestart() throws Exception { amClient.removeContainerRequest(cRequest3); allocateResponse = amClient.allocate(0.2f); - dispatcher.await(); + rm1.drainEvents(); Assert.assertEquals("No of assignments must be 0", 0, allocateResponse .getAllocatedContainers().size()); assertAsksAndReleases(4, 0, rm1); @@ -233,7 +229,7 @@ public void testAMRMClientResendsRequestsOnRMRestart() throws Exception { // request nm1.nodeHeartbeat(containerId.getApplicationAttemptId(), containerId.getContainerId(), ContainerState.RUNNING); - dispatcher.await(); + rm1.drainEvents(); amClient.requestContainerUpdate( container, UpdateContainerRequest.newInstance( container.getVersion(), container.getId(), @@ -242,7 +238,7 @@ public void testAMRMClientResendsRequestsOnRMRestart() throws Exception { it.remove(); allocateResponse = amClient.allocate(0.3f); - dispatcher.await(); + rm1.drainEvents(); Assert.assertEquals("No of assignments must be 0", 0, allocateResponse .getAllocatedContainers().size()); assertAsksAndReleases(3, pendingRelease, rm1); @@ -258,7 +254,6 @@ public void testAMRMClientResendsRequestsOnRMRestart() throws Exception { rm2.start(); nm1.setResourceTrackerService(rm2.getResourceTrackerService()); ((MyAMRMClientImpl) amClient).updateRMProxy(rm2); - dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher(); // NM should be rebooted on heartbeat, even first heartbeat for nm2 NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true); @@ -274,7 +269,7 @@ public void testAMRMClientResendsRequestsOnRMRestart() throws Exception { Collections.singletonList( containerId.getApplicationAttemptId().getApplicationId())); nm1.nodeHeartbeat(true); - dispatcher.await(); + rm2.drainEvents(); blacklistAdditions.add("h3"); amClient.updateBlacklist(blacklistAdditions, null); @@ -296,7 +291,7 @@ public void testAMRMClientResendsRequestsOnRMRestart() throws Exception { // containerRequest and blacklisted nodes. // Intern RM send resync command,AMRMClient resend allocate request allocateResponse = amClient.allocate(0.3f); - dispatcher.await(); + rm2.drainEvents(); completedContainer = allocateResponse.getCompletedContainersStatuses().size(); @@ -313,7 +308,7 @@ public void testAMRMClientResendsRequestsOnRMRestart() throws Exception { // Step-5 : Allocater after resync command allocateResponse = amClient.allocate(0.5f); - dispatcher.await(); + rm2.drainEvents(); Assert.assertEquals("No of assignments must be 0", 0, allocateResponse .getAllocatedContainers().size()); @@ -326,10 +321,10 @@ public void testAMRMClientResendsRequestsOnRMRestart() throws Exception { int count = 5; while (count-- > 0) { nm1.nodeHeartbeat(true); - dispatcher.await(); + rm2.drainEvents(); allocateResponse = amClient.allocate(0.5f); - dispatcher.await(); + rm2.drainEvents(); noAssignedContainer += allocateResponse.getAllocatedContainers().size(); if (noAssignedContainer == 3) { break; @@ -358,22 +353,20 @@ public void testAMRMClientForUnregisterAMOnRMRestart() throws Exception { // Phase-1 Start 1st RM MyResourceManager rm1 = new MyResourceManager(conf, memStore); rm1.start(); - DrainDispatcher dispatcher = - (DrainDispatcher) rm1.getRMContext().getDispatcher(); // Submit the application RMApp app = rm1.submitApp(1024); - dispatcher.await(); + rm1.drainEvents(); MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService()); nm1.registerNode(); nm1.nodeHeartbeat(true); // Node heartbeat - dispatcher.await(); + rm1.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt().getAppAttemptId(); rm1.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm1.drainEvents(); org.apache.hadoop.security.token.Token token = rm1.getRMContext().getRMApps().get(appAttemptId.getApplicationId()) @@ -393,7 +386,6 @@ public void testAMRMClientForUnregisterAMOnRMRestart() throws Exception { rm2.start(); nm1.setResourceTrackerService(rm2.getResourceTrackerService()); ((MyAMRMClientImpl) amClient).updateRMProxy(rm2); - dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher(); // NM should be rebooted on heartbeat, even first heartbeat for nm2 NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true); @@ -409,7 +401,7 @@ public void testAMRMClientForUnregisterAMOnRMRestart() throws Exception { Priority.newInstance(0), 0); nm1.registerNode(Arrays.asList(containerReport), null); nm1.nodeHeartbeat(true); - dispatcher.await(); + rm2.drainEvents(); amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, null); @@ -421,7 +413,6 @@ public void testAMRMClientForUnregisterAMOnRMRestart() throws Exception { amClient.stop(); rm1.stop(); rm2.stop(); - } @@ -439,22 +430,20 @@ public void testAMRMClientOnAMRMTokenRollOverOnRMRestart() throws Exception { // start first RM MyResourceManager2 rm1 = new MyResourceManager2(conf, memStore); rm1.start(); - DrainDispatcher dispatcher = - (DrainDispatcher) rm1.getRMContext().getDispatcher(); Long startTime = System.currentTimeMillis(); // Submit the application RMApp app = rm1.submitApp(1024); - dispatcher.await(); + rm1.drainEvents(); MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService()); nm1.registerNode(); nm1.nodeHeartbeat(true); // Node heartbeat - dispatcher.await(); + rm1.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt().getAppAttemptId(); rm1.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm1.drainEvents(); AMRMTokenSecretManager amrmTokenSecretManagerForRM1 = rm1.getRMContext().getAMRMTokenSecretManager(); @@ -513,7 +502,6 @@ public void testAMRMClientOnAMRMTokenRollOverOnRMRestart() throws Exception { rm2.start(); nm1.setResourceTrackerService(rm2.getResourceTrackerService()); ((MyAMRMClientImpl) amClient).updateRMProxy(rm2); - dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher(); AMRMTokenSecretManager amrmTokenSecretManagerForRM2 = rm2.getRMContext().getAMRMTokenSecretManager(); @@ -615,11 +603,6 @@ public void serviceStart() throws Exception { MyResourceManager.setClusterTimeStamp(fakeClusterTimeStamp); } - @Override - protected Dispatcher createDispatcher() { - return new DrainDispatcher(); - } - @Override protected EventHandler createSchedulerEventDispatcher() { // Dispatch inline for test sanity diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java index fbd5ac38dc008..100eb7f21a9c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java @@ -30,14 +30,9 @@ import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.junit.Before; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - public abstract class ACLsTestBase { protected static final String COMMON_USER = "common_user"; @@ -80,11 +75,6 @@ this.queueACLsManager, getRMContext() .getRMDelegationTokenSecretManager()); } - @Override - protected Dispatcher createDispatcher() { - return new DrainDispatcher(); - } - @Override protected void doSecureLogin() throws IOException { } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java index c9ce7d7a061e7..c95bcdfca9762 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java @@ -26,22 +26,17 @@ import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -108,20 +103,9 @@ protected MockAM launchAM(RMApp app, MockRM rm, MockNM nm) } protected void startRMs() throws IOException { - rm1 = new MockRM(confForRM1, null, false, false){ - @Override - protected Dispatcher createDispatcher() { - return new DrainDispatcher(); - } - }; - rm2 = new MockRM(confForRM2, null, false, false){ - @Override - protected Dispatcher createDispatcher() { - return new DrainDispatcher(); - } - }; + rm1 = new MockRM(confForRM1, null, false, false); + rm2 = new MockRM(confForRM2, null, false, false); startRMs(rm1, confForRM1, rm2, confForRM2); - } protected void startRMsWithCustomizedRMAppManager() throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ReservationACLsTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ReservationACLsTestBase.java index 03bc8897ab4dd..c8ee00e60bcb6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ReservationACLsTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ReservationACLsTestBase.java @@ -47,7 +47,6 @@ import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; import org.apache.hadoop.yarn.api.records.ReservationRequests; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; @@ -463,9 +462,7 @@ private void registerNode(String host, int memory, int vCores) throws int attempts = 10; Collection plans; do { - DrainDispatcher dispatcher = - (DrainDispatcher) resourceManager.getRMContext().getDispatcher(); - dispatcher.await(); + resourceManager.drainEvents(); LOG.info("Waiting for node capacity to be added to plan"); plans = resourceManager.getRMContext().getReservationSystem() .getAllPlans().values(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java index c4197a1c1f8b3..422b7eb88a218 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java @@ -40,8 +40,6 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; @@ -161,13 +159,7 @@ public void testContainerCleanup() throws Exception { Logger rootLogger = LogManager.getRootLogger(); rootLogger.setLevel(Level.DEBUG); - final DrainDispatcher dispatcher = new DrainDispatcher(); - MockRM rm = new MockRM() { - @Override - protected Dispatcher createDispatcher() { - return dispatcher; - } - }; + MockRM rm = new MockRM(); rm.start(); MockNM nm1 = rm.registerNode("127.0.0.1:1234", 5000); @@ -185,8 +177,8 @@ protected Dispatcher createDispatcher() { int request = 2; am.allocate("127.0.0.1" , 1000, request, new ArrayList()); - dispatcher.await(); - + rm.drainEvents(); + //kick the scheduler nm1.nodeHeartbeat(true); List conts = am.allocate(new ArrayList(), @@ -199,7 +191,7 @@ protected Dispatcher createDispatcher() { Thread.sleep(100); conts = am.allocate(new ArrayList(), new ArrayList()).getAllocatedContainers(); - dispatcher.await(); + rm.drainEvents(); contReceived += conts.size(); nm1.nodeHeartbeat(true); } @@ -209,7 +201,7 @@ protected Dispatcher createDispatcher() { ArrayList release = new ArrayList(); release.add(conts.get(0).getId()); am.allocate(new ArrayList(), release); - dispatcher.await(); + rm.drainEvents(); // Send one more heartbeat with a fake running container. This is to // simulate the situation that can happen if the NM reports that container @@ -224,7 +216,7 @@ protected Dispatcher createDispatcher() { containerStatuses.put(app.getApplicationId(), containerStatusList); NodeHeartbeatResponse resp = nm1.nodeHeartbeat(containerStatuses, true); - waitForContainerCleanup(dispatcher, nm1, resp); + waitForContainerCleanup(rm, nm1, resp); // Now to test the case when RM already gave cleanup, and NM suddenly // realizes that the container is running. @@ -240,17 +232,17 @@ protected Dispatcher createDispatcher() { resp = nm1.nodeHeartbeat(containerStatuses, true); // The cleanup list won't be instantaneous as it is given out by scheduler // and not RMNodeImpl. - waitForContainerCleanup(dispatcher, nm1, resp); + waitForContainerCleanup(rm, nm1, resp); rm.stop(); } - protected void waitForContainerCleanup(DrainDispatcher dispatcher, MockNM nm, + protected void waitForContainerCleanup(MockRM rm, MockNM nm, NodeHeartbeatResponse resp) throws Exception { int waitCount = 0, cleanedConts = 0; List contsToClean; do { - dispatcher.await(); + rm.drainEvents(); contsToClean = resp.getContainersToCleanup(); cleanedConts += contsToClean.size(); if (cleanedConts >= 1) { @@ -400,13 +392,7 @@ public void testContainerCleanupWhenRMRestartedAppNotRegistered() throws memStore.init(conf); // start RM - final DrainDispatcher dispatcher = new DrainDispatcher(); - MockRM rm1 = new MockRM(conf, memStore) { - @Override - protected Dispatcher createDispatcher() { - return dispatcher; - } - }; + MockRM rm1 = new MockRM(conf, memStore); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -419,13 +405,7 @@ protected Dispatcher createDispatcher() { rm1.waitForState(app0.getApplicationId(), RMAppState.RUNNING); // start new RM - final DrainDispatcher dispatcher2 = new DrainDispatcher(); - MockRM rm2 = new MockRM(conf, memStore) { - @Override - protected Dispatcher createDispatcher() { - return dispatcher2; - } - }; + MockRM rm2 = new MockRM(conf, memStore); rm2.start(); // nm1 register to rm2, and do a heartbeat @@ -437,7 +417,7 @@ protected Dispatcher createDispatcher() { NodeHeartbeatResponse response = nm1.nodeHeartbeat(am0 .getApplicationAttemptId(), 2, ContainerState.RUNNING); - waitForContainerCleanup(dispatcher2, nm1, response); + waitForContainerCleanup(rm2, nm1, response); rm1.stop(); rm2.stop(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java index 08b180fc413a5..9e8401027e948 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java @@ -59,8 +59,6 @@ import org.apache.hadoop.yarn.api.records.SerializedException; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException; @@ -260,7 +258,6 @@ public void testRetriesOnFailures() throws Exception { Configuration conf = new Configuration(); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); conf.setInt(YarnConfiguration.CLIENT_NM_CONNECT_RETRY_INTERVAL_MS, 1); - final DrainDispatcher dispatcher = new DrainDispatcher(); MockRM rm = new MockRMWithCustomAMLauncher(conf, null) { @Override protected ApplicationMasterLauncher createAMLauncher() { @@ -284,12 +281,8 @@ protected YarnRPC getYarnRPC() { } }; } - - @Override - protected Dispatcher createDispatcher() { - return dispatcher; - } }; + rm.start(); MockNM nm1 = rm.registerNode("127.0.0.1:1234", 5120); @@ -297,7 +290,7 @@ protected Dispatcher createDispatcher() { // kick the scheduling nm1.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); MockRM.waitForState(app.getCurrentAppAttempt(), RMAppAttemptState.LAUNCHED, 500); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java index 23bed228e19bb..18c49bdddcd6f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -42,8 +42,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; @@ -327,10 +325,8 @@ public void testResourceTypes() throws Exception { @Test(timeout=1200000) public void testAllocateAfterUnregister() throws Exception { - MyResourceManager rm = new MyResourceManager(conf); + MockRM rm = new MockRM(conf); rm.start(); - DrainDispatcher rmDispatcher = (DrainDispatcher) rm.getRMContext() - .getDispatcher(); // Register node1 MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB); @@ -351,7 +347,7 @@ public void testAllocateAfterUnregister() throws Exception { AllocateResponse alloc1Response = am1.schedule(); nm1.nodeHeartbeat(true); - rmDispatcher.await(); + rm.drainEvents(); alloc1Response = am1.schedule(); Assert.assertEquals(0, alloc1Response.getAllocatedContainers().size()); } @@ -474,17 +470,6 @@ public void testPriorityInAllocatedResponse() throws Exception { rm.stop(); } - private static class MyResourceManager extends MockRM { - - public MyResourceManager(YarnConfiguration conf) { - super(conf); - } - @Override - protected Dispatcher createDispatcher() { - return new DrainDispatcher(); - } - } - private void sentRMContainerLaunched(MockRM rm, ContainerId containerId) { CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); RMContainer rmContainer = cs.getRMContainer(containerId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java index b4adf480b35fc..75ef5c775be99 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java @@ -33,8 +33,6 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.TestAMRestart; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -65,8 +63,7 @@ public void testNodeBlacklistingOnAMFailure() throws Exception { conf.setBoolean(YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED, true); - DrainDispatcher dispatcher = new DrainDispatcher(); - MockRM rm = startRM(conf, dispatcher); + MockRM rm = startRM(conf); CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler(); // Register 5 nodes, so that we can blacklist atleast one if AM container @@ -122,7 +119,7 @@ public void testNodeBlacklistingOnAMFailure() throws Exception { // Try the current node a few times for (int i = 0; i <= 2; i++) { currentNode.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals( "AppAttemptState should still be SCHEDULED if currentNode is " @@ -132,7 +129,7 @@ public void testNodeBlacklistingOnAMFailure() throws Exception { // Now try the other node otherNode.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); // Now the AM container should be allocated MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000); @@ -169,8 +166,7 @@ public void testNodeBlacklistingOnAMFailureStrictNodeLocality() conf.setBoolean(YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED, true); - DrainDispatcher dispatcher = new DrainDispatcher(); - MockRM rm = startRM(conf, dispatcher); + MockRM rm = startRM(conf); CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler(); // Register 5 nodes, so that we can blacklist atleast one if AM container @@ -227,7 +223,7 @@ public void testNodeBlacklistingOnAMFailureStrictNodeLocality() System.out.println("New AppAttempt launched " + attempt.getAppAttemptId()); nm2.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); // Now the AM container should be allocated MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000); @@ -257,8 +253,7 @@ public void testNodeBlacklistingOnAMFailureRelaxedNodeLocality() conf.setBoolean(YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED, true); - DrainDispatcher dispatcher = new DrainDispatcher(); - MockRM rm = startRM(conf, dispatcher); + MockRM rm = startRM(conf); CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler(); // Register 5 nodes, so that we can blacklist atleast one if AM container @@ -319,7 +314,7 @@ public void testNodeBlacklistingOnAMFailureRelaxedNodeLocality() nm3.nodeHeartbeat(true); nm4.nodeHeartbeat(true); nm5.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); // Now the AM container should be allocated MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000); @@ -352,8 +347,7 @@ public void testNoBlacklistingForNonSystemErrors() throws Exception { 1.5f); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 100); - DrainDispatcher dispatcher = new DrainDispatcher(); - MockRM rm = startRM(conf, dispatcher); + MockRM rm = startRM(conf); MockNM node = new MockNM("127.0.0.1:1234", 8000, rm.getResourceTrackerService()); @@ -367,7 +361,7 @@ public void testNoBlacklistingForNonSystemErrors() throws Exception { // Now the AM container should be allocated RMAppAttempt attempt = MockRM.waitForAttemptScheduled(app, rm); node.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000); rm.sendAMLaunched(attempt.getAppAttemptId()); rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED); @@ -394,7 +388,7 @@ public void testNoBlacklistingForNonSystemErrors() throws Exception { .println("New AppAttempt launched " + attempt.getAppAttemptId()); node.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000); rm.sendAMLaunched(attempt.getAppAttemptId()); @@ -418,20 +412,13 @@ private void makeAMContainerExit(MockRM rm, ContainerId amContainer, rm.waitForState(amAttemptID.getApplicationId(), RMAppState.ACCEPTED); } - private MockRM startRM(YarnConfiguration conf, - final DrainDispatcher dispatcher) { - + private MockRM startRM(YarnConfiguration conf) { MemoryRMStateStore memStore = new MemoryRMStateStore(); memStore.init(conf); - MockRM rm1 = new MockRM(conf, memStore) { - @Override - protected Dispatcher createDispatcher() { - return dispatcher; - } - }; + MockRM rm = new MockRM(conf, memStore); - rm1.start(); - return rm1; + rm.start(); + return rm; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestReservationSystemWithRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestReservationSystemWithRMHA.java index 5a6fe67e2b15a..f746dc2f1883a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestReservationSystemWithRMHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestReservationSystemWithRMHA.java @@ -29,7 +29,6 @@ import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; @@ -186,9 +185,7 @@ private void addNodeCapacityToPlan(MockRM rm, int memory, int vCores) { rm.registerNode("127.0.0.1:1", memory, vCores); int attempts = 10; do { - DrainDispatcher dispatcher = - (DrainDispatcher) rm1.getRMContext().getDispatcher(); - dispatcher.await(); + rm1.drainEvents(); rm.getRMContext().getReservationSystem() .synchronizePlan(ReservationSystemTestUtil.reservationQ, false); if (rm.getRMContext().getReservationSystem() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java index c8baa607bf054..f9f0b746233a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java @@ -31,8 +31,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; @@ -47,12 +45,10 @@ public class TestAMRMRPCNodeUpdates { private MockRM rm; - ApplicationMasterService amService = null; - DrainDispatcher dispatcher = null; + private ApplicationMasterService amService; @Before public void setUp() { - dispatcher = new DrainDispatcher(); this.rm = new MockRM() { @Override public void init(Configuration conf) { @@ -61,12 +57,8 @@ public void init(Configuration conf) { "1.0"); super.init(conf); } - - @Override - protected Dispatcher createDispatcher() { - return dispatcher; - } }; + rm.start(); amService = rm.getApplicationMasterService(); } @@ -80,14 +72,14 @@ public void tearDown() { private void syncNodeHeartbeat(MockNM nm, boolean health) throws Exception { nm.nodeHeartbeat(health); - dispatcher.await(); + rm.drainEvents(); } private void syncNodeLost(MockNM nm) throws Exception { rm.sendNodeStarted(nm); rm.waitForState(nm.getNodeId(), NodeState.RUNNING); rm.sendNodeLost(nm); - dispatcher.await(); + rm.drainEvents(); } private AllocateResponse allocate(final ApplicationAttemptId attemptId, @@ -113,7 +105,7 @@ public void testAMRMUnusableNodes() throws Exception { MockNM nm2 = rm.registerNode("127.0.0.2:1234", 10000); MockNM nm3 = rm.registerNode("127.0.0.3:1234", 10000); MockNM nm4 = rm.registerNode("127.0.0.4:1234", 10000); - dispatcher.await(); + rm.drainEvents(); RMApp app1 = rm.submitApp(2000); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java index e7c7e51bf2c4c..6a7325c25c89a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java @@ -28,7 +28,6 @@ import org.apache.hadoop.yarn.conf.ConfigurationProviderFactory; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.InlineDispatcher; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -228,21 +227,16 @@ public void testRMNodeStatusAfterReconnect() throws Exception { // The node(127.0.0.1:1234) reconnected with RM. When it registered with // RM, RM set its lastNodeHeartbeatResponse's id to 0 asynchronously. But // the node's heartbeat come before RM succeeded setting the id to 0. - final DrainDispatcher dispatcher = new DrainDispatcher(); - MockRM rm = new MockRM(){ - @Override - protected Dispatcher createDispatcher() { - return dispatcher; - } - }; + MockRM rm = new MockRM(); rm.start(); + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); nm1.registerNode(); int i = 0; while(i < 3) { nm1.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); i++; } @@ -251,7 +245,7 @@ protected Dispatcher createDispatcher() { nm2.registerNode(); RMNode rmNode = rm.getRMContext().getRMNodes().get(nm2.getNodeId()); nm2.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals("Node is Not in Running state.", NodeState.RUNNING, rmNode.getState()); rm.stop(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java index 893f802ade334..db3144898fd18 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java @@ -46,7 +46,6 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java index ff52efd89be91..fd17bd91a3dc3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java @@ -36,8 +36,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; @@ -612,24 +610,17 @@ public void testOrderOfActivatingThePriorityApplicationOnRMRestart() conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); - final DrainDispatcher dispatcher = new DrainDispatcher(); MemoryRMStateStore memStore = new MemoryRMStateStore(); memStore.init(conf); - MockRM rm1 = new MockRM(conf, memStore) { - @Override - protected Dispatcher createDispatcher() { - return dispatcher; - } - }; + MockRM rm1 = new MockRM(conf, memStore); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 16384, rm1.getResourceTrackerService()); nm1.registerNode(); - - dispatcher.await(); + rm1.drainEvents(); ResourceScheduler scheduler = rm1.getRMContext().getScheduler(); LeafQueue defaultQueue = @@ -648,7 +639,7 @@ protected Dispatcher createDispatcher() { MockAM am2 = MockRM.launchAM(app2, rm1, nm1); am2.registerAppAttempt(); - dispatcher.await(); + rm1.drainEvents(); Assert.assertEquals(2, defaultQueue.getNumActiveApplications()); Assert.assertEquals(0, defaultQueue.getNumPendingApplications()); @@ -657,7 +648,7 @@ protected Dispatcher createDispatcher() { Priority appPriority3 = Priority.newInstance(7); RMApp app3 = rm1.submitApp(memory, appPriority3); - dispatcher.await(); + rm1.drainEvents(); Assert.assertEquals(2, defaultQueue.getNumActiveApplications()); Assert.assertEquals(1, defaultQueue.getNumPendingApplications()); @@ -676,14 +667,8 @@ protected Dispatcher createDispatcher() { Assert.assertEquals(app3.getCurrentAppAttempt().getAppAttemptId(), fcApp3.getApplicationAttemptId()); - final DrainDispatcher dispatcher1 = new DrainDispatcher(); // create new RM to represent restart and recover state - MockRM rm2 = new MockRM(conf, memStore) { - @Override - protected Dispatcher createDispatcher() { - return dispatcher1; - } - }; + MockRM rm2 = new MockRM(conf, memStore); // start new RM rm2.start(); @@ -693,7 +678,7 @@ protected Dispatcher createDispatcher() { // Verify RM Apps after this restart Assert.assertEquals(3, rm2.getRMContext().getRMApps().size()); - dispatcher1.await(); + rm2.drainEvents(); scheduler = rm2.getRMContext().getScheduler(); defaultQueue = (LeafQueue) ((CapacityScheduler) scheduler).getQueue("default"); @@ -714,7 +699,7 @@ protected Dispatcher createDispatcher() { // NM resync to new RM nm1.registerNode(); - dispatcher1.await(); + rm2.drainEvents(); // wait for activating applications count = 50; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java index a4513561d2c6e..d4e7727ad5e0c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java @@ -50,8 +50,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager; @@ -199,7 +197,6 @@ public void testClientToAMTokens() throws Exception { StartContainersResponse mockResponse = mock(StartContainersResponse.class); when(containerManager.startContainers((StartContainersRequest) any())) .thenReturn(mockResponse); - final DrainDispatcher dispatcher = new DrainDispatcher(); MockRM rm = new MockRMWithCustomAMLauncher(conf, containerManager) { protected ClientRMService createClientRMService() { @@ -208,11 +205,6 @@ protected ClientRMService createClientRMService() { getRMContext().getRMDelegationTokenSecretManager()); }; - @Override - protected Dispatcher createDispatcher() { - return dispatcher; - } - @Override protected void doSecureLogin() throws IOException { } @@ -225,11 +217,10 @@ protected void doSecureLogin() throws IOException { // Set up a node. MockNM nm1 = rm.registerNode("localhost:1234", 3072); nm1.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); - nm1.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttempt = app.getCurrentAppAttempt().getAppAttemptId(); final MockAM mockAM = @@ -436,7 +427,6 @@ public void testClientTokenRace() throws Exception { StartContainersResponse mockResponse = mock(StartContainersResponse.class); when(containerManager.startContainers((StartContainersRequest) any())) .thenReturn(mockResponse); - final DrainDispatcher dispatcher = new DrainDispatcher(); MockRM rm = new MockRMWithCustomAMLauncher(conf, containerManager) { protected ClientRMService createClientRMService() { @@ -445,11 +435,6 @@ protected ClientRMService createClientRMService() { getRMContext().getRMDelegationTokenSecretManager()); }; - @Override - protected Dispatcher createDispatcher() { - return dispatcher; - } - @Override protected void doSecureLogin() throws IOException { } @@ -462,10 +447,10 @@ protected void doSecureLogin() throws IOException { // Set up a node. MockNM nm1 = rm.registerNode("localhost:1234", 3072); nm1.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); nm1.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttempt = app.getCurrentAppAttempt().getAppAttemptId(); final MockAM mockAM =