Skip to content

Commit 7e63d0a

Browse files
authored
Merge branch 'apache:trunk' into YARN-11359
2 parents 101229d + a48e8c9 commit 7e63d0a

File tree

30 files changed

+2112
-776
lines changed

30 files changed

+2112
-776
lines changed

LICENSE-binary

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -241,8 +241,8 @@ com.google.guava:guava:27.0-jre
241241
com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava
242242
com.microsoft.azure:azure-storage:7.0.0
243243
com.nimbusds:nimbus-jose-jwt:9.8.1
244-
com.squareup.okhttp3:okhttp:4.9.3
245-
com.squareup.okio:okio:1.6.0
244+
com.squareup.okhttp3:okhttp:4.10.0
245+
com.squareup.okio:okio:3.2.0
246246
com.zaxxer:HikariCP:4.0.3
247247
commons-beanutils:commons-beanutils:1.9.3
248248
commons-cli:commons-cli:1.2

hadoop-client-modules/hadoop-client-runtime/pom.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@
148148
<!-- Leave javax APIs that are stable -->
149149
<!-- the jdk ships part of the javax.annotation namespace, so if we want to relocate this we'll have to care it out by class :( -->
150150
<exclude>com.google.code.findbugs:jsr305</exclude>
151+
<exclude>io.netty:*</exclude>
151152
<exclude>io.dropwizard.metrics:metrics-core</exclude>
152153
<exclude>org.eclipse.jetty:jetty-servlet</exclude>
153154
<exclude>org.eclipse.jetty:jetty-security</exclude>
@@ -156,6 +157,8 @@
156157
<exclude>org.bouncycastle:*</exclude>
157158
<!-- Leave snappy that includes native methods which cannot be relocated. -->
158159
<exclude>org.xerial.snappy:*</exclude>
160+
<!-- leave out kotlin classes -->
161+
<exclude>org.jetbrains.kotlin:*</exclude>
159162
</excludes>
160163
</artifactSet>
161164
<filters>

hadoop-common-project/hadoop-common/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,11 @@
383383
<artifactId>mockwebserver</artifactId>
384384
<scope>test</scope>
385385
</dependency>
386+
<dependency>
387+
<groupId>com.squareup.okio</groupId>
388+
<artifactId>okio-jvm</artifactId>
389+
<scope>test</scope>
390+
</dependency>
386391
<dependency>
387392
<groupId>dnsjava</groupId>
388393
<artifactId>dnsjava</artifactId>

hadoop-common-project/hadoop-common/src/site/markdown/DeprecatedProperties.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,8 @@ The following table lists the configuration property names that are deprecated i
208208
| mapred.task.profile.params | mapreduce.task.profile.params |
209209
| mapred.task.profile.reduces | mapreduce.task.profile.reduces |
210210
| mapred.task.timeout | mapreduce.task.timeout |
211-
| mapred.tasktracker.indexcache.mb | mapreduce.tasktracker.indexcache.mb |
211+
| mapred.tasktracker.indexcache.mb | mapreduce.reduce.shuffle.indexcache.mb |
212+
| mapreduce.tasktracker.indexcache.mb | mapreduce.reduce.shuffle.indexcache.mb |
212213
| mapred.tasktracker.map.tasks.maximum | mapreduce.tasktracker.map.tasks.maximum |
213214
| mapred.tasktracker.memory\_calculator\_plugin | mapreduce.tasktracker.resourcecalculatorplugin |
214215
| mapred.tasktracker.memorycalculatorplugin | mapreduce.tasktracker.resourcecalculatorplugin |

hadoop-hdfs-project/hadoop-hdfs-client/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,16 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
3737
<dependency>
3838
<groupId>com.squareup.okhttp3</groupId>
3939
<artifactId>okhttp</artifactId>
40+
<exclusions>
41+
<exclusion>
42+
<groupId>com.squareup.okio</groupId>
43+
<artifactId>okio-jvm</artifactId>
44+
</exclusion>
45+
</exclusions>
46+
</dependency>
47+
<dependency>
48+
<groupId>com.squareup.okio</groupId>
49+
<artifactId>okio-jvm</artifactId>
4050
</dependency>
4151
<dependency>
4252
<groupId>org.jetbrains.kotlin</groupId>

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PoolAlignmentContext.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,7 @@ public void receiveResponseState(RpcHeaderProtos.RpcResponseHeaderProto header)
7171
*/
7272
@Override
7373
public void updateRequestState(RpcHeaderProtos.RpcRequestHeaderProto.Builder header) {
74-
long maxStateId = Long.max(poolLocalStateId.get(), sharedGlobalStateId.get());
75-
header.setStateId(maxStateId);
74+
header.setStateId(poolLocalStateId.get());
7675
}
7776

