Skip to content

Commit

Permalink
Merge pull request DataLinkDC#55 from DataLinkDC/dev
Browse files Browse the repository at this point in the history
数据源优化和官网文档
  • Loading branch information
aiwenmo authored Dec 18, 2021
2 parents e59307d + 490f7ff commit 2eca50c
Show file tree
Hide file tree
Showing 71 changed files with 2,034 additions and 119 deletions.
16 changes: 9 additions & 7 deletions dlink-admin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<!--<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
</dependency>
</dependency>-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
Expand All @@ -97,10 +97,6 @@
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
</dependency>
<!--<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-core</artifactId>
</dependency>-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down Expand Up @@ -130,6 +126,12 @@
<groupId>com.dlink</groupId>
<artifactId>dlink-gateway</artifactId>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-metadata-clickhouse</artifactId>
<version>0.5.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down Expand Up @@ -210,4 +212,4 @@
</plugins>
<finalName>${project.artifactId}-${project.version}</finalName>
</build>
</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public Result explainSql(@RequestBody StudioExecuteDTO studioExecuteDTO) {
}

/**
* 解释Sql
* 获取执行图
*/
@PostMapping("/getStreamGraph")
public Result getStreamGraph(@RequestBody StudioExecuteDTO studioExecuteDTO) {
Expand Down
2 changes: 2 additions & 0 deletions dlink-admin/src/main/java/com/dlink/dto/CatalogueTaskDTO.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.dlink.dto;

import com.dlink.config.Dialect;
import lombok.Getter;
import lombok.Setter;

Expand All @@ -17,4 +18,5 @@ public class CatalogueTaskDTO {
private boolean isLeaf;
private String name;
private String alias;
private String dialect = Dialect.DEFAULT.getValue();
}
2 changes: 2 additions & 0 deletions dlink-admin/src/main/java/com/dlink/dto/StudioExecuteDTO.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
public class StudioExecuteDTO {
// RUN_MODE
private String type;
private String dialect;
private boolean useResult;
private boolean statementSet;
private boolean useSession;
private String session;
private boolean useRemote;
private Integer clusterId;
private Integer clusterConfigurationId;
private Integer databaseId;
private Integer jarId;
private boolean fragment;
private String statement;
Expand Down
4 changes: 4 additions & 0 deletions dlink-admin/src/main/java/com/dlink/model/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class Task extends SuperEntity{
@TableField(fill = FieldFill.INSERT)
private String alias;

private String dialect;

private String type;

private Integer checkPoint;
Expand All @@ -50,6 +52,8 @@ public class Task extends SuperEntity{

private Integer clusterConfigurationId;

private Integer databaseId;

private Integer jarId;

private String configJson;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.dlink.service.impl;

import com.dlink.assertion.Assert;
import com.dlink.assertion.Asserts;
import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.dto.CatalogueTaskDTO;
import com.dlink.mapper.CatalogueMapper;
Expand Down Expand Up @@ -40,11 +41,13 @@ public Catalogue createCatalogueAndTask(CatalogueTaskDTO catalogueTaskDTO) {
Task task = new Task();
task.setName(catalogueTaskDTO.getName());
task.setAlias(catalogueTaskDTO.getAlias());
task.setDialect(catalogueTaskDTO.getDialect());
taskService.saveOrUpdateTask(task);
Catalogue catalogue = new Catalogue();
catalogue.setName(catalogueTaskDTO.getAlias());
catalogue.setIsLeaf(true);
catalogue.setTaskId(task.getId());
catalogue.setType(catalogueTaskDTO.getDialect());
catalogue.setParentId(catalogueTaskDTO.getParentId());
this.save(catalogue);
return catalogue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.dlink.api.FlinkAPI;
import com.dlink.assertion.Asserts;
import com.dlink.config.Dialect;
import com.dlink.dto.SessionDTO;
import com.dlink.dto.StudioDDLDTO;
import com.dlink.dto.StudioExecuteDTO;
Expand All @@ -14,16 +15,15 @@
import com.dlink.job.JobConfig;
import com.dlink.job.JobManager;
import com.dlink.job.JobResult;
import com.dlink.metadata.driver.Driver;
import com.dlink.metadata.result.JdbcSelectResult;
import com.dlink.model.Cluster;
import com.dlink.model.DataBase;
import com.dlink.model.Savepoints;
import com.dlink.model.SystemConfiguration;
import com.dlink.result.IResult;
import com.dlink.result.SelectResult;
import com.dlink.result.SqlExplainResult;
import com.dlink.service.ClusterConfigurationService;
import com.dlink.service.ClusterService;
import com.dlink.service.SavepointsService;
import com.dlink.service.StudioService;
import com.dlink.service.*;
import com.dlink.session.SessionConfig;
import com.dlink.session.SessionInfo;
import com.dlink.session.SessionPool;
Expand Down Expand Up @@ -59,9 +59,19 @@ public class StudioServiceImpl implements StudioService {
private ClusterConfigurationService clusterConfigurationService;
@Autowired
private SavepointsService savepointsService;
@Autowired
private DataBaseService dataBaseService;

@Override
public JobResult executeSql(StudioExecuteDTO studioExecuteDTO) {
if(Dialect.SQL.equalsVal(studioExecuteDTO.getDialect())){
return executeCommonSql(studioExecuteDTO);
}else{
return executeFlinkSql(studioExecuteDTO);
}
}

private JobResult executeFlinkSql(StudioExecuteDTO studioExecuteDTO) {
JobConfig config = studioExecuteDTO.getJobConfig();
// If you are using a shared session, configure the current jobmanager address
if(!config.isUseSession()) {
Expand All @@ -73,6 +83,38 @@ public JobResult executeSql(StudioExecuteDTO studioExecuteDTO) {
return jobResult;
}

private JobResult executeCommonSql(StudioExecuteDTO studioExecuteDTO) {
JobResult result = new JobResult();
result.setStatement(studioExecuteDTO.getStatement());
result.setStartTime(LocalDateTime.now());
if(Asserts.isNull(studioExecuteDTO.getDatabaseId())){
result.setSuccess(false);
result.setError("请指定数据源");
result.setEndTime(LocalDateTime.now());
return result;
}else{
DataBase dataBase = dataBaseService.getById(studioExecuteDTO.getDatabaseId());
if(Asserts.isNull(dataBase)){
result.setSuccess(false);
result.setError("数据源不存在");
result.setEndTime(LocalDateTime.now());
return result;
}
Driver driver = Driver.build(dataBase.getDriverConfig()).connect();
JdbcSelectResult selectResult = driver.query(studioExecuteDTO.getStatement(),studioExecuteDTO.getMaxRowNum());
driver.close();
result.setResult(selectResult);
if(selectResult.isSuccess()){
result.setSuccess(true);
}else{
result.setSuccess(false);
result.setError(selectResult.getError());
}
result.setEndTime(LocalDateTime.now());
return result;
}
}

@Override
public IResult executeDDL(StudioDDLDTO studioDDLDTO) {
JobConfig config = studioDDLDTO.getJobConfig();
Expand All @@ -85,6 +127,14 @@ public IResult executeDDL(StudioDDLDTO studioDDLDTO) {

@Override
public List<SqlExplainResult> explainSql(StudioExecuteDTO studioExecuteDTO) {
if( Dialect.SQL.equalsVal(studioExecuteDTO.getDialect())){
return explainCommonSql(studioExecuteDTO);
}else{
return explainFlinkSql(studioExecuteDTO);
}
}

private List<SqlExplainResult> explainFlinkSql(StudioExecuteDTO studioExecuteDTO) {
JobConfig config = studioExecuteDTO.getJobConfig();
if(!config.isUseSession()) {
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), studioExecuteDTO.getClusterId()));
Expand All @@ -93,6 +143,25 @@ public List<SqlExplainResult> explainSql(StudioExecuteDTO studioExecuteDTO) {
return jobManager.explainSql(studioExecuteDTO.getStatement()).getSqlExplainResults();
}

private List<SqlExplainResult> explainCommonSql(StudioExecuteDTO studioExecuteDTO) {
if(Asserts.isNull(studioExecuteDTO.getDatabaseId())){
return new ArrayList<SqlExplainResult>(){{
add(SqlExplainResult.fail(studioExecuteDTO.getStatement(),"请指定数据源"));
}};
}else{
DataBase dataBase = dataBaseService.getById(studioExecuteDTO.getDatabaseId());
if(Asserts.isNull(dataBase)){
return new ArrayList<SqlExplainResult>(){{
add(SqlExplainResult.fail(studioExecuteDTO.getStatement(),"数据源不存在"));
}};
}
Driver driver = Driver.build(dataBase.getDriverConfig()).connect();
List<SqlExplainResult> sqlExplainResults = driver.explain(studioExecuteDTO.getStatement());
driver.close();
return sqlExplainResults;
}
}

@Override
public ObjectNode getStreamGraph(StudioExecuteDTO studioExecuteDTO) {
JobConfig config = studioExecuteDTO.getJobConfig();
Expand Down
4 changes: 3 additions & 1 deletion dlink-admin/src/main/resources/mapper/TaskMapper.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
<id column="id" property="id" />
<result column="name" property="name" />
<result column="alias" property="alias" />
<result column="dialect" property="dialect" />
<result column="type" property="type" />
<result column="check_point" property="checkPoint" />
<result column="save_point_strategy" property="savePointStrategy" />
Expand All @@ -16,6 +17,7 @@
<result column="statement_set" property="statementSet" />
<result column="cluster_id" property="clusterId" />
<result column="cluster_configuration_id" property="clusterConfigurationId" />
<result column="database_id" property="databaseId" />
<result column="jar_id" property="jarId" />
<result column="config_json" property="configJson" />
<result column="note" property="note" />
Expand All @@ -26,7 +28,7 @@

<!-- 通用查询结果列 -->
<sql id="Base_Column_List">
id, name, alias, type,check_point,save_point_strategy,save_point_path, parallelism,fragment,statement_set,cluster_id,cluster_configuration_id,jar_id,config_json,note, enabled, create_time, update_time
id, name, alias,dialect, type,check_point,save_point_strategy,save_point_path, parallelism,fragment,statement_set,cluster_id,cluster_configuration_id,database_id,jar_id,config_json,note, enabled, create_time, update_time
</sql>


Expand Down
49 changes: 49 additions & 0 deletions dlink-common/src/main/java/com/dlink/result/AbstractResult.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.dlink.result;

import java.time.LocalDateTime;

/**
* AbstractResult
*
* @author wenmo
* @since 2021/6/29 22:49
*/
public class AbstractResult {

protected boolean success;
protected LocalDateTime startTime;
protected LocalDateTime endTime;
protected String error;

public void setStartTime(LocalDateTime startTime){
this.startTime = startTime;
}

public boolean isSuccess() {
return success;
}

public void setSuccess(boolean success) {
this.success = success;
}

public LocalDateTime getStartTime() {
return startTime;
}

public LocalDateTime getEndTime() {
return endTime;
}

public void setEndTime(LocalDateTime endTime) {
this.endTime = endTime;
}

public String getError() {
return error;
}

public void setError(String error) {
this.error = error;
}
}
23 changes: 23 additions & 0 deletions dlink-common/src/main/java/com/dlink/result/SqlExplainResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,29 @@ public class SqlExplainResult {
private boolean explainTrue;
private LocalDateTime explainTime;

public SqlExplainResult() {
}

public SqlExplainResult(Integer index, String type, String sql, String parse, String explain, String error, boolean parseTrue, boolean explainTrue, LocalDateTime explainTime) {
this.index = index;
this.type = type;
this.sql = sql;
this.parse = parse;
this.explain = explain;
this.error = error;
this.parseTrue = parseTrue;
this.explainTrue = explainTrue;
this.explainTime = explainTime;
}

public static SqlExplainResult success(String type,String sql,String explain){
return new SqlExplainResult(1,type,sql,null,explain,null,true,true,LocalDateTime.now());
}

public static SqlExplainResult fail(String sql,String error){
return new SqlExplainResult(1,null,sql,null,null,error,false,false,LocalDateTime.now());
}

public Integer getIndex() {
return index;
}
Expand Down
40 changes: 40 additions & 0 deletions dlink-core/src/main/java/com/dlink/config/Dialect.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.dlink.config;

import com.dlink.assertion.Asserts;

/**
* Dialect
*
* @author wenmo
* @since 2021/12/13
**/
public enum Dialect {

FLINKSQL("FlinkSql"),SQL("Sql"),JAVA("Java");

private String value;

public static final Dialect DEFAULT = Dialect.FLINKSQL;

Dialect(String value) {
this.value = value;
}

public String getValue() {
return value;
}

public boolean equalsVal(String valueText){
return Asserts.isEqualsIgnoreCase(value,valueText);
}

public static Dialect get(String value){
for (Dialect type : Dialect.values()) {
if(Asserts.isEqualsIgnoreCase(type.getValue(),value)){
return type;
}
}
return Dialect.FLINKSQL;
}
}

Loading

0 comments on commit 2eca50c

Please sign in to comment.