Skip to content

Commit

Permalink
Fix HiveMetaStoreProxy#enableKerberos will return true if doesn't ena…
Browse files Browse the repository at this point in the history
…ble kerberos
  • Loading branch information
ruanwenjun committed Jan 31, 2024
1 parent 576919b commit c04a629
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ private void initialize() {
return;
}
this.fileSystem = FileSystem.get(configuration);
fileSystem.setWriteChecksum(false);
this.fileSystem.setWriteChecksum(false);
}

private Configuration createConfiguration() {
Expand All @@ -269,40 +269,41 @@ private Configuration createConfiguration() {
}

private boolean enableKerberos() {
boolean kerberosPrincipalEmpty = StringUtils.isEmpty(hadoopConf.getKerberosPrincipal());
boolean kerberosKeytabPathEmpty = StringUtils.isEmpty(hadoopConf.getKerberosKeytabPath());
boolean krb5FilePathEmpty = StringUtils.isEmpty(hadoopConf.getKrb5Path());
if (kerberosKeytabPathEmpty && kerberosPrincipalEmpty && krb5FilePathEmpty) {
boolean kerberosPrincipalEmpty = StringUtils.isBlank(hadoopConf.getKerberosPrincipal());
boolean kerberosKeytabPathEmpty = StringUtils.isBlank(hadoopConf.getKerberosKeytabPath());
if (kerberosKeytabPathEmpty && kerberosPrincipalEmpty) {
return false;
}
if (!kerberosPrincipalEmpty && !kerberosKeytabPathEmpty && !krb5FilePathEmpty) {
if (!kerberosPrincipalEmpty && !kerberosKeytabPathEmpty) {
return true;
}
if (kerberosPrincipalEmpty) {
throw new IllegalArgumentException("Please set kerberosPrincipal");
}
if (kerberosKeytabPathEmpty) {
throw new IllegalArgumentException("Please set kerberosKeytabPath");
}
throw new IllegalArgumentException("Please set krb5FilePath");
throw new IllegalArgumentException("Please set kerberosKeytabPath");
}

private void initializeWithKerberosLogin() throws IOException, InterruptedException {
HadoopLoginFactory.loginWithKerberos(
configuration,
hadoopConf.getKrb5Path(),
hadoopConf.getKerberosPrincipal(),
hadoopConf.getKerberosKeytabPath(),
(configuration, userGroupInformation) -> {
this.userGroupInformation = userGroupInformation;
this.fileSystem = FileSystem.get(configuration);
return null;
});
Pair<UserGroupInformation, FileSystem> pair =
HadoopLoginFactory.loginWithKerberos(
configuration,
hadoopConf.getKrb5Path(),
hadoopConf.getKerberosPrincipal(),
hadoopConf.getKerberosKeytabPath(),
(configuration, userGroupInformation) -> {
this.userGroupInformation = userGroupInformation;
this.fileSystem = FileSystem.get(configuration);
return Pair.of(userGroupInformation, fileSystem);
});
// todo: Use a daemon thread to reloginFromTicketCache
this.userGroupInformation = pair.getKey();
this.fileSystem = pair.getValue();
this.fileSystem.setWriteChecksum(false);
log.info("Create FileSystem success with Kerberos: {}.", hadoopConf.getKerberosPrincipal());
}

private boolean enableRemoteUser() {
return StringUtils.isNotEmpty(hadoopConf.getRemoteUser());
return StringUtils.isNotBlank(hadoopConf.getRemoteUser());
}

private void initializeWithRemoteUserLogin() throws Exception {
Expand All @@ -314,7 +315,9 @@ private void initializeWithRemoteUserLogin() throws Exception {
final FileSystem fileSystem = FileSystem.get(configuration);
return Pair.of(userGroupInformation, fileSystem);
});
log.info("Create FileSystem success with RemoteUser: {}.", hadoopConf.getRemoteUser());
this.userGroupInformation = pair.getKey();
this.fileSystem = pair.getValue();
this.fileSystem.setWriteChecksum(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private HiveMetaStoreProxy(Config config) {
String hiveSitePath = config.getString(HiveConfig.HIVE_SITE_PATH.key());
hiveConf.addResource(new File(hiveSitePath).toURI().toURL());
}
if (enableKerberos(config)) {
if (HiveMetaStoreProxyUtils.enableKerberos(config)) {
this.hiveMetaStoreClient =
HadoopLoginFactory.loginWithKerberos(
new Configuration(),
Expand All @@ -71,7 +71,7 @@ private HiveMetaStoreProxy(Config config) {
new HiveMetaStoreClient(hiveConf));
return;
}
if (enableRemoteUser(config)) {
if (HiveMetaStoreProxyUtils.enableRemoteUser(config)) {
this.hiveMetaStoreClient =
HadoopLoginFactory.loginWithRemoteUser(
new Configuration(),
Expand Down Expand Up @@ -149,25 +149,4 @@ public synchronized void close() {
HiveMetaStoreProxy.INSTANCE = null;
}
}

private boolean enableKerberos(Config config) {
boolean kerberosPrincipalEmpty =
config.hasPath(BaseSourceConfigOptions.KERBEROS_PRINCIPAL.key());
boolean kerberosKeytabPathEmpty =
config.hasPath(BaseSourceConfigOptions.KERBEROS_KEYTAB_PATH.key());
if (kerberosKeytabPathEmpty && kerberosPrincipalEmpty) {
return false;
}
if (!kerberosPrincipalEmpty && !kerberosKeytabPathEmpty) {
return true;
}
if (kerberosPrincipalEmpty) {
throw new IllegalArgumentException("Please set kerberosPrincipal");
}
throw new IllegalArgumentException("Please set kerberosKeytabPath");
}

private boolean enableRemoteUser(Config config) {
return config.hasPath(BaseSourceConfigOptions.REMOTE_USER.key());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.hive.utils;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;

import lombok.experimental.UtilityClass;

@UtilityClass
public class HiveMetaStoreProxyUtils {

public boolean enableKerberos(Config config) {
boolean kerberosPrincipalEmpty =
config.hasPath(BaseSourceConfigOptions.KERBEROS_PRINCIPAL.key());
boolean kerberosKeytabPathEmpty =
config.hasPath(BaseSourceConfigOptions.KERBEROS_KEYTAB_PATH.key());
if (kerberosKeytabPathEmpty && kerberosPrincipalEmpty) {
return true;
}
if (!kerberosPrincipalEmpty && !kerberosKeytabPathEmpty) {
return false;
}
if (kerberosPrincipalEmpty) {
throw new IllegalArgumentException("Please set kerberosPrincipal");
}
throw new IllegalArgumentException("Please set kerberosKeytabPath");
}

public boolean enableRemoteUser(Config config) {
return config.hasPath(BaseSourceConfigOptions.REMOTE_USER.key());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.hive.utils;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;

import org.junit.jupiter.api.Test;

import lombok.SneakyThrows;

import java.io.File;
import java.net.URL;
import java.nio.file.Paths;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

class HiveMetaStoreProxyUtilsTest {

@Test
void enableKerberos() {
Config config = parseConfig("/hive_without_kerberos.conf");
assertFalse(HiveMetaStoreProxyUtils.enableKerberos(config));
assertFalse(HiveMetaStoreProxyUtils.enableRemoteUser(config));

config = parseConfig("/hive_with_kerberos.conf");
assertTrue(HiveMetaStoreProxyUtils.enableKerberos(config));
assertFalse(HiveMetaStoreProxyUtils.enableRemoteUser(config));

config = parseConfig("/hive_with_remoteuser.conf");
assertTrue(HiveMetaStoreProxyUtils.enableRemoteUser(config));
}

@SneakyThrows
private Config parseConfig(String configFile) {
URL resource = HiveMetaStoreProxyUtilsTest.class.getResource(configFile);
String filePath = Paths.get(resource.toURI()).toString();
return ConfigFactory.parseFile(new File(filePath));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
{
table_name="temp.group_brand_order_list_board"
metastore_uri="thrift://localhost:9083"
hdfs_site_path = "/etc/hadoop/conf/hdfs-site.xml"
kerberos_principal = "hadoop"
kerberos_keytab_path = "/home/hadoop/hadoop.keytab"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
{
table_name="temp.group_brand_order_list_board"
metastore_uri="thrift://localhost:9083"
hdfs_site_path = "/etc/hadoop/conf/hdfs-site.xml"
remote_user = "hadoop"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
{
table_name="temp.group_brand_order_list_board"
metastore_uri="thrift://localhost:9083"
hdfs_site_path = "/etc/hadoop/conf/hdfs-site.xml"
}

0 comments on commit c04a629

Please sign in to comment.