Skip to content

Commit 10ddb9f

Browse files
authored
Merge branch 'apache:trunk' into YARN-11235
2 parents fd81fc1 + 123d1aa commit 10ddb9f

File tree

39 files changed

+882
-545
lines changed

39 files changed

+882
-545
lines changed

LICENSE-binary

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ com.aliyun:aliyun-java-sdk-ecs:4.2.0
215215
com.aliyun:aliyun-java-sdk-ram:3.0.0
216216
com.aliyun:aliyun-java-sdk-sts:3.0.0
217217
com.aliyun.oss:aliyun-sdk-oss:3.13.2
218-
com.amazonaws:aws-java-sdk-bundle:1.11.901
218+
com.amazonaws:aws-java-sdk-bundle:1.12.262
219219
com.cedarsoftware:java-util:1.9.0
220220
com.cedarsoftware:json-io:2.5.1
221221
com.fasterxml.jackson.core:jackson-annotations:2.12.7
@@ -309,7 +309,7 @@ org.apache.commons:commons-configuration2:2.1.1
309309
org.apache.commons:commons-csv:1.0
310310
org.apache.commons:commons-digester:1.8.1
311311
org.apache.commons:commons-lang3:3.12.0
312-
org.apache.commons:commons-math3:3.1.1
312+
org.apache.commons:commons-math3:3.6.1
313313
org.apache.commons:commons-text:1.4
314314
org.apache.commons:commons-validator:1.6
315315
org.apache.curator:curator-client:5.2.0

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public final class StreamStatisticNames {
4747
public static final String STREAM_READ_ABORTED = "stream_aborted";
4848

4949
/**
50-
* Bytes read from an input stream in read() calls.
50+
* Bytes read from an input stream in read()/readVectored() calls.
5151
* Does not include bytes read and then discarded in seek/close etc.
5252
* These are the bytes returned to the caller.
5353
* Value: {@value}.
@@ -110,6 +110,34 @@ public final class StreamStatisticNames {
110110
public static final String STREAM_READ_OPERATIONS =
111111
"stream_read_operations";
112112

113+
/**
114+
* Count of readVectored() operations in an input stream.
115+
* Value: {@value}.
116+
*/
117+
public static final String STREAM_READ_VECTORED_OPERATIONS =
118+
"stream_read_vectored_operations";
119+
120+
/**
121+
* Count of bytes discarded during readVectored() operation
122+
* in an input stream.
123+
* Value: {@value}.
124+
*/
125+
public static final String STREAM_READ_VECTORED_READ_BYTES_DISCARDED =
126+
"stream_read_vectored_read_bytes_discarded";
127+
128+
/**
129+
* Count of incoming file ranges during readVectored() operation.
130+
* Value: {@value}
131+
*/
132+
public static final String STREAM_READ_VECTORED_INCOMING_RANGES =
133+
"stream_read_vectored_incoming_ranges";
134+
/**
135+
* Count of combined file ranges during readVectored() operation.
136+
* Value: {@value}
137+
*/
138+
public static final String STREAM_READ_VECTORED_COMBINED_RANGES =
139+
"stream_read_vectored_combined_ranges";
140+
113141
/**
114142
* Count of incomplete read() operations in an input stream,
115143
* that is, when the bytes returned were less than that requested.

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,10 @@ public IntFunction<ByteBuffer> getAllocate() {
8484
return allocate;
8585
}
8686

87+
public WeakReferencedElasticByteBufferPool getPool() {
88+
return pool;
89+
}
90+
8791
@Override
8892
public void setup() throws Exception {
8993
super.setup();
@@ -382,6 +386,13 @@ protected List<FileRange> getSampleOverlappingRanges() {
382386
return fileRanges;
383387
}
384388

389+
protected List<FileRange> getConsecutiveRanges() {
390+
List<FileRange> fileRanges = new ArrayList<>();
391+
fileRanges.add(FileRange.createFileRange(100, 500));
392+
fileRanges.add(FileRange.createFileRange(600, 500));
393+
return fileRanges;
394+
}
395+
385396
/**
386397
* Validate that exceptions must be thrown during a vectored
387398
* read operation with specific input ranges.

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@
2929
import org.slf4j.Logger;
3030
import org.slf4j.LoggerFactory;
3131

32+
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT;
33+
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT;
34+
3235
/**
3336
* Base fairness policy that implements @RouterRpcFairnessPolicyController.
3437
* Internally a map of nameservice to Semaphore is used to control permits.
@@ -42,15 +45,26 @@ public class AbstractRouterRpcFairnessPolicyController
4245
/** Hash table to hold semaphore for each configured name service. */
4346
private Map<String, Semaphore> permits;
4447

48+
private long acquireTimeoutMs = DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT;
49+
4550
public void init(Configuration conf) {
4651
this.permits = new HashMap<>();
52+
long timeoutMs = conf.getTimeDuration(DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT,
53+
DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
54+
if (timeoutMs >= 0) {
55+
acquireTimeoutMs = timeoutMs;
56+
} else {
57+
LOG.warn("Invalid value {} configured for {} should be greater than or equal to 0. " +
58+
"Using default value of : {}ms instead.", timeoutMs,
59+
DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT, DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT);
60+
}
4761
}
4862

4963
@Override
5064
public boolean acquirePermit(String nsId) {
5165
try {
5266
LOG.debug("Taking lock for nameservice {}", nsId);
53-
return this.permits.get(nsId).tryAcquire(1, TimeUnit.SECONDS);
67+
return this.permits.get(nsId).tryAcquire(acquireTimeoutMs, TimeUnit.MILLISECONDS);
5468
} catch (InterruptedException e) {
5569
LOG.debug("Cannot get a permit for nameservice {}", nsId);
5670
}
@@ -82,15 +96,13 @@ protected int getAvailablePermits(String nsId) {
8296
@Override
8397
public String getAvailableHandlerOnPerNs() {
8498
JSONObject json = new JSONObject();
85-
for (Map.Entry<String, Semaphore> entry : permits.entrySet()) {
99+
permits.forEach((k, v) -> {
86100
try {
87-
String nsId = entry.getKey();
88-
int availableHandler = entry.getValue().availablePermits();
89-
json.put(nsId, availableHandler);
101+
json.put(k, v.availablePermits());
90102
} catch (JSONException e) {
91-
LOG.warn("Cannot put {} into JSONObject", entry.getKey(), e);
103+
LOG.warn("Cannot put {} into JSONObject", k, e);
92104
}
93-
}
105+
});
94106
return json.toString();
95107
}
96108
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/StaticRouterRpcFairnessPolicyController.java

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,10 @@ public StaticRouterRpcFairnessPolicyController(Configuration conf) {
5050
init(conf);
5151
}
5252

53-
public void init(Configuration conf)
54-
throws IllegalArgumentException {
53+
public void init(Configuration conf) throws IllegalArgumentException {
5554
super.init(conf);
5655
// Total handlers configured to process all incoming Rpc.
57-
int handlerCount = conf.getInt(
58-
DFS_ROUTER_HANDLER_COUNT_KEY,
59-
DFS_ROUTER_HANDLER_COUNT_DEFAULT);
56+
int handlerCount = conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY, DFS_ROUTER_HANDLER_COUNT_DEFAULT);
6057

6158
LOG.info("Handlers available for fairness assignment {} ", handlerCount);
6259

@@ -71,8 +68,7 @@ public void init(Configuration conf)
7168
allConfiguredNS.add(CONCURRENT_NS);
7269
validateHandlersCount(conf, handlerCount, allConfiguredNS);
7370
for (String nsId : allConfiguredNS) {
74-
int dedicatedHandlers =
75-
conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0);
71+
int dedicatedHandlers = conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0);
7672
LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId);
7773
if (dedicatedHandlers > 0) {
7874
handlerCount -= dedicatedHandlers;
@@ -86,7 +82,7 @@ public void init(Configuration conf)
8682
// Assign remaining handlers equally to remaining name services and
8783
// general pool if applicable.
8884
if (!unassignedNS.isEmpty()) {
89-
LOG.info("Unassigned ns {}", unassignedNS.toString());
85+
LOG.info("Unassigned ns {}", unassignedNS);
9086
int handlersPerNS = handlerCount / unassignedNS.size();
9187
LOG.info("Handlers available per ns {}", handlersPerNS);
9288
for (String nsId : unassignedNS) {
@@ -101,24 +97,20 @@ public void init(Configuration conf)
10197
int existingPermits = getAvailablePermits(CONCURRENT_NS);
10298
if (leftOverHandlers > 0) {
10399
LOG.info("Assigned extra {} handlers to commons pool", leftOverHandlers);
104-
insertNameServiceWithPermits(CONCURRENT_NS,
105-
existingPermits + leftOverHandlers);
100+
insertNameServiceWithPermits(CONCURRENT_NS, existingPermits + leftOverHandlers);
106101
}
107-
LOG.info("Final permit allocation for concurrent ns: {}",
108-
getAvailablePermits(CONCURRENT_NS));
102+
LOG.info("Final permit allocation for concurrent ns: {}", getAvailablePermits(CONCURRENT_NS));
109103
}
110104

111105
private static void logAssignment(String nsId, int count) {
112-
LOG.info("Assigned {} handlers to nsId {} ",
113-
count, nsId);
106+
LOG.info("Assigned {} handlers to nsId {} ", count, nsId);
114107
}
115108

116-
private void validateHandlersCount(Configuration conf, int handlerCount,
117-
Set<String> allConfiguredNS) {
109+
private void validateHandlersCount(Configuration conf,
110+
int handlerCount, Set<String> allConfiguredNS) {
118111
int totalDedicatedHandlers = 0;
119112
for (String nsId : allConfiguredNS) {
120-
int dedicatedHandlers =
121-
conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0);
113+
int dedicatedHandlers = conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0);
122114
if (dedicatedHandlers > 0) {
123115
// Total handlers should not be less than sum of dedicated handlers.
124116
totalDedicatedHandlers += dedicatedHandlers;
@@ -128,8 +120,7 @@ private void validateHandlersCount(Configuration conf, int handlerCount,
128120
}
129121
}
130122
if (totalDedicatedHandlers > handlerCount) {
131-
String msg = String.format(ERROR_MSG, handlerCount,
132-
totalDedicatedHandlers);
123+
String msg = String.format(ERROR_MSG, handlerCount, totalDedicatedHandlers);
133124
LOG.error(msg);
134125
throw new IllegalArgumentException(msg);
135126
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,10 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
354354
NoRouterRpcFairnessPolicyController.class;
355355
public static final String DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX =
356356
FEDERATION_ROUTER_FAIRNESS_PREFIX + "handler.count.";
357+
public static final String DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT =
358+
FEDERATION_ROUTER_FAIRNESS_PREFIX + "acquire.timeout";
359+
public static final long DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT =
360+
TimeUnit.SECONDS.toMillis(1);
357361

358362
// HDFS Router Federation Rename.
359363
public static final String DFS_ROUTER_FEDERATION_RENAME_PREFIX =

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -723,6 +723,14 @@
723723
</description>
724724
</property>
725725

726+
<property>
727+
<name>dfs.federation.router.fairness.acquire.timeout</name>
728+
<value>1s</value>
729+
<description>
730+
The maximum time to wait for a permit.
731+
</description>
732+
</property>
733+
726734
<property>
727735
<name>dfs.federation.router.federation.rename.bandwidth</name>
728736
<value>10</value>

hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRpcFairnessPolicyController.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,14 @@
2323
import org.apache.hadoop.hdfs.server.federation.router.FederationUtil;
2424
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
2525
import org.apache.hadoop.test.GenericTestUtils;
26+
import org.apache.hadoop.util.Time;
2627
import org.junit.Test;
2728
import org.slf4j.LoggerFactory;
2829

30+
import java.util.concurrent.TimeUnit;
31+
2932
import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
33+
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT;
3034
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
3135
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
3236
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX;
@@ -83,6 +87,26 @@ public void testHandlerAllocationPreconfigured() {
8387
assertFalse(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS));
8488
}
8589

90+
@Test
91+
public void testAcquireTimeout() {
92+
Configuration conf = createConf(40);
93+
conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns1", 30);
94+
conf.setTimeDuration(DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT, 100, TimeUnit.MILLISECONDS);
95+
RouterRpcFairnessPolicyController routerRpcFairnessPolicyController =
96+
FederationUtil.newFairnessPolicyController(conf);
97+
98+
// ns1 should have 30 permits allocated
99+
for (int i = 0; i < 30; i++) {
100+
assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1"));
101+
}
102+
long acquireBeginTimeMs = Time.monotonicNow();
103+
assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns1"));
104+
long acquireTimeMs = Time.monotonicNow() - acquireBeginTimeMs;
105+
106+
// There are some other operations, so acquireTimeMs >= 100ms.
107+
assertTrue(acquireTimeMs >= 100);
108+
}
109+
86110
@Test
87111
public void testAllocationErrorWithZeroHandlers() {
88112
Configuration conf = createConf(0);

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3616,7 +3616,7 @@ private Block addStoredBlock(final BlockInfo block,
36163616
curReplicaDelta =
36173617
(node.isDecommissioned() || node.isDecommissionInProgress()) ? 0 : 1;
36183618
if (logEveryBlock) {
3619-
blockLog.debug("BLOCK* addStoredBlock: {} is added to {} (size={})",
3619+
blockLog.info("BLOCK* addStoredBlock: {} is added to {} (size={})",
36203620
node, storedBlock, storedBlock.getNumBytes());
36213621
}
36223622
} else if (result == AddBlockResult.REPLACED) {

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/HostRestrictingAuthorizationFilterHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@
4343
import java.util.List;
4444
import java.util.Map;
4545

46-
import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
47-
import static io.netty.handler.codec.http.HttpHeaders.Values.CLOSE;
46+
import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
47+
import static io.netty.handler.codec.http.HttpHeaderValues.CLOSE;
4848
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
4949
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
5050

0 commit comments

Comments
 (0)