From c2ab8cb0d7694daee3acecc4a9ffcbf907de40dc Mon Sep 17 00:00:00 2001 From: ZackYoung Date: Thu, 7 Mar 2024 21:09:43 +0800 Subject: [PATCH] [BugFix]fix some udf bug (#3256) --- .../src/main/java/org/dinky/function/util/UDFUtil.java | 7 ++++--- .../main/java/org/dinky/gateway/yarn/YarnGateway.java | 9 +++++---- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/dinky-function/src/main/java/org/dinky/function/util/UDFUtil.java b/dinky-function/src/main/java/org/dinky/function/util/UDFUtil.java index 3e54a9b75a..427b1dec2f 100644 --- a/dinky-function/src/main/java/org/dinky/function/util/UDFUtil.java +++ b/dinky-function/src/main/java/org/dinky/function/util/UDFUtil.java @@ -36,6 +36,7 @@ import org.dinky.function.pool.UdfCodePool; import org.dinky.pool.ClassEntity; import org.dinky.pool.ClassPool; +import org.dinky.utils.URLUtils; import org.apache.flink.client.python.PythonFunctionFactory; import org.apache.flink.configuration.Configuration; @@ -365,10 +366,10 @@ public static FlinkUdfPathContextHolder createFlinkUdfPathContextHolder() { }); UdfCodePool.getGitPool().values().forEach(gitPackage -> { - if (FileUtil.getSuffix(gitPackage).equals("jar")) { - udfPathContextHolder.addUdfPath(new File(gitPackage)); + if ("jar".equals(FileUtil.getSuffix(gitPackage))) { + udfPathContextHolder.addUdfPath(URLUtils.toFile(gitPackage)); } else { - udfPathContextHolder.addPyUdfPath(new File(gitPackage)); + udfPathContextHolder.addPyUdfPath(URLUtils.toFile(gitPackage)); } }); return udfPathContextHolder; diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java b/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java index a5a05d2d3a..573fa835cc 100644 --- a/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java +++ b/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java @@ -38,6 +38,7 @@ import org.apache.flink.client.deployment.ClusterRetrieveException; import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.GlobalConfiguration; @@ -82,8 +83,6 @@ import cn.hutool.http.HttpUtil; public abstract class YarnGateway extends AbstractGateway { - - public static final String HADOOP_CONFIG = "fs.hdfs.hadoopconf"; private static final String HTML_TAG_REGEX = "
(.*)
"; protected YarnConfiguration yarnConfiguration; @@ -121,14 +120,16 @@ private void initConfig() { } if (Asserts.isNotNullString(clusterConfig.getHadoopConfigPath())) { - configuration.setString(HADOOP_CONFIG, clusterConfig.getHadoopConfigPath()); + configuration.setString( + ConfigConstants.PATH_HADOOP_CONFIG, + FileUtil.file(clusterConfig.getHadoopConfigPath()).getAbsolutePath()); } if (configuration.containsKey(SecurityOptions.KERBEROS_LOGIN_KEYTAB.key())) { try { SecurityUtils.install(new SecurityConfiguration(configuration)); UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); - logger.info("安全认证结束,用户和认证方式:" + currentUser.toString()); + logger.info("安全认证结束,用户和认证方式:{}", currentUser.toString()); } catch (Exception e) { logger.error(e.getMessage(), e); }