diff --git a/docs/en/seatunnel-engine/checkpoint-storage.md b/docs/en/seatunnel-engine/checkpoint-storage.md index 332a3951bf0..a04a84197dc 100644 --- a/docs/en/seatunnel-engine/checkpoint-storage.md +++ b/docs/en/seatunnel-engine/checkpoint-storage.md @@ -144,6 +144,8 @@ seatunnel: // if you used kerberos, you can config like this: kerberosPrincipal: your-kerberos-principal kerberosKeytabFilePath: your-kerberos-keytab + // if you need hdfs-site config, you can config like this: + hdfs_site_path: /path/to/your/hdfs_site_path ``` if HDFS is in HA mode , you can config like this: diff --git a/docs/en/seatunnel-engine/deployment.md b/docs/en/seatunnel-engine/deployment.md index 6a39f347d30..412add286a0 100644 --- a/docs/en/seatunnel-engine/deployment.md +++ b/docs/en/seatunnel-engine/deployment.md @@ -197,6 +197,8 @@ map: clusterName: seatunnel-cluster storage.type: hdfs fs.defaultFS: hdfs://localhost:9000 + // if you need hdfs-site config, you can config like this: + hdfs_site_path: /path/to/your/hdfs_site_path ``` If there is no HDFS and your cluster only have one node, you can config to use local file like this: diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java index 953da3027bd..2da4c6ad5fe 100644 --- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java +++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java @@ -24,6 +24,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; import java.io.IOException; @@ -44,11 +45,13 @@ public class HdfsConfiguration extends AbstractConfiguration { private static final String KERBEROS_KEY = "kerberos"; - /** ********* Hdfs constants ************* */ + /** ******** Hdfs constants ************* */ private static final String HDFS_IMPL = "org.apache.hadoop.hdfs.DistributedFileSystem"; private static final String HDFS_IMPL_KEY = "fs.hdfs.impl"; + private static final String HDFS_SITE_PATH = "hdfs_site_path"; + private static final String SEATUNNEL_HADOOP_PREFIX = "seatunnel.hadoop."; @Override @@ -71,6 +74,9 @@ public Configuration buildConfiguration(Map config) authenticateKerberos(kerberosPrincipal, kerberosKeytabFilePath, hadoopConf); } } + if (config.containsKey(HDFS_SITE_PATH)) { + hadoopConf.addResource(new Path(config.get(HDFS_SITE_PATH))); + } // support other hdfs optional config keys config.entrySet().stream() .filter(entry -> entry.getKey().startsWith(SEATUNNEL_HADOOP_PREFIX)) @@ -80,6 +86,7 @@ public Configuration buildConfiguration(Map config) String value = entry.getValue(); hadoopConf.set(key, value); }); + return hadoopConf; } diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java index 915981e476d..ab125ba5c84 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java @@ -49,6 +49,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.DEFAULT_IMAP_FILE_PATH_SPLIT; import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.DEFAULT_IMAP_NAMESPACE; @@ -125,8 +126,13 @@ public void initialize(Map configuration) { this.fileConfiguration = FileConfiguration.valueOf(storageType.toUpperCase()); // build configuration AbstractConfiguration fileConfiguration = this.fileConfiguration.getConfiguration(); + Map stringMap = + configuration.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, entry -> entry.getValue().toString())); - Configuration hadoopConf = fileConfiguration.buildConfiguration(configuration); + Configuration hadoopConf = fileConfiguration.buildConfiguration(stringMap); this.conf = hadoopConf; this.namespace = (String) configuration.getOrDefault(NAMESPACE_KEY, DEFAULT_IMAP_NAMESPACE); this.businessName = (String) configuration.get(BUSINESS_KEY); diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/AbstractConfiguration.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/AbstractConfiguration.java index 8d59ac11b80..8198479ae87 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/AbstractConfiguration.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/AbstractConfiguration.java @@ -46,7 +46,7 @@ public void setBlockSize(Long blockSize) { * @param config configuration * @param keys keys */ - void checkConfiguration(Map config, String... keys) { + void checkConfiguration(Map config, String... keys) { for (String key : keys) { if (!config.containsKey(key) || null == config.get(key)) { throw new IllegalArgumentException(key + " is required"); @@ -54,7 +54,7 @@ void checkConfiguration(Map config, String... keys) { } } - public abstract Configuration buildConfiguration(Map config) + public abstract Configuration buildConfiguration(Map config) throws IMapStorageException; /** @@ -65,11 +65,11 @@ public abstract Configuration buildConfiguration(Map config) * @param prefix */ void setExtraConfiguration( - Configuration hadoopConf, Map config, String prefix) { + Configuration hadoopConf, Map config, String prefix) { config.forEach( (k, v) -> { if (config.containsKey(BLOCK_SIZE)) { - setBlockSize(Long.parseLong(config.get(BLOCK_SIZE).toString())); + setBlockSize(Long.parseLong(config.get(BLOCK_SIZE))); } if (k.startsWith(prefix)) { hadoopConf.set(k, String.valueOf(v)); diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/HdfsConfiguration.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/HdfsConfiguration.java index 2f98dfa0b40..10592b3da8f 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/HdfsConfiguration.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/HdfsConfiguration.java @@ -20,21 +20,95 @@ package org.apache.seatunnel.engine.imap.storage.file.config; +import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException; + +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import java.io.IOException; import java.util.Map; -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT; import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY; public class HdfsConfiguration extends AbstractConfiguration { + /** hdfs uri is required */ + private static final String HDFS_DEF_FS_NAME = "fs.defaultFS"; + /** hdfs kerberos principal( is optional) */ + private static final String KERBEROS_PRINCIPAL = "kerberosPrincipal"; + + private static final String KERBEROS_KEYTAB_FILE_PATH = "kerberosKeytabFilePath"; + private static final String HADOOP_SECURITY_AUTHENTICATION_KEY = + "hadoop.security.authentication"; + + private static final String KERBEROS_KEY = "kerberos"; + + /** ******** Hdfs constants ************* */ + private static final String HDFS_IMPL = "org.apache.hadoop.hdfs.DistributedFileSystem"; + + private static final String HDFS_IMPL_KEY = "fs.hdfs.impl"; + + private static final String HDFS_SITE_PATH = "hdfs_site_path"; + + private static final String SEATUNNEL_HADOOP_PREFIX = "seatunnel.hadoop."; + @Override - public Configuration buildConfiguration(Map config) { + public Configuration buildConfiguration(Map config) { Configuration hadoopConf = new Configuration(); - hadoopConf.set( - FS_DEFAULT_NAME_KEY, - String.valueOf(config.getOrDefault(FS_DEFAULT_NAME_KEY, FS_DEFAULT_NAME_DEFAULT))); + if (config.containsKey(HDFS_DEF_FS_NAME)) { + hadoopConf.set(HDFS_DEF_FS_NAME, config.get(HDFS_DEF_FS_NAME)); + } + hadoopConf.set(HDFS_IMPL_KEY, HDFS_IMPL); + hadoopConf.set(FS_DEFAULT_NAME_KEY, config.get(FS_DEFAULT_NAME_KEY)); + if (config.containsKey(KERBEROS_PRINCIPAL) + && config.containsKey(KERBEROS_KEYTAB_FILE_PATH)) { + String kerberosPrincipal = config.get(KERBEROS_PRINCIPAL); + String kerberosKeytabFilePath = config.get(KERBEROS_KEYTAB_FILE_PATH); + if (StringUtils.isNotBlank(kerberosPrincipal) + && StringUtils.isNotBlank(kerberosKeytabFilePath)) { + hadoopConf.set(HADOOP_SECURITY_AUTHENTICATION_KEY, KERBEROS_KEY); + authenticateKerberos(kerberosPrincipal, kerberosKeytabFilePath, hadoopConf); + } + } + if (config.containsKey(HDFS_SITE_PATH)) { + hadoopConf.addResource(new Path(config.get(HDFS_SITE_PATH))); + } + // support other hdfs optional config keys + config.entrySet().stream() + .filter(entry -> entry.getKey().startsWith(SEATUNNEL_HADOOP_PREFIX)) + .forEach( + entry -> { + String key = entry.getKey().replace(SEATUNNEL_HADOOP_PREFIX, ""); + String value = entry.getValue(); + hadoopConf.set(key, value); + }); + return hadoopConf; } + + /** + * Authenticate kerberos + * + * @param kerberosPrincipal kerberos principal + * @param kerberosKeytabFilePath kerberos keytab file path + * @param hdfsConf hdfs configuration + * @throws IMapStorageException authentication exception + */ + private void authenticateKerberos( + String kerberosPrincipal, String kerberosKeytabFilePath, Configuration hdfsConf) + throws IMapStorageException { + UserGroupInformation.setConfiguration(hdfsConf); + try { + UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath); + } catch (IOException e) { + throw new IMapStorageException( + "Failed to login user from keytab : " + + kerberosKeytabFilePath + + " and kerberos principal : " + + kerberosPrincipal, + e); + } + } } diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/OssConfiguration.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/OssConfiguration.java index d36aa341457..71f31063be1 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/OssConfiguration.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/OssConfiguration.java @@ -36,11 +36,11 @@ public class OssConfiguration extends AbstractConfiguration { private static final String OSS_KEY = "fs.oss."; @Override - public Configuration buildConfiguration(Map config) + public Configuration buildConfiguration(Map config) throws IMapStorageException { checkConfiguration(config, OSS_BUCKET_KEY); Configuration hadoopConf = new Configuration(); - hadoopConf.set(FS_DEFAULT_NAME_KEY, String.valueOf(config.get(OSS_BUCKET_KEY))); + hadoopConf.set(FS_DEFAULT_NAME_KEY, config.get(OSS_BUCKET_KEY)); hadoopConf.set(OSS_IMPL_KEY, HDFS_OSS_IMPL); setExtraConfiguration(hadoopConf, config, OSS_KEY); return hadoopConf; diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/S3Configuration.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/S3Configuration.java index 5d34f7814bb..872120a5baf 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/S3Configuration.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/S3Configuration.java @@ -39,16 +39,16 @@ public class S3Configuration extends AbstractConfiguration { private static final String FS_KEY = "fs."; @Override - public Configuration buildConfiguration(Map config) + public Configuration buildConfiguration(Map config) throws IMapStorageException { checkConfiguration(config, S3_BUCKET_KEY); String protocol = DEFAULT_PROTOCOL; - if (config.get(S3_BUCKET_KEY).toString().startsWith(S3A_PROTOCOL)) { + if (config.get(S3_BUCKET_KEY).startsWith(S3A_PROTOCOL)) { protocol = S3A_PROTOCOL; } String fsImpl = protocol.equals(S3A_PROTOCOL) ? HDFS_S3A_IMPL : HDFS_S3N_IMPL; Configuration hadoopConf = new Configuration(); - hadoopConf.set(FS_DEFAULT_NAME_KEY, config.get(S3_BUCKET_KEY).toString()); + hadoopConf.set(FS_DEFAULT_NAME_KEY, config.get(S3_BUCKET_KEY)); hadoopConf.set(formatKey(protocol, HDFS_IMPL_KEY), fsImpl); setExtraConfiguration(hadoopConf, config, FS_KEY + protocol + SPLIT_CHAR); return hadoopConf;