diff --git a/dinky-admin/src/main/java/org/dinky/configure/AppConfig.java b/dinky-admin/src/main/java/org/dinky/configure/AppConfig.java index cac4573a84..e05affac11 100644 --- a/dinky-admin/src/main/java/org/dinky/configure/AppConfig.java +++ b/dinky-admin/src/main/java/org/dinky/configure/AppConfig.java @@ -81,8 +81,7 @@ public void addInterceptors(InterceptorRegistry registry) { })) .addPathPatterns("/api/**") .excludePathPatterns( - "/api/login", "/api/ldap/ldapEnableStatus", - "/druid/**", "/openapi/**"); + "/api/login", "/api/ldap/ldapEnableStatus", "/download/**", "/druid/**", "/openapi/**"); registry.addInterceptor(new TenantInterceptor()) .addPathPatterns("/api/**") diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java index 2c21c91437..fbd4b48212 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java @@ -596,17 +596,22 @@ public boolean saveOrUpdateTask(Task task) { task.setStatement(code); } } + String className = ""; // to compiler udf if (Asserts.isNotNullString(task.getDialect()) && Dialect.JAVA.isDialect(task.getDialect()) && Asserts.isNotNullString(task.getStatement())) { CustomStringJavaCompiler compiler = new CustomStringJavaCompiler(task.getStatement()); - task.setSavePointPath(compiler.getFullClassName()); + className = compiler.getFullClassName(); } else if (Dialect.PYTHON.isDialect(task.getDialect())) { - task.setSavePointPath(task.getName() + "." + UDFUtil.getPyUDFAttr(task.getStatement())); + className = task.getName() + "." + UDFUtil.getPyUDFAttr(task.getStatement()); } else if (Dialect.SCALA.isDialect(task.getDialect())) { - task.setSavePointPath(UDFUtil.getScalaFullClassName(task.getStatement())); + className = UDFUtil.getScalaFullClassName(task.getStatement()); } + if (!task.getConfigJson().getUdfConfig().getClassName().equals(className)) { + UdfCodePool.remove(task.getConfigJson().getUdfConfig().getClassName()); + } + task.getConfigJson().getUdfConfig().setClassName(className); UdfCodePool.addOrUpdate(UDFUtils.taskToUDF(task)); } diff --git a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java index f953edebbe..351811b9dd 100644 --- a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java +++ b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java @@ -46,7 +46,6 @@ import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.execution.JobClient; import org.apache.flink.python.PythonOptions; @@ -56,13 +55,10 @@ import org.apache.flink.table.api.TableResult; import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.InputStream; import java.lang.ref.WeakReference; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.net.HttpURLConnection; import java.net.URL; import java.net.URLClassLoader; import java.sql.SQLException; @@ -82,6 +78,7 @@ import cn.hutool.core.io.FileUtil; import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.URLUtil; +import cn.hutool.http.HttpUtil; import lombok.SneakyThrows; /** @@ -126,8 +123,7 @@ public static void submit(AppParamConfig config) throws SQLException { executorConfig, new WeakReference<>(DinkyClassLoader.build()).get()); // 加载第三方jar //TODO 这里有问题,需要修一修 - // loadDep(appTask.getType(), - // config.getTaskId(),DBUtil.getSysConfig(Status.SYS_ENV_SETTINGS_DINKYADDR.getKey()), executorConfig); + loadDep(appTask.getType(), config.getTaskId(), executorConfig); log.info("The job configuration is as follows: {}", executorConfig); String[] statements = @@ -169,7 +165,8 @@ public static String buildSql(AppTask appTask) throws SQLException { return sb.toString(); } - private static void loadDep(String type, Integer taskId, String dinkyAddr, ExecutorConfig executorConfig) { + private static void loadDep(String type, Integer taskId, ExecutorConfig executorConfig) { + String dinkyAddr = SystemConfiguration.getInstances().getDinkyAddr().getValue(); if (StringUtils.isBlank(dinkyAddr)) { return; } @@ -187,32 +184,35 @@ private static void loadDep(String type, Integer taskId, String dinkyAddr, Execu if (exists) { String depPath = flinkHome + "/dep"; ZipUtils.unzip(depZip, depPath); + log.info( + "download dep success, include :{}", + Arrays.stream(FileUtil.file(depPath).listFiles()) + .map(File::getName) + .collect(Collectors.joining(",\n"))); // move all jar FileUtil.listFileNames(depPath + "/jar").forEach(f -> { - FileUtil.moveContent( - FileUtil.file(depPath + "/jar/" + f), FileUtil.file(usrlib + "/" + f), true); + FileUtil.move(FileUtil.file(depPath + "/jar/" + f), FileUtil.file(usrlib + "/" + f), true); }); - URL[] jarUrls = FileUtil.listFileNames(usrlib).stream() - .map(f -> URLUtil.getURL(FileUtil.file(usrlib, f))) - .toArray(URL[]::new); - URL[] pyUrls = FileUtil.listFileNames(depPath + "/py/").stream() - .map(f -> URLUtil.getURL(FileUtil.file(depPath + "/py/", f))) - .toArray(URL[]::new); - - addURLs(jarUrls); - executorConfig - .getConfig() - .put( - PipelineOptions.JARS.key(), - Arrays.stream(jarUrls).map(URL::toString).collect(Collectors.joining(";"))); - if (ArrayUtil.isNotEmpty(pyUrls)) { - executorConfig - .getConfig() - .put( - PythonOptions.PYTHON_FILES.key(), - Arrays.stream(jarUrls) - .map(URL::toString) - .collect(Collectors.joining(","))); + if (FileUtil.isDirectory(usrlib)) { + URL[] jarUrls = FileUtil.listFileNames(usrlib).stream() + .map(f -> URLUtil.getURL(FileUtil.file(usrlib, f))) + .toArray(URL[]::new); + addURLs(jarUrls); + executor.getCustomTableEnvironment() + .addJar(FileUtil.file(usrlib).listFiles()); + } + if (FileUtil.isDirectory(depPath + "/py/")) { + URL[] pyUrls = FileUtil.listFileNames(depPath + "/py/").stream() + .map(f -> URLUtil.getURL(FileUtil.file(depPath + "/py/", f))) + .toArray(URL[]::new); + if (ArrayUtil.isNotEmpty(pyUrls)) { + executor.getCustomTableEnvironment() + .addConfiguration( + PythonOptions.PYTHON_FILES, + Arrays.stream(pyUrls) + .map(URL::toString) + .collect(Collectors.joining(","))); + } } } } catch (IOException e) { @@ -220,11 +220,11 @@ private static void loadDep(String type, Integer taskId, String dinkyAddr, Execu throw new RuntimeException(e); } } - executorConfig.getConfig().put("python.files", "./python_udf.zip"); } private static void addURLs(URL[] jarUrls) { - URLClassLoader urlClassLoader = (URLClassLoader) ClassLoader.getSystemClassLoader(); + Thread.currentThread().setContextClassLoader(new DinkyClassLoader(new URL[] {})); + URLClassLoader urlClassLoader = (URLClassLoader) Thread.currentThread().getContextClassLoader(); try { Method add = URLClassLoader.class.getDeclaredMethod("addURL", URL.class); add.setAccessible(true); @@ -238,23 +238,10 @@ private static void addURLs(URL[] jarUrls) { public static boolean downloadFile(String url, String path) throws IOException { try { - HttpURLConnection conn = (HttpURLConnection) new URL(url).openConnection(); - // 设置超时间为3秒 - conn.setConnectTimeout(3 * 1000); - // 获取输入流 - try (InputStream inputStream = conn.getInputStream()) { - // 获取输出流 - try (FileOutputStream outputStream = new FileOutputStream(path)) { - // 每次下载1024位 - byte[] b = new byte[1024]; - int len = -1; - while ((len = inputStream.read(b)) != -1) { - outputStream.write(b, 0, len); - } - return true; - } - } + HttpUtil.downloadFile(url, path); + return true; } catch (Exception e) { + log.error("download failed, Reason:", e); return false; } } diff --git a/dinky-client/dinky-client-1.14/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java b/dinky-client/dinky-client-1.14/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java index 35c9bc7419..22974fa224 100644 --- a/dinky-client/dinky-client-1.14/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java +++ b/dinky-client/dinky-client-1.14/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java @@ -62,23 +62,17 @@ import org.apache.flink.table.operations.QueryOperation; import org.apache.flink.table.planner.delegation.DefaultExecutor; -import java.io.File; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.net.URL; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.stream.Collectors; import com.fasterxml.jackson.databind.node.ObjectNode; -import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.ReflectUtil; -import cn.hutool.core.util.URLUtil; import lombok.extern.slf4j.Slf4j; /** @@ -230,23 +224,10 @@ public ObjectNode getStreamGraph(String statement) { } } - @Override - public void addJar(File... jarPath) { - Configuration configuration = - (Configuration) getStreamExecutionEnvironment().getConfiguration(); - List pathList = - Arrays.stream(URLUtil.getURLs(jarPath)).map(URL::toString).collect(Collectors.toList()); - List jars = configuration.get(PipelineOptions.JARS); - if (jars != null) { - CollUtil.addAll(jars, pathList); - } - Map flinkConfigurationMap = getFlinkConfigurationMap(); - flinkConfigurationMap.put(PipelineOptions.JARS.key(), jars); - } - @Override public void addConfiguration(ConfigOption option, T value) { Map flinkConfigurationMap = getFlinkConfigurationMap(); + getConfig().addConfiguration(new Configuration().set(option, value)); flinkConfigurationMap.put(option.key(), value); } diff --git a/dinky-client/dinky-client-1.15/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java b/dinky-client/dinky-client-1.15/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java index 62e55afa00..9ca5fca07c 100644 --- a/dinky-client/dinky-client-1.15/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java +++ b/dinky-client/dinky-client-1.15/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java @@ -62,22 +62,17 @@ import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.QueryOperation; -import java.io.File; import java.lang.reflect.Field; -import java.net.URL; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.stream.Collectors; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import cn.hutool.core.collection.CollUtil; -import cn.hutool.core.util.URLUtil; /** * CustomTableEnvironmentImpl @@ -210,32 +205,16 @@ public ObjectNode getStreamGraph(String statement) { } } - @Override - public void addJar(File... jarPath) { - Configuration configuration = - (Configuration) getStreamExecutionEnvironment().getConfiguration(); - List pathList = - Arrays.stream(URLUtil.getURLs(jarPath)).map(URL::toString).collect(Collectors.toList()); - List jars = configuration.get(PipelineOptions.JARS); - if (jars != null) { - CollUtil.addAll(jars, pathList); - } - Map flinkConfigurationMap = getFlinkConfigurationMap(); - flinkConfigurationMap.put(PipelineOptions.JARS.key(), jars); - } - @Override public void addConfiguration(ConfigOption option, T value) { Map flinkConfigurationMap = getFlinkConfigurationMap(); + getConfig().set(option, value); flinkConfigurationMap.put(option.key(), value); } private Map getFlinkConfigurationMap() { - Field configuration = null; try { - configuration = StreamExecutionEnvironment.class.getDeclaredField("configuration"); - configuration.setAccessible(true); - Configuration o = (Configuration) configuration.get(getStreamExecutionEnvironment()); + Configuration o = getRootConfiguration(); Field confData = Configuration.class.getDeclaredField("confData"); confData.setAccessible(true); Map temp = (Map) confData.get(o); diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/executor/CustomTableEnvironment.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/executor/CustomTableEnvironment.java index da15dc053d..794375f761 100644 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/executor/CustomTableEnvironment.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/executor/CustomTableEnvironment.java @@ -84,7 +84,7 @@ default void addJar(File... jarPath) { Arrays.stream(URLUtil.getURLs(jarPath)).map(URL::toString).collect(Collectors.toList()); List jars = configuration.get(PipelineOptions.JARS); if (jars == null) { - configuration.set(PipelineOptions.JARS, pathList); + addConfiguration(PipelineOptions.JARS, pathList); } else { CollUtil.addAll(jars, pathList); } diff --git a/dinky-core/src/main/java/org/dinky/executor/Executor.java b/dinky-core/src/main/java/org/dinky/executor/Executor.java index c75dbde5d1..edd6928773 100644 --- a/dinky-core/src/main/java/org/dinky/executor/Executor.java +++ b/dinky-core/src/main/java/org/dinky/executor/Executor.java @@ -60,13 +60,16 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.ReflectUtil; import cn.hutool.core.util.URLUtil; +import lombok.extern.slf4j.Slf4j; /** * Executor * * @since 2021/11/17 */ +@Slf4j public abstract class Executor { private static final Logger logger = LoggerFactory.getLogger(Executor.class); @@ -133,7 +136,22 @@ public String getTimeZone() { return getTableConfig().getLocalTimeZone().getId(); } + private void initClassloader(DinkyClassLoader classLoader) { + if (classLoader != null) { + try { + StreamExecutionEnvironment env = this.environment; + // Fix the Classloader in the env above to appClassLoader, causing ckp to fail to compile + ReflectUtil.setFieldValue(env, "userClassloader", classLoader); + env.configure(env.getConfiguration(), classLoader); + } catch (Throwable e) { + log.warn( + "The version of flink does not have a Classloader field and the classloader cannot be set.", e); + } + } + } + protected void init(DinkyClassLoader classLoader) { + initClassloader(classLoader); this.dinkyClassLoader = classLoader; if (executorConfig.isValidParallelism()) { environment.setParallelism(executorConfig.getParallelism()); diff --git a/dinky-core/src/main/java/org/dinky/job/JobManager.java b/dinky-core/src/main/java/org/dinky/job/JobManager.java index 7cf062433b..115e61000a 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobManager.java +++ b/dinky-core/src/main/java/org/dinky/job/JobManager.java @@ -79,6 +79,7 @@ import org.apache.flink.yarn.configuration.YarnConfigOptions; import java.io.File; +import java.io.IOException; import java.lang.ref.WeakReference; import java.time.LocalDateTime; import java.util.ArrayList; @@ -235,6 +236,11 @@ private boolean failed() { public boolean close() { CustomTableEnvironmentContext.clear(); RowLevelPermissionsContext.clear(); + try { + getExecutor().getDinkyClassLoader().close(); + } catch (IOException e) { + throw new RuntimeException(e); + } return true; } diff --git a/dinky-function/src/main/java/org/dinky/function/pool/UdfCodePool.java b/dinky-function/src/main/java/org/dinky/function/pool/UdfCodePool.java index 33c88d9a10..fbcba93d9d 100644 --- a/dinky-function/src/main/java/org/dinky/function/pool/UdfCodePool.java +++ b/dinky-function/src/main/java/org/dinky/function/pool/UdfCodePool.java @@ -52,6 +52,10 @@ public static void addOrUpdate(UDF udf) { CODE_POOL.put(udf.getClassName(), udf); } + public static void remove(String className) { + CODE_POOL.remove(className); + } + public static UDF getUDF(String className) { UDF udf = CODE_POOL.get(className); if (udf == null) { diff --git a/dinky-function/src/main/java/org/dinky/function/util/UDFUtil.java b/dinky-function/src/main/java/org/dinky/function/util/UDFUtil.java index 9350b83aaf..b4bc0c9712 100644 --- a/dinky-function/src/main/java/org/dinky/function/util/UDFUtil.java +++ b/dinky-function/src/main/java/org/dinky/function/util/UDFUtil.java @@ -95,7 +95,7 @@ public class UDFUtil { public static final String FUNCTION_SQL_REGEX = - "^CREATE\\s+(?:(?:TEMPORARY|TEMPORARY\\s+SYSTEM)\\s+)?FUNCTION\\s+(?:IF\\s+NOT\\s+EXISTS\\s+)?(\\S+)\\s+AS\\s+'(\\S+)'\\s*(?:LANGUAGE\\s+(?:JAVA|SCALA|PYTHON)\\s+)?(?:USING\\s+JAR\\s+'(\\S+)'\\s*(?:,\\s*JAR\\s+'(\\S+)'\\s*)*)?"; + "^CREATE\\s+(?:(?:TEMPORARY|TEMPORARY\\s+SYSTEM)\\s+)?FUNCTION\\s+(?:IF\\s+NOT\\s+EXISTS\\s+)?(\\S+)\\s+AS\\s+'(\\S+)'\\s*(?:LANGUAGE\\s+(?:JAVA|SCALA|PYTHON)\\s*)?(?:USING\\s+JAR\\s+'(\\S+)'\\s*(?:,\\s*JAR\\s+'(\\S+)'\\s*)*)?"; public static final Pattern PATTERN = Pattern.compile(FUNCTION_SQL_REGEX, Pattern.CASE_INSENSITIVE); public static final String SESSION = "SESSION"; @@ -118,7 +118,7 @@ public class UDFUtil { */ protected static final Map UDF_MD5_MAP = new HashMap<>(); - public static final String PYTHON_UDF_ATTR = "(\\S)\\s+=\\s+ud(?:f|tf|af|taf)"; + public static final String PYTHON_UDF_ATTR = "(\\S+)\\s*=\\s*ud(?:f|tf|af|taf)"; public static final String PYTHON_UDF_DEF = "@ud(?:f|tf|af|taf).*\\n+def\\s+(.*)\\(.*\\):"; public static final String SCALA_UDF_CLASS = "class\\s+(\\w+)(\\s*\\(.*\\)){0,1}\\s+extends"; public static final String SCALA_UDF_PACKAGE = "package\\s+(.*);"; @@ -343,7 +343,7 @@ public static UDF toUDF(String statement, DinkyClassLoader classLoader) { String gitPackage = UdfCodePool.getGitPackage(className); if (StrUtil.isNotBlank(gitPackage) && FileUtil.exist(gitPackage)) { - if (FileUtil.getSuffix(gitPackage).equals("jar")) { + if ("jar".equals(FileUtil.getSuffix(gitPackage))) { udfPathContextHolder.addUdfPath(new File(gitPackage)); } else { udfPathContextHolder.addPyUdfPath(new File(gitPackage)); diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/KubernetesGateway.java b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/KubernetesGateway.java index 6b746caa37..8b68c3f506 100644 --- a/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/KubernetesGateway.java +++ b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/KubernetesGateway.java @@ -40,6 +40,7 @@ import org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient; import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; import org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory; +import org.apache.flink.python.PythonOptions; import org.apache.http.util.TextUtils; import java.lang.reflect.Method; @@ -101,6 +102,8 @@ private void initConfig() { preparPodTemplate(k8sConfig.getKubeConfig(), KubernetesConfigOptions.KUBE_CONFIG_FILE); if (getType().isApplicationMode()) { + // remove python file + configuration.removeConfig(PythonOptions.PYTHON_FILES); resetCheckpointInApplicationMode(flinkConfig.getJobName()); } }