7877
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs.server.federation.router;
19+
20+
import org.apache.hadoop.conf.Configuration;
21+
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
22+
import org.junit.jupiter.api.Assertions;
23+
import org.junit.jupiter.api.Test;
24+
25+
26+
public class TestPoolAlignmentContext {
27+
@Test
28+
public void testNamenodeRequestsOnlyUsePoolLocalStateID() {
29+
RouterStateIdContext routerStateIdContext = new RouterStateIdContext(new Configuration());
30+
String namespaceId = "namespace1";
31+
routerStateIdContext.getNamespaceStateId(namespaceId).accumulate(20L);
32+
PoolAlignmentContext poolContext1 = new PoolAlignmentContext(routerStateIdContext, namespaceId);
33+
PoolAlignmentContext poolContext2 = new PoolAlignmentContext(routerStateIdContext, namespaceId);
34+
35+
assertRequestHeaderStateId(poolContext1, Long.MIN_VALUE);
36+
assertRequestHeaderStateId(poolContext2, Long.MIN_VALUE);
37+
Assertions.assertEquals(20L, poolContext1.getLastSeenStateId());
38+
Assertions.assertEquals(20L, poolContext2.getLastSeenStateId());
39+
40+
poolContext1.advanceClientStateId(30L);
41+
assertRequestHeaderStateId(poolContext1, 30L);
42+
assertRequestHeaderStateId(poolContext2, Long.MIN_VALUE);
43+
Assertions.assertEquals(20L, poolContext1.getLastSeenStateId());
44+
Assertions.assertEquals(20L, poolContext2.getLastSeenStateId());
45+
}
46+
47+
private void assertRequestHeaderStateId(PoolAlignmentContext poolAlignmentContext,
48+
Long expectedValue) {
49+
RpcRequestHeaderProto.Builder builder = RpcRequestHeaderProto.newBuilder();
50+
poolAlignmentContext.updateRequestState(builder);
51+
Assertions.assertEquals(expectedValue, builder.getStateId());
52+
}
53+
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -520,7 +520,7 @@ public void addVolume(final StorageLocation location,
520520

521521
for (final NamespaceInfo nsInfo : nsInfos) {
522522
String bpid = nsInfo.getBlockPoolID();
523-
try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
523+
try {
524524
fsVolume.addBlockPool(bpid, this.conf, this.timer);
525525
fsVolume.getVolumeMap(bpid, tempVolumeMap, ramDiskReplicaTracker);
526526
} catch (IOException e) {

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import java.util.concurrent.atomic.AtomicInteger;
2424

2525
import org.apache.hadoop.fs.Path;
26-
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
26+
import org.apache.hadoop.mapreduce.MRJobConfig;
2727
import org.slf4j.Logger;
2828
import org.slf4j.LoggerFactory;
2929

@@ -43,7 +43,7 @@ class IndexCache {
4343
public IndexCache(JobConf conf) {
4444
this.conf = conf;
4545
totalMemoryAllowed =
46-
conf.getInt(TTConfig.TT_INDEX_CACHE, 10) * 1024 * 1024;
46+
conf.getInt(MRJobConfig.SHUFFLE_INDEX_CACHE, 10) * 1024 * 1024;
4747
LOG.info("IndexCache created with max memory = " + totalMemoryAllowed);
4848
}
4949

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -577,6 +577,8 @@ public interface MRJobConfig {
577577
public static final String MAX_SHUFFLE_FETCH_HOST_FAILURES = "mapreduce.reduce.shuffle.max-host-failures";
578578
public static final int DEFAULT_MAX_SHUFFLE_FETCH_HOST_FAILURES = 5;
579579

580+
public static final String SHUFFLE_INDEX_CACHE = "mapreduce.reduce.shuffle.indexcache.mb";
581+
580582
public static final String REDUCE_SKIP_INCR_PROC_COUNT = "mapreduce.reduce.skip.proc-count.auto-incr";
581583

582584
public static final String REDUCE_SKIP_MAXGROUPS = "mapreduce.reduce.skip.maxgroups";

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,12 @@
2929
@InterfaceStability.Evolving
3030
public interface TTConfig extends MRConfig {
3131

32+
/**
33+
* @deprecated Use
34+
* {@link org.apache.hadoop.mapreduce.MRJobConfig#SHUFFLE_INDEX_CACHE}
35+
* instead
36+
*/
37+
@Deprecated
3238
public static final String TT_INDEX_CACHE =
3339
"mapreduce.tasktracker.indexcache.mb";
3440
public static final String TT_MAP_SLOTS =

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,15 @@
5353

5454
import org.apache.hadoop.classification.VisibleForTesting;
5555

56-
class Fetcher<K,V> extends Thread {
56+
@VisibleForTesting
57+
public class Fetcher<K, V> extends Thread {
5758

5859
private static final Logger LOG = LoggerFactory.getLogger(Fetcher.class);
5960

60-
/** Number of ms before timing out a copy */
61+
/** Number of ms before timing out a copy. */
6162
private static final int DEFAULT_STALLED_COPY_TIMEOUT = 3 * 60 * 1000;
6263

63-
/** Basic/unit connection timeout (in milliseconds) */
64+
/** Basic/unit connection timeout (in milliseconds). */
6465
private final static int UNIT_CONNECT_TIMEOUT = 60 * 1000;
6566

6667
/* Default read timeout (in milliseconds) */
@@ -72,19 +73,21 @@ class Fetcher<K,V> extends Thread {
7273
private static final String FETCH_RETRY_AFTER_HEADER = "Retry-After";
7374

7475
protected final Reporter reporter;
75-
private enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
76+
@VisibleForTesting
77+
public enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
7678
CONNECTION, WRONG_REDUCE}
77-
78-
private final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
79+
80+
@VisibleForTesting
81+
public final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
7982
private final JobConf jobConf;
8083
private final Counters.Counter connectionErrs;
8184
private final Counters.Counter ioErrs;
8285
private final Counters.Counter wrongLengthErrs;
8386
private final Counters.Counter badIdErrs;
8487
private final Counters.Counter wrongMapErrs;
8588
private final Counters.Counter wrongReduceErrs;
86-
protected final MergeManager<K,V> merger;
87-
protected final ShuffleSchedulerImpl<K,V> scheduler;
89+
protected final MergeManager<K, V> merger;
90+
protected final ShuffleSchedulerImpl<K, V> scheduler;
8891
protected final ShuffleClientMetrics metrics;
8992
protected final ExceptionReporter exceptionReporter;
9093
protected final int id;
@@ -111,7 +114,7 @@ private enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
111114
private static SSLFactory sslFactory;
112115

113116
public Fetcher(JobConf job, TaskAttemptID reduceId,
114-
ShuffleSchedulerImpl<K,V> scheduler, MergeManager<K,V> merger,
117+
ShuffleSchedulerImpl<K, V> scheduler, MergeManager<K, V> merger,
115118
Reporter reporter, ShuffleClientMetrics metrics,
116119
ExceptionReporter exceptionReporter, SecretKey shuffleKey) {
117120
this(job, reduceId, scheduler, merger, reporter, metrics,
@@ -120,7 +123,7 @@ public Fetcher(JobConf job, TaskAttemptID reduceId,
120123

121124
@VisibleForTesting
122125
Fetcher(JobConf job, TaskAttemptID reduceId,
123-
ShuffleSchedulerImpl<K,V> scheduler, MergeManager<K,V> merger,
126+
ShuffleSchedulerImpl<K, V> scheduler, MergeManager<K, V> merger,
124127
Reporter reporter, ShuffleClientMetrics metrics,
125128
ExceptionReporter exceptionReporter, SecretKey shuffleKey,
126129
int id) {
@@ -315,9 +318,8 @@ protected void copyFromHost(MapHost host) throws IOException {
315318
return;
316319
}
317320

318-
if(LOG.isDebugEnabled()) {
319-
LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: "
320-
+ maps);
321+
if (LOG.isDebugEnabled()) {
322+
LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: " + maps);
321323
}
322324

323325
// List of maps to be fetched yet
@@ -411,8 +413,8 @@ private void openConnectionWithRetry(URL url) throws IOException {
411413
shouldWait = false;
412414
} catch (IOException e) {
413415
if (!fetchRetryEnabled) {
414-
// throw exception directly if fetch's retry is not enabled
415-
throw e;
416+
// throw exception directly if fetch's retry is not enabled
417+
throw e;
416418
}
417419
if ((Time.monotonicNow() - startTime) >= this.fetchRetryTimeout) {
418420
LOG.warn("Failed to connect to host: " + url + "after "
@@ -489,7 +491,7 @@ private TaskAttemptID[] copyMapOutput(MapHost host,
489491
DataInputStream input,
490492
Set<TaskAttemptID> remaining,
491493
boolean canRetry) throws IOException {
492-
MapOutput<K,V> mapOutput = null;
494+
MapOutput<K, V> mapOutput = null;
493495
TaskAttemptID mapId = null;
494496
long decompressedLength = -1;
495497
long compressedLength = -1;
@@ -611,7 +613,7 @@ private void checkTimeoutOrRetry(MapHost host, IOException ioe)
611613
// First time to retry.
612614
long currentTime = Time.monotonicNow();
613615
if (retryStartTime == 0) {
614-
retryStartTime = currentTime;
616+
retryStartTime = currentTime;
615617
}
616618

617619
// Retry is not timeout, let's do retry with throwing an exception.
@@ -628,7 +630,7 @@ private void checkTimeoutOrRetry(MapHost host, IOException ioe)
628630
}
629631

630632
/**
631-
* Do some basic verification on the input received -- Being defensive
633+
* Do some basic verification on the input received -- Being defensive.
632634
* @param compressedLength
633635
* @param decompressedLength
634636
* @param forReduce
@@ -695,8 +697,7 @@ private URL getMapOutputURL(MapHost host, Collection<TaskAttemptID> maps
695697
* only on the last failure. Instead of connecting with a timeout of
696698
* X, we try connecting with a timeout of x < X but multiple times.
697699
*/
698-
private void connect(URLConnection connection, int connectionTimeout)
699-
throws IOException {
700+
private void connect(URLConnection connection, int connectionTimeout) throws IOException {
700701
int unit = 0;
701702
if (connectionTimeout < 0) {
702703
throw new IOException("Invalid timeout "

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -879,4 +879,9 @@ public int compareTo(Path obj) {
879879
return super.compareTo(obj);
880880
}
881881
}
882+
883+
@VisibleForTesting
884+
OnDiskMerger getOnDiskMerger() {
885+
return onDiskMerger;
886+
}
882887
}

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Set;
2626
import java.util.concurrent.atomic.AtomicInteger;
2727

28+
import org.apache.hadoop.classification.VisibleForTesting;
2829
import org.slf4j.Logger;
2930
import org.slf4j.LoggerFactory;
3031

@@ -109,4 +110,14 @@ public void run() {
109110
}
110111

111112
public abstract void merge(List<T> inputs) throws IOException;
113+
114+
@VisibleForTesting
115+
int getMergeFactor() {
116+
return mergeFactor;
117+
}
118+
119+
@VisibleForTesting
120+
LinkedList<List<T>> getPendingToBeMerged() {
121+
return pendingToBeMerged;
122+
}
112123
}

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,6 @@ private static void addDeprecatedKeys() {
8080
JTConfig.JT_TASKCACHE_LEVELS),
8181
new DeprecationDelta("mapred.job.tracker.retire.jobs",
8282
JTConfig.JT_RETIREJOBS),
83-
new DeprecationDelta("mapred.tasktracker.indexcache.mb",
84-
TTConfig.TT_INDEX_CACHE),
8583
new DeprecationDelta("mapred.tasktracker.map.tasks.maximum",
8684
TTConfig.TT_MAP_SLOTS),
8785
new DeprecationDelta("mapred.tasktracker.memory_calculator_plugin",
@@ -290,6 +288,10 @@ private static void addDeprecatedKeys() {
290288
MRJobConfig.REDUCE_LOG_LEVEL),
291289
new DeprecationDelta("mapreduce.job.counters.limit",
292290
MRJobConfig.COUNTERS_MAX_KEY),
291+
new DeprecationDelta("mapred.tasktracker.indexcache.mb",
292+
MRJobConfig.SHUFFLE_INDEX_CACHE),
293+
new DeprecationDelta("mapreduce.tasktracker.indexcache.mb",
294+
MRJobConfig.SHUFFLE_INDEX_CACHE),
293295
new DeprecationDelta("jobclient.completion.poll.interval",
294296
Job.COMPLETION_POLL_INTERVAL_KEY),
295297
new DeprecationDelta("jobclient.progress.monitor.poll.interval",

0 commit comments

Comments
 (0)