Skip to content

Commit

Permalink
[BugFix]fix some udf bug (DataLinkDC#3256)
Browse files Browse the repository at this point in the history
  • Loading branch information
zackyoungh authored Mar 7, 2024
1 parent 3316f13 commit c2ab8cb
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.dinky.function.pool.UdfCodePool;
import org.dinky.pool.ClassEntity;
import org.dinky.pool.ClassPool;
import org.dinky.utils.URLUtils;

import org.apache.flink.client.python.PythonFunctionFactory;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -365,10 +366,10 @@ public static FlinkUdfPathContextHolder createFlinkUdfPathContextHolder() {
});

UdfCodePool.getGitPool().values().forEach(gitPackage -> {
if (FileUtil.getSuffix(gitPackage).equals("jar")) {
udfPathContextHolder.addUdfPath(new File(gitPackage));
if ("jar".equals(FileUtil.getSuffix(gitPackage))) {
udfPathContextHolder.addUdfPath(URLUtils.toFile(gitPackage));
} else {
udfPathContextHolder.addPyUdfPath(new File(gitPackage));
udfPathContextHolder.addPyUdfPath(URLUtils.toFile(gitPackage));
}
});
return udfPathContextHolder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

import org.apache.flink.client.deployment.ClusterRetrieveException;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.GlobalConfiguration;
Expand Down Expand Up @@ -82,8 +83,6 @@
import cn.hutool.http.HttpUtil;

public abstract class YarnGateway extends AbstractGateway {

public static final String HADOOP_CONFIG = "fs.hdfs.hadoopconf";
private static final String HTML_TAG_REGEX = "<pre>(.*)</pre>";

protected YarnConfiguration yarnConfiguration;
Expand Down Expand Up @@ -121,14 +120,16 @@ private void initConfig() {
}

if (Asserts.isNotNullString(clusterConfig.getHadoopConfigPath())) {
configuration.setString(HADOOP_CONFIG, clusterConfig.getHadoopConfigPath());
configuration.setString(
ConfigConstants.PATH_HADOOP_CONFIG,
FileUtil.file(clusterConfig.getHadoopConfigPath()).getAbsolutePath());
}

if (configuration.containsKey(SecurityOptions.KERBEROS_LOGIN_KEYTAB.key())) {
try {
SecurityUtils.install(new SecurityConfiguration(configuration));
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
logger.info("安全认证结束,用户和认证方式:" + currentUser.toString());
logger.info("安全认证结束,用户和认证方式:{}", currentUser.toString());
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
Expand Down

0 comments on commit c2ab8cb

Please sign in to comment.