Skip to content

HADOOP-16290. Enable RpcMetrics units to be configurable #3198

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,9 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
public static final boolean RPC_METRICS_QUANTILE_ENABLE_DEFAULT = false;
public static final String RPC_METRICS_PERCENTILES_INTERVALS_KEY =
"rpc.metrics.percentiles.intervals";


public static final String RPC_METRICS_TIME_UNIT = "rpc.metrics.timeunit";

/** Allowed hosts for nfs exports */
public static final String NFS_EXPORTS_ALLOWED_HOSTS_SEPARATOR = ";";
public static final String NFS_EXPORTS_ALLOWED_HOSTS_KEY = "nfs.exports.allowed.hosts";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ public class DecayRpcScheduler implements RpcScheduler,
private final String namespace;
private final int topUsersCount; // e.g., report top 10 users' metrics
private static final double PRECISION = 0.0001;
private final TimeUnit metricsTimeUnit;
private MetricsProxy metricsProxy;
private final CostProvider costProvider;
private final Map<String, Integer> staticPriorities = new HashMap<>();
Expand Down Expand Up @@ -266,6 +267,8 @@ public DecayRpcScheduler(int numLevels, String ns, Configuration conf) {
DecayRpcSchedulerDetailedMetrics.create(ns);
decayRpcSchedulerDetailedMetrics.init(numLevels);

metricsTimeUnit = RpcMetrics.getMetricsTimeUnit(conf);

// Setup delay timer
Timer timer = new Timer(true);
DecayTask task = new DecayTask(this, timer);
Expand Down Expand Up @@ -725,8 +728,9 @@ public void addResponseTime(String callName, Schedulable schedulable,
addCost(user, processingCost);

int priorityLevel = schedulable.getPriorityLevel();
long queueTime = details.get(Timing.QUEUE, RpcMetrics.TIMEUNIT);
long processingTime = details.get(Timing.PROCESSING, RpcMetrics.TIMEUNIT);
long queueTime = details.get(Timing.QUEUE, metricsTimeUnit);
long processingTime = details.get(Timing.PROCESSING,
metricsTimeUnit);

this.decayRpcSchedulerDetailedMetrics.addQueueTime(
priorityLevel, queueTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ default void addResponseTime(String callName, Schedulable schedulable,
// this interface, a default implementation is supplied which uses the old
// method. All new implementations MUST override this interface and should
// NOT use the other addResponseTime method.
int queueTime = (int)
details.get(ProcessingDetails.Timing.QUEUE, RpcMetrics.TIMEUNIT);
int processingTime = (int)
details.get(ProcessingDetails.Timing.PROCESSING, RpcMetrics.TIMEUNIT);
int queueTime = (int) details.get(ProcessingDetails.Timing.QUEUE,
RpcMetrics.DEFAULT_METRIC_TIME_UNIT);
int processingTime = (int) details.get(ProcessingDetails.Timing.PROCESSING,
RpcMetrics.DEFAULT_METRIC_TIME_UNIT);
addResponseTime(callName, schedulable.getPriorityLevel(),
queueTime, processingTime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,13 +544,13 @@ void logSlowRpcCalls(String methodName, Call call,
(rpcMetrics.getProcessingStdDev() * deviation);

long processingTime =
details.get(Timing.PROCESSING, RpcMetrics.TIMEUNIT);
details.get(Timing.PROCESSING, rpcMetrics.getMetricsTimeUnit());
if ((rpcMetrics.getProcessingSampleCount() > minSampleSize) &&
(processingTime > threeSigma)) {
LOG.warn(
"Slow RPC : {} took {} {} to process from client {},"
+ " the processing detail is {}",
methodName, processingTime, RpcMetrics.TIMEUNIT, call,
methodName, processingTime, rpcMetrics.getMetricsTimeUnit(), call,
details.toString());
rpcMetrics.incrSlowRpc();
}
Expand All @@ -570,7 +570,7 @@ void updateMetrics(Call call, long startTime, boolean connDropped) {
deltaNanos -= details.get(Timing.RESPONSE);
details.set(Timing.HANDLER, deltaNanos);

long queueTime = details.get(Timing.QUEUE, RpcMetrics.TIMEUNIT);
long queueTime = details.get(Timing.QUEUE, rpcMetrics.getMetricsTimeUnit());
rpcMetrics.addRpcQueueTime(queueTime);

if (call.isResponseDeferred() || connDropped) {
Expand All @@ -579,9 +579,9 @@ void updateMetrics(Call call, long startTime, boolean connDropped) {
}

long processingTime =
details.get(Timing.PROCESSING, RpcMetrics.TIMEUNIT);
details.get(Timing.PROCESSING, rpcMetrics.getMetricsTimeUnit());
long waitTime =
details.get(Timing.LOCKWAIT, RpcMetrics.TIMEUNIT);
details.get(Timing.LOCKWAIT, rpcMetrics.getMetricsTimeUnit());
rpcMetrics.addRpcLockWaitTime(waitTime);
rpcMetrics.addRpcProcessingTime(processingTime);
// don't include lock wait for detailed metrics.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.concurrent.TimeUnit;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ipc.Server;
Expand Down Expand Up @@ -48,9 +49,12 @@ public class RpcMetrics {
final MetricsRegistry registry;
final String name;
final boolean rpcQuantileEnable;

public static final TimeUnit DEFAULT_METRIC_TIME_UNIT =
TimeUnit.MILLISECONDS;
/** The time unit used when storing/accessing time durations. */
public final static TimeUnit TIMEUNIT = TimeUnit.MILLISECONDS;
private final TimeUnit metricsTimeUnit;

RpcMetrics(Server server, Configuration conf) {
String port = String.valueOf(server.getListenerAddress().getPort());
name = "RpcActivityForPort" + port;
Expand All @@ -63,6 +67,7 @@ public class RpcMetrics {
rpcQuantileEnable = (intervals.length > 0) && conf.getBoolean(
CommonConfigurationKeys.RPC_METRICS_QUANTILE_ENABLE,
CommonConfigurationKeys.RPC_METRICS_QUANTILE_ENABLE_DEFAULT);
metricsTimeUnit = getMetricsTimeUnit(conf);
if (rpcQuantileEnable) {
rpcQueueTimeQuantiles =
new MutableQuantiles[intervals.length];
Expand All @@ -75,19 +80,19 @@ public class RpcMetrics {
for (int i = 0; i < intervals.length; i++) {
int interval = intervals[i];
rpcQueueTimeQuantiles[i] = registry.newQuantiles("rpcQueueTime"
+ interval + "s", "rpc queue time in " + TIMEUNIT, "ops",
+ interval + "s", "rpc queue time in " + metricsTimeUnit, "ops",
"latency", interval);
rpcLockWaitTimeQuantiles[i] = registry.newQuantiles(
"rpcLockWaitTime" + interval + "s",
"rpc lock wait time in " + TIMEUNIT, "ops",
"rpc lock wait time in " + metricsTimeUnit, "ops",
"latency", interval);
rpcProcessingTimeQuantiles[i] = registry.newQuantiles(
"rpcProcessingTime" + interval + "s",
"rpc processing time in " + TIMEUNIT, "ops",
"rpc processing time in " + metricsTimeUnit, "ops",
"latency", interval);
deferredRpcProcessingTimeQuantiles[i] = registry.newQuantiles(
"deferredRpcProcessingTime" + interval + "s",
"deferred rpc processing time in " + TIMEUNIT, "ops",
"deferred rpc processing time in " + metricsTimeUnit, "ops",
"latency", interval);
}
}
Expand Down Expand Up @@ -141,6 +146,27 @@ public String numOpenConnectionsPerUser() {
return server.getNumDroppedConnections();
}

public TimeUnit getMetricsTimeUnit() {
return metricsTimeUnit;
}

public static TimeUnit getMetricsTimeUnit(Configuration conf) {
TimeUnit metricsTimeUnit = RpcMetrics.DEFAULT_METRIC_TIME_UNIT;
String timeunit = conf.get(CommonConfigurationKeys.RPC_METRICS_TIME_UNIT);
if (StringUtils.isNotEmpty(timeunit)) {
try {
metricsTimeUnit = TimeUnit.valueOf(timeunit);
} catch (IllegalArgumentException e) {
LOG.info("Config key {} 's value {} does not correspond to enum values"
+ " of java.util.concurrent.TimeUnit. Hence default unit"
+ " {} will be used",
CommonConfigurationKeys.RPC_METRICS_TIME_UNIT, timeunit,
RpcMetrics.DEFAULT_METRIC_TIME_UNIT);
}
}
return metricsTimeUnit;
}

// Public instrumentation methods that could be extracted to an
// abstract class if we decide to do custom instrumentation classes a la
// JobTrackerInstrumentation. The methods with //@Override comment are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3312,6 +3312,21 @@
</description>
</property>

<property>
<name>rpc.metrics.timeunit</name>
<value>MILLISECONDS</value>
<description>
This property is used to configure timeunit for various RPC Metrics
e.g rpcQueueTime, rpcLockWaitTime, rpcProcessingTime,
deferredRpcProcessingTime. In the absence of this property,
default timeunit used is milliseconds.
The value of this property should match to any one value of enum:
java.util.concurrent.TimeUnit.
Some of the valid values: NANOSECONDS, MICROSECONDS, MILLISECONDS,
SECONDS etc.
</description>
</property>

<property>
<name>rpc.metrics.percentiles.intervals</name>
<value></value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ rpc
---

Each metrics record contains tags such as Hostname and port (number to which server is bound) as additional information along with metrics.
`rpc.metrics.timeunit` config can be used to configure timeunit for RPC metrics.
The default timeunit used for RPC metrics is milliseconds (as per the below description).

| Name | Description |
|:---- |:---- |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.Server.Call;
import org.apache.hadoop.ipc.Server.Connection;
import org.apache.hadoop.ipc.metrics.RpcMetrics;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
import org.apache.hadoop.ipc.protobuf.TestProtos;
Expand Down Expand Up @@ -1098,8 +1097,8 @@ public TestRpcService run() {
proxy.lockAndSleep(null, newSleepRequest(5));
rpcMetrics = getMetrics(server.getRpcMetrics().name());
assertGauge("RpcLockWaitTimeAvgTime",
(double)(RpcMetrics.TIMEUNIT.convert(10L, TimeUnit.SECONDS)),
rpcMetrics);
(double)(server.getRpcMetrics().getMetricsTimeUnit().convert(10L,
TimeUnit.SECONDS)), rpcMetrics);
} finally {
if (proxy2 != null) {
RPC.stopProxy(proxy2);
Expand Down Expand Up @@ -1603,6 +1602,70 @@ public void testSetProtocolEngine() {
assertTrue(rpcEngine instanceof StoppedRpcEngine);
}

@Test
public void testRpcMetricsInNanos() throws Exception {
final Server server;
TestRpcService proxy = null;

final int interval = 1;
conf.setBoolean(CommonConfigurationKeys.
RPC_METRICS_QUANTILE_ENABLE, true);
conf.set(CommonConfigurationKeys.
RPC_METRICS_PERCENTILES_INTERVALS_KEY, "" + interval);
conf.set(CommonConfigurationKeys.RPC_METRICS_TIME_UNIT, "NANOSECONDS");

server = setupTestServer(conf, 5);
String testUser = "testUserInNanos";
UserGroupInformation anotherUser =
UserGroupInformation.createRemoteUser(testUser);
TestRpcService proxy2 =
anotherUser.doAs((PrivilegedAction<TestRpcService>) () -> {
try {
return RPC.getProxy(TestRpcService.class, 0,
server.getListenerAddress(), conf);
} catch (IOException e) {
LOG.error("Something went wrong.", e);
}
return null;
});
try {
proxy = getClient(addr, conf);
for (int i = 0; i < 100; i++) {
proxy.ping(null, newEmptyRequest());
proxy.echo(null, newEchoRequest("" + i));
proxy2.echo(null, newEchoRequest("" + i));
}
MetricsRecordBuilder rpcMetrics =
getMetrics(server.getRpcMetrics().name());
assertEquals("Expected zero rpc lock wait time",
0, getDoubleGauge("RpcLockWaitTimeAvgTime", rpcMetrics), 0.001);
MetricsAsserts.assertQuantileGauges("RpcQueueTime" + interval + "s",
rpcMetrics);
MetricsAsserts.assertQuantileGauges("RpcProcessingTime" + interval + "s",
rpcMetrics);

proxy.lockAndSleep(null, newSleepRequest(5));
rpcMetrics = getMetrics(server.getRpcMetrics().name());
assertGauge("RpcLockWaitTimeAvgTime",
(double)(server.getRpcMetrics().getMetricsTimeUnit().convert(10L,
TimeUnit.SECONDS)), rpcMetrics);
LOG.info("RpcProcessingTimeAvgTime: {} , RpcQueueTimeAvgTime: {}",
getDoubleGauge("RpcProcessingTimeAvgTime", rpcMetrics),
getDoubleGauge("RpcQueueTimeAvgTime", rpcMetrics));

assertTrue(getDoubleGauge("RpcProcessingTimeAvgTime", rpcMetrics)
> 4000000D);
assertTrue(getDoubleGauge("RpcQueueTimeAvgTime", rpcMetrics)
> 4000D);
} finally {
if (proxy2 != null) {
RPC.stopProxy(proxy2);
}
stop(server, proxy);
}
}


public static void main(String[] args) throws Exception {
new TestRPC().testCallsInternal(conf);
}
Expand Down