Skip to content

Commit 386da87

Browse files
Merge branch 'apache:trunk' into HADOOP-18325_footerMetrics
2 parents eb381e1 + d55d76e commit 386da87

File tree

263 files changed

+114652
-3757
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

263 files changed

+114652
-3757
lines changed

.asf.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,4 @@ notifications:
2222
commits: common-commits@hadoop.apache.org
2323
issues: common-issues@hadoop.apache.org
2424
pullrequests: common-issues@hadoop.apache.org
25-
jira_options: link label worklog
25+
jira_options: comment link label

LICENSE-binary

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ com.aliyun:aliyun-java-sdk-ecs:4.2.0
215215
com.aliyun:aliyun-java-sdk-ram:3.0.0
216216
com.aliyun:aliyun-java-sdk-sts:3.0.0
217217
com.aliyun.oss:aliyun-sdk-oss:3.13.2
218-
com.amazonaws:aws-java-sdk-bundle:1.11.901
218+
com.amazonaws:aws-java-sdk-bundle:1.12.262
219219
com.cedarsoftware:java-util:1.9.0
220220
com.cedarsoftware:json-io:2.5.1
221221
com.fasterxml.jackson.core:jackson-annotations:2.12.7
@@ -309,7 +309,7 @@ org.apache.commons:commons-configuration2:2.1.1
309309
org.apache.commons:commons-csv:1.0
310310
org.apache.commons:commons-digester:1.8.1
311311
org.apache.commons:commons-lang3:3.12.0
312-
org.apache.commons:commons-math3:3.1.1
312+
org.apache.commons:commons-math3:3.6.1
313313
org.apache.commons:commons-text:1.4
314314
org.apache.commons:commons-validator:1.6
315315
org.apache.curator:curator-client:5.2.0

hadoop-common-project/hadoop-common/dev-support/jdiff/Apache_Hadoop_Common_3.3.4.xml

Lines changed: 39037 additions & 0 deletions
Large diffs are not rendered by default.

hadoop-common-project/hadoop-common/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1151,7 +1151,7 @@
11511151
<id>src-test-compile-protoc-legacy</id>
11521152
<phase>generate-test-sources</phase>
11531153
<goals>
1154-
<goal>compile</goal>
1154+
<goal>test-compile</goal>
11551155
</goals>
11561156
<configuration>
11571157
<skip>false</skip>
@@ -1160,7 +1160,7 @@
11601160
com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
11611161
</protocArtifact>
11621162
<includeDependenciesInDescriptorSet>false</includeDependenciesInDescriptorSet>
1163-
<protoSourceRoot>${basedir}/src/test/proto</protoSourceRoot>
1163+
<protoTestSourceRoot>${basedir}/src/test/proto</protoTestSourceRoot>
11641164
<outputDirectory>${project.build.directory}/generated-test-sources/java</outputDirectory>
11651165
<clearOutputDirectory>false</clearOutputDirectory>
11661166
<includes>

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,4 +475,18 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
475475
* default hadoop temp dir on local system: {@value}.
476476
*/
477477
public static final String HADOOP_TMP_DIR = "hadoop.tmp.dir";
478+
479+
/**
480+
* Thread-level IOStats Support.
481+
* {@value}
482+
*/
483+
public static final String IOSTATISTICS_THREAD_LEVEL_ENABLED =
484+
"fs.iostatistics.thread.level.enabled";
485+
486+
/**
487+
* Default value for Thread-level IOStats Support is true.
488+
*/
489+
public static final boolean IOSTATISTICS_THREAD_LEVEL_ENABLED_DEFAULT =
490+
true;
491+
478492
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegationTokenRenewer.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -256,9 +256,8 @@ public <T extends FileSystem & Renewable> void removeRenewAction(
256256
try {
257257
action.cancel();
258258
} catch (InterruptedException ie) {
259-
LOG.error("Interrupted while canceling token for " + fs.getUri()
260-
+ "filesystem");
261-
LOG.debug("Exception in removeRenewAction: {}", ie);
259+
LOG.error("Interrupted while canceling token for {} filesystem.", fs.getUri());
260+
LOG.debug("Exception in removeRenewAction.", ie);
262261
}
263262
}
264263
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@
5757
import org.apache.hadoop.fs.impl.StoreImplementationUtils;
5858
import org.apache.hadoop.fs.permission.FsPermission;
5959
import org.apache.hadoop.fs.statistics.IOStatistics;
60+
import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
61+
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
6062
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
6163
import org.apache.hadoop.fs.statistics.BufferedIOStatisticsOutputStream;
6264
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
@@ -156,11 +158,19 @@ class LocalFSFileInputStream extends FSInputStream implements
156158
/** Reference to the bytes read counter for slightly faster counting. */
157159
private final AtomicLong bytesRead;
158160

