Skip to content

Commit

Permalink
YARN-10550. Decouple NM runner logic from SLSRunner. Contributed by S…
Browse files Browse the repository at this point in the history
…zilard Nemeth
  • Loading branch information
szilard-nemeth authored and brumi1024 committed Mar 30, 2022
1 parent 6e00a79 commit ab8c360
Show file tree
Hide file tree
Showing 6 changed files with 296 additions and 179 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ public class AMRunner {
private Map<String, Class> amClassMap;
private TraceType inputType;
private String[] inputTraces;
private SynthTraceJobProducer stjp;
private TaskRunner runner;
private SLSRunner slsRunner;
private int numAMs, numTasks;
Expand Down Expand Up @@ -148,16 +147,15 @@ private void startAMFromSLSTrace(String inputTrace) throws IOException {
private void startAMFromSynthGenerator() throws YarnException, IOException {
Configuration localConf = new Configuration();
localConf.set("fs.defaultFS", "file:///");
// if we use the nodeFile this could have been not initialized yet.
if (stjp == null) {
stjp = new SynthTraceJobProducer(conf, new Path(inputTraces[0]));
slsRunner.setStjp(stjp);
//if we use the nodeFile this could have been not initialized yet.
if (slsRunner.getStjp() == null) {
slsRunner.setStjp(new SynthTraceJobProducer(conf, new Path(inputTraces[0])));
}

SynthJob job;
// we use stjp, a reference to the job producer instantiated during node
// creation
while ((job = (SynthJob) stjp.getNextJob()) != null) {
while ((job = (SynthJob) slsRunner.getStjp().getNextJob()) != null) {
ReservationId reservationId = null;
if (job.hasDeadline()) {
reservationId = ReservationId
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.sls;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.sls.SLSRunner.TraceType;
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator;
import org.apache.hadoop.yarn.sls.scheduler.TaskRunner;
import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer;
import org.apache.hadoop.yarn.sls.utils.SLSUtils;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class NMRunner {
private static final Logger LOG = LoggerFactory.getLogger(NMRunner.class);

// other simulation information
private int numNMs, numRacks;

// NM simulator
private Map<NodeId, NMSimulator> nmMap;
private Resource nodeManagerResource;
private String nodeFile;
private TaskRunner taskRunner;
private Configuration conf;
private ResourceManager rm;
private String tableMapping;
private int threadPoolSize;
private TraceType inputType;
private String[] inputTraces;
private SynthTraceJobProducer stjp;

public NMRunner(TaskRunner taskRunner, Configuration conf, ResourceManager rm, String tableMapping, int threadPoolSize) {
this.taskRunner = taskRunner;
this.conf = conf;
this.rm = rm;
this.tableMapping = tableMapping;
this.threadPoolSize = threadPoolSize;
this.nmMap = new ConcurrentHashMap<>();
this.nodeManagerResource = getNodeManagerResourceFromConf();
}

public void startNM() throws YarnException, IOException,
InterruptedException {
// nm configuration
int heartbeatInterval = conf.getInt(
SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS,
SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS_DEFAULT);
float resourceUtilizationRatio = conf.getFloat(
SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO,
SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO_DEFAULT);
// nm information (fetch from topology file, or from sls/rumen json file)
Set<SLSRunner.NodeDetails> nodeSet = null;
if (nodeFile.isEmpty()) {
for (String inputTrace : inputTraces) {
switch (inputType) {
case SLS:
nodeSet = SLSUtils.parseNodesFromSLSTrace(inputTrace);
break;
case RUMEN:
nodeSet = SLSUtils.parseNodesFromRumenTrace(inputTrace);
break;
case SYNTH:
stjp = new SynthTraceJobProducer(conf, new Path(inputTraces[0]));
nodeSet = SLSUtils.generateNodes(stjp.getNumNodes(),
stjp.getNumNodes()/stjp.getNodesPerRack());
break;
default:
throw new YarnException("Input configuration not recognized, "
+ "trace type should be SLS, RUMEN, or SYNTH");
}
}
} else {
nodeSet = SLSUtils.parseNodesFromNodeFile(nodeFile,
nodeManagerResource);
}

if (nodeSet == null || nodeSet.isEmpty()) {
throw new YarnException("No node! Please configure nodes.");
}

SLSUtils.generateNodeTableMapping(nodeSet, tableMapping);

// create NM simulators
Random random = new Random();
Set<String> rackSet = ConcurrentHashMap.newKeySet();
int threadPoolSize = Math.max(this.threadPoolSize,
SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT);
ExecutorService executorService = Executors.
newFixedThreadPool(threadPoolSize);
for (SLSRunner.NodeDetails nodeDetails : nodeSet) {
executorService.submit(new Runnable() {
@Override public void run() {
try {
// we randomize the heartbeat start time from zero to 1 interval
NMSimulator nm = new NMSimulator();
Resource nmResource = nodeManagerResource;
String hostName = nodeDetails.getHostname();
if (nodeDetails.getNodeResource() != null) {
nmResource = nodeDetails.getNodeResource();
}
Set<NodeLabel> nodeLabels = nodeDetails.getLabels();
nm.init(hostName, nmResource,
random.nextInt(heartbeatInterval),
heartbeatInterval, rm, resourceUtilizationRatio, nodeLabels);
nmMap.put(nm.getNode().getNodeID(), nm);
taskRunner.schedule(nm);
rackSet.add(nm.getNode().getRackName());
} catch (IOException | YarnException e) {
LOG.error("Got an error while adding node", e);
}
}
});
}
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.MINUTES);
numRacks = rackSet.size();
numNMs = nmMap.size();
}

void waitForNodesRunning() throws InterruptedException {
long startTimeMS = System.currentTimeMillis();
while (true) {
int numRunningNodes = 0;
for (RMNode node : rm.getRMContext().getRMNodes().values()) {
if (node.getState() == NodeState.RUNNING) {
numRunningNodes++;
}
}
if (numRunningNodes == numNMs) {
break;
}
LOG.info("SLSRunner is waiting for all nodes RUNNING."
+ " {} of {} NMs initialized.", numRunningNodes, numNMs);
Thread.sleep(1000);
}
LOG.info("SLSRunner takes {} ms to launch all nodes.",
System.currentTimeMillis() - startTimeMS);
}

private Resource getNodeManagerResourceFromConf() {
Resource resource = Resources.createResource(0);
ResourceInformation[] infors = ResourceUtils.getResourceTypesArray();
for (ResourceInformation info : infors) {
long value;
if (info.getName().equals(ResourceInformation.MEMORY_URI)) {
value = conf.getInt(SLSConfiguration.NM_MEMORY_MB,
SLSConfiguration.NM_MEMORY_MB_DEFAULT);
} else if (info.getName().equals(ResourceInformation.VCORES_URI)) {
value = conf.getInt(SLSConfiguration.NM_VCORES,
SLSConfiguration.NM_VCORES_DEFAULT);
} else {
value = conf.getLong(SLSConfiguration.NM_PREFIX +
info.getName(), SLSConfiguration.NM_RESOURCE_DEFAULT);
}

resource.setResourceValue(info.getName(), value);
}

return resource;
}

public void setNodeFile(String nodeFile) {
this.nodeFile = nodeFile;
}


public void setInputType(TraceType inputType) {
this.inputType = inputType;
}

public void setInputTraces(String[] inputTraces) {
this.inputTraces = inputTraces;
}

public int getNumNMs() {
return numNMs;
}

public int getNumRacks() {
return numRacks;
}

public Resource getNodeManagerResource() {
return nodeManagerResource;
}

public Map<NodeId, NMSimulator> getNmMap() {
return nmMap;
}

public SynthTraceJobProducer getStjp() {
return stjp;
}

public void setTableMapping(String tableMapping) {
this.tableMapping = tableMapping;
}

public void setRm(ResourceManager rm) {
this.rm = rm;
}
}
Loading

0 comments on commit ab8c360

Please sign in to comment.