diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java index 4215d6b2efd..a43baa7c31c 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java @@ -250,7 +250,7 @@ private void initialize() { return; } this.fileSystem = FileSystem.get(configuration); - fileSystem.setWriteChecksum(false); + this.fileSystem.setWriteChecksum(false); } private Configuration createConfiguration() { @@ -269,40 +269,41 @@ private Configuration createConfiguration() { } private boolean enableKerberos() { - boolean kerberosPrincipalEmpty = StringUtils.isEmpty(hadoopConf.getKerberosPrincipal()); - boolean kerberosKeytabPathEmpty = StringUtils.isEmpty(hadoopConf.getKerberosKeytabPath()); - boolean krb5FilePathEmpty = StringUtils.isEmpty(hadoopConf.getKrb5Path()); - if (kerberosKeytabPathEmpty && kerberosPrincipalEmpty && krb5FilePathEmpty) { + boolean kerberosPrincipalEmpty = StringUtils.isBlank(hadoopConf.getKerberosPrincipal()); + boolean kerberosKeytabPathEmpty = StringUtils.isBlank(hadoopConf.getKerberosKeytabPath()); + if (kerberosKeytabPathEmpty && kerberosPrincipalEmpty) { return false; } - if (!kerberosPrincipalEmpty && !kerberosKeytabPathEmpty && !krb5FilePathEmpty) { + if (!kerberosPrincipalEmpty && !kerberosKeytabPathEmpty) { return true; } if (kerberosPrincipalEmpty) { throw new IllegalArgumentException("Please set kerberosPrincipal"); } - if (kerberosKeytabPathEmpty) { - throw new IllegalArgumentException("Please set kerberosKeytabPath"); - } - throw new IllegalArgumentException("Please set krb5FilePath"); + throw new IllegalArgumentException("Please set kerberosKeytabPath"); } private void initializeWithKerberosLogin() throws IOException, InterruptedException { - HadoopLoginFactory.loginWithKerberos( - configuration, - hadoopConf.getKrb5Path(), - hadoopConf.getKerberosPrincipal(), - hadoopConf.getKerberosKeytabPath(), - (configuration, userGroupInformation) -> { - this.userGroupInformation = userGroupInformation; - this.fileSystem = FileSystem.get(configuration); - return null; - }); + Pair pair = + HadoopLoginFactory.loginWithKerberos( + configuration, + hadoopConf.getKrb5Path(), + hadoopConf.getKerberosPrincipal(), + hadoopConf.getKerberosKeytabPath(), + (configuration, userGroupInformation) -> { + this.userGroupInformation = userGroupInformation; + this.fileSystem = FileSystem.get(configuration); + return Pair.of(userGroupInformation, fileSystem); + }); // todo: Use a daemon thread to reloginFromTicketCache + this.userGroupInformation = pair.getKey(); + this.fileSystem = pair.getValue(); + this.fileSystem.setWriteChecksum(false); + log.info("Create FileSystem success with Kerberos: {}.", hadoopConf.getKerberosPrincipal()); } private boolean enableRemoteUser() { - return StringUtils.isNotEmpty(hadoopConf.getRemoteUser()); + return StringUtils.isNotBlank(hadoopConf.getRemoteUser()); } private void initializeWithRemoteUserLogin() throws Exception { @@ -314,7 +315,9 @@ private void initializeWithRemoteUserLogin() throws Exception { final FileSystem fileSystem = FileSystem.get(configuration); return Pair.of(userGroupInformation, fileSystem); }); + log.info("Create FileSystem success with RemoteUser: {}.", hadoopConf.getRemoteUser()); this.userGroupInformation = pair.getKey(); this.fileSystem = pair.getValue(); + this.fileSystem.setWriteChecksum(false); } } diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java index 6749ce4ece4..34c751f9c48 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java @@ -150,16 +150,16 @@ public synchronized void close() { } } - private boolean enableKerberos(Config config) { + boolean enableKerberos(Config config) { boolean kerberosPrincipalEmpty = config.hasPath(BaseSourceConfigOptions.KERBEROS_PRINCIPAL.key()); boolean kerberosKeytabPathEmpty = config.hasPath(BaseSourceConfigOptions.KERBEROS_KEYTAB_PATH.key()); if (kerberosKeytabPathEmpty && kerberosPrincipalEmpty) { - return false; + return true; } if (!kerberosPrincipalEmpty && !kerberosKeytabPathEmpty) { - return true; + return false; } if (kerberosPrincipalEmpty) { throw new IllegalArgumentException("Please set kerberosPrincipal"); diff --git a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyTest.java b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyTest.java new file mode 100644 index 00000000000..9a99c1efcc4 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hive.utils; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; + +import org.junit.jupiter.api.Test; + +import lombok.SneakyThrows; + +import java.io.File; +import java.net.URL; +import java.nio.file.Paths; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class HiveMetaStoreProxyTest { + + @Test + void enableKerberos() { + Config config = parseConfig("/hive_without_kerberos.conf"); + HiveMetaStoreProxy hiveMetaStoreProxy = HiveMetaStoreProxy.getInstance(config); + assertFalse(hiveMetaStoreProxy.enableKerberos(config)); + + config = parseConfig("/hive_with_kerberos.conf"); + hiveMetaStoreProxy = HiveMetaStoreProxy.getInstance(config); + assertTrue(hiveMetaStoreProxy.enableKerberos(config)); + } + + @SneakyThrows + private Config parseConfig(String configFile) { + URL resource = HiveMetaStoreProxyTest.class.getResource(configFile); + String filePath = Paths.get(resource.toURI()).toString(); + return ConfigFactory.parseFile(new File(filePath)); + } +} diff --git a/seatunnel-connectors-v2/connector-hive/src/test/resources/hive_with_kerberos.conf b/seatunnel-connectors-v2/connector-hive/src/test/resources/hive_with_kerberos.conf new file mode 100644 index 00000000000..bc777aa27d2 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hive/src/test/resources/hive_with_kerberos.conf @@ -0,0 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +{ + table_name="temp.group_brand_order_list_board" + metastore_uri="thrift://localhost:9083" + hdfs_site_path = "/etc/hadoop/conf/hdfs-site.xml" + kerberos_principal = "hadoop" + kerberos_keytab_path = "/home/hadoop/hadoop.keytab" +} diff --git a/seatunnel-connectors-v2/connector-hive/src/test/resources/hive_without_kerberos.conf b/seatunnel-connectors-v2/connector-hive/src/test/resources/hive_without_kerberos.conf new file mode 100644 index 00000000000..4c16b369e0a --- /dev/null +++ b/seatunnel-connectors-v2/connector-hive/src/test/resources/hive_without_kerberos.conf @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +{ + table_name="temp.group_brand_order_list_board" + metastore_uri="thrift://localhost:9083" + hdfs_site_path = "/etc/hadoop/conf/hdfs-site.xml" +}