Skip to content

Commit

Permalink
[Improve][Zeta][storage] update hdfs configuration, support more para…
Browse files Browse the repository at this point in the history
…meters (apache#6547)
  • Loading branch information
liunaijie authored Apr 7, 2024
1 parent ba1b191 commit 595cdd1
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 16 deletions.
2 changes: 2 additions & 0 deletions docs/en/seatunnel-engine/checkpoint-storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ seatunnel:
// if you used kerberos, you can config like this:
kerberosPrincipal: your-kerberos-principal
kerberosKeytabFilePath: your-kerberos-keytab
// if you need hdfs-site config, you can config like this:
hdfs_site_path: /path/to/your/hdfs_site_path
```

if HDFS is in HA mode , you can config like this:
Expand Down
2 changes: 2 additions & 0 deletions docs/en/seatunnel-engine/deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ map:
clusterName: seatunnel-cluster
storage.type: hdfs
fs.defaultFS: hdfs://localhost:9000
// if you need hdfs-site config, you can config like this:
hdfs_site_path: /path/to/your/hdfs_site_path
```

If there is no HDFS and your cluster only have one node, you can config to use local file like this:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;

import java.io.IOException;
Expand All @@ -44,11 +45,13 @@ public class HdfsConfiguration extends AbstractConfiguration {

private static final String KERBEROS_KEY = "kerberos";

/** ********* Hdfs constants ************* */
/** ******** Hdfs constants ************* */
private static final String HDFS_IMPL = "org.apache.hadoop.hdfs.DistributedFileSystem";

private static final String HDFS_IMPL_KEY = "fs.hdfs.impl";

private static final String HDFS_SITE_PATH = "hdfs_site_path";

private static final String SEATUNNEL_HADOOP_PREFIX = "seatunnel.hadoop.";

@Override
Expand All @@ -71,6 +74,9 @@ public Configuration buildConfiguration(Map<String, String> config)
authenticateKerberos(kerberosPrincipal, kerberosKeytabFilePath, hadoopConf);
}
}
if (config.containsKey(HDFS_SITE_PATH)) {
hadoopConf.addResource(new Path(config.get(HDFS_SITE_PATH)));
}
// support other hdfs optional config keys
config.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(SEATUNNEL_HADOOP_PREFIX))
Expand All @@ -80,6 +86,7 @@ public Configuration buildConfiguration(Map<String, String> config)
String value = entry.getValue();
hadoopConf.set(key, value);
});

return hadoopConf;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.DEFAULT_IMAP_FILE_PATH_SPLIT;
import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.DEFAULT_IMAP_NAMESPACE;
Expand Down Expand Up @@ -125,8 +126,13 @@ public void initialize(Map<String, Object> configuration) {
this.fileConfiguration = FileConfiguration.valueOf(storageType.toUpperCase());
// build configuration
AbstractConfiguration fileConfiguration = this.fileConfiguration.getConfiguration();
Map<String, String> stringMap =
configuration.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey, entry -> entry.getValue().toString()));

