Skip to content

Commit

Permalink
优化SET和新增作业配置其他配置低优先级实现
Browse files Browse the repository at this point in the history
  • Loading branch information
aiwenmo committed Nov 29, 2021
1 parent 439a8ea commit 692d55f
Show file tree
Hide file tree
Showing 14 changed files with 107 additions and 31 deletions.
29 changes: 27 additions & 2 deletions dlink-admin/src/main/java/com/dlink/model/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@
import com.baomidou.mybatisplus.annotation.FieldFill;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import com.dlink.assertion.Asserts;
import com.dlink.db.model.SuperEntity;
import com.dlink.job.JobConfig;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Data;
import lombok.EqualsAndHashCode;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* 任务
Expand Down Expand Up @@ -46,7 +52,7 @@ public class Task extends SuperEntity{

private Integer jarId;

private String config;
private String configJson;

private String note;

Expand All @@ -59,6 +65,21 @@ public class Task extends SuperEntity{
@TableField(exist = false)
private List<Savepoints> savepoints;

@TableField(exist = false)
private List<Map<String,String>> config = new ArrayList<>();


public List<Map<String,String>> parseConfig(){
ObjectMapper objectMapper = new ObjectMapper();
try {
if(Asserts.isNotNullString(configJson)) {
config = objectMapper.readValue(configJson, ArrayList.class);
}
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return config;
}
/*public ExecutorSetting buildExecutorSetting(){
HashMap configMap = new HashMap();
if(config!=null&&!"".equals(clusterName)) {
Expand All @@ -72,7 +93,11 @@ public JobConfig buildSubmitConfig(){
if(clusterId==null||clusterId==0){
useRemote = false;
}
return new JobConfig(type,false,false,useRemote,clusterId,clusterConfigurationId,jarId,getId(),alias,fragment,statementSet,checkPoint,parallelism,savePointStrategy,savePointPath);
Map<String,String> map = new HashMap<>();
for(Map<String,String> item : config){
map.put(item.get("key"),item.get("value"));
}
return new JobConfig(type,false,false,useRemote,clusterId,clusterConfigurationId,jarId,getId(),alias,fragment,statementSet,checkPoint,parallelism,savePointStrategy,savePointPath,map);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ private String buildParas(Integer id) {

@Override
public JobResult submitByTaskId(Integer id) {
Task task = this.getById(id);
Task task = this.getTaskInfoById(id);
Assert.check(task);
boolean isJarTask = isJarTask(task);
Statement statement = null;
/*Statement statement = null;
if(!isJarTask){
statement = statementService.getById(id);
Assert.check(statement);
}
}*/
JobConfig config = task.buildSubmitConfig();
if (!JobManager.useGateway(config.getType())) {
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), task.getClusterId()));
Expand Down Expand Up @@ -100,7 +100,7 @@ public JobResult submitByTaskId(Integer id) {
}
JobManager jobManager = JobManager.build(config);
if(!isJarTask) {
return jobManager.executeSql(statement.getStatement());
return jobManager.executeSql(task.getStatement());
}else{
return jobManager.executeJar();
}
Expand All @@ -114,6 +114,7 @@ private boolean isJarTask(Task task){
public Task getTaskInfoById(Integer id) {
Task task = this.getById(id);
if (task != null) {
task.parseConfig();
Statement statement = statementService.getById(id);
if (task.getClusterId() != null) {
Cluster cluster = clusterService.getById(task.getClusterId());
Expand Down
4 changes: 2 additions & 2 deletions dlink-admin/src/main/resources/mapper/TaskMapper.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<result column="cluster_id" property="clusterId" />
<result column="cluster_configuration_id" property="clusterConfigurationId" />
<result column="jar_id" property="jarId" />
<result column="config" property="config" />
<result column="config_json" property="configJson" />
<result column="note" property="note" />
<result column="enabled" property="enabled" />
<result column="create_time" property="createTime" />
Expand All @@ -26,7 +26,7 @@

<!-- 通用查询结果列 -->
<sql id="Base_Column_List">
id, name, alias, type,check_point,save_point_strategy,save_point_path, parallelism,fragment,statement_set,cluster_id,cluster_configuration_id,jar_id,config,note, enabled, create_time, update_time
id, name, alias, type,check_point,save_point_strategy,save_point_path, parallelism,fragment,statement_set,cluster_id,cluster_configuration_id,jar_id,config_json,note, enabled, create_time, update_time
</sql>


Expand Down
6 changes: 4 additions & 2 deletions dlink-core/src/main/java/com/dlink/job/JobConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ public JobConfig(String type,boolean useResult, boolean useSession, String sessi

public JobConfig(String type,boolean useResult, boolean useSession, boolean useRemote, Integer clusterId,
Integer clusterConfigurationId, Integer jarId, Integer taskId, String jobName, boolean useSqlFragment,
boolean useStatementSet,Integer checkpoint, Integer parallelism, Integer savePointStrategyValue, String savePointPath) {
boolean useStatementSet,Integer checkpoint, Integer parallelism, Integer savePointStrategyValue,
String savePointPath,Map<String,String> config) {
this.type = type;
this.useResult = useResult;
this.useSession = useSession;
Expand All @@ -99,10 +100,11 @@ public JobConfig(String type,boolean useResult, boolean useSession, boolean useR
this.parallelism = parallelism;
this.savePointStrategy = SavePointStrategy.get(savePointStrategyValue);
this.savePointPath = savePointPath;
this.config = config;
}

public ExecutorSetting getExecutorSetting(){
return new ExecutorSetting(checkpoint,parallelism,useSqlFragment,savePointPath,jobName);
return new ExecutorSetting(checkpoint,parallelism,useSqlFragment,savePointPath,jobName,config);
}

public void setSessionConfig(SessionConfig sessionConfig){
Expand Down
2 changes: 1 addition & 1 deletion dlink-doc/sql/dlink.sql
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ CREATE TABLE `dlink_task` (
`cluster_id` int(11) NULL DEFAULT NULL COMMENT 'Flink集群ID',
`cluster_configuration_id` int(11) NULL DEFAULT NULL COMMENT '集群配置ID',
`jar_id` int(11) NULL DEFAULT NULL COMMENT 'jarID',
`config` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '配置',
`config_json` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '配置JSON',
`note` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '注释',
`enabled` tinyint(1) NOT NULL DEFAULT 1 COMMENT '是否启用',
`create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间',
Expand Down
6 changes: 6 additions & 0 deletions dlink-doc/sql/dlink_history.sql
Original file line number Diff line number Diff line change
Expand Up @@ -465,4 +465,10 @@ CREATE TABLE `dlink_user` (
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
INSERT INTO `dlink_user`(`id`, `username`, `password`, `nickname`, `worknum`, `avatar`, `mobile`, `enabled`, `is_delete`, `create_time`, `update_time`) VALUES (1, 'admin', '21232f297a57a5a743894a0e4a801fc3', 'Admin', NULL, NULL, NULL, 1, 0, '2021-11-28 17:19:27', '2021-11-28 17:19:31');

-- ----------------------------
-- 0.4.0 2021-11-29
-- ----------------------------
ALTER TABLE `dlink_task`
CHANGE COLUMN `config` `config_json` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '配置JSON' AFTER `jar_id`;

SET FOREIGN_KEY_CHECKS = 1;
43 changes: 43 additions & 0 deletions dlink-executor/src/main/java/com/dlink/executor/Executor.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.ExplainDetail;
Expand All @@ -14,7 +15,11 @@
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.command.ResetOperation;
import org.apache.flink.table.operations.command.SetOperation;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -99,6 +104,10 @@ private void initEnvironment(){
if(executorSetting.getParallelism()!=null&&executorSetting.getParallelism()>0){
environment.setParallelism(executorSetting.getParallelism());
}
if(executorSetting.getConfig()!=null) {
Configuration configuration = Configuration.fromMap(executorSetting.getConfig());
environment.getConfig().configure(configuration, null);
}
}

private void updateEnvironment(ExecutorSetting executorSetting){
Expand All @@ -108,6 +117,10 @@ private void updateEnvironment(ExecutorSetting executorSetting){
if(executorSetting.getParallelism()!=null&&executorSetting.getParallelism()>0){
environment.setParallelism(executorSetting.getParallelism());
}
if(executorSetting.getConfig()!=null) {
Configuration configuration = Configuration.fromMap(executorSetting.getConfig());
environment.getConfig().configure(configuration, null);
}
}

private void initStreamExecutionEnvironment(){
Expand Down Expand Up @@ -227,4 +240,34 @@ public void submitSql(String statements){
public void submitStatementSet(List<String> statements){
executeStatementSet(statements);
}

public boolean parseAndLoadConfiguration(String statement){
List<Operation> operations = stEnvironment.getParser().parse(statement);
for(Operation operation : operations){
if(operation instanceof SetOperation){
callSet((SetOperation)operation);
return true;
} else if (operation instanceof ResetOperation){
callReset((ResetOperation)operation);
return true;
}
}
return false;
}

private void callSet(SetOperation setOperation){
if (setOperation.getKey().isPresent() && setOperation.getValue().isPresent()) {
String key = setOperation.getKey().get().trim();
String value = setOperation.getValue().get().trim();
Map<String,String> confMap = new HashMap<>();
confMap.put(key,value);
Configuration configuration = Configuration.fromMap(confMap);
environment.getConfig().configure(configuration,null);
stEnvironment.getConfig().addConfiguration(configuration);
}
}

private void callReset(ResetOperation resetOperation) {
// to do nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ public static String pretreatStatement(Executor executor, String statement) {
return statement.trim();
}

// return false to continue with executeSql
public static boolean build(Executor executor, String statement) {
if(executor.parseAndLoadConfiguration(statement)){
return true;
}
Operation operation = Operations.buildOperation(statement);
if (Asserts.isNotNull(operation)) {
operation.build(executor.getCustomTableEnvironmentImpl());
Expand Down
4 changes: 2 additions & 2 deletions dlink-executor/src/main/java/com/dlink/trans/Operations.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
public class Operations {

private static Operation[] operations = {
new CreateAggTableOperation(),
new SetOperation()
new CreateAggTableOperation()
// , new SetOperation()
};

public static SqlType getSqlTypeFromStatements(String statement){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* @author wenmo
* @since 2021/10/21 19:56
**/
@Deprecated
public class SetOperation extends AbstractOperation implements Operation {

private String KEY_WORD = "SET";
Expand Down
17 changes: 2 additions & 15 deletions dlink-web/src/components/Studio/StudioMenu/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,9 @@ const StudioMenu = (props: any) => {
let param = {
useSession: useSession,
session: currentSession.session,
useRemote: current.task.useRemote,
clusterId: current.task.clusterId,
useResult: current.task.useResult,
maxRowNum: current.task.maxRowNum,
statement: selectsql,
fragment: current.task.fragment,
jobName: current.task.jobName,
parallelism: current.task.parallelism,
checkPoint: current.task.checkPoint,
savePointStrategy: current.task.savePointStrategy,
savePointPath: current.task.savePointPath,
configJson: JSON.stringify(current.task.config),
...current.task,
};
const key = current.key;
const taskKey = (Math.random() * 1000) + '';
Expand All @@ -82,11 +74,6 @@ const StudioMenu = (props: any) => {
let newTabs = tabs;
for (let i = 0; i < newTabs.panes.length; i++) {
if (newTabs.panes[i].key == key) {
/*let newResult = newTabs.panes[i].console.result;
newResult.unshift(res.datas);
newTabs.panes[i].console={
result:newResult,
};*/
newTabs.panes[i].console.result = res.datas;
break;
}
Expand Down
3 changes: 1 addition & 2 deletions dlink-web/src/components/Studio/StudioTree/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,10 @@ const StudioTree: React.FC<StudioTreeProps> = (props) => {
session:'',
maxRowNum: 100,
jobName:node.name,
config: [],
useResult:false,
useSession:false,
useRemote:true,
...result.datas
...result.datas,
},
console:{
result:[],
Expand Down
4 changes: 3 additions & 1 deletion dlink-web/src/pages/FlinkSqlStudio/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,9 @@ const Model: ModelType = {

effects: {
* saveTask({payload}, {call, put}) {
yield call(handleAddOrUpdate, 'api/task', payload);
let para = payload;
para.configJson = JSON.stringify(payload.config);
yield call(handleAddOrUpdate, 'api/task', para);
yield put({
type: 'saveTaskData',
payload,
Expand Down
6 changes: 6 additions & 0 deletions dlink-web/src/pages/Welcome.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,12 @@ export default (): React.ReactNode => {
<li>
<Link>新增用户管理模块</Link>
</li>
<li>
<Link>优化 SET 语法</Link>
</li>
<li>
<Link>新增作业配置的其他配置低优先级加载实现</Link>
</li>
</ul>
</Paragraph>
</Timeline.Item>
Expand Down

0 comments on commit 692d55f

Please sign in to comment.