Skip to content

Commit

Permalink
Merge pull request DataLinkDC#90 from DataLinkDC/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
aiwenmo authored Jan 14, 2022
2 parents 0a22a92 + 5c6f8d2 commit 3b0948b
Show file tree
Hide file tree
Showing 47 changed files with 5,319 additions and 217 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,27 @@ Dinky(原 Dlink):
| | | 新增 选中片段执行 | 0.4.0 |
| | | 新增 布局拖拽 | 0.4.0 |
| | | 新增 SQL导出 | 0.5.0 |
| | | 新增 快捷键保存、校验、美化 | 0.5.0 |
| | | 支持 local 模式下 FlinkSQL 提交 | 0.4.0 |
| | | 支持 standalone 模式下 FlinkSQL 提交 | 0.4.0 |
| | | 支持 yarn session 模式下 FlinkSQL 提交 | 0.4.0 |
| | | 支持 yarn per-job 模式下 FlinkSQL 提交 | 0.4.0 |
| | | 支持 yarn application 模式下 FlinkSQL 提交 | 0.4.0 |
| | | 支持 kubernetes session 模式下 FlinkSQL 提交 | 0.5.0 |
| | | 支持 kubernetes application 模式下 FlinkSQL 提交 | 0.5.0 |
| | | 支持 UDF Java 方言Local模式在线编写、调试、动态加载 | 0.5.0 |
| | Flink 作业 | 支持 yarn application 模式下 Jar 提交 | 0.4.0 |
| | | 支持 k8s application 模式下 Jar 提交 | 0.5.0 |
| | | 支持 作业 Cancel | 0.4.0 |
| | | 支持 作业 SavePoint 的 Cancel、Stop、Trigger | 0.4.0 |
| | | 新增 作业自动从 SavePoint 恢复机制(包含最近、最早、指定一次) | 0.4.0 |
| | | 新增 UDF java方言代码的开发 | 0.5.0 |
| | Flink 集群 | 支持 查看已注册集群的作业列表与运维 | 0.4.0 |
| | | 新增 自动注册 Yarn 创建的集群 | 0.4.0 |
| | SQL | 新增 外部数据源的 SQL 校验 | 0.5.0 |
| | | 新增 外部数据源的 SQL 执行与预览 | 0.5.0 |
| | BI | 新增 折线图的渲染 | 0.5.0 |
| | | 新增 条形图图的渲染 | 0.5.0 |
| | | 新增 饼图的渲染 | 0.5.0 |
| | 元数据 | 新增 查询外部数据源的元数据信息 | 0.4.0 |
| | 归档 | 新增 执行与提交历史 | 0.4.0 |
| 运维中心 | 暂无 | 暂无 | 0.4.0 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public Result createTask(@RequestBody CatalogueTaskDTO catalogueTaskDTO) throws
}

/**
* 创建节点和作业
* 重命名节点和作业
*/
@PutMapping("/toRename")
public Result toRename(@RequestBody Catalogue catalogue) throws Exception {
Expand Down
2 changes: 2 additions & 0 deletions dlink-admin/src/main/java/com/dlink/service/TaskService.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ public interface TaskService extends ISuperService<Task> {
List<Task> listFlinkSQLEnv();

String exportSql(Integer id);

Task getUDFByClassName(String className);
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public boolean toRename(Catalogue catalogue) {
}else{
Task task = new Task();
task.setId(oldCatalogue.getTaskId());
task.setName(catalogue.getName());
task.setAlias(catalogue.getName());
taskService.updateById(task);
this.updateById(catalogue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.dlink.session.SessionInfo;
import com.dlink.session.SessionPool;
import com.dlink.utils.RunTimeUtil;
import com.dlink.utils.UDFUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -89,6 +90,7 @@ private JobResult executeFlinkSql(StudioExecuteDTO studioExecuteDTO) {
if(!config.isUseSession()) {
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), studioExecuteDTO.getClusterId()));
}
initUDF(config,studioExecuteDTO.getStatement());
JobManager jobManager = JobManager.build(config);
JobResult jobResult = jobManager.executeSql(studioExecuteDTO.getStatement());
RunTimeUtil.recovery(jobManager);
Expand Down Expand Up @@ -152,6 +154,7 @@ private List<SqlExplainResult> explainFlinkSql(StudioExecuteDTO studioExecuteDTO
if(!config.isUseSession()) {
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), studioExecuteDTO.getClusterId()));
}
initUDF(config,studioExecuteDTO.getStatement());
JobManager jobManager = JobManager.buildPlanMode(config);
return jobManager.explainSql(studioExecuteDTO.getStatement()).getSqlExplainResults();
}
Expand Down Expand Up @@ -317,4 +320,15 @@ public boolean savepoint(Integer clusterId, String jobId, String savePointType,S
}
return false;
}

private void initUDF(JobConfig config,String statement){
if(!GatewayType.LOCAL.equalsValue(config.getType())){
return;
}
List<String> udfClassNameList = JobManager.getUDFClassName(statement);
for(String item : udfClassNameList){
Task task = taskService.getUDFByClassName(item);
JobManager.initUDF(item,task.getStatement());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.dlink.mapper.TaskMapper;
import com.dlink.model.*;
import com.dlink.service.*;
import com.dlink.utils.CustomStringJavaCompiler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
Expand Down Expand Up @@ -100,6 +101,11 @@ public Task getTaskInfoById(Integer id) {

@Override
public boolean saveOrUpdateTask(Task task) {
if(Asserts.isNotNullString(task.getDialect()) && Dialect.JAVA.equalsVal(task.getDialect())
&& Asserts.isNotNullString(task.getStatement()) ){
CustomStringJavaCompiler compiler = new CustomStringJavaCompiler(task.getStatement());
task.setSavePointPath(compiler.getFullClassName());
}
if (task.getId() != null) {
this.updateById(task);
if (task.getStatement() != null) {
Expand Down Expand Up @@ -151,6 +157,14 @@ public String exportSql(Integer id) {
}
}

@Override
public Task getUDFByClassName(String className) {
Task task = getOne(new QueryWrapper<Task>().eq("dialect", "Java").eq("enabled", 1).eq("save_point_path", className));
Assert.check(task);
task.setStatement(statementService.getById(task.getId()).getStatement());
return task;
}

private JobConfig buildJobConfig(Task task){
boolean isJarTask = isJarTask(task);
if(!isJarTask&&Asserts.isNotNull(task.getEnvId())){
Expand Down
Loading

0 comments on commit 3b0948b

Please sign in to comment.