Skip to content

Commit

Permalink
feat: add official add jar sql syntax, and rename custom add jar
Browse files Browse the repository at this point in the history
…to `add customJar` (DataLinkDC#1927)

* feat: add official add jar sql syntax

* Spotless Apply

* refact

Signed-off-by: Licho <lecho.sun@gmail.com>

* refactor: add log add reduce code

Signed-off-by: licho <lecho.sun@gmail.com>

* Spotless Apply

* fix: unit test

Signed-off-by: Licho <lecho.sun@gmail.com>

* feat: add configure combination initialize

Signed-off-by: Licho <lecho.sun@gmail.com>

* Spotless Apply

* fix: adapter 1.13

* spotless

* fix: configure import

* Spotless Apply

* fix: flink 1.14 adaptically

* Spotless Apply

---------

Signed-off-by: Licho <lecho.sun@gmail.com>
Signed-off-by: licho <lecho.sun@gmail.com>
Co-authored-by: leechor <leechor@users.noreply.github.com>
  • Loading branch information
leechor and leechor authored May 7, 2023
1 parent c9f0ba4 commit 21c1dec
Show file tree
Hide file tree
Showing 12 changed files with 84 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.flink.table.planner.utils.ExecutorUtils;
import org.apache.flink.table.typeutils.FieldInfoUtils;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -81,6 +82,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

import cn.hutool.core.util.ReflectUtil;

/**
* 定制TableEnvironmentImpl
*
Expand Down Expand Up @@ -331,6 +334,20 @@ public boolean parseAndLoadConfiguration(
return false;
}

@Override
public Configuration getRootConfiguration() {
Method method =
ReflectUtil.getMethod(
this.getStreamExecutionEnvironment().getClass(), "getConfiguration");
ReflectUtil.setAccessible(method);
try {
Object object = method.invoke(this.getStreamExecutionEnvironment());
return (Configuration) object;
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}

private void callSet(
SetOperation setOperation,
StreamExecutionEnvironment environment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram;
import org.apache.flink.table.typeutils.FieldInfoUtils;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -80,6 +81,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

import cn.hutool.core.util.ReflectUtil;

/**
* 定制TableEnvironmentImpl
*
Expand Down Expand Up @@ -184,6 +187,20 @@ public static CustomTableEnvironmentImpl create(
classLoader);
}

@Override
public Configuration getRootConfiguration() {
Method method =
ReflectUtil.getMethod(
this.getStreamExecutionEnvironment().getClass(), "getConfiguration");
ReflectUtil.setAccessible(method);
try {
Object object = method.invoke(this.getStreamExecutionEnvironment());
return (Configuration) object;
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}

private static Executor lookupExecutor(
ClassLoader classLoader,
String executorIdentifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.dinky.executor;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
Expand Down Expand Up @@ -51,4 +52,9 @@ public StreamExecutionEnvironment getStreamExecutionEnvironment() {
public Planner getPlanner() {
return ((StreamTableEnvironmentImpl) streamTableEnvironment).getPlanner();
}

@Override
public Configuration getRootConfiguration() {
return (Configuration) this.getConfig().getRootConfiguration();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.dinky.executor;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
Expand Down Expand Up @@ -71,4 +72,9 @@ public void injectExtendedExecutor(CustomExtendedOperationExecutor extendedExecu
ReflectUtil.setFieldValue(
getPlanner(), "extendedOperationExecutor", extendedOperationExecutor);
}

@Override
public Configuration getRootConfiguration() {
return (Configuration) this.getConfig().getRootConfiguration();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.dinky.model.LineageRel;
import org.dinky.result.SqlExplainResult;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down Expand Up @@ -61,6 +62,8 @@ boolean parseAndLoadConfiguration(

Planner getPlanner();

Configuration getRootConfiguration();

default List<LineageRel> getLineage(String statement) {
return null;
}
Expand Down
20 changes: 19 additions & 1 deletion dinky-core/src/main/java/org/dinky/explainer/Explainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.dinky.constant.FlinkSQLConstant;
import org.dinky.context.DinkyClassLoaderContextHolder;
import org.dinky.context.JarPathContextHolder;
import org.dinky.executor.CustomTableEnvironment;
import org.dinky.executor.Executor;
import org.dinky.explainer.watchTable.WatchStatementExplainer;
import org.dinky.function.data.model.UDF;
Expand All @@ -45,6 +46,8 @@
import org.dinky.utils.SqlUtil;
import org.dinky.utils.URLUtils;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;

import java.time.LocalDateTime;
Expand Down Expand Up @@ -113,6 +116,11 @@ public JobParam pretreatStatements(String[] statements) {
.forEach(JarPathContextHolder::addOtherPlugins);
DinkyClassLoaderContextHolder.get()
.addURL(URLUtils.getURLs(JarPathContextHolder.getOtherPluginsFiles()));
} else if (operationType.equals(SqlType.ADD_JAR)) {
Configuration combinationConfig = getCombinationConfig();
FileSystem.initialize(combinationConfig, null);
ddl.add(new StatementParam(statement, operationType));
statementList.add(statement);
} else if (operationType.equals(SqlType.INSERT)
|| operationType.equals(SqlType.SELECT)
|| operationType.equals(SqlType.SHOW)
Expand Down Expand Up @@ -148,6 +156,16 @@ public JobParam pretreatStatements(String[] statements) {
return new JobParam(statementList, ddl, trans, execute, CollUtil.removeNull(udfList));
}

private Configuration getCombinationConfig() {
CustomTableEnvironment cte = executor.getCustomTableEnvironment();
Configuration rootConfig = cte.getRootConfiguration();
Configuration config = cte.getConfig().getConfiguration();
Configuration combinationConfig = new Configuration();
combinationConfig.addAll(rootConfig);
combinationConfig.addAll(config);
return combinationConfig;
}

public List<UDF> parseUDFFromStatements(String[] statements) {
List<UDF> udfList = new ArrayList<>();
for (String statement : statements) {
Expand All @@ -156,7 +174,7 @@ public List<UDF> parseUDFFromStatements(String[] statements) {
}
UDF udf = UDFUtil.toUDF(statement);
if (Asserts.isNotNull(udf)) {
udfList.add(UDFUtil.toUDF(statement));
udfList.add(udf);
}
}
return udfList;
Expand Down
4 changes: 2 additions & 2 deletions dinky-executor/src/main/java/org/dinky/parser/SqlType.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ public enum SqlType {
RESET("RESET", "^RESET.*"),

EXECUTE("EXECUTE", "^EXECUTE.*"),

ADD("ADD", "^ADD.*"),
ADD_JAR("ADD_JAR", "^ADD\\s+JAR\\s+\\S+"),
ADD("ADD", "^ADD\\s+CUSTOMJAR\\s+\\S+"),

WATCH("WATCH", "^WATCH.*"),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
/** @since 0.7.0 */
public class AddJarSqlParser {

private static final String ADD_JAR = "(add\\s+jar)\\s+'(.*.jar)'";
private static final String ADD_JAR = "(add\\s+customjar)\\s+'(.*.jar)'";
private static final Pattern ADD_JAR_PATTERN =
Pattern.compile(ADD_JAR, Pattern.CASE_INSENSITIVE);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
/** @since 0.7.0 */
public class AddJarOperation extends AbstractOperation implements Operation {

private static final String KEY_WORD = "ADD JAR";
private static final String KEY_WORD = "ADD CUSTOMJAR";

public AddJarOperation(String statement) {
super(statement);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public void match() {
test("SET...", SqlType.SET, true);
test("RESET...", SqlType.RESET, true);
test("EXECUTE...", SqlType.EXECUTE, true);
test("ADD...", SqlType.ADD, true);
test("ADD jar ...", SqlType.ADD_JAR, true);
test("ADD customjar ...", SqlType.ADD, true);
test("WATCH...", SqlType.WATCH, true);

String sql =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.dinky.function.pool;

import org.dinky.function.data.model.UDF;
import org.dinky.process.exception.DinkyException;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -49,9 +48,8 @@ public static void addOrUpdate(UDF udf) {
public static UDF getUDF(String className) {
UDF udf = CODE_POOL.get(className);
if (udf == null) {
String error = StrUtil.format("class: {} is not exists!", className);
log.error(error);
throw new DinkyException(error);
String error = StrUtil.format("class: {} is not exists!,maybe for add jar", className);
log.warn(error);
}
return udf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,12 +352,14 @@ public static UDF toUDF(String statement) {
}

UDF udf = UdfCodePool.getUDF(className);
return UDF.builder()
.name(udfName)
.className(className)
.code(udf.getCode())
.functionLanguage(udf.getFunctionLanguage())
.build();
if (udf != null) {
return UDF.builder()
.name(udfName)
.className(className)
.code(udf.getCode())
.functionLanguage(udf.getFunctionLanguage())
.build();
}
}
return null;
}
Expand Down

0 comments on commit 21c1dec

Please sign in to comment.