Skip to content

Commit 4cdab98

Browse files
authored
Merge branch 'apache:trunk' into yarnuijs
2 parents 7327eb9 + 43b5183 commit 4cdab98

File tree

16 files changed

+454
-133
lines changed

16 files changed

+454
-133
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/PrometheusMetricsSink.java

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,13 @@
1919

2020
import java.util.ArrayList;
2121
import java.util.List;
22+
import java.util.concurrent.ExecutionException;
2223
import java.util.regex.Matcher;
24+
25+
import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder;
26+
import org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader;
27+
import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache;
28+
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.UncheckedExecutionException;
2329
import org.apache.commons.configuration2.SubsetConfiguration;
2430
import org.apache.hadoop.metrics2.AbstractMetric;
2531
import org.apache.hadoop.metrics2.MetricType;
@@ -35,13 +41,16 @@
3541
import java.util.regex.Pattern;
3642

3743
import org.apache.commons.lang3.StringUtils;
44+
import org.slf4j.Logger;
45+
import org.slf4j.LoggerFactory;
3846

3947
/**
4048
* Metrics sink for prometheus exporter.
4149
* <p>
4250
* Stores the metric data in-memory and return with it on request.
4351
*/
4452
public class PrometheusMetricsSink implements MetricsSink {
53+
private static final Logger LOG = LoggerFactory.getLogger(PrometheusMetricsSink.class);
4554

4655
/**
4756
* Cached output lines for each metrics.
@@ -62,6 +71,17 @@ public class PrometheusMetricsSink implements MetricsSink {
6271
Pattern
6372
.compile("^op=(?<op>\\w+)(.user=(?<user>.*)|)\\.(TotalCount|count)$");
6473

74+
/**
75+
* A fixed cache for Hadoop metric to Prometheus metric name conversion.
76+
*/
77+
private static final int NORMALIZED_NAME_CACHE_MAX_SIZE = 100_000;
78+
private static final CacheLoader<String, String> NORMALIZED_NAME_CACHE_LOADER =
79+
CacheLoader.from(PrometheusMetricsSink::normalizeImpl);
80+
private static final LoadingCache<String, String> NORMALIZED_NAME_CACHE =
81+
CacheBuilder.newBuilder()
82+
.maximumSize(NORMALIZED_NAME_CACHE_MAX_SIZE)
83+
.build(NORMALIZED_NAME_CACHE_LOADER);
84+
6585
public PrometheusMetricsSink() {
6686
}
6787

@@ -83,7 +103,21 @@ public void putMetrics(MetricsRecord metricsRecord) {
83103

84104
/**
85105
* Convert CamelCase based names to lower-case names where the separator
86-
* is the underscore, to follow prometheus naming conventions.
106+
* is the underscore, to follow prometheus naming conventions. This method
107+
* utilizes a cache to improve performance.
108+
*
109+
* <p>
110+
* Reference:
111+
* <ul>
112+
* <li>
113+
* <a href="https://prometheus.io/docs/practices/naming/">
114+
* Metrics and Label Naming</a>
115+
* </li>
116+
* <li>
117+
* <a href="https://prometheus.io/docs/instrumenting/exposition_formats/">
118+
* Exposition formats</a>
119+
* </li>
120+
* </ul>
87121
*
88122
* @param metricName metricName.
89123
* @param recordName recordName.
@@ -93,6 +127,22 @@ public String prometheusName(String recordName,
93127
String metricName) {
94128
String baseName = StringUtils.capitalize(recordName)
95129
+ StringUtils.capitalize(metricName);
130+
try {
131+
return NORMALIZED_NAME_CACHE.get(baseName);
132+
} catch (ExecutionException | UncheckedExecutionException e) {
133+
// This should not happen since normalization function do not throw any exception
134+
// Nevertheless, we can fall back to uncached implementation if it somehow happens.
135+
LOG.warn("Exception encountered when loading metric with base name {} from cache, " +
136+
"fall back to uncached normalization implementation", baseName, e);
137+
return normalizeImpl(baseName);
138+
}
139+
}
140+
141+
/**
142+
* Underlying Prometheus normalization implementation.
143+
* See {@link PrometheusMetricsSink#prometheusName(String, String)} for more information.
144+
*/
145+
private static String normalizeImpl(String baseName) {
96146
String[] parts = SPLIT_PATTERN.split(baseName);
97147
String joined = String.join("_", parts).toLowerCase();
98148
return DELIMITERS.matcher(joined).replaceAll("_");

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import software.amazon.awssdk.transfer.s3.model.FileUpload;
5858
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
5959

60+
import org.apache.commons.lang3.StringUtils;
6061
import org.apache.hadoop.conf.Configuration;
6162
import org.apache.hadoop.fs.FileSystem;
6263
import org.apache.hadoop.fs.LocalDirAllocator;
@@ -287,10 +288,11 @@ public boolean inputStreamHasCapability(final String capability) {
287288
* Initialize dir allocator if not already initialized.
288289
*/
289290
private void initLocalDirAllocator() {
290-
String bufferDir = getConfig().get(BUFFER_DIR) != null
291-
? BUFFER_DIR
292-
: HADOOP_TMP_DIR;
293-
directoryAllocator = new LocalDirAllocator(bufferDir);
291+
String key = BUFFER_DIR;
292+
if (StringUtils.isEmpty(getConfig().getTrimmed(key))) {
293+
key = HADOOP_TMP_DIR;
294+
}
295+
directoryAllocator = new LocalDirAllocator(key);
294296
}
295297

296298
/** Acquire write capacity for rate limiting {@inheritDoc}. */

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
import java.io.IOException;
2323

24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
2426
import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient;
2527
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
2628
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
@@ -40,6 +42,8 @@
4042
* {@code S3AStore}, if fs.s3a.input.stream.type is set to Analytics.
4143
*/
4244
public class AnalyticsStreamFactory extends AbstractObjectInputStreamFactory {
45+
private static final Logger LOG =
46+
LoggerFactory.getLogger(AnalyticsStreamFactory.class);
4347

4448
private S3SeekableInputStreamConfiguration seekableInputStreamConfiguration;
4549
private LazyAutoCloseableReference<S3SeekableInputStreamFactory> s3SeekableInputStreamFactory;
@@ -98,7 +102,11 @@ public StreamFactoryRequirements factoryRequirements() {
98102

99103
@Override
100104
protected void serviceStop() throws Exception {
101-
this.s3SeekableInputStreamFactory.close();
105+
try {
106+
s3SeekableInputStreamFactory.close();
107+
} catch (Exception ignored) {
108+
LOG.debug("Ignored exception while closing stream factory", ignored);
109+
}
102110
callbacks().incrementFactoryStatistic(ANALYTICS_STREAM_FACTORY_CLOSED);
103111
super.serviceStop();
104112
}

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.commons.lang3.reflect.FieldUtils;
5050
import org.apache.hadoop.conf.Configuration;
5151
import org.apache.hadoop.fs.FileStatus;
52+
import org.apache.hadoop.fs.LocalDirAllocator;
5253
import org.apache.hadoop.fs.Path;
5354
import org.apache.hadoop.fs.contract.ContractTestUtils;
5455
import org.apache.hadoop.fs.s3a.auth.STSClientFactory;
@@ -63,6 +64,7 @@
6364
import org.apache.hadoop.util.VersionInfo;
6465
import org.apache.http.HttpStatus;
6566

67+
import static java.lang.String.format;
6668
import static java.util.Objects.requireNonNull;
6769
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
6870
import static org.apache.hadoop.fs.s3a.Constants.*;
@@ -485,12 +487,29 @@ public void testCloseIdempotent() throws Throwable {
485487

486488
@Test
487489
public void testDirectoryAllocatorDefval() throws Throwable {
490+
removeAllocatorContexts();
488491
conf = new Configuration();
489-
conf.unset(Constants.BUFFER_DIR);
490-
fs = S3ATestUtils.createTestFileSystem(conf);
491-
File tmp = createTemporaryFileForWriting();
492-
assertTrue("not found: " + tmp, tmp.exists());
493-
tmp.delete();
492+
final String bucketName = getTestBucketName(conf);
493+
final String blank = " ";
494+
conf.set(Constants.BUFFER_DIR, blank);
495+
conf.set(format("fs.s3a.bucket.%s.buffer.dir", bucketName), blank);
496+
try {
497+
fs = S3ATestUtils.createTestFileSystem(conf);
498+
final Configuration fsConf = fs.getConf();
499+
Assertions.assertThat(fsConf.get(Constants.BUFFER_DIR))
500+
.describedAs("Config option %s", Constants.BUFFER_DIR)
501+
.isEqualTo(blank);
502+
File tmp = createTemporaryFileForWriting();
503+
assertTrue("not found: " + tmp, tmp.exists());
504+
tmp.delete();
505+
} finally {
506+
removeAllocatorContexts();
507+
}
508+
}
509+
510+
private static void removeAllocatorContexts() {
511+
LocalDirAllocator.removeContext(BUFFER_DIR);
512+
LocalDirAllocator.removeContext(HADOOP_TMP_DIR);
494513
}
495514

496515
/**
@@ -504,13 +523,21 @@ private File createTemporaryFileForWriting() throws IOException {
504523

505524
@Test
506525
public void testDirectoryAllocatorRR() throws Throwable {
526+
removeAllocatorContexts();
507527
File dir1 = GenericTestUtils.getRandomizedTestDir();
508528
File dir2 = GenericTestUtils.getRandomizedTestDir();
509529
dir1.mkdirs();
510530
dir2.mkdirs();
511531
conf = new Configuration();
512-
conf.set(Constants.BUFFER_DIR, dir1 + ", " + dir2);
532+
final String bucketName = getTestBucketName(conf);
533+
final String dirs = dir1 + ", " + dir2;
534+
conf.set(Constants.BUFFER_DIR, dirs);
535+
conf.set(format("fs.s3a.bucket.%s.buffer.dir", bucketName), dirs);
513536
fs = S3ATestUtils.createTestFileSystem(conf);
537+
final Configuration fsConf = fs.getConf();
538+
Assertions.assertThat(fsConf.get(Constants.BUFFER_DIR))
539+
.describedAs("Config option %s", Constants.BUFFER_DIR)
540+
.isEqualTo(dirs);
514541
File tmp1 = createTemporaryFileForWriting();
515542
tmp1.delete();
516543
File tmp2 = createTemporaryFileForWriting();
@@ -552,10 +579,10 @@ public S3AFileSystem run() throws Exception{
552579
private static <T> T getField(Object target, Class<T> fieldType,
553580
String fieldName) throws IllegalAccessException {
554581
Object obj = FieldUtils.readField(target, fieldName, true);
555-
assertNotNull(String.format(
582+
assertNotNull(format(
556583
"Could not read field named %s in object with class %s.", fieldName,
557584
target.getClass().getName()), obj);
558-
assertTrue(String.format(
585+
assertTrue(format(
559586
"Unexpected type found for field named %s, expected %s, actual %s.",
560587
fieldName, fieldType.getName(), obj.getClass().getName()),
561588
fieldType.isAssignableFrom(obj.getClass()));

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@
7676
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
7777
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TrileanConversionException;
7878
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
79-
import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
8079
import org.apache.hadoop.fs.azurebfs.services.ListResponseData;
8180
import org.apache.hadoop.fs.azurebfs.enums.Trilean;
8281
import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
@@ -118,7 +117,6 @@
118117
import org.apache.hadoop.fs.azurebfs.utils.CRC64;
119118
import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
120119
import org.apache.hadoop.fs.azurebfs.utils.EncryptionType;
121-
import org.apache.hadoop.fs.azurebfs.utils.ListUtils;
122120
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
123121
import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
124122
import org.apache.hadoop.fs.impl.BackReference;
@@ -1298,13 +1296,8 @@ public String listStatus(final Path path, final String startFrom,
12981296
}
12991297
} while (shouldContinue);
13001298

1301-
if (listingClient instanceof AbfsBlobClient) {
1302-
fileStatuses.addAll(ListUtils.getUniqueListResult(fileStatusList));
1303-
LOG.debug("ListBlob API returned a total of {} elements including duplicates."
1304-
+ "Number of unique Elements are {}", fileStatusList.size(), fileStatuses.size());
1305-
} else {
1306-
fileStatuses.addAll(fileStatusList);
1307-
}
1299+
fileStatuses.addAll(listingClient.postListProcessing(
1300+
relativePath, fileStatusList, tracingContext, uri));
13081301

13091302
return continuation;
13101303
}

0 commit comments

Comments
 (0)