Skip to content

Commit

Permalink
[Improvement] Optimize expression variable replacement methods (DataL…
Browse files Browse the repository at this point in the history
…inkDC#3259)

Co-authored-by: Zzm0809 <Zzm0809@users.noreply.github.com>
  • Loading branch information
Zzm0809 and Zzm0809 authored Mar 8, 2024
1 parent 1dd5e07 commit cb81b4d
Show file tree
Hide file tree
Showing 14 changed files with 176 additions and 55 deletions.
1 change: 1 addition & 0 deletions dinky-admin/src/main/java/org/dinky/init/SystemInit.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public void run(ApplicationArguments args) {
initResources();
List<Tenant> tenants = tenantService.list();
sysConfigService.initSysConfig();
sysConfigService.initExpressionVariables();

for (Tenant tenant : tenants) {
taskService.initDefaultFlinkSQLEnv(tenant.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ public interface SysConfigService extends ISuperService<SysConfig> {
*/
void initSysConfig();

/**
* Initialize expression variables.
*/
void initExpressionVariables();

/**
* Update system configurations by key-value pairs.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.dinky.service.impl;

import org.dinky.context.EngineContextHolder;
import org.dinky.data.model.Configuration;
import org.dinky.data.model.SysConfig;
import org.dinky.data.model.SystemConfiguration;
Expand All @@ -33,18 +34,20 @@

import org.springframework.stereotype.Service;

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.activerecord.Model;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.convert.Convert;
import lombok.extern.slf4j.Slf4j;

/**
* SysConfigServiceImpl
*
* @since 2021/11/18
*/
@Service
@Slf4j
public class SysConfigServiceImpl extends SuperServiceImpl<SysConfigMapper, SysConfig> implements SysConfigService {

@Override
Expand Down Expand Up @@ -92,13 +95,30 @@ public void initSysConfig() {
systemConfiguration.initSetConfiguration(configMap);
}

/**
* Initialize expression variables.
*/
@Override
public void initExpressionVariables() {
SystemConfiguration systemConfiguration = SystemConfiguration.getInstances();
// to initialize expression variable class and load it into the engine context
EngineContextHolder.loadExpressionVariableClass(
systemConfiguration.getExpressionVariable().getValue());
}

@Override
public void updateSysConfigByKv(String key, String value) {
SysConfig config = getOne(new QueryWrapper<SysConfig>().eq("name", key));
SysConfig config = getOne(new LambdaQueryWrapper<>(SysConfig.class).eq(SysConfig::getName, key));
config.setValue(value);
SystemConfiguration systemConfiguration = SystemConfiguration.getInstances();

systemConfiguration.setConfiguration(key, value);
config.updateById();
// if the expression variable is modified, reinitialize the expression variable
if (key.equals(systemConfiguration.getExpressionVariable().getKey())) {
log.info(
"The expression variable is modified, reinitialize the expression variable to the engine context.");
initExpressionVariables();
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ private static void initSystemConfiguration() throws SQLException {
Map<String, String> configMap =
CollUtil.toMap(sysConfigList, new HashMap<>(), SysConfig::getName, SysConfig::getValue);
systemConfiguration.initSetConfiguration(configMap);
systemConfiguration.initExpressionVariableList(configMap);
}

public static void submit(AppParamConfig config) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.context;

import org.dinky.utils.StringUtil;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.lang.Dict;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;

/**
* The EngineContextHolder
*/
@Slf4j
public class EngineContextHolder {
private static final Dict ENGINE_CONTEXT = Dict.create();

/**
* Get the engine contextload class
* @param variables the variables
* @return the class loader variable jexl class
*/
private static List<String> getClassLoaderVariableJexlClass(String variables) {
if (StrUtil.isBlank(variables)) {
log.warn("The variable is empty, please check the configuration.");
return Collections.emptyList();
}
return Arrays.stream(variables.split(","))
.map(String::trim)
.filter(s -> !s.isEmpty())
.collect(Collectors.toList());
}

/**
* Load expression variable class to the engine context
* @param variables the variables
*/
public static void loadExpressionVariableClass(String variables) {
getClassLoaderVariableJexlClass(variables).forEach(className -> {
try {
String classSimpleName =
BeanUtil.getBeanDesc(Class.forName(className)).getSimpleName();
String snakeCaseClassName = StringUtil.toSnakeCase(true, classSimpleName);
ENGINE_CONTEXT.set(snakeCaseClassName, Class.forName(className));
log.info("load class : {}", className);
} catch (ClassNotFoundException e) {
log.error(
"The class [{}] that needs to be loaded may not be loaded by dinky or there is no jar file of this class under dinky's lib/plugins/extends. Please check, and try again. {}",
className,
e.getMessage(),
e);
}
});
}

/**
* Get the engine context
* @return the engine context
*/
public static Dict getEngineContext() {
return ENGINE_CONTEXT;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

package org.dinky.data.constant;

import java.util.Arrays;
import java.util.regex.Pattern;

/**
* CommonConstant
*
Expand All @@ -31,4 +34,14 @@ public final class CommonConstant {

public static final String DINKY_APP_MAIN_CLASS = "org.dinky.app.MainApp";
public static final String LineSep = System.getProperty("line.separator");

public static final Pattern GLOBAL_VARIABLE_PATTERN = Pattern.compile("\\$\\{(.+?)}");

public static final String DEFAULT_EXPRESSION_VARIABLES = String.join(
",",
Arrays.asList(
"cn.hutool.core.date.DateUtil",
"cn.hutool.core.util.IdUtil",
"cn.hutool.core.util.RandomUtil",
"cn.hutool.core.util.StrUtil"));
}
2 changes: 2 additions & 0 deletions dinky-common/src/main/java/org/dinky/data/enums/Status.java
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,8 @@ public enum Status {
SYS_ENV_SETTINGS_MAX_RETAIN_DAYS_NOTE(1172, "sys.env.settings.maxRetainDays.note"),
SYS_ENV_SETTINGS_MAX_RETAIN_COUNT(1173, "sys.env.settings.maxRetainCount"),
SYS_ENV_SETTINGS_MAX_RETAIN_COUNT_NOTE(1174, "sys.env.settings.maxRetainCount.note"),
SYS_ENV_SETTINGS_EXPRESSION_VARIABLE(1175, "sys.env.settings.expressionVariable"),
SYS_ENV_SETTINGS_EXPRESSION_VARIABLE_NOTE(1176, "sys.env.settings.expressionVariable.note"),

SYS_DOLPHINSCHEDULER_SETTINGS_ENABLE(118, "sys.dolphinscheduler.settings.enable"),
SYS_DOLPHINSCHEDULER_SETTINGS_ENABLE_NOTE(119, "sys.dolphinscheduler.settings.enable.note"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.dinky.data.model;

import org.dinky.context.EngineContextHolder;
import org.dinky.data.constant.CommonConstant;
import org.dinky.data.enums.Status;
import org.dinky.data.properties.OssProperties;

Expand Down Expand Up @@ -124,6 +126,12 @@ public static Configuration.OptionBuilder key(Status status) {
.defaultValue(30)
.note(Status.SYS_ENV_SETTINGS_MAX_RETAIN_DAYS_NOTE);

// the default value is the same as the default value of the expressionVariable
private final Configuration<String> expressionVariable = key(Status.SYS_ENV_SETTINGS_EXPRESSION_VARIABLE)
.stringType()
.defaultValue(CommonConstant.DEFAULT_EXPRESSION_VARIABLES)
.note(Status.SYS_ENV_SETTINGS_EXPRESSION_VARIABLE_NOTE);

private final Configuration<Boolean> dolphinschedulerEnable = key(Status.SYS_DOLPHINSCHEDULER_SETTINGS_ENABLE)
.booleanType()
.defaultValue(false)
Expand Down Expand Up @@ -315,6 +323,14 @@ public void initSetConfiguration(Map<String, String> configMap) {
CONFIGURATION_LIST.stream().peek(Configuration::runParameterCheck).forEach(Configuration::runChangeEvent);
}

public void initExpressionVariableList(Map<String, String> configMap) {
CONFIGURATION_LIST.forEach(item -> {
if (item.getKey().equals(expressionVariable.getKey())) {
EngineContextHolder.loadExpressionVariableClass(configMap.get(item.getKey()));
}
});
}

public Map<String, List<Configuration<?>>> getAllConfiguration() {
Map<String, List<Configuration<?>>> data = new TreeMap<>();
for (Configuration<?> item : CONFIGURATION_LIST) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ sys.env.settings.maxRetainDays=Job history max retained days
sys.env.settings.maxRetainDays.note=The maximum number of days for the history of submitted jobs and auto-registered cluster records to be retained will be automatically deleted when they expire
sys.env.settings.maxRetainCount=Job history max retained counts
sys.env.settings.maxRetainCount.note=The maximum number of submitted job histories and auto-registered cluster records will not be deleted if they are less than that number, even if the retention days have passed

sys.env.settings.expressionVariable=Expression variable list
sys.env.settings.expressionVariable.note= Used to use expression variables in task configuration, use , to separate multiple variables, need to use the fully qualified name of the class, for example: com.dinky.common.utils.DateUtils, please ensure that the class is in the classpath of Dinky
sys.dolphinscheduler.settings.enable=Whether to enable DolphinScheduler
sys.dolphinscheduler.settings.enable.note=Whether to enable DolphinScheduler. Only after enabling it can you use the related functions of DolphinScheduler. Please fill in the following configuration items first, and then enable this configuration after completion. Also: Please ensure that the related configurations of DolphinScheduler are correct.
sys.dolphinscheduler.settings.url=DolphinScheduler address
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ sys.env.settings.maxRetainDays=作业历史最大保留天数
sys.env.settings.maxRetainDays.note=提交的作业历史与自动注册的集群记录最大保留天数,过期会被自动删除
sys.env.settings.maxRetainCount=作业历史最大保留数量
sys.env.settings.maxRetainCount.note=提交的作业历史与自动注册的集群记录最大保留数量,如果不足该数量,则不会被删除,即使已经过了最大保留天数

sys.env.settings.expressionVariable=表达式变量列表
sys.env.settings.expressionVariable.note=用于在任务配置中使用表达式变量,逗号分割,需要使用类的全限定名,例如: com.dinky.common.utils.DateUtils,请确保类在Dinky的classpath中
sys.dolphinscheduler.settings.enable=是否启用 DolphinScheduler
sys.dolphinscheduler.settings.enable.note=是否启用 DolphinScheduler ,启用后才能使用 DolphinScheduler 的相关功能,请先填写下列配置项,完成后再开启此项配置, 另:请确保 DolphinScheduler 的相关配置正确
sys.dolphinscheduler.settings.url=DolphinScheduler 地址
Expand Down
63 changes: 20 additions & 43 deletions dinky-core/src/main/java/org/dinky/executor/VariableManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@

import org.dinky.assertion.Asserts;
import org.dinky.constant.FlinkSQLConstant;
import org.dinky.context.EngineContextHolder;
import org.dinky.data.constant.CommonConstant;
import org.dinky.data.exception.DinkyException;
import org.dinky.utils.StringUtil;

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
Expand All @@ -36,18 +37,14 @@
import org.apache.flink.util.StringUtils;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.io.resource.ResourceUtil;
import cn.hutool.core.lang.Dict;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.expression.engine.jexl.JexlEngine;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -64,41 +61,12 @@ public final class VariableManager {

public static final JexlEngine ENGINE = new JexlEngine();

public static final Dict ENGINE_CONTEXT = Dict.create();

/**
* load expression variable class
*/
private static void loadExpressionVariableClass() {
List<String> classLoaderVariableJexlClass = getClassLoaderVariableJexlClass();
if (CollUtil.isEmpty(classLoaderVariableJexlClass)) {
return;
}
classLoaderVariableJexlClass.forEach(fullClassName -> {
try {
String classSimpleName =
BeanUtil.getBeanDesc(Class.forName(fullClassName)).getSimpleName();
String snakeCaseClassName = StringUtil.toSnakeCase(true, classSimpleName);
ENGINE_CONTEXT.set(snakeCaseClassName, Class.forName(fullClassName));
log.info("load class : {}", fullClassName);
} catch (ClassNotFoundException e) {
log.error(
"The class [{}] that needs to be loaded may not be loaded by dinky or there is no jar file of this class under dinky's lib/plugins/extends. Please check, and try again. {}",
fullClassName,
e.getMessage(),
e);
}
});
}

public VariableManager() {
variables = new HashMap<>();
}

public static List<String> getClassLoaderVariableJexlClass() {
return Arrays.asList(ResourceUtil.readUtf8Str("dinky-loader/ExpressionVariableClass")
.replace("\r", "")
.split("\n"));
public VariableManager(Dict context) {
variables = new HashMap<>();
}

/**
Expand Down Expand Up @@ -171,14 +139,24 @@ public Object getVariable(String variableName) {
return variables.get(variableName);
}
// load expression variable class
loadExpressionVariableClass();
// use jexl to parse variable value
return ENGINE.eval(variableName, ENGINE_CONTEXT, null);
if (parseAndMatchExpressionVariable(variableName)) {
return ENGINE.eval(variableName, EngineContextHolder.getEngineContext(), null);
}
return null;
} catch (Exception e) {
throw new DinkyException(format("The variable name or jexl key of sql %s does not exist.", variableName));
}
}

public boolean parseAndMatchExpressionVariable(String variableName) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(variableName),
"sql variable name or jexl key cannot be null or empty.");
// key 格式是 dateUtil.getVariable("key") 按照这个格式解析 出 dateUtil
String substring = variableName.substring(0, variableName.indexOf("."));
return StrUtil.isNotBlank(EngineContextHolder.getEngineContext().getStr(substring));
}

/**
* Get a table result of sql under the given name. The sql variable name must be existed.
*
Expand Down Expand Up @@ -258,9 +236,8 @@ public String parseVariable(String statement) {
*
* @param statement A sql will be replaced.
*/
private String replaceVariable(String statement) {
Pattern p = Pattern.compile("\\$\\{(.+?)}");
Matcher m = p.matcher(statement);
public String replaceVariable(String statement) {
Matcher m = CommonConstant.GLOBAL_VARIABLE_PATTERN.matcher(statement);
StringBuffer sb = new StringBuffer();
while (m.find()) {
String key = m.group(1);
Expand Down
Loading

0 comments on commit cb81b4d

Please sign in to comment.