Skip to content

Commit

Permalink
[Fix](catalog)Remove the fs.disable.cache parameter to prevent excess…
Browse files Browse the repository at this point in the history
…ive FS-associated objects and memory leaks (apache#46184)

### Background
In the current file system implementation, the fs.disable.cache
parameter allows disabling FS caching. While this provides flexibility,
it introduces several critical issues:
```

1:      22537201      721190432  java.util.HashMap$Node
2:      21559238      689895616  javax.management.MBeanAttributeInfo
3:      21559098      517418352  javax.management.Attribute
4:      19380247      465125928  org.apache.hadoop.metrics2.impl.MetricCounterLong
5:        122603      461180096  [J
6:        294309      255533536  [B
7:        724598      252264048  [Ljava.lang.Object;
8:       2012368      189047432  [C
9:        159442      131064400  [Ljava.util.HashMap$Node;
10:        114752       88075072  [Ljavax.management.MBeanAttributeInfo;
11:       1899581       45589944  java.lang.String
12:       1720140       41283360  org.apache.hadoop.metrics2.impl.MetricGaugeLong
```

#### Unbounded FS Instance Creation
When fs.disable.cache=true, a new FS instance is created for every
access, preventing instance reuse.
```

    String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
    if (conf.getBoolean(disableCacheName, false)) {
      LOGGER.debug("Bypassing cache to create filesystem {}", uri);
      return createFileSystem(uri, conf);
    }
```

#### Resource Leakage
Associated objects, such as thread metrics and connection pools, are not
properly released due to excessive FS instance creation, leading to
memory leaks.

#### Performance Degradation
Frequent creation and destruction of FS instances impose significant
overhead, especially in high-concurrency scenarios.



### Release note

None

### Check List (For Author)

- Test <!-- At least one of them must be included. -->
    - [ ] Regression test
    - [ ] Unit Test
    - [x] Manual test (add detailed scripts or steps below)
```
CREATE CATALOG `iceberg_cos` PROPERTIES (
"warehouse" = "cosn://ha/ha/ha/stress/multi_fs",
"type" = "iceberg",
"iceberg.catalog.type" = "hadoop",
"cos.secret_key" = "*XXX",
"cos.region" = "ap-beijing",
"cos.endpoint" = "cos.ap-beijing.myqcloud.com",
"cos.access_key" = "**************"
);

Create a catalog using object storage, then write a scheduled script to continuously refresh the catalog. Query the catalog periodically and monitor whether the thread memory behaves as expected.
```
<img width="1131" alt="image"
src="https://github.com/user-attachments/assets/c7b04a5a-449f-432c-975b-524fdb81247a"
/>

At 22:30, I replaced it with the fixed version.
  • Loading branch information
CalvinKirs authored Dec 31, 2024
1 parent 266a224 commit 17667ae
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,6 @@ private FileCacheValue loadFiles(FileCacheKey key) {
URI uri = finalLocation.getPath().toUri();
if (uri.getScheme() != null) {
String scheme = uri.getScheme();
updateJobConf("fs." + scheme + ".impl.disable.cache", "true");
if (jobConf.get("fs." + scheme + ".impl") == null) {
if (!scheme.equals("hdfs") && !scheme.equals("viewfs")) {
updateJobConf("fs." + scheme + ".impl", PropertyConverter.getHadoopFSImplByScheme(scheme));
Expand Down Expand Up @@ -451,8 +450,6 @@ private synchronized void setJobConf() {
// https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31
jobConf.set("mapreduce.input.fileinputformat.input.dir.recursive", "true");
// disable FileSystem's cache
jobConf.set("fs.hdfs.impl.disable.cache", "true");
jobConf.set("fs.file.impl.disable.cache", "true");
}

private synchronized void updateJobConf(String key, String value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ private static Map<String, String> convertToOBSProperties(Map<String, String> pr
CloudCredential credential) {
Map<String, String> obsProperties = Maps.newHashMap();
obsProperties.put(OBSConstants.ENDPOINT, props.get(ObsProperties.ENDPOINT));
obsProperties.put(ObsProperties.FS.IMPL_DISABLE_CACHE, "true");
obsProperties.put("fs.obs.impl", getHadoopFSImplByScheme("obs"));
if (credential.isWhole()) {
obsProperties.put(OBSConstants.ACCESS_KEY, credential.getAccessKey());
Expand Down Expand Up @@ -275,7 +274,6 @@ public static String checkRegion(String endpoint, String region, String regionKe
private static void setS3FsAccess(Map<String, String> s3Properties, Map<String, String> properties,
CloudCredential credential) {
s3Properties.put(Constants.MAX_ERROR_RETRIES, "2");
s3Properties.put("fs.s3.impl.disable.cache", "true");
s3Properties.putIfAbsent("fs.s3.impl", S3AFileSystem.class.getName());
String credentialsProviders = getAWSCredentialsProviders(properties);
s3Properties.put(Constants.AWS_CREDENTIALS_PROVIDER, credentialsProviders);
Expand All @@ -286,8 +284,6 @@ private static void setS3FsAccess(Map<String, String> s3Properties, Map<String,
if (credential.isTemporary()) {
s3Properties.put(Constants.SESSION_TOKEN, credential.getSessionToken());
s3Properties.put(Constants.AWS_CREDENTIALS_PROVIDER, TemporaryAWSCredentialsProvider.class.getName());
s3Properties.put("fs.s3.impl.disable.cache", "true");
s3Properties.put("fs.s3a.impl.disable.cache", "true");
}
s3Properties.put(Constants.PATH_STYLE_ACCESS, properties.getOrDefault(USE_PATH_STYLE, "false"));
for (Map.Entry<String, String> entry : properties.entrySet()) {
Expand Down Expand Up @@ -362,7 +358,6 @@ private static void rewriteHdfsOnOssProperties(Map<String, String> ossProperties
private static Map<String, String> convertToCOSProperties(Map<String, String> props, CloudCredential credential) {
Map<String, String> cosProperties = Maps.newHashMap();
cosProperties.put(CosNConfigKeys.COSN_ENDPOINT_SUFFIX_KEY, props.get(CosProperties.ENDPOINT));
cosProperties.put("fs.cosn.impl.disable.cache", "true");
cosProperties.put("fs.cosn.impl", getHadoopFSImplByScheme("cosn"));
cosProperties.put("fs.lakefs.impl", getHadoopFSImplByScheme("lakefs"));
if (credential.isWhole()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ public void testAWSOldCatalogPropertiesConverter() throws Exception {
Assertions.assertEquals(13, properties.size());

Map<String, String> hdProps = catalog.getCatalogProperty().getHadoopProperties();
Assertions.assertEquals(22, hdProps.size());
Assertions.assertEquals(21, hdProps.size());
}

@Test
Expand All @@ -291,7 +291,8 @@ public void testS3CatalogPropertiesConverter() throws Exception {
Assertions.assertEquals(12, properties.size());

Map<String, String> hdProps = catalog.getCatalogProperty().getHadoopProperties();
Assertions.assertEquals(21, hdProps.size());
Assertions.assertNull(hdProps.get("fs.s3.impl.disable.cache"));
Assertions.assertEquals(20, hdProps.size());
}

@Test
Expand Down Expand Up @@ -442,7 +443,8 @@ public void testGlueCatalogPropertiesConverter() throws Exception {
Assertions.assertEquals("s3.us-east-1.amazonaws.com", properties.get(S3Properties.ENDPOINT));

Map<String, String> hdProps = catalog.getCatalogProperty().getHadoopProperties();
Assertions.assertEquals(30, hdProps.size());
Assertions.assertEquals(29, hdProps.size());
Assertions.assertNull(hdProps.get("fs.s3.impl.disable.cache"));

String query = "create catalog hms_glue properties (\n"
+ " 'type'='hms',\n"
Expand All @@ -460,7 +462,8 @@ public void testGlueCatalogPropertiesConverter() throws Exception {
Assertions.assertEquals("s3.us-east-1.amazonaws.com.cn", propertiesNew.get(S3Properties.ENDPOINT));

Map<String, String> hdPropsNew = catalogNew.getCatalogProperty().getHadoopProperties();
Assertions.assertEquals(30, hdPropsNew.size());
Assertions.assertNull(hdPropsNew.get("fs.s3.impl.disable.cache"));
Assertions.assertEquals(29, hdPropsNew.size());
}

@Test
Expand All @@ -474,7 +477,7 @@ public void testS3CompatibleCatalogPropertiesConverter() throws Exception {
+ " 'cos.secret_key' = 'skk'\n"
+ ");";
testS3CompatibleCatalogProperties(catalogName0, CosProperties.COS_PREFIX,
"cos.ap-beijing.myqcloud.com", query0, 12, 18);
"cos.ap-beijing.myqcloud.com", query0, 12, 17);

String catalogName1 = "hms_oss";
String query1 = "create catalog " + catalogName1 + " properties (\n"
Expand All @@ -496,7 +499,7 @@ public void testS3CompatibleCatalogPropertiesConverter() throws Exception {
+ " 'minio.secret_key' = 'skk'\n"
+ ");";
testS3CompatibleCatalogProperties(catalogName2, MinioProperties.MINIO_PREFIX,
"http://127.0.0.1", query2, 12, 21);
"http://127.0.0.1", query2, 12, 20);

String catalogName3 = "hms_obs";
String query3 = "create catalog hms_obs properties (\n"
Expand All @@ -507,7 +510,7 @@ public void testS3CompatibleCatalogPropertiesConverter() throws Exception {
+ " 'obs.secret_key' = 'skk'\n"
+ ");";
testS3CompatibleCatalogProperties(catalogName3, ObsProperties.OBS_PREFIX,
"obs.cn-north-4.myhuaweicloud.com", query3, 12, 17);
"obs.cn-north-4.myhuaweicloud.com", query3, 12, 16);
}

private void testS3CompatibleCatalogProperties(String catalogName, String prefix,
Expand All @@ -521,6 +524,7 @@ private void testS3CompatibleCatalogProperties(String catalogName, String prefix

Map<String, String> hdProps = catalog.getCatalogProperty().getHadoopProperties();
Assertions.assertEquals(bePropsSize, hdProps.size());
Assertions.assertNull(hdProps.get(String.format("fs.%s.impl.disable.cache", prefix)));

Map<String, String> expectedMetaProperties = new HashMap<>();
expectedMetaProperties.put("endpoint", endpoint);
Expand Down

0 comments on commit 17667ae

Please sign in to comment.