From 21c1dec9d32f1f713cafe4323d494e2b708e4567 Mon Sep 17 00:00:00 2001 From: Licho Date: Sun, 7 May 2023 13:33:47 +0800 Subject: [PATCH] feat: add official `add jar` sql syntax, and rename custom `add jar` to `add customJar` (#1927) * feat: add official add jar sql syntax * Spotless Apply * refact Signed-off-by: Licho * refactor: add log add reduce code Signed-off-by: licho * Spotless Apply * fix: unit test Signed-off-by: Licho * feat: add configure combination initialize Signed-off-by: Licho * Spotless Apply * fix: adapter 1.13 * spotless * fix: configure import * Spotless Apply * fix: flink 1.14 adaptically * Spotless Apply --------- Signed-off-by: Licho Signed-off-by: licho Co-authored-by: leechor --- .../executor/CustomTableEnvironmentImpl.java | 17 ++++++++++++++++ .../executor/CustomTableEnvironmentImpl.java | 17 ++++++++++++++++ .../AbstractCustomTableEnvironment.java | 6 ++++++ .../AbstractCustomTableEnvironment.java | 6 ++++++ .../executor/CustomTableEnvironment.java | 3 +++ .../java/org/dinky/explainer/Explainer.java | 20 ++++++++++++++++++- .../main/java/org/dinky/parser/SqlType.java | 4 ++-- .../dinky/parser/check/AddJarSqlParser.java | 2 +- .../org/dinky/trans/ddl/AddJarOperation.java | 2 +- .../java/org/dinky/parser/SqlTypeTest.java | 3 ++- .../org/dinky/function/pool/UdfCodePool.java | 6 ++---- .../java/org/dinky/function/util/UDFUtil.java | 14 +++++++------ 12 files changed, 84 insertions(+), 16 deletions(-) diff --git a/dinky-client/dinky-client-1.13/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java b/dinky-client/dinky-client-1.13/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java index 6320a8d650..7dcada86c7 100644 --- a/dinky-client/dinky-client-1.13/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java +++ b/dinky-client/dinky-client-1.13/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java @@ -70,6 +70,7 @@ import org.apache.flink.table.planner.utils.ExecutorUtils; import org.apache.flink.table.typeutils.FieldInfoUtils; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; @@ -81,6 +82,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import cn.hutool.core.util.ReflectUtil; + /** * 定制TableEnvironmentImpl * @@ -331,6 +334,20 @@ public boolean parseAndLoadConfiguration( return false; } + @Override + public Configuration getRootConfiguration() { + Method method = + ReflectUtil.getMethod( + this.getStreamExecutionEnvironment().getClass(), "getConfiguration"); + ReflectUtil.setAccessible(method); + try { + Object object = method.invoke(this.getStreamExecutionEnvironment()); + return (Configuration) object; + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + private void callSet( SetOperation setOperation, StreamExecutionEnvironment environment, diff --git a/dinky-client/dinky-client-1.14/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java b/dinky-client/dinky-client-1.14/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java index 06e0009fce..6c25874f68 100644 --- a/dinky-client/dinky-client-1.14/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java +++ b/dinky-client/dinky-client-1.14/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java @@ -69,6 +69,7 @@ import org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram; import org.apache.flink.table.typeutils.FieldInfoUtils; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; @@ -80,6 +81,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import cn.hutool.core.util.ReflectUtil; + /** * 定制TableEnvironmentImpl * @@ -184,6 +187,20 @@ public static CustomTableEnvironmentImpl create( classLoader); } + @Override + public Configuration getRootConfiguration() { + Method method = + ReflectUtil.getMethod( + this.getStreamExecutionEnvironment().getClass(), "getConfiguration"); + ReflectUtil.setAccessible(method); + try { + Object object = method.invoke(this.getStreamExecutionEnvironment()); + return (Configuration) object; + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + private static Executor lookupExecutor( ClassLoader classLoader, String executorIdentifier, diff --git a/dinky-client/dinky-client-1.15/src/main/java/org/dinky/executor/AbstractCustomTableEnvironment.java b/dinky-client/dinky-client-1.15/src/main/java/org/dinky/executor/AbstractCustomTableEnvironment.java index 6376c80778..eadbeae390 100644 --- a/dinky-client/dinky-client-1.15/src/main/java/org/dinky/executor/AbstractCustomTableEnvironment.java +++ b/dinky-client/dinky-client-1.15/src/main/java/org/dinky/executor/AbstractCustomTableEnvironment.java @@ -19,6 +19,7 @@ package org.dinky.executor; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; @@ -51,4 +52,9 @@ public StreamExecutionEnvironment getStreamExecutionEnvironment() { public Planner getPlanner() { return ((StreamTableEnvironmentImpl) streamTableEnvironment).getPlanner(); } + + @Override + public Configuration getRootConfiguration() { + return (Configuration) this.getConfig().getRootConfiguration(); + } } diff --git a/dinky-client/dinky-client-1.16/src/main/java/org/dinky/executor/AbstractCustomTableEnvironment.java b/dinky-client/dinky-client-1.16/src/main/java/org/dinky/executor/AbstractCustomTableEnvironment.java index a6f7299b2f..e33732f05f 100644 --- a/dinky-client/dinky-client-1.16/src/main/java/org/dinky/executor/AbstractCustomTableEnvironment.java +++ b/dinky-client/dinky-client-1.16/src/main/java/org/dinky/executor/AbstractCustomTableEnvironment.java @@ -19,6 +19,7 @@ package org.dinky.executor; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; @@ -71,4 +72,9 @@ public void injectExtendedExecutor(CustomExtendedOperationExecutor extendedExecu ReflectUtil.setFieldValue( getPlanner(), "extendedOperationExecutor", extendedOperationExecutor); } + + @Override + public Configuration getRootConfiguration() { + return (Configuration) this.getConfig().getRootConfiguration(); + } } diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/executor/CustomTableEnvironment.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/executor/CustomTableEnvironment.java index dc1abef729..a5b8f299dc 100644 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/executor/CustomTableEnvironment.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/executor/CustomTableEnvironment.java @@ -22,6 +22,7 @@ import org.dinky.model.LineageRel; import org.dinky.result.SqlExplainResult; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.rest.messages.JobPlanInfo; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -61,6 +62,8 @@ boolean parseAndLoadConfiguration( Planner getPlanner(); + Configuration getRootConfiguration(); + default List getLineage(String statement) { return null; } diff --git a/dinky-core/src/main/java/org/dinky/explainer/Explainer.java b/dinky-core/src/main/java/org/dinky/explainer/Explainer.java index b9244115e4..cb200c209c 100644 --- a/dinky-core/src/main/java/org/dinky/explainer/Explainer.java +++ b/dinky-core/src/main/java/org/dinky/explainer/Explainer.java @@ -23,6 +23,7 @@ import org.dinky.constant.FlinkSQLConstant; import org.dinky.context.DinkyClassLoaderContextHolder; import org.dinky.context.JarPathContextHolder; +import org.dinky.executor.CustomTableEnvironment; import org.dinky.executor.Executor; import org.dinky.explainer.watchTable.WatchStatementExplainer; import org.dinky.function.data.model.UDF; @@ -45,6 +46,8 @@ import org.dinky.utils.SqlUtil; import org.dinky.utils.URLUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; import org.apache.flink.runtime.rest.messages.JobPlanInfo; import java.time.LocalDateTime; @@ -113,6 +116,11 @@ public JobParam pretreatStatements(String[] statements) { .forEach(JarPathContextHolder::addOtherPlugins); DinkyClassLoaderContextHolder.get() .addURL(URLUtils.getURLs(JarPathContextHolder.getOtherPluginsFiles())); + } else if (operationType.equals(SqlType.ADD_JAR)) { + Configuration combinationConfig = getCombinationConfig(); + FileSystem.initialize(combinationConfig, null); + ddl.add(new StatementParam(statement, operationType)); + statementList.add(statement); } else if (operationType.equals(SqlType.INSERT) || operationType.equals(SqlType.SELECT) || operationType.equals(SqlType.SHOW) @@ -148,6 +156,16 @@ public JobParam pretreatStatements(String[] statements) { return new JobParam(statementList, ddl, trans, execute, CollUtil.removeNull(udfList)); } + private Configuration getCombinationConfig() { + CustomTableEnvironment cte = executor.getCustomTableEnvironment(); + Configuration rootConfig = cte.getRootConfiguration(); + Configuration config = cte.getConfig().getConfiguration(); + Configuration combinationConfig = new Configuration(); + combinationConfig.addAll(rootConfig); + combinationConfig.addAll(config); + return combinationConfig; + } + public List parseUDFFromStatements(String[] statements) { List udfList = new ArrayList<>(); for (String statement : statements) { @@ -156,7 +174,7 @@ public List parseUDFFromStatements(String[] statements) { } UDF udf = UDFUtil.toUDF(statement); if (Asserts.isNotNull(udf)) { - udfList.add(UDFUtil.toUDF(statement)); + udfList.add(udf); } } return udfList; diff --git a/dinky-executor/src/main/java/org/dinky/parser/SqlType.java b/dinky-executor/src/main/java/org/dinky/parser/SqlType.java index b3814c0c17..2e699e5f6e 100644 --- a/dinky-executor/src/main/java/org/dinky/parser/SqlType.java +++ b/dinky-executor/src/main/java/org/dinky/parser/SqlType.java @@ -56,8 +56,8 @@ public enum SqlType { RESET("RESET", "^RESET.*"), EXECUTE("EXECUTE", "^EXECUTE.*"), - - ADD("ADD", "^ADD.*"), + ADD_JAR("ADD_JAR", "^ADD\\s+JAR\\s+\\S+"), + ADD("ADD", "^ADD\\s+CUSTOMJAR\\s+\\S+"), WATCH("WATCH", "^WATCH.*"), diff --git a/dinky-executor/src/main/java/org/dinky/parser/check/AddJarSqlParser.java b/dinky-executor/src/main/java/org/dinky/parser/check/AddJarSqlParser.java index 0904c5fdc4..d7b27f39c1 100644 --- a/dinky-executor/src/main/java/org/dinky/parser/check/AddJarSqlParser.java +++ b/dinky-executor/src/main/java/org/dinky/parser/check/AddJarSqlParser.java @@ -36,7 +36,7 @@ /** @since 0.7.0 */ public class AddJarSqlParser { - private static final String ADD_JAR = "(add\\s+jar)\\s+'(.*.jar)'"; + private static final String ADD_JAR = "(add\\s+customjar)\\s+'(.*.jar)'"; private static final Pattern ADD_JAR_PATTERN = Pattern.compile(ADD_JAR, Pattern.CASE_INSENSITIVE); diff --git a/dinky-executor/src/main/java/org/dinky/trans/ddl/AddJarOperation.java b/dinky-executor/src/main/java/org/dinky/trans/ddl/AddJarOperation.java index 31b3723e0d..b7eb39d97d 100644 --- a/dinky-executor/src/main/java/org/dinky/trans/ddl/AddJarOperation.java +++ b/dinky-executor/src/main/java/org/dinky/trans/ddl/AddJarOperation.java @@ -30,7 +30,7 @@ /** @since 0.7.0 */ public class AddJarOperation extends AbstractOperation implements Operation { - private static final String KEY_WORD = "ADD JAR"; + private static final String KEY_WORD = "ADD CUSTOMJAR"; public AddJarOperation(String statement) { super(statement); diff --git a/dinky-executor/src/test/java/org/dinky/parser/SqlTypeTest.java b/dinky-executor/src/test/java/org/dinky/parser/SqlTypeTest.java index a298f695d0..b12c5ef09a 100644 --- a/dinky-executor/src/test/java/org/dinky/parser/SqlTypeTest.java +++ b/dinky-executor/src/test/java/org/dinky/parser/SqlTypeTest.java @@ -52,7 +52,8 @@ public void match() { test("SET...", SqlType.SET, true); test("RESET...", SqlType.RESET, true); test("EXECUTE...", SqlType.EXECUTE, true); - test("ADD...", SqlType.ADD, true); + test("ADD jar ...", SqlType.ADD_JAR, true); + test("ADD customjar ...", SqlType.ADD, true); test("WATCH...", SqlType.WATCH, true); String sql = diff --git a/dinky-function/src/main/java/org/dinky/function/pool/UdfCodePool.java b/dinky-function/src/main/java/org/dinky/function/pool/UdfCodePool.java index 6cb2ff3782..93aef5a850 100644 --- a/dinky-function/src/main/java/org/dinky/function/pool/UdfCodePool.java +++ b/dinky-function/src/main/java/org/dinky/function/pool/UdfCodePool.java @@ -20,7 +20,6 @@ package org.dinky.function.pool; import org.dinky.function.data.model.UDF; -import org.dinky.process.exception.DinkyException; import java.util.List; import java.util.Map; @@ -49,9 +48,8 @@ public static void addOrUpdate(UDF udf) { public static UDF getUDF(String className) { UDF udf = CODE_POOL.get(className); if (udf == null) { - String error = StrUtil.format("class: {} is not exists!", className); - log.error(error); - throw new DinkyException(error); + String error = StrUtil.format("class: {} is not exists!,maybe for add jar", className); + log.warn(error); } return udf; } diff --git a/dinky-function/src/main/java/org/dinky/function/util/UDFUtil.java b/dinky-function/src/main/java/org/dinky/function/util/UDFUtil.java index d5fc80675d..08488b6f2d 100644 --- a/dinky-function/src/main/java/org/dinky/function/util/UDFUtil.java +++ b/dinky-function/src/main/java/org/dinky/function/util/UDFUtil.java @@ -352,12 +352,14 @@ public static UDF toUDF(String statement) { } UDF udf = UdfCodePool.getUDF(className); - return UDF.builder() - .name(udfName) - .className(className) - .code(udf.getCode()) - .functionLanguage(udf.getFunctionLanguage()) - .build(); + if (udf != null) { + return UDF.builder() + .name(udfName) + .className(className) + .code(udf.getCode()) + .functionLanguage(udf.getFunctionLanguage()) + .build(); + } } return null; }