161+
/**
162+
* Thread level IOStatistics aggregator to update in close().
163+
*/
164+
private final IOStatisticsAggregator
165+
ioStatisticsAggregator;
166+
159167
public LocalFSFileInputStream(Path f) throws IOException {
160168
name = pathToFile(f);
161169
fis = new FileInputStream(name);
162170
bytesRead = ioStatistics.getCounterReference(
163171
STREAM_READ_BYTES);
172+
ioStatisticsAggregator =
173+
IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator();
164174
}
165175

166176
@Override
@@ -193,9 +203,13 @@ public boolean seekToNewSource(long targetPos) throws IOException {
193203

194204
@Override
195205
public void close() throws IOException {
196-
fis.close();
197-
if (asyncChannel != null) {
198-
asyncChannel.close();
206+
try {
207+
fis.close();
208+
if (asyncChannel != null) {
209+
asyncChannel.close();
210+
}
211+
} finally {
212+
ioStatisticsAggregator.aggregate(ioStatistics);
199213
}
200214
}
201215

@@ -278,6 +292,7 @@ public boolean hasCapability(String capability) {
278292
// new capabilities.
279293
switch (capability.toLowerCase(Locale.ENGLISH)) {
280294
case StreamCapabilities.IOSTATISTICS:
295+
case StreamCapabilities.IOSTATISTICS_CONTEXT:
281296
case StreamCapabilities.VECTOREDIO:
282297
return true;
283298
default:
@@ -407,9 +422,19 @@ final class LocalFSFileOutputStream extends OutputStream implements
407422
STREAM_WRITE_EXCEPTIONS)
408423
.build();
409424

425+
/**
426+
* Thread level IOStatistics aggregator to update in close().
427+
*/
428+
private final IOStatisticsAggregator
429+
ioStatisticsAggregator;
430+
410431
private LocalFSFileOutputStream(Path f, boolean append,
411432
FsPermission permission) throws IOException {
412433
File file = pathToFile(f);
434+
// store the aggregator before attempting any IO.
435+
ioStatisticsAggregator =
436+
IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator();
437+
413438
if (!append && permission == null) {
414439
permission = FsPermission.getFileDefault();
415440
}
@@ -436,10 +461,17 @@ private LocalFSFileOutputStream(Path f, boolean append,
436461
}
437462

438463
/*
439-
* Just forward to the fos
464+
* Close the fos; update the IOStatisticsContext.
440465
*/
441466
@Override
442-
public void close() throws IOException { fos.close(); }
467+
public void close() throws IOException {
468+
try {
469+
fos.close();
470+
} finally {
471+
ioStatisticsAggregator.aggregate(ioStatistics);
472+
}
473+
}
474+
443475
@Override
444476
public void flush() throws IOException { fos.flush(); }
445477
@Override
@@ -485,6 +517,7 @@ public boolean hasCapability(String capability) {
485517
// new capabilities.
486518
switch (capability.toLowerCase(Locale.ENGLISH)) {
487519
case StreamCapabilities.IOSTATISTICS:
520+
case StreamCapabilities.IOSTATISTICS_CONTEXT:
488521
return true;
489522
default:
490523
return StoreImplementationUtils.isProbeForSyncable(capability);

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,12 @@ public interface StreamCapabilities {
9393
*/
9494
String ABORTABLE_STREAM = CommonPathCapabilities.ABORTABLE_STREAM;
9595

96+
/**
97+
* Streams that support IOStatistics context and capture thread-level
98+
* IOStatistics.
99+
*/
100+
String IOSTATISTICS_CONTEXT = "fs.capability.iocontext.supported";
101+
96102
/**
97103
* Capabilities that a stream can support and be queried for.
98104
*/

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ public static List<? extends FileRange> validateNonOverlappingAndReturnSortedRan
210210
if (sortedRanges[i].getOffset() < prev.getOffset() + prev.getLength()) {
211211
throw new UnsupportedOperationException("Overlapping ranges are not supported");
212212
}
213+
prev = sortedRanges[i];
213214
}
214215
return Arrays.asList(sortedRanges);
215216
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hadoop.fs.impl;
2020

21+
import java.lang.ref.WeakReference;
2122
import java.util.function.Consumer;
2223
import java.util.function.Function;
2324
import javax.annotation.Nullable;
@@ -48,7 +49,17 @@ public long currentThreadId() {
4849
}
4950

5051
public V setForCurrentThread(V newVal) {
51-
return put(currentThreadId(), newVal);
52+
long id = currentThreadId();
53+
54+
// if the same object is already in the map, just return it.
55+
WeakReference<V> ref = lookup(id);
56+
// Reference value could be set to null. Thus, ref.get() could return
57+
// null. Should be handled accordingly while using the returned value.
58+
if (ref != null && ref.get() == newVal) {
59+
return ref.get();
60+
}
61+
62+
return put(id, newVal);
5263
}
5364

5465
}

0 commit comments

Comments
 (0)