Skip to content

Commit

Permalink
Merge branch 'trunk' into YARN-10885
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 authored Aug 13, 2022
2 parents 7a9c9a5 + b737869 commit 0ed4dd4
Show file tree
Hide file tree
Showing 141 changed files with 107,594 additions and 2,612 deletions.
2 changes: 1 addition & 1 deletion .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ notifications:
commits: common-commits@hadoop.apache.org
issues: common-issues@hadoop.apache.org
pullrequests: common-issues@hadoop.apache.org
jira_options: link label worklog
jira_options: comment link label
39,037 changes: 39,037 additions & 0 deletions hadoop-common-project/hadoop-common/dev-support/jdiff/Apache_Hadoop_Common_3.3.4.xml

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions hadoop-common-project/hadoop-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1151,7 +1151,7 @@
<id>src-test-compile-protoc-legacy</id>
<phase>generate-test-sources</phase>
<goals>
<goal>compile</goal>
<goal>test-compile</goal>
</goals>
<configuration>
<skip>false</skip>
Expand All @@ -1160,7 +1160,7 @@
com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
</protocArtifact>
<includeDependenciesInDescriptorSet>false</includeDependenciesInDescriptorSet>
<protoSourceRoot>${basedir}/src/test/proto</protoSourceRoot>
<protoTestSourceRoot>${basedir}/src/test/proto</protoTestSourceRoot>
<outputDirectory>${project.build.directory}/generated-test-sources/java</outputDirectory>
<clearOutputDirectory>false</clearOutputDirectory>
<includes>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,13 +480,13 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
* Thread-level IOStats Support.
* {@value}
*/
public static final String THREAD_LEVEL_IOSTATISTICS_ENABLED =
"fs.thread.level.iostatistics.enabled";
public static final String IOSTATISTICS_THREAD_LEVEL_ENABLED =
"fs.iostatistics.thread.level.enabled";

/**
* Default value for Thread-level IOStats Support is true.
*/
public static final boolean THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT =
public static final boolean IOSTATISTICS_THREAD_LEVEL_ENABLED_DEFAULT =
true;

}
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,8 @@ public <T extends FileSystem & Renewable> void removeRenewAction(
try {
action.cancel();
} catch (InterruptedException ie) {
LOG.error("Interrupted while canceling token for " + fs.getUri()
+ "filesystem");
LOG.debug("Exception in removeRenewAction: {}", ie);
LOG.error("Interrupted while canceling token for {} filesystem.", fs.getUri());
LOG.debug("Exception in removeRenewAction.", ie);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,14 @@ static void setThreadIOStatisticsContext(
IOStatisticsContextIntegration.setThreadIOStatisticsContext(
statisticsContext);
}

/**
* Static probe to check if the thread-level IO statistics enabled.
*
* @return if the thread-level IO statistics enabled.
*/
static boolean enabled() {
return IOStatisticsContextIntegration.isIOStatisticsThreadLevelEnabled();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import org.apache.hadoop.fs.impl.WeakReferenceThreadMap;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;

import static org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED;
import static org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_THREAD_LEVEL_ENABLED;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_THREAD_LEVEL_ENABLED_DEFAULT;

/**
* A Utility class for IOStatisticsContext, which helps in creating and
Expand Down Expand Up @@ -76,8 +76,17 @@ public final class IOStatisticsContextIntegration {
// Work out if the current context has thread level IOStatistics enabled.
final Configuration configuration = new Configuration();
isThreadIOStatsEnabled =
configuration.getBoolean(THREAD_LEVEL_IOSTATISTICS_ENABLED,
THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT);
configuration.getBoolean(IOSTATISTICS_THREAD_LEVEL_ENABLED,
IOSTATISTICS_THREAD_LEVEL_ENABLED_DEFAULT);
}

/**
* Static probe to check if the thread-level IO statistics enabled.
*
* @return if the thread-level IO statistics enabled.
*/
public static boolean isIOStatisticsThreadLevelEnabled() {
return isThreadIOStatsEnabled;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.io.compress;

import java.io.IOException;

/**
* An exception class for when a closed compressor/decopressor is being used
* {@link org.apache.hadoop.io.compress.Compressor}
* {@link org.apache.hadoop.io.compress.Decompressor}
*/
public class AlreadyClosedException extends IOException {

public AlreadyClosedException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ public static void returnCompressor(Compressor compressor) {
}
// if the compressor can't be reused, don't pool it.
if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) {
compressor.end();
return;
}
compressor.reset();
Expand All @@ -225,6 +226,7 @@ public static void returnDecompressor(Decompressor decompressor) {
}
// if the decompressor can't be reused, don't pool it.
if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
decompressor.end();
return;
}
decompressor.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.zip.GZIPOutputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.AlreadyClosedException;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.DoNotPool;
import org.apache.hadoop.util.DataChecksum;
Expand Down Expand Up @@ -83,6 +84,10 @@ public int compress(byte[] b, int off, int len) throws IOException {
throw new IOException("compress called on finished compressor");
}

if (state == BuiltInGzipDecompressor.GzipStateLabel.ENDED) {
throw new AlreadyClosedException("compress called on closed compressor");
}

int compressedBytesWritten = 0;

// If we are not within uncompressed data yet, output the header.
Expand Down Expand Up @@ -139,6 +144,8 @@ public long getBytesWritten() {
@Override
public void end() {
deflater.end();

state = BuiltInGzipDecompressor.GzipStateLabel.ENDED;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;

import org.apache.hadoop.io.compress.AlreadyClosedException;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.DoNotPool;
import org.apache.hadoop.util.DataChecksum;
Expand Down Expand Up @@ -109,7 +110,11 @@ public enum GzipStateLabel {
* Immediately after the trailer (and potentially prior to the next gzip
* member/substream header), without reset() having been called.
*/
FINISHED;
FINISHED,
/**
* Immediately after end() has been called.
*/
ENDED;
}

/**
Expand Down Expand Up @@ -186,6 +191,10 @@ public synchronized int decompress(byte[] b, int off, int len)
throws IOException {
int numAvailBytes = 0;

if (state == GzipStateLabel.ENDED) {
throw new AlreadyClosedException("decompress called on closed decompressor");
}

if (state != GzipStateLabel.DEFLATE_STREAM) {
executeHeaderState();

Expand Down Expand Up @@ -476,6 +485,8 @@ public synchronized void reset() {
@Override
public synchronized void end() {
inflater.end();

state = GzipStateLabel.ENDED;
}

/**
Expand Down
Loading

0 comments on commit 0ed4dd4

Please sign in to comment.