From 624cee99054f9d389ec6a56b1d16bc4ca1dcdca3 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Wed, 22 Nov 2023 20:27:45 +0800 Subject: [PATCH] Support using multiple hadoop account --- docs/en/connector-v2/source/HdfsFile.md | 1 + .../file/hdfs/sink/BaseHdfsFileSink.java | 4 + .../file/hdfs/source/BaseHdfsFileSource.java | 14 +- .../connector-file-base/pom.xml | 8 + .../seatunnel/file/config/BaseSinkConfig.java | 6 + .../file/config/BaseSourceConfig.java | 6 + .../seatunnel/file/config/HadoopConf.java | 3 + .../HadoopFileSystemProxy.java} | 226 +++++++++++------- .../file/hadoop/HadoopLoginFactory.java | 68 ++++++ .../seatunnel/file/sink/BaseFileSink.java | 13 +- .../file/sink/BaseFileSinkWriter.java | 21 +- .../commit/FileSinkAggregatedCommitter.java | 27 ++- .../sink/writer/AbstractWriteStrategy.java | 22 +- .../file/sink/writer/ExcelWriteStrategy.java | 5 +- .../file/sink/writer/JsonWriteStrategy.java | 7 +- .../file/sink/writer/TextWriteStrategy.java | 7 +- .../file/sink/writer/WriteStrategy.java | 11 +- .../seatunnel/file/source/BaseFileSource.java | 4 +- .../file/source/BaseFileSourceReader.java | 10 +- .../source/reader/AbstractReadStrategy.java | 18 +- .../file/hadoop/HadoopLoginFactoryTest.java | 107 +++++++++ .../file/cos/source/CosFileSource.java | 5 +- .../file/ftp/source/FtpFileSource.java | 5 +- .../file/oss/source/OssFileSource.java | 5 +- .../file/oss/source/OssFileSource.java | 5 +- .../file/s3/source/S3FileSource.java | 5 +- .../file/sftp/source/SftpFileSource.java | 5 +- .../commit/HiveSinkAggregatedCommitter.java | 6 +- .../seatunnel/hive/sink/HiveSink.java | 5 +- .../hive/utils/HiveMetaStoreProxy.java | 59 ++++- .../S3RedshiftSinkAggregatedCommitter.java | 14 +- .../redshift/sink/S3RedshiftSink.java | 2 +- 32 files changed, 485 insertions(+), 219 deletions(-) rename seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/{sink/util/FileSystemUtils.java => hadoop/HadoopFileSystemProxy.java} (52%) create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopLoginFactory.java create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopLoginFactoryTest.java diff --git a/docs/en/connector-v2/source/HdfsFile.md b/docs/en/connector-v2/source/HdfsFile.md index f1ef0aa87741..e97abb3c6727 100644 --- a/docs/en/connector-v2/source/HdfsFile.md +++ b/docs/en/connector-v2/source/HdfsFile.md @@ -51,6 +51,7 @@ Read data from hdfs file system. | date_format | string | no | yyyy-MM-dd | Date type format, used to tell connector how to convert string to date, supported as the following formats:`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd` default `yyyy-MM-dd`.Date type format, used to tell connector how to convert string to date, supported as the following formats:`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd` default `yyyy-MM-dd` | | datetime_format | string | no | yyyy-MM-dd HH:mm:ss | Datetime type format, used to tell connector how to convert string to datetime, supported as the following formats:`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss` .default `yyyy-MM-dd HH:mm:ss` | | time_format | string | no | HH:mm:ss | Time type format, used to tell connector how to convert string to time, supported as the following formats:`HH:mm:ss` `HH:mm:ss.SSS`.default `HH:mm:ss` | +| remote_user | string | no | - | The login user used to connect to hadoop login name. It is intended to be used for remote users in RPC, it won't have any credentials. | | kerberos_principal | string | no | - | The principal of kerberos | | kerberos_keytab_path | string | no | - | The keytab path of kerberos | | skip_header_row_number | long | no | 0 | Skip the first few lines, but only for the txt and csv.For example, set like following:`skip_header_row_number = 2`.then Seatunnel will skip the first 2 lines from source files | diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java index 763a13aac5f6..5f78389455c5 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java @@ -48,6 +48,10 @@ public void prepare(Config pluginConfig) throws PrepareFailException { if (pluginConfig.hasPath(BaseSinkConfig.HDFS_SITE_PATH.key())) { hadoopConf.setHdfsSitePath(pluginConfig.getString(BaseSinkConfig.HDFS_SITE_PATH.key())); } + + if (pluginConfig.hasPath(BaseSinkConfig.REMOTE_USER.key())) { + hadoopConf.setRemoteUser(pluginConfig.getString(BaseSinkConfig.REMOTE_USER.key())); + } if (pluginConfig.hasPath(BaseSinkConfig.KERBEROS_PRINCIPAL.key())) { hadoopConf.setKerberosPrincipal( pluginConfig.getString(BaseSinkConfig.KERBEROS_PRINCIPAL.key())); diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java index d351dbb2525c..73d4dd209722 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java @@ -55,16 +55,17 @@ public void prepare(Config pluginConfig) throws PrepareFailException { "PluginName: %s, PluginType: %s, Message: %s", getPluginName(), PluginType.SOURCE, result.getMsg())); } - readStrategy = - ReadStrategyFactory.of( - pluginConfig.getString(HdfsSourceConfig.FILE_FORMAT_TYPE.key())); - readStrategy.setPluginConfig(pluginConfig); String path = pluginConfig.getString(HdfsSourceConfig.FILE_PATH.key()); hadoopConf = new HadoopConf(pluginConfig.getString(HdfsSourceConfig.DEFAULT_FS.key())); if (pluginConfig.hasPath(HdfsSourceConfig.HDFS_SITE_PATH.key())) { hadoopConf.setHdfsSitePath( pluginConfig.getString(HdfsSourceConfig.HDFS_SITE_PATH.key())); } + + if (pluginConfig.hasPath(HdfsSourceConfig.REMOTE_USER.key())) { + hadoopConf.setRemoteUser(pluginConfig.getString(HdfsSourceConfig.REMOTE_USER.key())); + } + if (pluginConfig.hasPath(HdfsSourceConfig.KERBEROS_PRINCIPAL.key())) { hadoopConf.setKerberosPrincipal( pluginConfig.getString(HdfsSourceConfig.KERBEROS_PRINCIPAL.key())); @@ -73,6 +74,11 @@ public void prepare(Config pluginConfig) throws PrepareFailException { hadoopConf.setKerberosKeytabPath( pluginConfig.getString(HdfsSourceConfig.KERBEROS_KEYTAB_PATH.key())); } + readStrategy = + ReadStrategyFactory.of( + pluginConfig.getString(HdfsSourceConfig.FILE_FORMAT_TYPE.key())); + readStrategy.setPluginConfig(pluginConfig); + readStrategy.init(hadoopConf); try { filePaths = readStrategy.getFileNamesByPath(hadoopConf, path); } catch (IOException e) { diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml index 9260b9f14064..486b75939af1 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml @@ -36,6 +36,7 @@ 1.12.3 4.1.2 4.1.2 + 3.1.4 @@ -136,6 +137,13 @@ poi-ooxml ${poi-ooxml.version} + + + org.apache.hadoop + hadoop-minikdc + ${hadoop-minikdc.version} + test + diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java index 4f4d09d75cd3..4909508028df 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java @@ -209,6 +209,12 @@ public class BaseSinkConfig { .noDefaultValue() .withDescription("The path of hdfs-site.xml"); + public static final Option REMOTE_USER = + Options.key("remote_user") + .stringType() + .noDefaultValue() + .withDescription("The remote user name of hdfs"); + public static final Option KERBEROS_PRINCIPAL = Options.key("kerberos_principal") .stringType() diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java index 6e2818d4597f..eff54a09241d 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java @@ -78,6 +78,12 @@ public class BaseSourceConfig { .noDefaultValue() .withDescription("The path of hdfs-site.xml"); + public static final Option REMOTE_USER = + Options.key("remote_user") + .stringType() + .noDefaultValue() + .withDescription("The remote user name of hdfs"); + public static final Option KERBEROS_PRINCIPAL = Options.key("kerberos_principal") .stringType() diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java index 197e70e04885..380effdfd227 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java @@ -33,6 +33,9 @@ public class HadoopConf implements Serializable { protected Map extraOptions = new HashMap<>(); protected String hdfsNameKey; protected String hdfsSitePath; + + protected String remoteUser; + protected String kerberosPrincipal; protected String kerberosKeytabPath; diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java similarity index 52% rename from seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java rename to seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java index 64f246a115b6..ebe472d1e7cd 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java @@ -15,13 +15,14 @@ * limitations under the License. */ -package org.apache.seatunnel.connectors.seatunnel.file.sink.util; +package org.apache.seatunnel.connectors.seatunnel.file.hadoop; import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FSDataOutputStream; @@ -31,85 +32,42 @@ import org.apache.hadoop.security.UserGroupInformation; import lombok.NonNull; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import java.io.Closeable; import java.io.IOException; import java.io.Serializable; -import java.net.URI; import java.util.ArrayList; import java.util.List; @Slf4j -public class FileSystemUtils implements Serializable { - private static final int WRITE_BUFFER_SIZE = 2048; +public class HadoopFileSystemProxy implements Serializable, Closeable { - private final HadoopConf hadoopConf; + private transient UserGroupInformation userGroupInformation; private transient Configuration configuration; - public FileSystemUtils(HadoopConf hadoopConf) { - this.hadoopConf = hadoopConf; - } + private transient FileSystem fileSystem; - public static void doKerberosAuthentication( - Configuration configuration, String principal, String keytabPath) { - if (StringUtils.isBlank(principal) || StringUtils.isBlank(keytabPath)) { - log.warn( - "Principal [{}] or keytabPath [{}] is empty, it will skip kerberos authentication", - principal, - keytabPath); - } else { - configuration.set("hadoop.security.authentication", "kerberos"); - UserGroupInformation.setConfiguration(configuration); - try { - log.info( - "Start Kerberos authentication using principal {} and keytab {}", - principal, - keytabPath); - UserGroupInformation.loginUserFromKeytab(principal, keytabPath); - log.info("Kerberos authentication successful"); - } catch (IOException e) { - String errorMsg = - String.format( - "Kerberos authentication failed using this " - + "principal [%s] and keytab path [%s]", - principal, keytabPath); - throw new FileConnectorException( - CommonErrorCodeDeprecated.KERBEROS_AUTHORIZED_FAILED, errorMsg, e); - } - } - } + private final HadoopConf hadoopConf; - public Configuration getConfiguration(HadoopConf hadoopConf) { - Configuration configuration = new Configuration(); - configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey()); - configuration.set( - String.format("fs.%s.impl", hadoopConf.getSchema()), hadoopConf.getFsHdfsImpl()); - hadoopConf.setExtraOptionsForConfiguration(configuration); - String principal = hadoopConf.getKerberosPrincipal(); - String keytabPath = hadoopConf.getKerberosKeytabPath(); - doKerberosAuthentication(configuration, principal, keytabPath); - return configuration; + public HadoopFileSystemProxy(@NonNull HadoopConf hadoopConf) { + this.hadoopConf = hadoopConf; } - public FileSystem getFileSystem(@NonNull String path) throws IOException { - if (configuration == null) { - configuration = getConfiguration(hadoopConf); + public boolean fileExist(@NonNull String filePath) throws IOException { + if (fileSystem == null) { + initialize(); } - FileSystem fileSystem = - FileSystem.get(URI.create(path.replaceAll("\\\\", "/")), configuration); - fileSystem.setWriteChecksum(false); - return fileSystem; - } - - public FSDataOutputStream getOutputStream(@NonNull String outFilePath) throws IOException { - FileSystem fileSystem = getFileSystem(outFilePath); - Path path = new Path(outFilePath); - return fileSystem.create(path, true, WRITE_BUFFER_SIZE); + Path fileName = new Path(filePath); + return fileSystem.exists(fileName); } public void createFile(@NonNull String filePath) throws IOException { - FileSystem fileSystem = getFileSystem(filePath); + if (fileSystem == null) { + initialize(); + } Path path = new Path(filePath); if (!fileSystem.createNewFile(path)) { throw new FileConnectorException( @@ -118,33 +76,30 @@ public void createFile(@NonNull String filePath) throws IOException { } } - public void deleteFile(@NonNull String file) throws IOException { - FileSystem fileSystem = getFileSystem(file); - Path path = new Path(file); + public void deleteFile(@NonNull String filePath) throws IOException { + if (fileSystem == null) { + initialize(); + } + Path path = new Path(filePath); if (fileSystem.exists(path)) { if (!fileSystem.delete(path, true)) { throw new FileConnectorException( CommonErrorCodeDeprecated.FILE_OPERATION_FAILED, - "delete file " + file + " error"); + "delete file " + filePath + " error"); } } } - /** - * rename file - * - * @param oldName old file name - * @param newName target file name - * @param rmWhenExist if this is true, we will delete the target file when it already exists - * @throws IOException throw IOException - */ - public void renameFile(@NonNull String oldName, @NonNull String newName, boolean rmWhenExist) + public void renameFile( + @NonNull String oldFilePath, + @NonNull String newFilePath, + boolean removeWhenNewFilePathExist) throws IOException { - FileSystem fileSystem = getFileSystem(newName); - log.info("begin rename file oldName :[" + oldName + "] to newName :[" + newName + "]"); - - Path oldPath = new Path(oldName); - Path newPath = new Path(newName); + if (fileSystem == null) { + initialize(); + } + Path oldPath = new Path(oldFilePath); + Path newPath = new Path(newFilePath); if (!fileExist(oldPath.toString())) { log.warn( @@ -156,8 +111,8 @@ public void renameFile(@NonNull String oldName, @NonNull String newName, boolean return; } - if (rmWhenExist) { - if (fileExist(newName) && fileExist(oldName)) { + if (removeWhenNewFilePathExist) { + if (fileExist(newFilePath)) { fileSystem.delete(newPath, true); log.info("Delete already file: {}", newPath); } @@ -176,7 +131,9 @@ public void renameFile(@NonNull String oldName, @NonNull String newName, boolean } public void createDir(@NonNull String filePath) throws IOException { - FileSystem fileSystem = getFileSystem(filePath); + if (fileSystem == null) { + initialize(); + } Path dfs = new Path(filePath); if (!fileSystem.mkdirs(dfs)) { throw new FileConnectorException( @@ -185,15 +142,10 @@ public void createDir(@NonNull String filePath) throws IOException { } } - public boolean fileExist(@NonNull String filePath) throws IOException { - FileSystem fileSystem = getFileSystem(filePath); - Path fileName = new Path(filePath); - return fileSystem.exists(fileName); - } - - /** get the dir in filePath */ - public List dirList(@NonNull String filePath) throws IOException { - FileSystem fileSystem = getFileSystem(filePath); + public List getAllSubFiles(@NonNull String filePath) throws IOException { + if (fileSystem == null) { + initialize(); + } List pathList = new ArrayList<>(); if (!fileExist(filePath)) { return pathList; @@ -209,4 +161,98 @@ public List dirList(@NonNull String filePath) throws IOException { } return pathList; } + + public FileStatus[] listStatus(String filePath) throws IOException { + if (fileSystem == null) { + initialize(); + } + return fileSystem.listStatus(new Path(filePath)); + } + + public FSDataOutputStream getOutputStream(String filePath) throws IOException { + if (fileSystem == null) { + initialize(); + } + return fileSystem.create(new Path(filePath), true); + } + + @Override + public void close() throws IOException { + synchronized (UserGroupInformation.class) { + try (FileSystem closedFileSystem = fileSystem) { + if (userGroupInformation != null) { + userGroupInformation.logoutUserFromKeytab(); + } + } + } + } + + @SneakyThrows + private void initialize() { + this.configuration = createConfiguration(); + if (enableKerberos()) { + configuration.set("hadoop.security.authentication", "kerberos"); + initializeWithKerberosLogin(); + return; + } + if (enableRemoteUser()) { + initializeWithRemoteUserLogin(); + return; + } + this.fileSystem = FileSystem.get(configuration); + fileSystem.setWriteChecksum(false); + } + + private Configuration createConfiguration() { + Configuration configuration = new Configuration(); + configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey()); + configuration.set( + String.format("fs.%s.impl", hadoopConf.getSchema()), hadoopConf.getFsHdfsImpl()); + hadoopConf.setExtraOptionsForConfiguration(configuration); + return configuration; + } + + private boolean enableKerberos() { + boolean kerberosPrincipalEmpty = StringUtils.isEmpty(hadoopConf.getKerberosPrincipal()); + boolean kerberosKeytabPathEmpty = StringUtils.isEmpty(hadoopConf.getKerberosKeytabPath()); + if (kerberosKeytabPathEmpty && kerberosPrincipalEmpty) { + return false; + } + if (!kerberosPrincipalEmpty && !kerberosKeytabPathEmpty) { + return true; + } + if (kerberosPrincipalEmpty) { + throw new IllegalArgumentException("Please set kerberosPrincipal"); + } + throw new IllegalArgumentException("Please set kerberosKeytabPath"); + } + + private void initializeWithKerberosLogin() throws IOException, InterruptedException { + HadoopLoginFactory.loginWithKerberos( + configuration, + hadoopConf.getKerberosPrincipal(), + hadoopConf.getKerberosKeytabPath(), + (userGroupInformation) -> { + this.userGroupInformation = userGroupInformation; + this.fileSystem = FileSystem.get(configuration); + return null; + }); + // todo: Use a daemon thread to reloginFromTicketCache + } + + private boolean enableRemoteUser() { + return StringUtils.isNotEmpty(hadoopConf.getRemoteUser()); + } + + private void initializeWithRemoteUserLogin() throws Exception { + final Pair pair = + HadoopLoginFactory.loginWithRemoteUser( + hadoopConf.getRemoteUser(), + (userGroupInformation) -> { + final FileSystem fileSystem = FileSystem.get(configuration); + return Pair.of(userGroupInformation, fileSystem); + }); + this.userGroupInformation = pair.getKey(); + this.fileSystem = pair.getValue(); + } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopLoginFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopLoginFactory.java new file mode 100644 index 000000000000..d932cdeb9d29 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopLoginFactory.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +public class HadoopLoginFactory { + + /** Login with kerberos, and do the given action after login successfully. */ + public static T loginWithKerberos( + Configuration configuration, + String kerberosPrincipal, + String kerberosKeytabPath, + LoginFunction action) + throws IOException, InterruptedException { + if (!configuration.get("hadoop.security.authentication").equals("kerberos")) { + throw new IllegalArgumentException("hadoop.security.authentication must be kerberos"); + } + // Use global lock to avoid multiple threads to execute setConfiguration at the same time + synchronized (UserGroupInformation.class) { + // init configuration + UserGroupInformation.setConfiguration(configuration); + UserGroupInformation userGroupInformation = + UserGroupInformation.loginUserFromKeytabAndReturnUGI( + kerberosPrincipal, kerberosKeytabPath); + return userGroupInformation.doAs( + (PrivilegedExceptionAction) () -> action.run(userGroupInformation)); + } + } + + /** Login with remote user, and do the given action after login successfully. */ + public static T loginWithRemoteUser(String remoteUser, LoginFunction action) + throws Exception { + + // Use global lock to avoid multiple threads to execute setConfiguration at the same time + synchronized (UserGroupInformation.class) { + // init configuration + UserGroupInformation userGroupInformation = + UserGroupInformation.createRemoteUser(remoteUser); + return userGroupInformation.doAs( + (PrivilegedExceptionAction) () -> action.run(userGroupInformation)); + } + } + + public interface LoginFunction { + + T run(UserGroupInformation userGroupInformation) throws Exception; + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java index 4d9244f779a8..6686da988066 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java @@ -34,11 +34,9 @@ import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter; import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig; import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState; -import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils; import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy; import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategyFactory; -import java.io.IOException; import java.util.List; import java.util.Optional; @@ -48,7 +46,6 @@ public abstract class BaseFileSink protected SeaTunnelRowType seaTunnelRowType; protected Config pluginConfig; protected HadoopConf hadoopConf; - protected FileSystemUtils fileSystemUtils; protected FileSinkConfig fileSinkConfig; protected JobContext jobContext; protected String jobId; @@ -63,24 +60,23 @@ public void setJobContext(JobContext jobContext) { public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { this.seaTunnelRowType = seaTunnelRowType; this.fileSinkConfig = new FileSinkConfig(pluginConfig, seaTunnelRowType); - this.fileSystemUtils = new FileSystemUtils(hadoopConf); } @Override public SinkWriter restoreWriter( - SinkWriter.Context context, List states) throws IOException { + SinkWriter.Context context, List states) { return new BaseFileSinkWriter(createWriteStrategy(), hadoopConf, context, jobId, states); } @Override public Optional> - createAggregatedCommitter() throws IOException { - return Optional.of(new FileSinkAggregatedCommitter(fileSystemUtils)); + createAggregatedCommitter() { + return Optional.of(new FileSinkAggregatedCommitter(hadoopConf)); } @Override public SinkWriter createWriter( - SinkWriter.Context context) throws IOException { + SinkWriter.Context context) { return new BaseFileSinkWriter(createWriteStrategy(), hadoopConf, context, jobId); } @@ -115,7 +111,6 @@ protected WriteStrategy createWriteStrategy() { WriteStrategy writeStrategy = WriteStrategyFactory.of(fileSinkConfig.getFileFormat(), fileSinkConfig); writeStrategy.setSeaTunnelRowTypeInfo(seaTunnelRowType); - writeStrategy.setFileSystemUtils(fileSystemUtils); return writeStrategy; } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java index eaf4a36b10c3..27a36e245bed 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java @@ -22,11 +22,11 @@ import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; +import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy; import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo; import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo; import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter; import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState; -import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils; import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractWriteStrategy; import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy; @@ -42,7 +42,6 @@ public class BaseFileSinkWriter implements SinkWriter { protected final WriteStrategy writeStrategy; - private final FileSystemUtils fileSystemUtils; public BaseFileSinkWriter( WriteStrategy writeStrategy, @@ -51,7 +50,6 @@ public BaseFileSinkWriter( String jobId, List fileSinkStates) { this.writeStrategy = writeStrategy; - this.fileSystemUtils = writeStrategy.getFileSystemUtils(); int subTaskIndex = context.getIndexOfSubtask(); String uuidPrefix; if (!fileSinkStates.isEmpty()) { @@ -59,13 +57,16 @@ public BaseFileSinkWriter( } else { uuidPrefix = UUID.randomUUID().toString().replaceAll("-", "").substring(0, 10); } - writeStrategy.init(hadoopConf, jobId, uuidPrefix, subTaskIndex); + final HadoopFileSystemProxy hadoopFileSystemProxy = + writeStrategy.getHadoopFileSystemProxy(); if (!fileSinkStates.isEmpty()) { try { - List transactions = findTransactionList(jobId, uuidPrefix); + List transactions = + findTransactionList(jobId, uuidPrefix, hadoopFileSystemProxy); FileSinkAggregatedCommitter fileSinkAggregatedCommitter = - new FileSinkAggregatedCommitter(fileSystemUtils); + new FileSinkAggregatedCommitter(hadoopConf); + fileSinkAggregatedCommitter.init(); LinkedHashMap fileStatesMap = new LinkedHashMap<>(); fileSinkStates.forEach( fileSinkState -> @@ -100,9 +101,11 @@ public BaseFileSinkWriter( } } - private List findTransactionList(String jobId, String uuidPrefix) throws IOException { - return fileSystemUtils - .dirList( + private List findTransactionList( + String jobId, String uuidPrefix, HadoopFileSystemProxy hadoopFileSystemProxy) + throws IOException { + return hadoopFileSystemProxy + .getAllSubFiles( AbstractWriteStrategy.getTransactionDirPrefix( writeStrategy.getFileSinkConfig().getTmpPath(), jobId, uuidPrefix)) .stream() diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java index a076188e2a2c..26d2e4f5f1ba 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java @@ -18,7 +18,8 @@ package org.apache.seatunnel.connectors.seatunnel.file.sink.commit; import org.apache.seatunnel.api.sink.SinkAggregatedCommitter; -import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils; +import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; +import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy; import lombok.extern.slf4j.Slf4j; @@ -31,10 +32,10 @@ @Slf4j public class FileSinkAggregatedCommitter implements SinkAggregatedCommitter { - protected final FileSystemUtils fileSystemUtils; + protected HadoopFileSystemProxy hadoopFileSystemProxy; - public FileSinkAggregatedCommitter(FileSystemUtils fileSystemUtils) { - this.fileSystemUtils = fileSystemUtils; + public FileSinkAggregatedCommitter(HadoopConf hadoopConf) { + this.hadoopFileSystemProxy = new HadoopFileSystemProxy(hadoopConf); } @Override @@ -49,13 +50,13 @@ public List commit( for (Map.Entry mvFileEntry : entry.getValue().entrySet()) { // first rename temp file - fileSystemUtils.renameFile( + hadoopFileSystemProxy.renameFile( mvFileEntry.getKey(), mvFileEntry.getValue(), true); } // second delete transaction directory - fileSystemUtils.deleteFile(entry.getKey()); + hadoopFileSystemProxy.deleteFile(entry.getKey()); } - } catch (Exception e) { + } catch (Throwable e) { log.error( "commit aggregatedCommitInfo error, aggregatedCommitInfo = {} ", aggregatedCommitInfo, @@ -115,14 +116,14 @@ public void abort(List aggregatedCommitInfos) throws E // rollback the file for (Map.Entry mvFileEntry : entry.getValue().entrySet()) { - if (fileSystemUtils.fileExist(mvFileEntry.getValue()) - && !fileSystemUtils.fileExist(mvFileEntry.getKey())) { - fileSystemUtils.renameFile( + if (hadoopFileSystemProxy.fileExist(mvFileEntry.getValue()) + && !hadoopFileSystemProxy.fileExist(mvFileEntry.getKey())) { + hadoopFileSystemProxy.renameFile( mvFileEntry.getValue(), mvFileEntry.getKey(), true); } } // delete the transaction dir - fileSystemUtils.deleteFile(entry.getKey()); + hadoopFileSystemProxy.deleteFile(entry.getKey()); } } catch (Exception e) { log.error("abort aggregatedCommitInfo error ", e); @@ -136,5 +137,7 @@ public void abort(List aggregatedCommitInfos) throws E * @throws IOException throw IOException when close failed. */ @Override - public void close() throws IOException {} + public void close() throws IOException { + hadoopFileSystemProxy.close(); + } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java index f932d1651778..b18846bbca60 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java @@ -28,10 +28,10 @@ import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; +import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy; import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo; import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig; import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState; -import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -71,7 +71,7 @@ public abstract class AbstractWriteStrategy implements WriteStrategy { protected String jobId; protected int subTaskIndex; protected HadoopConf hadoopConf; - protected FileSystemUtils fileSystemUtils; + protected HadoopFileSystemProxy hadoopFileSystemProxy; protected String transactionId; /** The uuid prefix to make sure same job different file sink will not conflict. */ protected String uuidPrefix; @@ -104,6 +104,7 @@ public AbstractWriteStrategy(FileSinkConfig fileSinkConfig) { @Override public void init(HadoopConf conf, String jobId, String uuidPrefix, int subTaskIndex) { this.hadoopConf = conf; + this.hadoopFileSystemProxy = new HadoopFileSystemProxy(conf); this.jobId = jobId; this.subTaskIndex = subTaskIndex; this.uuidPrefix = uuidPrefix; @@ -157,12 +158,6 @@ public Configuration getConfiguration(HadoopConf hadoopConf) { configuration.set( String.format("fs.%s.impl", hadoopConf.getSchema()), hadoopConf.getFsHdfsImpl()); this.hadoopConf.setExtraOptionsForConfiguration(configuration); - String principal = hadoopConf.getKerberosPrincipal(); - String keytabPath = hadoopConf.getKerberosKeytabPath(); - if (!isKerberosAuthorization) { - FileSystemUtils.doKerberosAuthentication(configuration, principal, keytabPath); - isKerberosAuthorization = true; - } return configuration; } @@ -289,7 +284,7 @@ public void abortPrepare() { */ public void abortPrepare(String transactionId) { try { - fileSystemUtils.deleteFile(getTransactionDir(transactionId)); + hadoopFileSystemProxy.deleteFile(getTransactionDir(transactionId)); } catch (IOException e) { throw new FileConnectorException( CommonErrorCodeDeprecated.FILE_OPERATION_FAILED, @@ -417,12 +412,7 @@ public FileSinkConfig getFileSinkConfig() { } @Override - public FileSystemUtils getFileSystemUtils() { - return fileSystemUtils; - } - - @Override - public void setFileSystemUtils(FileSystemUtils fileSystemUtils) { - this.fileSystemUtils = fileSystemUtils; + public HadoopFileSystemProxy getHadoopFileSystemProxy() { + return hadoopFileSystemProxy; } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ExcelWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ExcelWriteStrategy.java index 66150422222f..5d7f2ab95241 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ExcelWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ExcelWriteStrategy.java @@ -51,8 +51,9 @@ public void finishAndCloseFile() { this.beingWrittenWriter.forEach( (k, v) -> { try { - fileSystemUtils.createFile(k); - FSDataOutputStream fileOutputStream = fileSystemUtils.getOutputStream(k); + hadoopFileSystemProxy.createFile(k); + FSDataOutputStream fileOutputStream = + hadoopFileSystemProxy.getOutputStream(k); v.flushAndCloseExcel(fileOutputStream); fileOutputStream.close(); } catch (IOException e) { diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java index c3a3a975ca22..c44de690b155 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java @@ -115,17 +115,18 @@ private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) { case LZO: LzopCodec lzo = new LzopCodec(); OutputStream out = - lzo.createOutputStream(fileSystemUtils.getOutputStream(filePath)); + lzo.createOutputStream( + hadoopFileSystemProxy.getOutputStream(filePath)); fsDataOutputStream = new FSDataOutputStream(out, null); break; case NONE: - fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); + fsDataOutputStream = hadoopFileSystemProxy.getOutputStream(filePath); break; default: log.warn( "Json file does not support this compress type: {}", compressFormat.getCompressCodec()); - fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); + fsDataOutputStream = hadoopFileSystemProxy.getOutputStream(filePath); break; } beingWrittenOutputStream.put(filePath, fsDataOutputStream); diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java index 020d02fb5baf..eeb85e976d2d 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java @@ -136,19 +136,20 @@ private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) { case LZO: LzopCodec lzo = new LzopCodec(); OutputStream out = - lzo.createOutputStream(fileSystemUtils.getOutputStream(filePath)); + lzo.createOutputStream( + hadoopFileSystemProxy.getOutputStream(filePath)); fsDataOutputStream = new FSDataOutputStream(out, null); enableWriteHeader(fsDataOutputStream); break; case NONE: - fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); + fsDataOutputStream = hadoopFileSystemProxy.getOutputStream(filePath); enableWriteHeader(fsDataOutputStream); break; default: log.warn( "Text file does not support this compress type: {}", compressFormat.getCompressCodec()); - fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); + fsDataOutputStream = hadoopFileSystemProxy.getOutputStream(filePath); enableWriteHeader(fsDataOutputStream); break; } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java index a64af87d061d..a2fb5c1510db 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java @@ -21,8 +21,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; +import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy; import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig; -import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils; import org.apache.hadoop.conf.Configuration; @@ -99,12 +99,5 @@ public interface WriteStrategy extends Transaction, Serializable { * * @return file system utils */ - FileSystemUtils getFileSystemUtils(); - - /** - * set file system utils - * - * @param fileSystemUtils fileSystemUtils - */ - void setFileSystemUtils(FileSystemUtils fileSystemUtils); + HadoopFileSystemProxy getHadoopFileSystemProxy(); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSource.java index c6034f06b8ec..28ab46e4fb2b 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSource.java @@ -55,8 +55,8 @@ public SeaTunnelDataType getProducedType() { @Override public SourceReader createReader( - SourceReader.Context readerContext) throws Exception { - return new BaseFileSourceReader(readStrategy, hadoopConf, readerContext); + SourceReader.Context readerContext) { + return new BaseFileSourceReader(readStrategy, readerContext); } @Override diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java index 7082b6171ea9..36eefba7228e 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java @@ -21,7 +21,6 @@ import org.apache.seatunnel.api.source.SourceReader; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; -import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy; import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit; @@ -37,22 +36,17 @@ @Slf4j public class BaseFileSourceReader implements SourceReader { private final ReadStrategy readStrategy; - private final HadoopConf hadoopConf; private final SourceReader.Context context; private final Deque sourceSplits = new ConcurrentLinkedDeque<>(); private volatile boolean noMoreSplit; - public BaseFileSourceReader( - ReadStrategy readStrategy, HadoopConf hadoopConf, SourceReader.Context context) { + public BaseFileSourceReader(ReadStrategy readStrategy, SourceReader.Context context) { this.readStrategy = readStrategy; - this.hadoopConf = hadoopConf; this.context = context; } @Override - public void open() throws Exception { - readStrategy.init(hadoopConf); - } + public void open() throws Exception {} @Override public void close() throws IOException {} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java index 7de0a242ccfd..c42d798c7d0f 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java @@ -24,13 +24,11 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig; import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; -import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils; +import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import lombok.extern.slf4j.Slf4j; @@ -75,13 +73,14 @@ public abstract class AbstractReadStrategy implements ReadStrategy { protected List readColumns = new ArrayList<>(); protected boolean isMergePartition = true; protected long skipHeaderNumber = BaseSourceConfig.SKIP_HEADER_ROW_NUMBER.defaultValue(); - protected transient boolean isKerberosAuthorization = false; + protected transient HadoopFileSystemProxy hadoopFileSystemProxy; protected Pattern pattern; @Override public void init(HadoopConf conf) { this.hadoopConf = conf; + this.hadoopFileSystemProxy = new HadoopFileSystemProxy(hadoopConf); } @Override @@ -102,12 +101,6 @@ public Configuration getConfiguration(HadoopConf hadoopConf) { configuration.set( String.format("fs.%s.impl", hadoopConf.getSchema()), hadoopConf.getFsHdfsImpl()); hadoopConf.setExtraOptionsForConfiguration(configuration); - String principal = hadoopConf.getKerberosPrincipal(); - String keytabPath = hadoopConf.getKerberosKeytabPath(); - if (!isKerberosAuthorization) { - FileSystemUtils.doKerberosAuthentication(configuration, principal, keytabPath); - isKerberosAuthorization = true; - } return configuration; } @@ -121,11 +114,8 @@ boolean checkFileType(String path) { @Override public List getFileNamesByPath(HadoopConf hadoopConf, String path) throws IOException { - Configuration configuration = getConfiguration(hadoopConf); - FileSystem hdfs = FileSystem.get(configuration); ArrayList fileNames = new ArrayList<>(); - Path listFiles = new Path(path); - FileStatus[] stats = hdfs.listStatus(listFiles); + FileStatus[] stats = hadoopFileSystemProxy.listStatus(path); for (FileStatus fileStatus : stats) { if (fileStatus.isDirectory()) { fileNames.addAll(getFileNamesByPath(hadoopConf, fileStatus.getPath().toString())); diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopLoginFactoryTest.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopLoginFactoryTest.java new file mode 100644 index 000000000000..8e7e3f166ec0 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopLoginFactoryTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.minikdc.MiniKdc; +import org.apache.hadoop.security.UserGroupInformation; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.File; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +class HadoopLoginFactoryTest { + + private MiniKdc miniKdc; + + private File workDir; + + @BeforeEach + public void startMiniKdc() throws Exception { + workDir = new File(System.getProperty("test.dir", "target")); + miniKdc = new MiniKdc(MiniKdc.createConf(), workDir); + miniKdc.start(); + } + + @AfterEach + public void stopMiniKdc() { + if (miniKdc != null) { + miniKdc.stop(); + } + } + + @Test + void loginWithKerberos_success() throws Exception { + miniKdc.createPrincipal(new File(workDir, "tom.keytab"), "tom"); + + UserGroupInformation userGroupInformation = + HadoopLoginFactory.loginWithKerberos( + new Configuration(), + "tom", + workDir.getPath() + "/" + "tom.keytab", + ugi -> ugi); + + assertNotNull(userGroupInformation); + assertEquals("tom@EXAMPLE.COM", userGroupInformation.getUserName()); + } + + @Test + void loginWithKerberos_multiple_times() throws Exception { + miniKdc.createPrincipal(new File(workDir, "tom1.keytab"), "tom1"); + miniKdc.createPrincipal(new File(workDir, "tom2.keytab"), "tom2"); + + UserGroupInformation tom1 = + HadoopLoginFactory.loginWithKerberos( + new Configuration(), + "tom1", + workDir.getPath() + "/" + "tom1.keytab", + ugi -> ugi); + + assertNotNull(tom1); + assertEquals("tom1@EXAMPLE.COM", tom1.getUserName()); + + UserGroupInformation tom2 = + HadoopLoginFactory.loginWithKerberos( + new Configuration(), + "tom2", + workDir.getPath() + "/" + "tom2.keytab", + ugi -> ugi); + + assertNotNull(tom2); + assertEquals("tom2@EXAMPLE.COM", tom2.getUserName()); + } + + @Test + void loginWithKerberos_fail() { + Configuration configuration = new Configuration(); + Assertions.assertThrows( + Exception.class, + () -> + HadoopLoginFactory.loginWithKerberos( + configuration, + "tom", + workDir.getPath() + "/" + "tom.keytab", + ugi -> ugi)); + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java index 27414893365f..15eeeb55701c 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java @@ -67,11 +67,12 @@ public void prepare(Config pluginConfig) throws PrepareFailException { "PluginName: %s, PluginType: %s, Message: %s", getPluginName(), PluginType.SOURCE, result.getMsg())); } + String path = pluginConfig.getString(CosConfig.FILE_PATH.key()); + hadoopConf = CosConf.buildWithConfig(pluginConfig); readStrategy = ReadStrategyFactory.of(pluginConfig.getString(CosConfig.FILE_FORMAT_TYPE.key())); readStrategy.setPluginConfig(pluginConfig); - String path = pluginConfig.getString(CosConfig.FILE_PATH.key()); - hadoopConf = CosConf.buildWithConfig(pluginConfig); + readStrategy.init(hadoopConf); try { filePaths = readStrategy.getFileNamesByPath(hadoopConf, path); } catch (IOException e) { diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java index da3b46703d03..4c073d74fe70 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java @@ -75,11 +75,12 @@ public void prepare(Config pluginConfig) throws PrepareFailException { CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, "Ftp file source connector only support read [text, csv, json] files"); } + String path = pluginConfig.getString(FtpConfig.FILE_PATH.key()); + hadoopConf = FtpConf.buildWithConfig(pluginConfig); readStrategy = ReadStrategyFactory.of(pluginConfig.getString(FtpConfig.FILE_FORMAT_TYPE.key())); readStrategy.setPluginConfig(pluginConfig); - String path = pluginConfig.getString(FtpConfig.FILE_PATH.key()); - hadoopConf = FtpConf.buildWithConfig(pluginConfig); + readStrategy.init(hadoopConf); try { filePaths = readStrategy.getFileNamesByPath(hadoopConf, path); } catch (IOException e) { diff --git a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java index 4265e8358f59..264f380413df 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java @@ -68,11 +68,12 @@ public void prepare(Config pluginConfig) throws PrepareFailException { "PluginName: %s, PluginType: %s, Message: %s", getPluginName(), PluginType.SOURCE, result.getMsg())); } + String path = pluginConfig.getString(OssConfig.FILE_PATH.key()); + hadoopConf = OssConf.buildWithConfig(pluginConfig); readStrategy = ReadStrategyFactory.of(pluginConfig.getString(OssConfig.FILE_FORMAT_TYPE.key())); readStrategy.setPluginConfig(pluginConfig); - String path = pluginConfig.getString(OssConfig.FILE_PATH.key()); - hadoopConf = OssConf.buildWithConfig(pluginConfig); + readStrategy.init(hadoopConf); try { filePaths = readStrategy.getFileNamesByPath(hadoopConf, path); } catch (IOException e) { diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java index d0bcd943412c..87bb7f5d4916 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java @@ -67,11 +67,12 @@ public void prepare(Config pluginConfig) throws PrepareFailException { "PluginName: %s, PluginType: %s, Message: %s", getPluginName(), PluginType.SOURCE, result.getMsg())); } + String path = pluginConfig.getString(OssConfig.FILE_PATH.key()); + hadoopConf = OssConf.buildWithConfig(pluginConfig); readStrategy = ReadStrategyFactory.of(pluginConfig.getString(OssConfig.FILE_FORMAT_TYPE.key())); readStrategy.setPluginConfig(pluginConfig); - String path = pluginConfig.getString(OssConfig.FILE_PATH.key()); - hadoopConf = OssConf.buildWithConfig(pluginConfig); + readStrategy.init(hadoopConf); try { filePaths = readStrategy.getFileNamesByPath(hadoopConf, path); } catch (IOException e) { diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java index 4d64558c25c3..1d78f9a749e7 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java @@ -64,11 +64,12 @@ public void prepare(Config pluginConfig) throws PrepareFailException { "PluginName: %s, PluginType: %s, Message: %s", getPluginName(), PluginType.SOURCE, result.getMsg())); } + String path = pluginConfig.getString(S3Config.FILE_PATH.key()); + hadoopConf = S3Conf.buildWithConfig(pluginConfig); readStrategy = ReadStrategyFactory.of(pluginConfig.getString(S3Config.FILE_FORMAT_TYPE.key())); readStrategy.setPluginConfig(pluginConfig); - String path = pluginConfig.getString(S3Config.FILE_PATH.key()); - hadoopConf = S3Conf.buildWithConfig(pluginConfig); + readStrategy.init(hadoopConf); try { filePaths = readStrategy.getFileNamesByPath(hadoopConf, path); } catch (IOException e) { diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java index 1a8126c261c1..afad0b5f37a7 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java @@ -75,11 +75,12 @@ public void prepare(Config pluginConfig) throws PrepareFailException { CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, "Sftp file source connector only support read [text, csv, json] files"); } + String path = pluginConfig.getString(SftpConfig.FILE_PATH.key()); + hadoopConf = SftpConf.buildWithConfig(pluginConfig); readStrategy = ReadStrategyFactory.of(pluginConfig.getString(SftpConfig.FILE_FORMAT_TYPE.key())); readStrategy.setPluginConfig(pluginConfig); - String path = pluginConfig.getString(SftpConfig.FILE_PATH.key()); - hadoopConf = SftpConf.buildWithConfig(pluginConfig); + readStrategy.init(hadoopConf); try { filePaths = readStrategy.getFileNamesByPath(hadoopConf, path); } catch (IOException e) { diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java index 4934cc2aa120..0e423f3e8758 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java @@ -19,9 +19,9 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo; import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter; -import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils; import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; @@ -44,8 +44,8 @@ public class HiveSinkAggregatedCommitter extends FileSinkAggregatedCommitter { private final boolean abortDropPartitionMetadata; public HiveSinkAggregatedCommitter( - Config pluginConfig, String dbName, String tableName, FileSystemUtils fileSystemUtils) { - super(fileSystemUtils); + Config pluginConfig, String dbName, String tableName, HadoopConf hadoopConf) { + super(hadoopConf); this.pluginConfig = pluginConfig; this.dbName = dbName; this.tableName = tableName; diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java index 86b353226624..1c208b47da4f 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java @@ -44,7 +44,6 @@ import com.google.auto.service.AutoService; -import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.List; @@ -204,8 +203,8 @@ public void prepare(Config pluginConfig) throws PrepareFailException { @Override public Optional> - createAggregatedCommitter() throws IOException { + createAggregatedCommitter() { return Optional.of( - new HiveSinkAggregatedCommitter(pluginConfig, dbName, tableName, fileSystemUtils)); + new HiveSinkAggregatedCommitter(pluginConfig, dbName, tableName, hadoopConf)); } } diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java index 788fe38dc1cd..b86b79839e4e 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java @@ -20,7 +20,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig; -import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils; +import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopLoginFactory; import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig; import org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException; @@ -42,26 +42,36 @@ @Slf4j public class HiveMetaStoreProxy { - private final HiveMetaStoreClient hiveMetaStoreClient; + private HiveMetaStoreClient hiveMetaStoreClient; private static volatile HiveMetaStoreProxy INSTANCE = null; private HiveMetaStoreProxy(Config config) { String metastoreUri = config.getString(HiveConfig.METASTORE_URI.key()); - HiveConf hiveConf = new HiveConf(); - hiveConf.set("hive.metastore.uris", metastoreUri); - if (config.hasPath(BaseSourceConfig.KERBEROS_PRINCIPAL.key()) - && config.hasPath(BaseSourceConfig.KERBEROS_KEYTAB_PATH.key())) { - String principal = config.getString(BaseSourceConfig.KERBEROS_PRINCIPAL.key()); - String keytabPath = config.getString(BaseSourceConfig.KERBEROS_KEYTAB_PATH.key()); - Configuration configuration = new Configuration(); - FileSystemUtils.doKerberosAuthentication(configuration, principal, keytabPath); - } + try { + HiveConf hiveConf = new HiveConf(); + hiveConf.set("hive.metastore.uris", metastoreUri); if (config.hasPath(HiveConfig.HIVE_SITE_PATH.key())) { String hiveSitePath = config.getString(HiveConfig.HIVE_SITE_PATH.key()); hiveConf.addResource(new File(hiveSitePath).toURI().toURL()); } - hiveMetaStoreClient = new HiveMetaStoreClient(hiveConf); + if (enableKerberos(config)) { + this.hiveMetaStoreClient = + HadoopLoginFactory.loginWithKerberos( + new Configuration(), + config.getString(BaseSourceConfig.KERBEROS_PRINCIPAL.key()), + config.getString(BaseSourceConfig.KERBEROS_KEYTAB_PATH.key()), + (userGroupInformation) -> new HiveMetaStoreClient(hiveConf)); + return; + } + if (enableRemoteUser(config)) { + this.hiveMetaStoreClient = + HadoopLoginFactory.loginWithRemoteUser( + config.getString(BaseSourceConfig.REMOTE_USER.key()), + (userGroupInformation) -> new HiveMetaStoreClient(hiveConf)); + return; + } + this.hiveMetaStoreClient = new HiveMetaStoreClient(hiveConf); } catch (MetaException e) { String errorMsg = String.format( @@ -78,6 +88,11 @@ private HiveMetaStoreProxy(Config config) { metastoreUri, config.getString(HiveConfig.HIVE_SITE_PATH.key())); throw new HiveConnectorException( HiveConnectorErrorCode.INITIALIZE_HIVE_METASTORE_CLIENT_FAILED, errorMsg, e); + } catch (Exception e) { + throw new HiveConnectorException( + HiveConnectorErrorCode.INITIALIZE_HIVE_METASTORE_CLIENT_FAILED, + "Login form kerberos failed", + e); } } @@ -125,4 +140,24 @@ public synchronized void close() { HiveMetaStoreProxy.INSTANCE = null; } } + + private boolean enableKerberos(Config config) { + boolean kerberosPrincipalEmpty = config.hasPath(BaseSourceConfig.KERBEROS_PRINCIPAL.key()); + boolean kerberosKeytabPathEmpty = + config.hasPath(BaseSourceConfig.KERBEROS_KEYTAB_PATH.key()); + if (kerberosKeytabPathEmpty && kerberosPrincipalEmpty) { + return false; + } + if (!kerberosPrincipalEmpty && !kerberosKeytabPathEmpty) { + return true; + } + if (kerberosPrincipalEmpty) { + throw new IllegalArgumentException("Please set kerberosPrincipal"); + } + throw new IllegalArgumentException("Please set kerberosKeytabPath"); + } + + private boolean enableRemoteUser(Config config) { + return config.hasPath(BaseSourceConfig.REMOTE_USER.key()); + } } diff --git a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java index f78d6107a4a7..94fe62163473 100644 --- a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java +++ b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java @@ -20,9 +20,9 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; +import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo; import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter; -import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils; import org.apache.seatunnel.connectors.seatunnel.redshift.RedshiftJdbcClient; import org.apache.seatunnel.connectors.seatunnel.redshift.config.S3RedshiftConfig; import org.apache.seatunnel.connectors.seatunnel.redshift.exception.S3RedshiftConnectorErrorCode; @@ -46,8 +46,8 @@ public class S3RedshiftSinkAggregatedCommitter extends FileSinkAggregatedCommitt private Config pluginConfig; - public S3RedshiftSinkAggregatedCommitter(FileSystemUtils fileSystemUtils, Config pluginConfig) { - super(fileSystemUtils); + public S3RedshiftSinkAggregatedCommitter(HadoopConf hadoopConf, Config pluginConfig) { + super(hadoopConf); this.pluginConfig = pluginConfig; this.executeSql = pluginConfig.getString(S3RedshiftConfig.EXECUTE_SQL.key()); } @@ -64,15 +64,15 @@ public List commit( for (Map.Entry mvFileEntry : entry.getValue().entrySet()) { // first rename temp file - fileSystemUtils.renameFile( + hadoopFileSystemProxy.renameFile( mvFileEntry.getKey(), mvFileEntry.getValue(), true); String sql = convertSql(mvFileEntry.getValue()); log.debug("execute redshift sql is:" + sql); RedshiftJdbcClient.getInstance(pluginConfig).execute(sql); - fileSystemUtils.deleteFile(mvFileEntry.getValue()); + hadoopFileSystemProxy.deleteFile(mvFileEntry.getValue()); } // second delete transaction directory - fileSystemUtils.deleteFile(entry.getKey()); + hadoopFileSystemProxy.deleteFile(entry.getKey()); } } catch (Exception e) { log.error("commit aggregatedCommitInfo error ", e); @@ -96,7 +96,7 @@ public void abort(List aggregatedCommitInfos) { for (Map.Entry> entry : aggregatedCommitInfo.getTransactionMap().entrySet()) { // delete the transaction dir - fileSystemUtils.deleteFile(entry.getKey()); + hadoopFileSystemProxy.deleteFile(entry.getKey()); } } catch (Exception e) { log.error("abort aggregatedCommitInfo error ", e); diff --git a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftSink.java b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftSink.java index 4ccf1f8f313f..ac009dbc950b 100644 --- a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftSink.java +++ b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftSink.java @@ -72,6 +72,6 @@ public void prepare(Config pluginConfig) throws PrepareFailException { @Override public Optional> createAggregatedCommitter() { - return Optional.of(new S3RedshiftSinkAggregatedCommitter(fileSystemUtils, pluginConfig)); + return Optional.of(new S3RedshiftSinkAggregatedCommitter(hadoopConf, pluginConfig)); } }