Skip to content

Commit

Permalink
YARN-8995. Log events info in AsyncDispatcher when event queue size c…
Browse files Browse the repository at this point in the history
…umulatively reaches a certain number every time. Contributed by zhuqi.
  • Loading branch information
TaoYang526 committed Sep 5, 2019
1 parent f347c34 commit 172bcd8
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2483,6 +2483,20 @@ public static boolean isAclEnabled(Configuration conf) {

public static final long DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT = 300000;

/**
* The threshold used to trigger the logging of event types and counts
* in RM's main event dispatcher. Default value is 5000,
* which means RM will print events info when the queue size cumulatively
* reaches 5000 every time. Such info can be used to reveal what
* kind of events that RM is stuck at processing mostly,
* it can help to narrow down certain performance issues.
*/
public static final String
YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD =
YARN_PREFIX + "dispatcher.print-events-info.threshold";
public static final int
DEFAULT_YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD = 5000;

/**
* CLASSPATH for YARN applications. A comma-separated list of CLASSPATH
* entries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
Expand All @@ -55,8 +57,13 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {

private final BlockingQueue<Event> eventQueue;
private volatile int lastEventQueueSizeLogged = 0;
private volatile int lastEventDetailsQueueSizeLogged = 0;
private volatile boolean stopped = false;

//Configuration for control the details queue event printing.
private int detailsInterval;
private boolean printTrigger = false;

// Configuration flag for enabling/disabling draining dispatcher's events on
// stop functionality.
private volatile boolean drainEventsOnStop = false;
Expand Down Expand Up @@ -129,6 +136,12 @@ public void run() {
}
if (event != null) {
dispatch(event);
if (printTrigger) {
//Log the latest dispatch event type
// may cause the too many events queued
LOG.info("Latest dispatch event type: " + event.getType());
printTrigger = false;
}
}
}
}
Expand All @@ -140,6 +153,15 @@ public void disableExitOnDispatchException() {
exitOnDispatchException = false;
}

@Override
protected void serviceInit(Configuration conf) throws Exception{
super.serviceInit(conf);
this.detailsInterval = getConfig().getInt(YarnConfiguration.
YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD,
YarnConfiguration.
DEFAULT_YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD);
}

@Override
protected void serviceStart() throws Exception {
//start all the components
Expand Down Expand Up @@ -246,6 +268,17 @@ public EventHandler<Event> getEventHandler() {
}

class GenericEventHandler implements EventHandler<Event> {
private void printEventQueueDetails(BlockingQueue<Event> queue) {
Map<Enum, Long> counterMap = eventQueue.stream().
collect(Collectors.
groupingBy(e -> e.getType(), Collectors.counting())
);
for (Map.Entry<Enum, Long> entry : counterMap.entrySet()) {
long num = entry.getValue();
LOG.info("Event type: " + entry.getKey()
+ ", Event record counter: " + num);
}
}
public void handle(Event event) {
if (blockNewEvents) {
return;
Expand All @@ -259,6 +292,12 @@ public void handle(Event event) {
lastEventQueueSizeLogged = qSize;
LOG.info("Size of event-queue is " + qSize);
}
if (qSize != 0 && qSize % detailsInterval == 0
&& lastEventDetailsQueueSizeLogged != qSize) {
lastEventDetailsQueueSizeLogged = qSize;
printEventQueueDetails(eventQueue);
printTrigger = true;
}
int remCapacity = eventQueue.remainingCapacity();
if (remCapacity < 1000) {
LOG.warn("Very low remaining capacity in the event-queue: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,19 @@
<value>300000</value>
</property>

<property>
<description>
The threshold used to trigger the logging of event types
and counts in RM's main event dispatcher. Default length is 5000,
which means RM will print events info when the queue size cumulatively
reaches 5000 every time. Such info can be used to reveal what kind of events
that RM is stuck at processing mostly, it can help to
narrow down certain performance issues.
</description>
<name>yarn.dispatcher.print-events-info.threshold</name>
<value>5000</value>
</property>

<property>
<description>The expiry interval for application master reporting.</description>
<name>yarn.am.liveness-monitor.expiry-interval-ms</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@

package org.apache.hadoop.yarn.event;

import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import org.slf4j.Logger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
Expand Down Expand Up @@ -93,6 +96,20 @@ private enum DummyType {
DUMMY
}

private static class TestHandler implements EventHandler<Event> {
@Override
public void handle(Event event) {
try {
// As long as 10000 events queued
Thread.sleep(1500);
} catch (InterruptedException e) {}
}
}

private enum TestEnum {
TestEventType
}

@SuppressWarnings({ "rawtypes", "unchecked" })
private void dispatchDummyEvents(Dispatcher disp, int count) {
for (int i = 0; i < count; i++) {
Expand All @@ -119,5 +136,45 @@ public void testDrainDispatcherDrainEventsOnStop() throws Exception {
disp.close();
assertEquals(0, queue.size());
}

//Test print dispatcher details when the blocking queue is heavy
@Test(timeout = 10000)
public void testPrintDispatcherEventDetails() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.
YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD, 5000);
Logger log = mock(Logger.class);
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);

Field logger = AsyncDispatcher.class.getDeclaredField("LOG");
logger.setAccessible(true);
Field modifiers = Field.class.getDeclaredField("modifiers");
modifiers.setAccessible(true);
modifiers.setInt(logger, logger.getModifiers() & ~Modifier.FINAL);
Object oldLog = logger.get(null);

try {
logger.set(null, log);
dispatcher.register(TestEnum.class, new TestHandler());
dispatcher.start();

for (int i = 0; i < 10000; ++i) {
Event event = mock(Event.class);
when(event.getType()).thenReturn(TestEnum.TestEventType);
dispatcher.getEventHandler().handle(event);
}
verify(log, atLeastOnce()).info("Event type: TestEventType, " +
"Event record counter: 5000");
Thread.sleep(2000);
//Make sure more than one event to take
verify(log, atLeastOnce()).
info("Latest dispatch event type: TestEventType");
dispatcher.stop();
} finally {
//... restore logger object
logger.set(null, oldLog);
}
}
}

0 comments on commit 172bcd8

Please sign in to comment.