Skip to content

Commit

Permalink
YARN-4412. Create ClusterMonitor to compute ordered list of preferred…
Browse files Browse the repository at this point in the history
… NMs for OPPORTUNITIC containers (asuresh)
  • Loading branch information
xslogic committed Apr 22, 2016
1 parent 77d3280 commit c725b97
Show file tree
Hide file tree
Showing 33 changed files with 1,441 additions and 288 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode
Expand Down Expand Up @@ -190,6 +191,10 @@ public List<Container> pullNewlyIncreasedContainers() {
return null;
}

public QueuedContainersStatus getQueuedContainersStatus() {
return null;
}

@Override
public ResourceUtilization getAggregatedContainersUtilization() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode
Expand Down Expand Up @@ -179,6 +180,10 @@ public List<Container> pullNewlyIncreasedContainers() {
return Collections.EMPTY_LIST;
}

public QueuedContainersStatus getQueuedContainersStatus() {
return null;
}

@Override
public ResourceUtilization getAggregatedContainersUtilization() {
return node.getAggregatedContainersUtilization();
Expand Down
2 changes: 2 additions & 0 deletions hadoop-yarn-project/CHANGES-yarn-2877.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,5 @@ yarn-2877 distributed scheduling (Unreleased)
YARN-2885. Create AMRMProxy request interceptor and ContainerAllocator
to distribute OPPORTUNISTIC containers to appropriate Nodes (asuresh)

YARN-4412. Create ClusterMonitor to compute ordered list of preferred
NMs for OPPORTUNITIC containers (asuresh)
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,21 @@ public static boolean isAclEnabled(Configuration conf) {
YARN_PREFIX + "distributed-scheduling.incr-vcores";
public static final int DIST_SCHEDULING_INCR_VCORES_DEFAULT = 1;

/** Container token expiry for container allocated via Distributed
* Scheduling. */
public static final String DIST_SCHEDULING_TOP_K =
YARN_PREFIX + "distributed-scheduling.top-k";
public static final int DIST_SCHEDULING_TOP_K_DEFAULT = 10;

public static final String DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS =
YARN_PREFIX + "distributed-scheduling.top-k-compute-interval-ms";
public static final long DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS_DEFAULT = 1000;

public static final String DIST_SCHEDULING_TOP_K_COMPARATOR =
YARN_PREFIX + "distributed-scheduling.top-k-comparator";
public static final String DIST_SCHEDULING_TOP_K_COMPARATOR_DEFAULT =
"WAIT_TIME";

/** Container token expiry for container allocated via
* Distributed Scheduling */
public static String DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS =
YARN_PREFIX + "distributed-scheduling.container-token-expiry";
public static final int DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.event;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;

/**
* This is a specialized EventHandler to be used by Services that are expected
* handle a large number of events efficiently by ensuring that the caller
* thread is not blocked. Events are immediately stored in a BlockingQueue and
* a separate dedicated Thread consumes events from the queue and handles
* appropriately
* @param <T> Type of Event
*/
public class EventDispatcher<T extends Event> extends
AbstractService implements EventHandler<T> {

private final EventHandler<T> handler;
private final BlockingQueue<T> eventQueue =
new LinkedBlockingDeque<>();
private final Thread eventProcessor;
private volatile boolean stopped = false;
private boolean shouldExitOnError = false;

private static final Log LOG = LogFactory.getLog(EventDispatcher.class);

private final class EventProcessor implements Runnable {
@Override
public void run() {

T event;

while (!stopped && !Thread.currentThread().isInterrupted()) {
try {
event = eventQueue.take();
} catch (InterruptedException e) {
LOG.error("Returning, interrupted : " + e);
return; // TODO: Kill RM.
}

try {
handler.handle(event);
} catch (Throwable t) {
// An error occurred, but we are shutting down anyway.
// If it was an InterruptedException, the very act of
// shutdown could have caused it and is probably harmless.
if (stopped) {
LOG.warn("Exception during shutdown: ", t);
break;
}
LOG.fatal("Error in handling event type " + event.getType()
+ " to the Event Dispatcher", t);
if (shouldExitOnError
&& !ShutdownHookManager.get().isShutdownInProgress()) {
LOG.info("Exiting, bbye..");
System.exit(-1);
}
}
}
}
}

public EventDispatcher(EventHandler<T> handler, String name) {
super(name);
this.handler = handler;
this.eventProcessor = new Thread(new EventProcessor());
this.eventProcessor.setName(getName() + ":Event Processor");
}

@Override
protected void serviceInit(Configuration conf) throws Exception {
this.shouldExitOnError =
conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
super.serviceInit(conf);
}

@Override
protected void serviceStart() throws Exception {
this.eventProcessor.start();
super.serviceStart();
}

@Override
protected void serviceStop() throws Exception {
this.stopped = true;
this.eventProcessor.interrupt();
try {
this.eventProcessor.join();
} catch (InterruptedException e) {
throw new YarnRuntimeException(e);
}
super.serviceStop();
}

@Override
public void handle(T event) {
try {
int qSize = eventQueue.size();
if (qSize !=0 && qSize %1000 == 0) {
LOG.info("Size of " + getName() + " event-queue is " + qSize);
}
int remCapacity = eventQueue.remainingCapacity();
if (remCapacity < 1000) {
LOG.info("Very low remaining capacity on " + getName() + "" +
"event queue: " + remCapacity);
}
this.eventQueue.put(event);
} catch (InterruptedException e) {
LOG.info("Interrupted. Trying to exit gracefully.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2317,6 +2317,36 @@
<value>4096</value>
</property>

<!-- Distributed Scheduling configuration -->
<property>
<description>
The interval in milliseconds specifying the frequency at which the
Distributed Scheduling Cluster Monitor will recomute the top K
viable nodes on which OPPORTUNISTIC containers can be scheduled
</description>
<name>yarn.distributed-scheduling.top-k-compute-interval-ms</name>
<value>1000</value>
</property>

<property>
<description>
The Default comparator used by the Distributed Scheduling Cluster
Monitor to order the top K nodes on which OPPORTUNISTIC containers can
be scheduled. The allowed values are "WAIT_TIME" and "QUEUE_LENGTH"
</description>
<name>yarn.distributed-scheduling.top-k-comparator</name>
<value>WAIT_TIME</value>
</property>

<property>
<description>
The max number of nodes returned by the Distributed Scheduling Cluster
Monitor. (The value of K in top-K)
</description>
<name>yarn.distributed-scheduling.top-k</name>
<value>10</value>
</property>

<!-- Node Labels Configuration -->

<property>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,13 @@ public abstract void setNodeUtilization(
@Unstable
public abstract void setIncreasedContainers(
List<Container> increasedContainers);

@Private
@Unstable
public abstract QueuedContainersStatus getQueuedContainersStatus();

@Private
@Unstable
public abstract void setQueuedContainersStatus(
QueuedContainersStatus queuedContainersStatus);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.api.records;

import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.yarn.util.Records;

/**
* <p>
* <code>QueuedContainersStatus</code> captures information pertaining to the
* state of execution of the Queueable containers within a node.
* </p>
*/
@Private
@Evolving
public abstract class QueuedContainersStatus {
public static QueuedContainersStatus newInstance() {
return Records.newRecord(QueuedContainersStatus.class);
}

public abstract int getEstimatedQueueWaitTime();

public abstract void setEstimatedQueueWaitTime(int queueWaitTime);

public abstract int getWaitQueueLength();

public abstract void setWaitQueueLength(int queueWaitTime);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,17 @@
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourceUtilizationPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceUtilizationProto;

import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;

Expand Down Expand Up @@ -400,6 +403,27 @@ public synchronized void setIncreasedContainers(
this.increasedContainers = increasedContainers;
}

@Override
public QueuedContainersStatus getQueuedContainersStatus() {
NodeStatusProtoOrBuilder p =
this.viaProto ? this.proto : this.builder;
if (!p.hasQueuedContainerStatus()) {
return null;
}
return convertFromProtoFormat(p.getQueuedContainerStatus());
}

@Override
public void setQueuedContainersStatus(QueuedContainersStatus queuedContainersStatus) {
maybeInitBuilder();
if (queuedContainersStatus == null) {
this.builder.clearQueuedContainerStatus();
return;
}
this.builder.setQueuedContainerStatus(
convertToProtoFormat(queuedContainersStatus));
}

private NodeIdProto convertToProtoFormat(NodeId nodeId) {
return ((NodeIdPBImpl)nodeId).getProto();
}
Expand Down Expand Up @@ -433,15 +457,25 @@ private ApplicationIdProto convertToProtoFormat(ApplicationId c) {
return ((ApplicationIdPBImpl)c).getProto();
}

private ResourceUtilizationProto convertToProtoFormat(ResourceUtilization r) {
private YarnProtos.ResourceUtilizationProto convertToProtoFormat(ResourceUtilization r) {
return ((ResourceUtilizationPBImpl) r).getProto();
}

private ResourceUtilizationPBImpl convertFromProtoFormat(
ResourceUtilizationProto p) {
YarnProtos.ResourceUtilizationProto p) {
return new ResourceUtilizationPBImpl(p);
}

private YarnServerCommonProtos.QueuedContainersStatusProto convertToProtoFormat(
QueuedContainersStatus r) {
return ((QueuedContainersStatusPBImpl) r).getProto();
}

private QueuedContainersStatus convertFromProtoFormat(
YarnServerCommonProtos.QueuedContainersStatusProto p) {
return new QueuedContainersStatusPBImpl(p);
}

private ContainerPBImpl convertFromProtoFormat(
ContainerProto c) {
return new ContainerPBImpl(c);
Expand Down
Loading

0 comments on commit c725b97

Please sign in to comment.