Skip to content

Commit f39dc0f

Browse files
authored
Merge branch 'apache:trunk' into YARN-11509
2 parents 9efbda8 + e14c52c commit f39dc0f

File tree

21 files changed

+582
-54
lines changed

21 files changed

+582
-54
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
*.patch
1212
*.diff
1313
.idea
14+
.vscode
1415
.svn
1516
.classpath
1617
.project

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProcessingDetails.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@
2020

2121
import org.apache.hadoop.classification.InterfaceAudience;
2222
import org.apache.hadoop.classification.InterfaceStability;
23+
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
2324
import org.slf4j.Logger;
2425
import org.slf4j.LoggerFactory;
2526

2627
import java.util.concurrent.TimeUnit;
2728

2829
/**
29-
* Stores the times that a call takes to be processed through each step.
30+
* Stores the times that a call takes to be processed through each step and
31+
* its response status.
3032
*/
3133
@InterfaceStability.Unstable
3234
@InterfaceAudience.Private
@@ -53,6 +55,9 @@ public enum Timing {
5355

5456
private long[] timings = new long[Timing.values().length];
5557

58+
// Rpc return status of this call
59+
private RpcStatusProto returnStatus = RpcStatusProto.SUCCESS;
60+
5661
ProcessingDetails(TimeUnit timeUnit) {
5762
this.valueTimeUnit = timeUnit;
5863
}
@@ -81,6 +86,14 @@ public void add(Timing type, long value, TimeUnit timeUnit) {
8186
timings[type.ordinal()] += valueTimeUnit.convert(value, timeUnit);
8287
}
8388

89+
public void setReturnStatus(RpcStatusProto status) {
90+
this.returnStatus = status;
91+
}
92+
93+
public RpcStatusProto getReturnStatus() {
94+
return returnStatus;
95+
}
96+
8497
@Override
8598
public String toString() {
8699
StringBuilder sb = new StringBuilder(256);

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -600,17 +600,18 @@ void logSlowRpcCalls(String methodName, Call call,
600600
}
601601
}
602602

603-
void updateMetrics(Call call, long startTime, boolean connDropped) {
603+
void updateMetrics(Call call, long processingStartTimeNanos, boolean connDropped) {
604604
totalRequests.increment();
605605
// delta = handler + processing + response
606-
long deltaNanos = Time.monotonicNowNanos() - startTime;
607-
long timestampNanos = call.timestampNanos;
606+
long completionTimeNanos = Time.monotonicNowNanos();
607+
long deltaNanos = completionTimeNanos - processingStartTimeNanos;
608+
long arrivalTimeNanos = call.timestampNanos;
608609

609610
ProcessingDetails details = call.getProcessingDetails();
610611
// queue time is the delta between when the call first arrived and when it
611612
// began being serviced, minus the time it took to be put into the queue
612613
details.set(Timing.QUEUE,
613-
startTime - timestampNanos - details.get(Timing.ENQUEUE));
614+
processingStartTimeNanos - arrivalTimeNanos - details.get(Timing.ENQUEUE));
614615
deltaNanos -= details.get(Timing.PROCESSING);
615616
deltaNanos -= details.get(Timing.RESPONSE);
616617
details.set(Timing.HANDLER, deltaNanos);
@@ -636,10 +637,17 @@ void updateMetrics(Call call, long startTime, boolean connDropped) {
636637
processingTime -= waitTime;
637638
String name = call.getDetailedMetricsName();
638639
rpcDetailedMetrics.addProcessingTime(name, processingTime);
640+
// Overall processing time is from arrival to completion.
641+
long overallProcessingTime = rpcMetrics.getMetricsTimeUnit()
642+
.convert(completionTimeNanos - arrivalTimeNanos, TimeUnit.NANOSECONDS);
643+
rpcDetailedMetrics.addOverallProcessingTime(name, overallProcessingTime);
639644
callQueue.addResponseTime(name, call, details);
640645
if (isLogSlowRPC()) {
641646
logSlowRpcCalls(name, call, details);
642647
}
648+
if (details.getReturnStatus() == RpcStatusProto.SUCCESS) {
649+
rpcMetrics.incrRpcCallSuccesses();
650+
}
643651
}
644652

645653
void updateDeferredMetrics(String name, long processingTime) {
@@ -1237,6 +1245,7 @@ public Void run() throws Exception {
12371245
setResponseFields(value, responseParams);
12381246
sendResponse();
12391247

1248+
details.setReturnStatus(responseParams.returnStatus);
12401249
deltaNanos = Time.monotonicNowNanos() - startNanos;
12411250
details.set(Timing.RESPONSE, deltaNanos, TimeUnit.NANOSECONDS);
12421251
} else {

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,27 @@
3333
@InterfaceAudience.Private
3434
@Metrics(about="Per method RPC metrics", context="rpcdetailed")
3535
public class RpcDetailedMetrics {
36+
static final String DEFERRED_PREFIX = "Deferred";
37+
static final String OVERALL_PROCESSING_PREFIX = "Overall";
3638

39+
// per-method RPC processing time
3740
@Metric MutableRatesWithAggregation rates;
3841
@Metric MutableRatesWithAggregation deferredRpcRates;
42+
/**
43+
* per-method overall RPC processing time, from request arrival to when the
44+
* response is sent back.
45+
*/
46+
@Metric MutableRatesWithAggregation overallRpcProcessingRates;
3947

4048
static final Logger LOG = LoggerFactory.getLogger(RpcDetailedMetrics.class);
4149
final MetricsRegistry registry;
4250
final String name;
4351

52+
// Mainly to facilitate testing in TestRPC.java
53+
public MutableRatesWithAggregation getOverallRpcProcessingRates() {
54+
return overallRpcProcessingRates;
55+
}
56+
4457
RpcDetailedMetrics(int port) {
4558
name = "RpcDetailedActivityForPort"+ port;
4659
registry = new MetricsRegistry("rpcdetailed")
@@ -61,7 +74,8 @@ public static RpcDetailedMetrics create(int port) {
6174
*/
6275
public void init(Class<?> protocol) {
6376
rates.init(protocol);
64-
deferredRpcRates.init(protocol, "Deferred");
77+
deferredRpcRates.init(protocol, DEFERRED_PREFIX);
78+
overallRpcProcessingRates.init(protocol, OVERALL_PROCESSING_PREFIX);
6579
}
6680

6781
/**
@@ -78,6 +92,15 @@ public void addDeferredProcessingTime(String name, long processingTime) {
7892
deferredRpcRates.add(name, processingTime);
7993
}
8094

95+
/**
96+
* Add an overall RPC processing time sample.
97+
* @param rpcCallName of the RPC call
98+
* @param overallProcessingTime the overall RPC processing time
99+
*/
100+
public void addOverallProcessingTime(String rpcCallName, long overallProcessingTime) {
101+
overallRpcProcessingRates.add(rpcCallName, overallProcessingTime);
102+
}
103+
81104
/**
82105
* Shutdown the instrumentation for the process
83106
*/

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,8 @@ public static RpcMetrics create(Server server, Configuration conf) {
138138
MutableCounterLong rpcSlowCalls;
139139
@Metric("Number of requeue calls")
140140
MutableCounterLong rpcRequeueCalls;
141+
@Metric("Number of successful RPC calls")
142+
MutableCounterLong rpcCallSuccesses;
141143

142144
@Metric("Number of open connections") public int numOpenConnections() {
143145
return server.getNumOpenConnections();
@@ -330,6 +332,13 @@ public void incrRequeueCalls() {
330332
rpcRequeueCalls.incr();
331333
}
332334

335+
/**
336+
* One RPC call success event.
337+
*/
338+
public void incrRpcCallSuccesses() {
339+
rpcCallSuccesses.incr();
340+
}
341+
333342
/**
334343
* Returns a MutableRate Counter.
335344
* @return Mutable Rate

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.hadoop.metrics2.util.SampleStat;
3434
import org.slf4j.Logger;
3535
import org.slf4j.LoggerFactory;
36+
import static org.apache.commons.lang3.StringUtils.capitalize;
3637

3738

3839
/**
@@ -162,7 +163,8 @@ Map<String, MutableRate> getGlobalMetrics() {
162163
private synchronized MutableRate addMetricIfNotExists(String name) {
163164
MutableRate metric = globalMetrics.get(name);
164165
if (metric == null) {
165-
metric = new MutableRate(name + typePrefix, name + typePrefix, false);
166+
String metricName = typePrefix + capitalize(name);
167+
metric = new MutableRate(metricName, metricName, false);
166168
metric.setUpdateTimeStamp(true);
167169
globalMetrics.put(name, metric);
168170
}

hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@ The default timeunit used for RPC metrics is milliseconds (as per the below desc
8282
| `RpcAuthenticationSuccesses` | Total number of authentication successes |
8383
| `RpcAuthorizationFailures` | Total number of authorization failures |
8484
| `RpcAuthorizationSuccesses` | Total number of authorization successes |
85+
| `RpcClientBackoff` | Total number of client backoff requests |
86+
| `RpcSlowCalls` | Total number of slow RPC calls |
87+
| `RpcCallsSuccesses` | Total number of RPC calls that are successfully processed |
8588
| `NumOpenConnections` | Current number of open connections |
8689
| `NumInProcessHandler` | Current number of handlers on working |
8790
| `CallQueueLength` | Current length of the call queue |
@@ -142,8 +145,10 @@ to FairCallQueue metrics. For each level of priority, rpcqueue and rpcprocessing
142145
rpcdetailed context
143146
===================
144147

145-
Metrics of rpcdetailed context are exposed in unified manner by RPC layer. Two metrics are exposed for each RPC based on its name. Metrics named "(RPC method name)NumOps" indicates total number of method calls, and metrics named "(RPC method name)AvgTime" shows average turn around time for method calls in milliseconds.
148+
Metrics of rpcdetailed context are exposed in unified manner by RPC layer. Two metrics are exposed for each RPC based on its name. Metrics named "(RPC method name)NumOps" indicates total number of method calls, and metrics named "(RPC method name)AvgTime" shows average processing time for method calls in milliseconds.
146149
Please note that the AvgTime metrics do not include time spent waiting to acquire locks on data structures (see RpcLockWaitTimeAvgTime).
150+
Metrics named "Overall(RPC method name)AvgTime" shows the average overall processing time for method calls
151+
in milliseconds. It is measured from request arrival to when the response is sent back to the client.
147152

148153
rpcdetailed
149154
-----------

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.hadoop.ipc.protobuf.TestProtos;
4040
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
4141
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
42+
import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation;
4243
import org.apache.hadoop.net.NetUtils;
4344
import org.apache.hadoop.security.AccessControlException;
4445
import org.apache.hadoop.security.SecurityUtil;
@@ -95,6 +96,9 @@
9596
import java.util.concurrent.atomic.AtomicReference;
9697
import java.util.concurrent.locks.ReentrantLock;
9798

99+
import static org.apache.hadoop.test.MetricsAsserts.assertGaugeGt;
100+
import static org.apache.hadoop.test.MetricsAsserts.assertGaugeGte;
101+
import static org.apache.hadoop.test.MetricsAsserts.mockMetricsRecordBuilder;
98102
import static org.assertj.core.api.Assertions.assertThat;
99103
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
100104
import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
@@ -1397,6 +1401,82 @@ public void testNumInProcessHandlerMetrics() throws Exception {
13971401
}
13981402
}
13991403

1404+
/**
1405+
* Test the rpcCallSucesses metric in RpcMetrics.
1406+
*/
1407+
@Test
1408+
public void testRpcCallSuccessesMetric() throws Exception {
1409+
final Server server;
1410+
TestRpcService proxy = null;
1411+
1412+
server = setupTestServer(conf, 5);
1413+
try {
1414+
proxy = getClient(addr, conf);
1415+
1416+
// 10 successful responses
1417+
for (int i = 0; i < 10; i++) {
1418+
proxy.ping(null, newEmptyRequest());
1419+
}
1420+
MetricsRecordBuilder rpcMetrics =
1421+
getMetrics(server.getRpcMetrics().name());
1422+
assertCounter("RpcCallSuccesses", 10L, rpcMetrics);
1423+
// rpcQueueTimeNumOps equals total number of RPC calls.
1424+
assertCounter("RpcQueueTimeNumOps", 10L, rpcMetrics);
1425+
1426+
// 2 failed responses with ERROR status and 1 more successful response.
1427+
for (int i = 0; i < 2; i++) {
1428+
try {
1429+
proxy.error(null, newEmptyRequest());
1430+
} catch (ServiceException ignored) {
1431+
}
1432+
}
1433+
proxy.ping(null, newEmptyRequest());
1434+
1435+
rpcMetrics = getMetrics(server.getRpcMetrics().name());
1436+
assertCounter("RpcCallSuccesses", 11L, rpcMetrics);
1437+
assertCounter("RpcQueueTimeNumOps", 13L, rpcMetrics);
1438+
} finally {
1439+
stop(server, proxy);
1440+
}
1441+
}
1442+
1443+
/**
1444+
* Test per-type overall RPC processing time metric.
1445+
*/
1446+
@Test
1447+
public void testOverallRpcProcessingTimeMetric() throws Exception {
1448+
final Server server;
1449+
TestRpcService proxy = null;
1450+
1451+
server = setupTestServer(conf, 5);
1452+
try {
1453+
proxy = getClient(addr, conf);
1454+
1455+
// Sent 1 ping request and 2 lockAndSleep requests
1456+
proxy.ping(null, newEmptyRequest());
1457+
proxy.lockAndSleep(null, newSleepRequest(10));
1458+
proxy.lockAndSleep(null, newSleepRequest(12));
1459+
1460+
MetricsRecordBuilder rb = mockMetricsRecordBuilder();
1461+
MutableRatesWithAggregation rates =
1462+
server.rpcDetailedMetrics.getOverallRpcProcessingRates();
1463+
rates.snapshot(rb, true);
1464+
1465+
// Verify the ping request.
1466+
// Overall processing time for ping is zero when this test is run together with
1467+
// the rest of tests. Thus, we use assertGaugeGte() for OverallPingAvgTime.
1468+
assertCounter("OverallPingNumOps", 1L, rb);
1469+
assertGaugeGte("OverallPingAvgTime", 0.0, rb);
1470+
1471+
// Verify lockAndSleep requests. AvgTime should be greater than 10 ms,
1472+
// since we sleep for 10 and 12 ms respectively.
1473+
assertCounter("OverallLockAndSleepNumOps", 2L, rb);
1474+
assertGaugeGt("OverallLockAndSleepAvgTime", 10.0, rb);
1475+
1476+
} finally {
1477+
stop(server, proxy);
1478+
}
1479+
}
14001480
/**
14011481
* Test RPC backoff by queue full.
14021482
*/

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestMutableMetrics.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,20 @@ public void testDuplicateMetrics() {
328328
verify(rb, times(1))
329329
.addCounter(info("GetLongNumOps", "Number of ops for getLong"), 0L);
330330
verify(rb, times(1)).addCounter(
331-
info("GetLongDeferredNumOps", "Number of ops for getLongDeferred"), 0L);
331+
info("DeferredGetLongNumOps", "Number of ops for deferredGetLong"), 0L);
332+
333+
// Add some samples and verify
334+
rb = mockMetricsRecordBuilder();
335+
rates.add("testRpcMethod", 10);
336+
deferredRpcRates.add("testRpcMethod", 100);
337+
deferredRpcRates.add("testRpcMethod", 500);
338+
rates.snapshot(rb, true);
339+
deferredRpcRates.snapshot(rb, true);
340+
341+
assertCounter("TestRpcMethodNumOps", 1L, rb);
342+
assertGauge("TestRpcMethodAvgTime", 10.0, rb);
343+
assertCounter("DeferredTestRpcMethodNumOps", 2L, rb);
344+
assertGauge("DeferredTestRpcMethodAvgTime", 300.0, rb);
332345
}
333346

334347
/**

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,19 @@ public static void assertGaugeGt(String name, double greater,
358358
getDoubleGauge(name, rb) > greater);
359359
}
360360

361+
/**
362+
* Assert that a double gauge metric is greater than or equal to a value.
363+
* @param name of the metric
364+
* @param greater value of the metric should be greater than or equal to this
365+
* @param rb the record builder mock used to getMetrics
366+
*/
367+
public static void assertGaugeGte(String name, double greater,
368+
MetricsRecordBuilder rb) {
369+
double curValue = getDoubleGauge(name, rb);
370+
Assert.assertTrue("Bad value for metric " + name,
371+
curValue >= greater);
372+
}
373+
361374
/**
362375
* Assert that a double gauge metric is greater than a value
363376
* @param name of the metric

hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.EnumSet;
2424
import java.util.List;
2525

26+
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
2627
import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;
2728
import com.fasterxml.jackson.databind.ObjectMapper;
2829
import com.fasterxml.jackson.databind.type.MapType;
@@ -284,6 +285,7 @@ public enum Operation {
284285
HTTP_POST), SATISFYSTORAGEPOLICY(HTTP_PUT), GETSNAPSHOTDIFFLISTING(HTTP_GET),
285286
GETFILELINKSTATUS(HTTP_GET),
286287
GETSTATUS(HTTP_GET),
288+
GETECPOLICIES(HTTP_GET),
287289
GET_BLOCK_LOCATIONS(HTTP_GET);
288290

289291
private String httpMethod;
@@ -1773,6 +1775,17 @@ public FsStatus getStatus(final Path path) throws IOException {
17731775
return JsonUtilClient.toFsStatus(json);
17741776
}
17751777

1778+
public Collection<ErasureCodingPolicyInfo> getAllErasureCodingPolicies() throws IOException {
1779+
Map<String, String> params = new HashMap<>();
1780+
params.put(OP_PARAM, Operation.GETECPOLICIES.toString());
1781+
Path path = new Path(getUri().toString(), "/");
1782+
HttpURLConnection conn =
1783+
getConnection(Operation.GETECPOLICIES.getMethod(), params, path, false);
1784+
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
1785+
JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
1786+
return JsonUtilClient.getAllErasureCodingPolicies(json);
1787+
}
1788+
17761789
@VisibleForTesting
17771790
static BlockLocation[] toBlockLocations(JSONObject json) throws IOException {
17781791
ObjectMapper mapper = new ObjectMapper();

0 commit comments

Comments
 (0)