Configuration hadoopConf = fileConfiguration.buildConfiguration(configuration);
Configuration hadoopConf = fileConfiguration.buildConfiguration(stringMap);
this.conf = hadoopConf;
this.namespace = (String) configuration.getOrDefault(NAMESPACE_KEY, DEFAULT_IMAP_NAMESPACE);
this.businessName = (String) configuration.get(BUSINESS_KEY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ public void setBlockSize(Long blockSize) {
* @param config configuration
* @param keys keys
*/
void checkConfiguration(Map<String, Object> config, String... keys) {
void checkConfiguration(Map<String, String> config, String... keys) {
for (String key : keys) {
if (!config.containsKey(key) || null == config.get(key)) {
throw new IllegalArgumentException(key + " is required");
}
}
}

public abstract Configuration buildConfiguration(Map<String, Object> config)
public abstract Configuration buildConfiguration(Map<String, String> config)
throws IMapStorageException;

/**
Expand All @@ -65,11 +65,11 @@ public abstract Configuration buildConfiguration(Map<String, Object> config)
* @param prefix
*/
void setExtraConfiguration(
Configuration hadoopConf, Map<String, Object> config, String prefix) {
Configuration hadoopConf, Map<String, String> config, String prefix) {
config.forEach(
(k, v) -> {
if (config.containsKey(BLOCK_SIZE)) {
setBlockSize(Long.parseLong(config.get(BLOCK_SIZE).toString()));
setBlockSize(Long.parseLong(config.get(BLOCK_SIZE)));
}
if (k.startsWith(prefix)) {
hadoopConf.set(k, String.valueOf(v));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,95 @@

package org.apache.seatunnel.engine.imap.storage.file.config;

import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;

import java.io.IOException;
import java.util.Map;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT;
import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;

public class HdfsConfiguration extends AbstractConfiguration {

/** hdfs uri is required */
private static final String HDFS_DEF_FS_NAME = "fs.defaultFS";
/** hdfs kerberos principal( is optional) */
private static final String KERBEROS_PRINCIPAL = "kerberosPrincipal";

private static final String KERBEROS_KEYTAB_FILE_PATH = "kerberosKeytabFilePath";
private static final String HADOOP_SECURITY_AUTHENTICATION_KEY =
"hadoop.security.authentication";

private static final String KERBEROS_KEY = "kerberos";

/** ******** Hdfs constants ************* */
private static final String HDFS_IMPL = "org.apache.hadoop.hdfs.DistributedFileSystem";

private static final String HDFS_IMPL_KEY = "fs.hdfs.impl";

private static final String HDFS_SITE_PATH = "hdfs_site_path";

private static final String SEATUNNEL_HADOOP_PREFIX = "seatunnel.hadoop.";

@Override
public Configuration buildConfiguration(Map<String, Object> config) {
public Configuration buildConfiguration(Map<String, String> config) {
Configuration hadoopConf = new Configuration();
hadoopConf.set(
FS_DEFAULT_NAME_KEY,
String.valueOf(config.getOrDefault(FS_DEFAULT_NAME_KEY, FS_DEFAULT_NAME_DEFAULT)));
if (config.containsKey(HDFS_DEF_FS_NAME)) {
hadoopConf.set(HDFS_DEF_FS_NAME, config.get(HDFS_DEF_FS_NAME));
}
hadoopConf.set(HDFS_IMPL_KEY, HDFS_IMPL);
hadoopConf.set(FS_DEFAULT_NAME_KEY, config.get(FS_DEFAULT_NAME_KEY));
if (config.containsKey(KERBEROS_PRINCIPAL)
&& config.containsKey(KERBEROS_KEYTAB_FILE_PATH)) {
String kerberosPrincipal = config.get(KERBEROS_PRINCIPAL);
String kerberosKeytabFilePath = config.get(KERBEROS_KEYTAB_FILE_PATH);
if (StringUtils.isNotBlank(kerberosPrincipal)
&& StringUtils.isNotBlank(kerberosKeytabFilePath)) {
hadoopConf.set(HADOOP_SECURITY_AUTHENTICATION_KEY, KERBEROS_KEY);
authenticateKerberos(kerberosPrincipal, kerberosKeytabFilePath, hadoopConf);
}
}
if (config.containsKey(HDFS_SITE_PATH)) {
hadoopConf.addResource(new Path(config.get(HDFS_SITE_PATH)));
}
// support other hdfs optional config keys
config.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(SEATUNNEL_HADOOP_PREFIX))
.forEach(
entry -> {
String key = entry.getKey().replace(SEATUNNEL_HADOOP_PREFIX, "");
String value = entry.getValue();
hadoopConf.set(key, value);
});

return hadoopConf;
}

/**
* Authenticate kerberos
*
* @param kerberosPrincipal kerberos principal
* @param kerberosKeytabFilePath kerberos keytab file path
* @param hdfsConf hdfs configuration
* @throws IMapStorageException authentication exception
*/
private void authenticateKerberos(
String kerberosPrincipal, String kerberosKeytabFilePath, Configuration hdfsConf)
throws IMapStorageException {
UserGroupInformation.setConfiguration(hdfsConf);
try {
UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath);
} catch (IOException e) {
throw new IMapStorageException(
"Failed to login user from keytab : "
+ kerberosKeytabFilePath
+ " and kerberos principal : "
+ kerberosPrincipal,
e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ public class OssConfiguration extends AbstractConfiguration {
private static final String OSS_KEY = "fs.oss.";

@Override
public Configuration buildConfiguration(Map<String, Object> config)
public Configuration buildConfiguration(Map<String, String> config)
throws IMapStorageException {
checkConfiguration(config, OSS_BUCKET_KEY);
Configuration hadoopConf = new Configuration();
hadoopConf.set(FS_DEFAULT_NAME_KEY, String.valueOf(config.get(OSS_BUCKET_KEY)));
hadoopConf.set(FS_DEFAULT_NAME_KEY, config.get(OSS_BUCKET_KEY));
hadoopConf.set(OSS_IMPL_KEY, HDFS_OSS_IMPL);
setExtraConfiguration(hadoopConf, config, OSS_KEY);
return hadoopConf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,16 @@ public class S3Configuration extends AbstractConfiguration {
private static final String FS_KEY = "fs.";

@Override
public Configuration buildConfiguration(Map<String, Object> config)
public Configuration buildConfiguration(Map<String, String> config)
throws IMapStorageException {
checkConfiguration(config, S3_BUCKET_KEY);
String protocol = DEFAULT_PROTOCOL;
if (config.get(S3_BUCKET_KEY).toString().startsWith(S3A_PROTOCOL)) {
if (config.get(S3_BUCKET_KEY).startsWith(S3A_PROTOCOL)) {
protocol = S3A_PROTOCOL;
}
String fsImpl = protocol.equals(S3A_PROTOCOL) ? HDFS_S3A_IMPL : HDFS_S3N_IMPL;
Configuration hadoopConf = new Configuration();
hadoopConf.set(FS_DEFAULT_NAME_KEY, config.get(S3_BUCKET_KEY).toString());
hadoopConf.set(FS_DEFAULT_NAME_KEY, config.get(S3_BUCKET_KEY));
hadoopConf.set(formatKey(protocol, HDFS_IMPL_KEY), fsImpl);
setExtraConfiguration(hadoopConf, config, FS_KEY + protocol + SPLIT_CHAR);
return hadoopConf;
Expand Down

0 comments on commit 595cdd1

Please sign in to comment.