Skip to content

Commit 6b4bd87

Browse files
steveloughrankkolman
authored andcommitted
HADOOP-18526. Leak of S3AInstrumentation instances via hadoop Metrics references (apache#5144)
This has triggered an OOM in a process which was churning through s3a fs instances; the increased memory footprint of IOStatistics amplified what must have been a long-standing issue with FS instances being created and not closed() * Makes sure instrumentation is closed when the FS is closed. * Uses a weak reference from metrics to instrumentation, so even if the FS wasn't closed (see HADOOP-18478), this back reference would not cause the S3AInstrumentation reference to be retained. * If S3AFileSystem is configured to log at TRACE it will log the calling stack of initialize(), so help identify where the instance is being created. This should help track down the cause of instance leakage. Contributed by Steve Loughran. (cherry picked from commit aaf92fe)
1 parent a6df05b commit 6b4bd87

File tree

2 files changed

+152
-14
lines changed

2 files changed

+152
-14
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java

Lines changed: 55 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,10 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
9999
private static int metricsSourceNameCounter = 0;
100100
private static int metricsSourceActiveCounter = 0;
101101

102-
private String metricsSourceName;
102+
/**
103+
* Weak reference so there's no back reference to the instrumentation.
104+
*/
105+
private WeakRefMetricsSource metricsSourceReference;
103106

104107
private final MetricsRegistry registry =
105108
new MetricsRegistry("s3aFileSystem").setContext(CONTEXT);
@@ -245,19 +248,33 @@ public S3AInstrumentation(URI name) {
245248
registerAsMetricsSource(name);
246249
}
247250

251+
/**
252+
* Get the current metrics system; demand creating.
253+
* @return a metric system, creating if need be.
254+
*/
248255
@VisibleForTesting
249-
public MetricsSystem getMetricsSystem() {
256+
static MetricsSystem getMetricsSystem() {
250257
synchronized (metricsSystemLock) {
251258
if (metricsSystem == null) {
252259
metricsSystem = new MetricsSystemImpl();
253260
metricsSystem.init(METRICS_SYSTEM_NAME);
261+
LOG.debug("Metrics system inited {}", metricsSystem);
254262
}
255263
}
256264
return metricsSystem;
257265
}
258266

259267
/**
260-
* Register this instance as a metrics source.
268+
* Does the instrumentation have a metrics system?
269+
* @return true if the metrics system is present.
270+
*/
271+
@VisibleForTesting
272+
static boolean hasMetricSystem() {
273+
return metricsSystem != null;
274+
}
275+
276+
/**
277+
* Register this instance as a metrics source via a weak reference.
261278
* @param name s3a:// URI for the associated FileSystem instance
262279
*/
263280
private void registerAsMetricsSource(URI name) {
@@ -269,8 +286,9 @@ private void registerAsMetricsSource(URI name) {
269286
number = ++metricsSourceNameCounter;
270287
}
271288
String msName = METRICS_SOURCE_BASENAME + number;
272-
metricsSourceName = msName + "-" + name.getHost();
273-
metricsSystem.register(metricsSourceName, "", this);
289+
String metricsSourceName = msName + "-" + name.getHost();
290+
metricsSourceReference = new WeakRefMetricsSource(metricsSourceName, this);
291+
metricsSystem.register(metricsSourceName, "", metricsSourceReference);
274292
}
275293

276294
/**
@@ -602,16 +620,39 @@ public void getMetrics(MetricsCollector collector, boolean all) {
602620
registry.snapshot(collector.addRecord(registry.info().name()), true);
603621
}
604622

623+
/**
624+
* if registered with the metrics, return the
625+
* name of the source.
626+
* @return the name of the metrics, or null if this instance is not bonded.
627+
*/
628+
public String getMetricSourceName() {
629+
return metricsSourceReference != null
630+
? metricsSourceReference.getName()
631+
: null;
632+
}
633+
605634
public void close() {
606-
synchronized (metricsSystemLock) {
607-
putLatencyQuantile.stop();
608-
throttleRateQuantile.stop();
609-
metricsSystem.unregisterSource(metricsSourceName);
610-
int activeSources = --metricsSourceActiveCounter;
611-
if (activeSources == 0) {
612-
metricsSystem.publishMetricsNow();
613-
metricsSystem.shutdown();
614-
metricsSystem = null;
635+
if (metricsSourceReference != null) {
636+
// get the name
637+
String name = metricsSourceReference.getName();
638+
LOG.debug("Unregistering metrics for {}", name);
639+
// then set to null so a second close() is a noop here.
640+
metricsSourceReference = null;
641+
synchronized (metricsSystemLock) {
642+
if (metricsSystem == null) {
643+
LOG.debug("there is no metric system to unregister {} from", name);
644+
return;
645+
}
646+
putLatencyQuantile.stop();
647+
throttleRateQuantile.stop();
648+
649+
metricsSystem.unregisterSource(name);
650+
int activeSources = --metricsSourceActiveCounter;
651+
if (activeSources == 0) {
652+
metricsSystem.publishMetricsNow();
653+
metricsSystem.shutdown();
654+
metricsSystem = null;
655+
}
615656
}
616657
}
617658
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.s3a;
20+
21+
import java.lang.ref.WeakReference;
22+
23+
import org.apache.hadoop.classification.InterfaceAudience;
24+
import org.apache.hadoop.metrics2.MetricsCollector;
25+
import org.apache.hadoop.metrics2.MetricsSource;
26+
27+
import static java.util.Objects.requireNonNull;
28+
29+
/**
30+
* A weak referenced metrics source which avoids hanging on to large objects
31+
* if somehow they don't get fully closed/cleaned up.
32+
* The JVM may clean up all objects which are only weakly referenced whenever
33+
* it does a GC, <i>even if there is no memory pressure</i>.
34+
* To avoid these refs being removed, always keep a strong reference around
35+
* somewhere.
36+
*/
37+
@InterfaceAudience.Private
38+
public class WeakRefMetricsSource implements MetricsSource {
39+
40+
/**
41+
* Name to know when unregistering.
42+
*/
43+
private final String name;
44+
45+
/**
46+
* Underlying metrics source.
47+
*/
48+
private final WeakReference<MetricsSource> sourceWeakReference;
49+
50+
/**
51+
* Constructor.
52+
* @param name Name to know when unregistering.
53+
* @param source metrics source
54+
*/
55+
public WeakRefMetricsSource(final String name, final MetricsSource source) {
56+
this.name = name;
57+
this.sourceWeakReference = new WeakReference<>(requireNonNull(source));
58+
}
59+
60+
/**
61+
* If the weak reference is non null, update the metrics.
62+
* @param collector to contain the resulting metrics snapshot
63+
* @param all if true, return all metrics even if unchanged.
64+
*/
65+
@Override
66+
public void getMetrics(final MetricsCollector collector, final boolean all) {
67+
MetricsSource metricsSource = sourceWeakReference.get();
68+
if (metricsSource != null) {
69+
metricsSource.getMetrics(collector, all);
70+
}
71+
}
72+
73+
/**
74+
* Name to know when unregistering.
75+
* @return the name passed in during construction.
76+
*/
77+
public String getName() {
78+
return name;
79+
}
80+
81+
/**
82+
* Get the source, will be null if the reference has been GC'd
83+
* @return the source reference
84+
*/
85+
public MetricsSource getSource() {
86+
return sourceWeakReference.get();
87+
}
88+
89+
@Override
90+
public String toString() {
91+
return "WeakRefMetricsSource{" +
92+
"name='" + name + '\'' +
93+
", sourceWeakReference is " +
94+
(sourceWeakReference.get() == null ? "unset" : "set") +
95+
'}';
96+
}
97+
}

0 commit comments

Comments
 (0)