diff --git a/dlink-admin/pom.xml b/dlink-admin/pom.xml index 91e6b6e01e..f136c82920 100644 --- a/dlink-admin/pom.xml +++ b/dlink-admin/pom.xml @@ -76,11 +76,11 @@ - + mysql mysql-connector-java @@ -97,10 +97,6 @@ org.hibernate hibernate-validator - junit junit @@ -130,6 +126,12 @@ com.dlink dlink-gateway + + com.dlink + dlink-metadata-clickhouse + 0.5.0-SNAPSHOT + provided + @@ -210,4 +212,4 @@ ${project.artifactId}-${project.version} - \ No newline at end of file + diff --git a/dlink-admin/src/main/java/com/dlink/controller/StudioController.java b/dlink-admin/src/main/java/com/dlink/controller/StudioController.java index 40bf316515..75988d50e3 100644 --- a/dlink-admin/src/main/java/com/dlink/controller/StudioController.java +++ b/dlink-admin/src/main/java/com/dlink/controller/StudioController.java @@ -48,7 +48,7 @@ public Result explainSql(@RequestBody StudioExecuteDTO studioExecuteDTO) { } /** - * 解释Sql + * 获取执行图 */ @PostMapping("/getStreamGraph") public Result getStreamGraph(@RequestBody StudioExecuteDTO studioExecuteDTO) { diff --git a/dlink-admin/src/main/java/com/dlink/dto/CatalogueTaskDTO.java b/dlink-admin/src/main/java/com/dlink/dto/CatalogueTaskDTO.java index afc8e8387b..eb60efafa1 100644 --- a/dlink-admin/src/main/java/com/dlink/dto/CatalogueTaskDTO.java +++ b/dlink-admin/src/main/java/com/dlink/dto/CatalogueTaskDTO.java @@ -1,5 +1,6 @@ package com.dlink.dto; +import com.dlink.config.Dialect; import lombok.Getter; import lombok.Setter; @@ -17,4 +18,5 @@ public class CatalogueTaskDTO { private boolean isLeaf; private String name; private String alias; + private String dialect = Dialect.DEFAULT.getValue(); } diff --git a/dlink-admin/src/main/java/com/dlink/dto/StudioExecuteDTO.java b/dlink-admin/src/main/java/com/dlink/dto/StudioExecuteDTO.java index ced4739154..62ffed2b4c 100644 --- a/dlink-admin/src/main/java/com/dlink/dto/StudioExecuteDTO.java +++ b/dlink-admin/src/main/java/com/dlink/dto/StudioExecuteDTO.java @@ -22,6 +22,7 @@ public class StudioExecuteDTO { // RUN_MODE private String type; + private String dialect; private boolean useResult; private boolean statementSet; private boolean useSession; @@ -29,6 +30,7 @@ public class StudioExecuteDTO { private boolean useRemote; private Integer clusterId; private Integer clusterConfigurationId; + private Integer databaseId; private Integer jarId; private boolean fragment; private String statement; diff --git a/dlink-admin/src/main/java/com/dlink/model/Task.java b/dlink-admin/src/main/java/com/dlink/model/Task.java index de985d1cb9..45db7cb74e 100644 --- a/dlink-admin/src/main/java/com/dlink/model/Task.java +++ b/dlink-admin/src/main/java/com/dlink/model/Task.java @@ -32,6 +32,8 @@ public class Task extends SuperEntity{ @TableField(fill = FieldFill.INSERT) private String alias; + private String dialect; + private String type; private Integer checkPoint; @@ -50,6 +52,8 @@ public class Task extends SuperEntity{ private Integer clusterConfigurationId; + private Integer databaseId; + private Integer jarId; private String configJson; diff --git a/dlink-admin/src/main/java/com/dlink/service/impl/CatalogueServiceImpl.java b/dlink-admin/src/main/java/com/dlink/service/impl/CatalogueServiceImpl.java index 28af781139..9ce84ea48b 100644 --- a/dlink-admin/src/main/java/com/dlink/service/impl/CatalogueServiceImpl.java +++ b/dlink-admin/src/main/java/com/dlink/service/impl/CatalogueServiceImpl.java @@ -1,6 +1,7 @@ package com.dlink.service.impl; import com.dlink.assertion.Assert; +import com.dlink.assertion.Asserts; import com.dlink.db.service.impl.SuperServiceImpl; import com.dlink.dto.CatalogueTaskDTO; import com.dlink.mapper.CatalogueMapper; @@ -40,11 +41,13 @@ public Catalogue createCatalogueAndTask(CatalogueTaskDTO catalogueTaskDTO) { Task task = new Task(); task.setName(catalogueTaskDTO.getName()); task.setAlias(catalogueTaskDTO.getAlias()); + task.setDialect(catalogueTaskDTO.getDialect()); taskService.saveOrUpdateTask(task); Catalogue catalogue = new Catalogue(); catalogue.setName(catalogueTaskDTO.getAlias()); catalogue.setIsLeaf(true); catalogue.setTaskId(task.getId()); + catalogue.setType(catalogueTaskDTO.getDialect()); catalogue.setParentId(catalogueTaskDTO.getParentId()); this.save(catalogue); return catalogue; diff --git a/dlink-admin/src/main/java/com/dlink/service/impl/StudioServiceImpl.java b/dlink-admin/src/main/java/com/dlink/service/impl/StudioServiceImpl.java index 51dc0f3882..07d47510a5 100644 --- a/dlink-admin/src/main/java/com/dlink/service/impl/StudioServiceImpl.java +++ b/dlink-admin/src/main/java/com/dlink/service/impl/StudioServiceImpl.java @@ -2,6 +2,7 @@ import com.dlink.api.FlinkAPI; import com.dlink.assertion.Asserts; +import com.dlink.config.Dialect; import com.dlink.dto.SessionDTO; import com.dlink.dto.StudioDDLDTO; import com.dlink.dto.StudioExecuteDTO; @@ -14,16 +15,15 @@ import com.dlink.job.JobConfig; import com.dlink.job.JobManager; import com.dlink.job.JobResult; +import com.dlink.metadata.driver.Driver; +import com.dlink.metadata.result.JdbcSelectResult; import com.dlink.model.Cluster; +import com.dlink.model.DataBase; import com.dlink.model.Savepoints; -import com.dlink.model.SystemConfiguration; import com.dlink.result.IResult; import com.dlink.result.SelectResult; import com.dlink.result.SqlExplainResult; -import com.dlink.service.ClusterConfigurationService; -import com.dlink.service.ClusterService; -import com.dlink.service.SavepointsService; -import com.dlink.service.StudioService; +import com.dlink.service.*; import com.dlink.session.SessionConfig; import com.dlink.session.SessionInfo; import com.dlink.session.SessionPool; @@ -59,9 +59,19 @@ public class StudioServiceImpl implements StudioService { private ClusterConfigurationService clusterConfigurationService; @Autowired private SavepointsService savepointsService; + @Autowired + private DataBaseService dataBaseService; @Override public JobResult executeSql(StudioExecuteDTO studioExecuteDTO) { + if(Dialect.SQL.equalsVal(studioExecuteDTO.getDialect())){ + return executeCommonSql(studioExecuteDTO); + }else{ + return executeFlinkSql(studioExecuteDTO); + } + } + + private JobResult executeFlinkSql(StudioExecuteDTO studioExecuteDTO) { JobConfig config = studioExecuteDTO.getJobConfig(); // If you are using a shared session, configure the current jobmanager address if(!config.isUseSession()) { @@ -73,6 +83,38 @@ public JobResult executeSql(StudioExecuteDTO studioExecuteDTO) { return jobResult; } + private JobResult executeCommonSql(StudioExecuteDTO studioExecuteDTO) { + JobResult result = new JobResult(); + result.setStatement(studioExecuteDTO.getStatement()); + result.setStartTime(LocalDateTime.now()); + if(Asserts.isNull(studioExecuteDTO.getDatabaseId())){ + result.setSuccess(false); + result.setError("请指定数据源"); + result.setEndTime(LocalDateTime.now()); + return result; + }else{ + DataBase dataBase = dataBaseService.getById(studioExecuteDTO.getDatabaseId()); + if(Asserts.isNull(dataBase)){ + result.setSuccess(false); + result.setError("数据源不存在"); + result.setEndTime(LocalDateTime.now()); + return result; + } + Driver driver = Driver.build(dataBase.getDriverConfig()).connect(); + JdbcSelectResult selectResult = driver.query(studioExecuteDTO.getStatement(),studioExecuteDTO.getMaxRowNum()); + driver.close(); + result.setResult(selectResult); + if(selectResult.isSuccess()){ + result.setSuccess(true); + }else{ + result.setSuccess(false); + result.setError(selectResult.getError()); + } + result.setEndTime(LocalDateTime.now()); + return result; + } + } + @Override public IResult executeDDL(StudioDDLDTO studioDDLDTO) { JobConfig config = studioDDLDTO.getJobConfig(); @@ -85,6 +127,14 @@ public IResult executeDDL(StudioDDLDTO studioDDLDTO) { @Override public List explainSql(StudioExecuteDTO studioExecuteDTO) { + if( Dialect.SQL.equalsVal(studioExecuteDTO.getDialect())){ + return explainCommonSql(studioExecuteDTO); + }else{ + return explainFlinkSql(studioExecuteDTO); + } + } + + private List explainFlinkSql(StudioExecuteDTO studioExecuteDTO) { JobConfig config = studioExecuteDTO.getJobConfig(); if(!config.isUseSession()) { config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), studioExecuteDTO.getClusterId())); @@ -93,6 +143,25 @@ public List explainSql(StudioExecuteDTO studioExecuteDTO) { return jobManager.explainSql(studioExecuteDTO.getStatement()).getSqlExplainResults(); } + private List explainCommonSql(StudioExecuteDTO studioExecuteDTO) { + if(Asserts.isNull(studioExecuteDTO.getDatabaseId())){ + return new ArrayList(){{ + add(SqlExplainResult.fail(studioExecuteDTO.getStatement(),"请指定数据源")); + }}; + }else{ + DataBase dataBase = dataBaseService.getById(studioExecuteDTO.getDatabaseId()); + if(Asserts.isNull(dataBase)){ + return new ArrayList(){{ + add(SqlExplainResult.fail(studioExecuteDTO.getStatement(),"数据源不存在")); + }}; + } + Driver driver = Driver.build(dataBase.getDriverConfig()).connect(); + List sqlExplainResults = driver.explain(studioExecuteDTO.getStatement()); + driver.close(); + return sqlExplainResults; + } + } + @Override public ObjectNode getStreamGraph(StudioExecuteDTO studioExecuteDTO) { JobConfig config = studioExecuteDTO.getJobConfig(); diff --git a/dlink-admin/src/main/resources/mapper/TaskMapper.xml b/dlink-admin/src/main/resources/mapper/TaskMapper.xml index f2c7b391dd..ebacf8f91f 100644 --- a/dlink-admin/src/main/resources/mapper/TaskMapper.xml +++ b/dlink-admin/src/main/resources/mapper/TaskMapper.xml @@ -7,6 +7,7 @@ + @@ -16,6 +17,7 @@ + @@ -26,7 +28,7 @@ - id, name, alias, type,check_point,save_point_strategy,save_point_path, parallelism,fragment,statement_set,cluster_id,cluster_configuration_id,jar_id,config_json,note, enabled, create_time, update_time + id, name, alias,dialect, type,check_point,save_point_strategy,save_point_path, parallelism,fragment,statement_set,cluster_id,cluster_configuration_id,database_id,jar_id,config_json,note, enabled, create_time, update_time diff --git a/dlink-common/src/main/java/com/dlink/result/AbstractResult.java b/dlink-common/src/main/java/com/dlink/result/AbstractResult.java new file mode 100644 index 0000000000..427764d8a0 --- /dev/null +++ b/dlink-common/src/main/java/com/dlink/result/AbstractResult.java @@ -0,0 +1,49 @@ +package com.dlink.result; + +import java.time.LocalDateTime; + +/** + * AbstractResult + * + * @author wenmo + * @since 2021/6/29 22:49 + */ +public class AbstractResult { + + protected boolean success; + protected LocalDateTime startTime; + protected LocalDateTime endTime; + protected String error; + + public void setStartTime(LocalDateTime startTime){ + this.startTime = startTime; + } + + public boolean isSuccess() { + return success; + } + + public void setSuccess(boolean success) { + this.success = success; + } + + public LocalDateTime getStartTime() { + return startTime; + } + + public LocalDateTime getEndTime() { + return endTime; + } + + public void setEndTime(LocalDateTime endTime) { + this.endTime = endTime; + } + + public String getError() { + return error; + } + + public void setError(String error) { + this.error = error; + } +} diff --git a/dlink-core/src/main/java/com/dlink/result/IResult.java b/dlink-common/src/main/java/com/dlink/result/IResult.java similarity index 100% rename from dlink-core/src/main/java/com/dlink/result/IResult.java rename to dlink-common/src/main/java/com/dlink/result/IResult.java diff --git a/dlink-common/src/main/java/com/dlink/result/SqlExplainResult.java b/dlink-common/src/main/java/com/dlink/result/SqlExplainResult.java index 882e2c561b..7b53018a77 100644 --- a/dlink-common/src/main/java/com/dlink/result/SqlExplainResult.java +++ b/dlink-common/src/main/java/com/dlink/result/SqlExplainResult.java @@ -20,6 +20,29 @@ public class SqlExplainResult { private boolean explainTrue; private LocalDateTime explainTime; + public SqlExplainResult() { + } + + public SqlExplainResult(Integer index, String type, String sql, String parse, String explain, String error, boolean parseTrue, boolean explainTrue, LocalDateTime explainTime) { + this.index = index; + this.type = type; + this.sql = sql; + this.parse = parse; + this.explain = explain; + this.error = error; + this.parseTrue = parseTrue; + this.explainTrue = explainTrue; + this.explainTime = explainTime; + } + + public static SqlExplainResult success(String type,String sql,String explain){ + return new SqlExplainResult(1,type,sql,null,explain,null,true,true,LocalDateTime.now()); + } + + public static SqlExplainResult fail(String sql,String error){ + return new SqlExplainResult(1,null,sql,null,null,error,false,false,LocalDateTime.now()); + } + public Integer getIndex() { return index; } diff --git a/dlink-core/src/main/java/com/dlink/config/Dialect.java b/dlink-core/src/main/java/com/dlink/config/Dialect.java new file mode 100644 index 0000000000..4fba563baf --- /dev/null +++ b/dlink-core/src/main/java/com/dlink/config/Dialect.java @@ -0,0 +1,40 @@ +package com.dlink.config; + +import com.dlink.assertion.Asserts; + +/** + * Dialect + * + * @author wenmo + * @since 2021/12/13 + **/ +public enum Dialect { + + FLINKSQL("FlinkSql"),SQL("Sql"),JAVA("Java"); + + private String value; + + public static final Dialect DEFAULT = Dialect.FLINKSQL; + + Dialect(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + + public boolean equalsVal(String valueText){ + return Asserts.isEqualsIgnoreCase(value,valueText); + } + + public static Dialect get(String value){ + for (Dialect type : Dialect.values()) { + if(Asserts.isEqualsIgnoreCase(type.getValue(),value)){ + return type; + } + } + return Dialect.FLINKSQL; + } +} + diff --git a/dlink-core/src/main/java/com/dlink/job/JobResult.java b/dlink-core/src/main/java/com/dlink/job/JobResult.java index 4d91a54ff4..527031e3e2 100644 --- a/dlink-core/src/main/java/com/dlink/job/JobResult.java +++ b/dlink-core/src/main/java/com/dlink/job/JobResult.java @@ -29,7 +29,10 @@ public class JobResult { private LocalDateTime startTime; private LocalDateTime endTime; - public JobResult(Integer id, JobConfig jobConfig, String jobManagerAddress, Job.JobStatus status, String statement, String jobId, String error, IResult result, LocalDateTime startTime, LocalDateTime endTime) { + public JobResult() { + } + + public JobResult(Integer id, JobConfig jobConfig, String jobManagerAddress, Job.JobStatus status, String statement, String jobId, String error, IResult result, LocalDateTime startTime, LocalDateTime endTime) { this.id = id; this.jobConfig = jobConfig; this.jobManagerAddress = jobManagerAddress; diff --git a/dlink-core/src/main/java/com/dlink/result/AbstractResult.java b/dlink-core/src/main/java/com/dlink/result/AbstractResult.java deleted file mode 100644 index 5b375340d6..0000000000 --- a/dlink-core/src/main/java/com/dlink/result/AbstractResult.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.dlink.result; - -import java.time.LocalDateTime; - -/** - * AbstractResult - * - * @author wenmo - * @since 2021/6/29 22:49 - */ -public class AbstractResult { - - protected boolean success; - protected LocalDateTime startTime; - protected LocalDateTime endTime; - - public void setStartTime(LocalDateTime startTime){ - this.startTime = startTime; - } -} diff --git a/dlink-doc/bin/auto.sh b/dlink-doc/bin/auto.sh index ba53679e80..07241f6f92 100644 --- a/dlink-doc/bin/auto.sh +++ b/dlink-doc/bin/auto.sh @@ -3,7 +3,7 @@ # 要运行的jar包路径,加不加引号都行。 注意:等号两边 不能 有空格,否则会提示command找不到 JAR_NAME="./dlink-admin-*.jar" #java -Djava.ext.dirs=$JAVA_HOME/jre/lib/ext:$JAVA_HOME/jre/lib:./lib -classpath ."/lib/*.jar" -jar dlink-admin-*.jar -SETTING="-Dloader.path=./lib,./plugins" +SETTING="-Dloader.path=./lib,./plugins -Ddruid.mysql.usePingMethod=false" # 如果输入格式不对,给出提示! tips() { echo "" diff --git "a/dlink-doc/doc/Dlink\345\234\250Flink-mysql-cdc\345\210\260Doris\347\232\204\345\256\236\350\267\265.md" "b/dlink-doc/doc/Dlink\345\234\250Flink-mysql-cdc\345\210\260Doris\347\232\204\345\256\236\350\267\265.md" new file mode 100644 index 0000000000..b985b3ae5e --- /dev/null +++ "b/dlink-doc/doc/Dlink\345\234\250Flink-mysql-cdc\345\210\260Doris\347\232\204\345\256\236\350\267\265.md" @@ -0,0 +1,202 @@ +# Dlink 在 Flink-mysql-cdc 到 Doris 的实践 + +## 背景 + +Apache Doris是一个现代化的MPP分析型数据库产品。仅需亚秒级响应时间即可获得查询结果,有效地支持实时数据分析。例如固定历史报表,实时数据分析,交互式数据分析和探索式数据分析等。 + +目前 Doris 的生态正在建设中,本文将分享如何基于 Dlink 实现 Mysql 变动数据通过 Flink 实时入库 Doris。 + +## 准备 + +老规矩,列清各组件版本: + +| 组件 | 版本 | +| :-------------: | :----------: | +| Flink | 1.13.3 | +| Flink-mysql-cdc | 2.1.0 | +| Doris | 0.15.1-rc09 | +| doris-flink | 1.0-SNAPSHOT | +| Mysql | 8.0.13 | +| Dlink | 0.4.0 | + +需要注意的是,本文的 Doris 是基于 OracleJDK1.8 和 Scala 2.11 通过源码进行编译打包的,所以所有组件的 scala 均为 2.11,此处和 Doris 社区略有不同。 + +## 部署 + +本文的 Flink 和 Doris 的部署不做描述,详情请查阅官网。 + +[Doris]: https://doris.apache.org/master/zh-CN/extending-doris/flink-doris-connector.html#%E4%BD%BF%E7%94%A8%E6%96%B9%E6%B3%95 "Doris" + +本文在 Dlink 部署成功的基础上进行,如需查看具体部署步骤,请阅读《flink sql 知其所以然(十六):flink sql 开发企业级利器之 Dlink》。 + +Dlink 的 plugins 下添加 `doris-flink-1.0-SNAPSHOT.jar` 和 `flink-sql-connector-mysql-cdc-2.1.0.jar` 。重启 Dlink。 + +```java +plugins/ -- Flink 相关扩展 +|- doris-flink-1.0-SNAPSHOT.jar +|- flink-csv-1.13.3.jar +|- flink-dist_2.11-1.13.3.jar +|- flink-format-changelog-json-2.1.0.jar +|- flink-json-1.13.3.jar +|- flink-shaded-zookeeper-3.4.14.jar +|- flink-sql-connector-mysql-cdc-2.1.0.jar +|- flink-table_2.11-1.13.3.jar +|- flink-table-blink_2.11-1.13.3.jar +``` + +当然,如果您想直接使用 FLINK_HOME 的话,可以在 `auto.sh` 文件中 `SETTING` 变量添加`$FLINK_HOME/lib` 。 + +## 数据表 + +### 学生表 (student) + +```sql +-- Mysql +DROP TABLE IF EXISTS `student`; +CREATE TABLE `student` ( + `sid` int(11) NOT NULL, + `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, + PRIMARY KEY (`sid`) USING BTREE +) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic; + +INSERT INTO `student` VALUES (1, '小红'); +INSERT INTO `student` VALUES (2, '小黑'); +INSERT INTO `student` VALUES (3, '小黄'); +``` + +### 成绩表(score) + +```sql +-- Mysql +DROP TABLE IF EXISTS `score`; +CREATE TABLE `score` ( + `cid` int(11) NOT NULL, + `sid` int(11) NULL DEFAULT NULL, + `cls` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, + `score` int(11) NULL DEFAULT NULL, + PRIMARY KEY (`cid`) USING BTREE +) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic; + +INSERT INTO `score` VALUES (1, 1, 'chinese', 90); +INSERT INTO `score` VALUES (2, 1, 'math', 95); +INSERT INTO `score` VALUES (3, 1, 'english', 93); +INSERT INTO `score` VALUES (4, 2, 'chinese', 92); +INSERT INTO `score` VALUES (5, 2, 'math', 75); +INSERT INTO `score` VALUES (6, 2, 'english', 80); +INSERT INTO `score` VALUES (7, 3, 'chinese', 100); +INSERT INTO `score` VALUES (8, 3, 'math', 60); +``` + +### 学生成绩宽表(scoreinfo) + +```sql +-- Doris +CREATE TABLE scoreinfo +( + cid INT, + sid INT, + name VARCHAR(32), + cls VARCHAR(32), + score INT +) +UNIQUE KEY(cid) +DISTRIBUTED BY HASH(cid) BUCKETS 10 +PROPERTIES("replication_num" = "1"); +``` + + + +## FlinkSQL + +```sql +CREATE TABLE student ( + sid INT, + name STRING, + PRIMARY KEY (sid) NOT ENFORCED +) WITH ( +'connector' = 'mysql-cdc', +'hostname' = '127.0.0.1', +'port' = '3306', +'username' = 'test', +'password' = '123456', +'database-name' = 'test', +'table-name' = 'student'); +CREATE TABLE score ( + cid INT, + sid INT, + cls STRING, + score INT, + PRIMARY KEY (cid) NOT ENFORCED +) WITH ( +'connector' = 'mysql-cdc', +'hostname' = '127.0.0.1', +'port' = '3306', +'username' = 'test', +'password' = '123456', +'database-name' = 'test', +'table-name' = 'score'); +CREATE TABLE scoreinfo ( + cid INT, + sid INT, + name STRING, + cls STRING, + score INT, + PRIMARY KEY (cid) NOT ENFORCED +) WITH ( +'connector' = 'doris', +'fenodes' = '127.0.0.1:8030' , +'table.identifier' = 'test.scoreinfo', +'username' = 'root', +'password'='' +); +insert into scoreinfo +select +a.cid,a.sid,b.name,a.cls,a.score +from score a +left join student b on a.sid = b.sid +``` + +## 调试 + +### 在 Dlink 中提交 + +本示例采用了 yarn-session 的方式进行提交。 + +![image-20211218134246511](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTp0RDozicic8lrYycianYsUN9ibG3QjNqVbMk7L41wHykKnkV0YxDCVSYj68HlFWylpYckkXicgnTDU7uQ/0?wx_fmt=png) + +### FlinkWebUI + +![image-20211218134439699](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTp0RDozicic8lrYycianYsUN9ibn1jXbvKznaF8Tm4AxxvYYDI0fEtXbGm0XUeXhGp44KMlPdoOzjvtHQ/0?wx_fmt=png) + +上图可见,流任务已经成功被 Dlink 提交的远程集群了。 + +### Doris 查询 + +![image-20211218135404787](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTp0RDozicic8lrYycianYsUN9iblKTY6o9fWZxFDQYC19wKVFRGDuUBgNOZxm14sWjyr8tUY7RDeUiaEUw/0?wx_fmt=png) + +上图可见,Doris 已经被写入了历史全量数据。 + +### 增量测试 + +在 Mysql 中执行新增语句: + +```sql +INSERT INTO `score` VALUES (9, 3, 'english', 100); +``` + +Doris 成功被追加: + +![image-20211218135545742](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTp0RDozicic8lrYycianYsUN9ibvE4qyQ9ttf2kNZ3raEgabvh442HfiaIfm2l5dhdFmWoGiaHMlvcQmocw/0?wx_fmt=png) + +### 变动测试 + +在 Mysql 中执行新增语句: + +```sql +update score set score = 100 where cid = 1 +``` + +Doris 成功被修改: + +![image-20211218135949764](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTp0RDozicic8lrYycianYsUN9ib3liaIvXQcCSboO4IoeJhtTRa38ukNogtFzwg31mNEFwRcJ1wGNIhQkQ/0?wx_fmt=png) + diff --git a/dlink-doc/sql/dlink.sql b/dlink-doc/sql/dlink.sql index 7bc1564c24..5d2c9af559 100644 --- a/dlink-doc/sql/dlink.sql +++ b/dlink-doc/sql/dlink.sql @@ -221,6 +221,7 @@ CREATE TABLE `dlink_task` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'ID', `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '名称', `alias` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '别名', + `dialect` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '方言', `type` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '类型', `check_point` int(11) NULL DEFAULT NULL COMMENT 'CheckPoint ', `save_point_strategy` int(1) UNSIGNED ZEROFILL NULL DEFAULT NULL COMMENT 'SavePoint策略', @@ -230,6 +231,7 @@ CREATE TABLE `dlink_task` ( `statement_set` tinyint(1) NULL DEFAULT NULL COMMENT '启用语句集', `cluster_id` int(11) NULL DEFAULT NULL COMMENT 'Flink集群ID', `cluster_configuration_id` int(11) NULL DEFAULT NULL COMMENT '集群配置ID', + `database_id` int(11) NULL DEFAULT NULL COMMENT '数据源ID', `jar_id` int(11) NULL DEFAULT NULL COMMENT 'jarID', `config_json` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '配置JSON', `note` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '注释', diff --git a/dlink-doc/sql/dlink_history.sql b/dlink-doc/sql/dlink_history.sql index 8b69865cab..0ec942de72 100644 --- a/dlink-doc/sql/dlink_history.sql +++ b/dlink-doc/sql/dlink_history.sql @@ -471,4 +471,12 @@ INSERT INTO `dlink_user`(`id`, `username`, `password`, `nickname`, `worknum`, `a ALTER TABLE `dlink_task` CHANGE COLUMN `config` `config_json` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '配置JSON' AFTER `jar_id`; +-- ---------------------------- +-- 0.5.0-SNAPSHOT 2021-12-13 +-- ---------------------------- +ALTER TABLE `dlink_task` +ADD COLUMN `dialect` varchar(50) NULL COMMENT '方言' AFTER `alias`; +ALTER TABLE `dlink_task` +ADD COLUMN `database_id` int(11) NULL COMMENT '数据源ID' AFTER `cluster_configuration_id`; + SET FOREIGN_KEY_CHECKS = 1; diff --git a/dlink-function/src/main/java/com/dlink/ud/udtaf/Top2WithRetract.java b/dlink-function/src/main/java/com/dlink/ud/udtaf/Top2WithRetract.java new file mode 100644 index 0000000000..32799ae546 --- /dev/null +++ b/dlink-function/src/main/java/com/dlink/ud/udtaf/Top2WithRetract.java @@ -0,0 +1,86 @@ +package com.dlink.ud.udtaf; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.functions.TableAggregateFunction; +import org.apache.flink.util.Collector; + +/** + * Top2WithRetract + * + * @author wenmo + * @since 2021/12/17 18:55 + */ + +public class Top2WithRetract extends TableAggregateFunction, Top2WithRetract.Top2WithRetractAccumulator> { + + public static class Top2WithRetractAccumulator { + public Integer first; + public Integer second; + public Integer oldFirst; + public Integer oldSecond; + } + + @Override + public Top2WithRetractAccumulator createAccumulator() { + Top2WithRetractAccumulator acc = new Top2WithRetractAccumulator(); + acc.first = Integer.MIN_VALUE; + acc.second = Integer.MIN_VALUE; + acc.oldFirst = Integer.MIN_VALUE; + acc.oldSecond = Integer.MIN_VALUE; + return acc; + } + + public void accumulate(Top2WithRetractAccumulator acc, Integer v) { + if (v > acc.first) { + acc.second = acc.first; + acc.first = v; + } else if (v > acc.second) { + acc.second = v; + } + } + + public void retract(Top2WithRetractAccumulator acc, Integer v){ + if (v == acc.first) { + acc.oldFirst = acc.first; + acc.oldSecond = acc.second; + acc.first = acc.second; + acc.second = Integer.MIN_VALUE; + } else if (v == acc.second) { + acc.oldSecond = acc.second; + acc.second = Integer.MIN_VALUE; + } + } + + public void emitValue(Top2WithRetractAccumulator acc, Collector> out) { + // emit the value and rank + if (acc.first != Integer.MIN_VALUE) { + out.collect(Tuple2.of(acc.first, 1)); + } + if (acc.second != Integer.MIN_VALUE) { + out.collect(Tuple2.of(acc.second, 2)); + } + } + + public void emitUpdateWithRetract( + Top2WithRetractAccumulator acc, + RetractableCollector> out) { + if (!acc.first.equals(acc.oldFirst)) { + // if there is an update, retract the old value then emit a new value + if (acc.oldFirst != Integer.MIN_VALUE) { + out.retract(Tuple2.of(acc.oldFirst, 1)); + } + out.collect(Tuple2.of(acc.first, 1)); + acc.oldFirst = acc.first; + } + if (!acc.second.equals(acc.oldSecond)) { + // if there is an update, retract the old value then emit a new value + if (acc.oldSecond != Integer.MIN_VALUE) { + out.retract(Tuple2.of(acc.oldSecond, 2)); + } + out.collect(Tuple2.of(acc.second, 2)); + acc.oldSecond = acc.second; + } + } +} + + diff --git a/dlink-gateway/src/main/java/com/dlink/gateway/config/SavePointType.java b/dlink-gateway/src/main/java/com/dlink/gateway/config/SavePointType.java index bf3dee0002..adea66b05d 100644 --- a/dlink-gateway/src/main/java/com/dlink/gateway/config/SavePointType.java +++ b/dlink-gateway/src/main/java/com/dlink/gateway/config/SavePointType.java @@ -23,7 +23,7 @@ public String getValue() { public static SavePointType get(String value){ for (SavePointType type : SavePointType.values()) { - if(Asserts.isEquals(type.getValue(),value)){ + if(Asserts.isEqualsIgnoreCase(type.getValue(),value)){ return type; } } diff --git a/dlink-metadata/dlink-metadata-base/pom.xml b/dlink-metadata/dlink-metadata-base/pom.xml index 001f761211..e425c0640c 100644 --- a/dlink-metadata/dlink-metadata-base/pom.xml +++ b/dlink-metadata/dlink-metadata-base/pom.xml @@ -28,6 +28,11 @@ org.slf4j slf4j-api + + com.alibaba + druid-spring-boot-starter + provided + - \ No newline at end of file + diff --git a/dlink-metadata/dlink-metadata-base/src/main/java/com/dlink/metadata/driver/AbstractJdbcDriver.java b/dlink-metadata/dlink-metadata-base/src/main/java/com/dlink/metadata/driver/AbstractJdbcDriver.java index 9ed82a8d18..7f76cfd82e 100644 --- a/dlink-metadata/dlink-metadata-base/src/main/java/com/dlink/metadata/driver/AbstractJdbcDriver.java +++ b/dlink-metadata/dlink-metadata-base/src/main/java/com/dlink/metadata/driver/AbstractJdbcDriver.java @@ -1,10 +1,17 @@ package com.dlink.metadata.driver; +import com.alibaba.druid.sql.SQLUtils; +import com.alibaba.druid.sql.ast.SQLStatement; +import com.alibaba.druid.sql.parser.ParserException; +import com.alibaba.druid.sql.parser.SQLStatementParser; +import com.alibaba.druid.sql.parser.Token; import com.dlink.assertion.Asserts; import com.dlink.constant.CommonConstant; +import com.dlink.metadata.result.JdbcSelectResult; import com.dlink.model.Column; import com.dlink.model.Schema; import com.dlink.model.Table; +import com.dlink.result.SqlExplainResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -240,34 +247,70 @@ public boolean execute(String sql) { } @Override - public List> query(String sql) { + public JdbcSelectResult query(String sql, Integer limit) { + JdbcSelectResult result = new JdbcSelectResult(); List> datas = new ArrayList<>(); List columns = new ArrayList<>(); + List columnNameList = new ArrayList<>(); PreparedStatement preparedStatement = null; ResultSet results = null; + int count = 0; try { preparedStatement = conn.prepareStatement(sql); results = preparedStatement.executeQuery(); + if(Asserts.isNull(results)){ + result.setSuccess(true); + close(preparedStatement, results); + return result; + } ResultSetMetaData metaData = results.getMetaData(); for (int i = 1; i <= metaData.getColumnCount(); i++) { + columnNameList.add(metaData.getColumnLabel(i)); Column column = new Column(); - column.setName(metaData.getColumnName(i)); + column.setName(metaData.getColumnLabel(i)); column.setType(metaData.getColumnTypeName(i)); column.setJavaType(getTypeConvert().convert(metaData.getColumnTypeName(i)).getType()); columns.add(column); } + result.setColumns(columnNameList); while (results.next()) { HashMap data = new HashMap<>(); for (int i = 0; i < columns.size(); i++) { data.put(columns.get(i).getName(), getTypeConvert().convertValue(results, columns.get(i).getName(), columns.get(i).getType())); } datas.add(data); + count ++; + if(count >= limit){ + break; + } } - } catch (SQLException e) { - e.printStackTrace(); + result.setSuccess(true); + } catch (Exception e) { + result.setError(e.getMessage()); + result.setSuccess(false); } finally { close(preparedStatement, results); + result.setRowData(datas); + return result; + } + } + + @Override + public List explain(String sql){ + List sqlExplainResults = new ArrayList<>(); + String current = null; + try { + List stmtList = SQLUtils.parseStatements(sql,config.getType()); + for(SQLStatement item : stmtList){ + current = item.toString(); + String type = item.getClass().getSimpleName(); + sqlExplainResults.add(SqlExplainResult.success(type, current, null)); + } + } catch (Exception e) { + sqlExplainResults.add(SqlExplainResult.fail(current,e.getMessage())); + } finally { + return sqlExplainResults; } - return datas; + } } diff --git a/dlink-metadata/dlink-metadata-base/src/main/java/com/dlink/metadata/driver/Driver.java b/dlink-metadata/dlink-metadata-base/src/main/java/com/dlink/metadata/driver/Driver.java index d05f99cc9b..014638b943 100644 --- a/dlink-metadata/dlink-metadata-base/src/main/java/com/dlink/metadata/driver/Driver.java +++ b/dlink-metadata/dlink-metadata-base/src/main/java/com/dlink/metadata/driver/Driver.java @@ -2,11 +2,11 @@ import com.dlink.assertion.Asserts; import com.dlink.exception.MetaDataException; -import com.dlink.metadata.result.SelectResult; +import com.dlink.metadata.result.JdbcSelectResult; import com.dlink.model.Column; import com.dlink.model.Schema; import com.dlink.model.Table; -import com.fasterxml.jackson.databind.JsonNode; +import com.dlink.result.SqlExplainResult; import sun.misc.Service; import java.util.Iterator; @@ -89,6 +89,8 @@ static Driver build(DriverConfig config) { boolean execute(String sql); - List query(String sql); + JdbcSelectResult query(String sql, Integer limit); + + List explain(String sql); } diff --git a/dlink-metadata/dlink-metadata-base/src/main/java/com/dlink/metadata/result/ExplainResult.java b/dlink-metadata/dlink-metadata-base/src/main/java/com/dlink/metadata/result/ExplainResult.java new file mode 100644 index 0000000000..1ee5998226 --- /dev/null +++ b/dlink-metadata/dlink-metadata-base/src/main/java/com/dlink/metadata/result/ExplainResult.java @@ -0,0 +1,11 @@ +package com.dlink.metadata.result; + +/** + * ExplainResult + * + * @author qiwenkai + * @since 2021/12/13 19:14 + **/ +public class ExplainResult { + private String sql; +} diff --git a/dlink-metadata/dlink-metadata-base/src/main/java/com/dlink/metadata/result/SelectResult.java b/dlink-metadata/dlink-metadata-base/src/main/java/com/dlink/metadata/result/JdbcSelectResult.java similarity index 55% rename from dlink-metadata/dlink-metadata-base/src/main/java/com/dlink/metadata/result/SelectResult.java rename to dlink-metadata/dlink-metadata-base/src/main/java/com/dlink/metadata/result/JdbcSelectResult.java index 79163e5080..9e3e39c145 100644 --- a/dlink-metadata/dlink-metadata-base/src/main/java/com/dlink/metadata/result/SelectResult.java +++ b/dlink-metadata/dlink-metadata-base/src/main/java/com/dlink/metadata/result/JdbcSelectResult.java @@ -1,5 +1,7 @@ package com.dlink.metadata.result; +import com.dlink.result.AbstractResult; +import com.dlink.result.IResult; import lombok.Getter; import lombok.Setter; @@ -14,10 +16,15 @@ */ @Setter @Getter -public class SelectResult { +public class JdbcSelectResult extends AbstractResult implements IResult { private List columns; - private List> datas; + private List> rowData; private Integer total; private Integer page; private Integer limit; + + @Override + public String getJobId() { + return null; + } } diff --git a/dlink-metadata/dlink-metadata-clickhouse/pom.xml b/dlink-metadata/dlink-metadata-clickhouse/pom.xml index 6277e0c32e..1dfec4653a 100644 --- a/dlink-metadata/dlink-metadata-clickhouse/pom.xml +++ b/dlink-metadata/dlink-metadata-clickhouse/pom.xml @@ -24,7 +24,11 @@ ru.yandex.clickhouse clickhouse-jdbc - test + + + com.alibaba + druid-spring-boot-starter + provided - \ No newline at end of file + diff --git a/dlink-metadata/dlink-metadata-clickhouse/src/main/java/com/dlink/metadata/ast/Clickhouse20CreateTableStatement.java b/dlink-metadata/dlink-metadata-clickhouse/src/main/java/com/dlink/metadata/ast/Clickhouse20CreateTableStatement.java new file mode 100644 index 0000000000..55dfec9111 --- /dev/null +++ b/dlink-metadata/dlink-metadata-clickhouse/src/main/java/com/dlink/metadata/ast/Clickhouse20CreateTableStatement.java @@ -0,0 +1,93 @@ +package com.dlink.metadata.ast; + +import com.alibaba.druid.DbType; +import com.alibaba.druid.sql.ast.SQLExpr; +import com.alibaba.druid.sql.ast.SQLOrderBy; +import com.alibaba.druid.sql.ast.statement.SQLAssignItem; +import com.alibaba.druid.sql.ast.statement.SQLCreateTableStatement; +import com.alibaba.druid.sql.dialect.clickhouse.visitor.ClickhouseVisitor; +import com.alibaba.druid.sql.visitor.SQLASTVisitor; + +import java.util.ArrayList; +import java.util.List; + +public class Clickhouse20CreateTableStatement extends SQLCreateTableStatement { + protected final List settings = new ArrayList(); + private SQLOrderBy orderBy; + private SQLExpr partitionBy; + private SQLExpr primaryKey; + private SQLExpr sampleBy; + + public Clickhouse20CreateTableStatement() { + super(DbType.clickhouse); + } + + public SQLOrderBy getOrderBy() { + return orderBy; + } + + public void setOrderBy(SQLOrderBy x) { + if (x != null) { + x.setParent(this); + } + + this.orderBy = x; + } + + public SQLExpr getPartitionBy() { + return partitionBy; + } + + public void setPartitionBy(SQLExpr x) { + if (x != null) { + x.setParent(this); + } + + this.partitionBy = x; + } + + public SQLExpr getPrimaryKey() { + return primaryKey; + } + + public void setPrimaryKey(SQLExpr x) { + if (x != null) { + x.setParent(this); + } + + this.primaryKey = x; + } + + public SQLExpr getSampleBy() { + return sampleBy; + } + + public void setSampleBy(SQLExpr x) { + if (x != null) { + x.setParent(this); + } + + this.sampleBy = x; + } + + public List getSettings() { + return settings; + } + + @Override + protected void accept0(SQLASTVisitor v) { + if (v instanceof ClickhouseVisitor) { + ClickhouseVisitor vv = (ClickhouseVisitor) v; + if (vv.visit(this)) { + acceptChild(vv); + } + vv.endVisit(this); + return; + } + + if (v.visit(this)) { + acceptChild(v); + } + v.endVisit(this); + } +} diff --git a/dlink-metadata/dlink-metadata-clickhouse/src/main/java/com/dlink/metadata/driver/ClickHouseDriver.java b/dlink-metadata/dlink-metadata-clickhouse/src/main/java/com/dlink/metadata/driver/ClickHouseDriver.java index 3bff4ce3cf..944c1e984d 100644 --- a/dlink-metadata/dlink-metadata-clickhouse/src/main/java/com/dlink/metadata/driver/ClickHouseDriver.java +++ b/dlink-metadata/dlink-metadata-clickhouse/src/main/java/com/dlink/metadata/driver/ClickHouseDriver.java @@ -1,10 +1,28 @@ package com.dlink.metadata.driver; +import com.alibaba.druid.sql.ast.SQLStatement; +import com.alibaba.druid.sql.ast.statement.SQLDropTableStatement; +import com.alibaba.druid.sql.ast.statement.SQLExprTableSource; +import com.alibaba.druid.sql.ast.statement.SQLSelectStatement; +import com.alibaba.druid.sql.parser.ParserException; +import com.alibaba.druid.sql.parser.Token; +import com.dlink.assertion.Asserts; +import com.dlink.metadata.ast.Clickhouse20CreateTableStatement; import com.dlink.metadata.convert.ClickHouseTypeConvert; import com.dlink.metadata.convert.ITypeConvert; +import com.dlink.metadata.parser.Clickhouse20StatementParser; import com.dlink.metadata.query.ClickHouseQuery; import com.dlink.metadata.query.IDBQuery; import com.dlink.model.Table; +import com.dlink.result.SqlExplainResult; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** * ClickHouseDriver @@ -38,8 +56,104 @@ public String getName() { return "ClickHouse OLAP 数据库"; } + @Override + public List listTables(String schemaName) { + List
tableList = new ArrayList<>(); + PreparedStatement preparedStatement = null; + ResultSet results = null; + String sql = getDBQuery().tablesSql(schemaName); + try { + preparedStatement = conn.prepareStatement(sql); + results = preparedStatement.executeQuery(); + while (results.next()) { + String tableName = results.getString(getDBQuery().tableName()); + if (Asserts.isNotNullString(tableName)) { + Table tableInfo = new Table(); + tableInfo.setName(tableName); + tableInfo.setSchema(schemaName); + tableList.add(tableInfo); + } + } + } catch (SQLException e) { + e.printStackTrace(); + } finally { + close(preparedStatement, results); + } + return tableList; + } + @Override public String getCreateTableSql(Table table) { return null; } + + @Override + public List explain(String sql){ + List sqlExplainResults = new ArrayList<>(); + StringBuilder explain = new StringBuilder(); + PreparedStatement preparedStatement = null; + ResultSet results = null; + String current = null; + try { + Clickhouse20StatementParser parser = new Clickhouse20StatementParser(sql); + List stmtList = new ArrayList<>(); + parser.parseStatementList(stmtList, -1, null); + if (parser.getLexer().token() != Token.EOF) { + throw new ParserException("syntax error : " + sql); + } + for(SQLStatement item : stmtList){ + current = item.toString(); + String type = item.getClass().getSimpleName(); + if(!(item instanceof SQLSelectStatement)){ + if(item instanceof Clickhouse20CreateTableStatement){ + Matcher m = Pattern.compile(",\\s*\\)").matcher(sql); + if (m.find()) { + sqlExplainResults.add(SqlExplainResult.fail(sql, "No comma can be added to the last field of Table! ")); + break; + } + sqlExplainResults.add(checkCreateTable((Clickhouse20CreateTableStatement)item)); + } else if(item instanceof SQLDropTableStatement){ + sqlExplainResults.add(checkDropTable((SQLDropTableStatement)item)); + } else { + sqlExplainResults.add(SqlExplainResult.success(type, current, explain.toString())); + } + continue; + } + preparedStatement = conn.prepareStatement("explain "+current); + results = preparedStatement.executeQuery(); + while(results.next()){ + explain.append(getTypeConvert().convertValue(results,"explain", "string")+"\r\n"); + } + sqlExplainResults.add(SqlExplainResult.success(type, current, explain.toString())); + } + } catch (Exception e) { + e.printStackTrace(); + sqlExplainResults.add(SqlExplainResult.fail(current, e.getMessage())); + } finally { + close(preparedStatement, results); + return sqlExplainResults; + } + } + + private SqlExplainResult checkCreateTable(Clickhouse20CreateTableStatement sqlStatement){ + if(existTable(Table.build(sqlStatement.getTableName()))){ + if(sqlStatement.isIfNotExists()){ + return SqlExplainResult.success(sqlStatement.getClass().getSimpleName(), sqlStatement.toString(), null); + }else{ + String schema = null == sqlStatement.getSchema() ? "" : sqlStatement.getSchema()+"."; + return SqlExplainResult.fail(sqlStatement.toString(), "Table "+schema+sqlStatement.getTableName()+" already exists."); + } + }else{ + return SqlExplainResult.success(sqlStatement.getClass().getSimpleName(), sqlStatement.toString(), null); + } + } + + private SqlExplainResult checkDropTable(SQLDropTableStatement sqlStatement){ + SQLExprTableSource sqlExprTableSource = sqlStatement.getTableSources().get(0); + if(!existTable(Table.build(sqlExprTableSource.getTableName()))){ + return SqlExplainResult.fail(sqlStatement.toString(), "Table "+sqlExprTableSource.getSchema()+"."+sqlExprTableSource.getTableName()+" not exists."); + }else{ + return SqlExplainResult.success(sqlStatement.getClass().getSimpleName(), sqlStatement.toString(), null); + } + } } diff --git a/dlink-metadata/dlink-metadata-clickhouse/src/main/java/com/dlink/metadata/parser/Clickhouse20CreateTableParser.java b/dlink-metadata/dlink-metadata-clickhouse/src/main/java/com/dlink/metadata/parser/Clickhouse20CreateTableParser.java new file mode 100644 index 0000000000..e07b10fe1c --- /dev/null +++ b/dlink-metadata/dlink-metadata-clickhouse/src/main/java/com/dlink/metadata/parser/Clickhouse20CreateTableParser.java @@ -0,0 +1,76 @@ +package com.dlink.metadata.parser; + +import com.alibaba.druid.sql.ast.SQLExpr; +import com.alibaba.druid.sql.ast.SQLOrderBy; +import com.alibaba.druid.sql.ast.statement.SQLAssignItem; +import com.alibaba.druid.sql.ast.statement.SQLCreateTableStatement; +import com.alibaba.druid.sql.parser.SQLCreateTableParser; +import com.alibaba.druid.sql.parser.SQLExprParser; +import com.alibaba.druid.sql.parser.Token; +import com.alibaba.druid.util.FnvHash; +import com.dlink.metadata.ast.Clickhouse20CreateTableStatement; + +public class Clickhouse20CreateTableParser extends SQLCreateTableParser { + public Clickhouse20CreateTableParser(SQLExprParser exprParser) { + super(exprParser); + } + + protected SQLCreateTableStatement newCreateStatement() { + return new Clickhouse20CreateTableStatement(); + } + + protected void parseCreateTableRest(SQLCreateTableStatement stmt) { + Clickhouse20CreateTableStatement ckStmt = (Clickhouse20CreateTableStatement) stmt; + if (lexer.identifierEquals(FnvHash.Constants.ENGINE)) { + lexer.nextToken(); + if (lexer.token() == Token.EQ) { + lexer.nextToken(); + } + stmt.setEngine( + this.exprParser.expr() + ); + } + + if (lexer.identifierEquals("PARTITION")) { + lexer.nextToken(); + accept(Token.BY); + SQLExpr expr = this.exprParser.expr(); + ckStmt.setPartitionBy(expr); + } + + if (lexer.token() == Token.PRIMARY) { + lexer.nextToken(); + accept(Token.KEY); + SQLExpr expr = this.exprParser.expr(); + ckStmt.setPrimaryKey(expr); + } + + if (lexer.token() == Token.ORDER) { + SQLOrderBy orderBy = this.exprParser.parseOrderBy(); + ckStmt.setOrderBy(orderBy); + } + + if (lexer.identifierEquals("SAMPLE")) { + lexer.nextToken(); + accept(Token.BY); + SQLExpr expr = this.exprParser.expr(); + ckStmt.setSampleBy(expr); + } + + if (lexer.identifierEquals("SETTINGS")) { + lexer.nextToken(); + for (;;) { + SQLAssignItem item = this.exprParser.parseAssignItem(); + item.setParent(ckStmt); + ckStmt.getSettings().add(item); + + if (lexer.token() == Token.COMMA) { + lexer.nextToken(); + continue; + } + + break; + } + } + } +} diff --git a/dlink-metadata/dlink-metadata-clickhouse/src/main/java/com/dlink/metadata/parser/Clickhouse20ExprParser.java b/dlink-metadata/dlink-metadata-clickhouse/src/main/java/com/dlink/metadata/parser/Clickhouse20ExprParser.java new file mode 100644 index 0000000000..50b1aaccaf --- /dev/null +++ b/dlink-metadata/dlink-metadata-clickhouse/src/main/java/com/dlink/metadata/parser/Clickhouse20ExprParser.java @@ -0,0 +1,78 @@ +/* + * Copyright 1999-2017 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dlink.metadata.parser; + +import com.alibaba.druid.sql.ast.SQLExpr; +import com.alibaba.druid.sql.ast.expr.SQLArrayExpr; +import com.alibaba.druid.sql.ast.expr.SQLCharExpr; +import com.alibaba.druid.sql.parser.Lexer; +import com.alibaba.druid.sql.parser.SQLExprParser; +import com.alibaba.druid.sql.parser.SQLParserFeature; +import com.alibaba.druid.sql.parser.Token; +import com.alibaba.druid.util.FnvHash; + +import java.util.Arrays; + +public class Clickhouse20ExprParser extends SQLExprParser { + private final static String[] AGGREGATE_FUNCTIONS; + private final static long[] AGGREGATE_FUNCTIONS_CODES; + + static { + String[] strings = { "AVG", "COUNT", "MAX", "MIN", "STDDEV", "SUM", "ROW_NUMBER", + "ROWNUMBER" }; + AGGREGATE_FUNCTIONS_CODES = FnvHash.fnv1a_64_lower(strings, true); + AGGREGATE_FUNCTIONS = new String[AGGREGATE_FUNCTIONS_CODES.length]; + for (String str : strings) { + long hash = FnvHash.fnv1a_64_lower(str); + int index = Arrays.binarySearch(AGGREGATE_FUNCTIONS_CODES, hash); + AGGREGATE_FUNCTIONS[index] = str; + } + } + + public Clickhouse20ExprParser(String sql){ + this(new Clickhouse20Lexer(sql)); + this.lexer.nextToken(); + } + + public Clickhouse20ExprParser(String sql, SQLParserFeature... features){ + this(new Clickhouse20Lexer(sql, features)); + this.lexer.nextToken(); + } + + public Clickhouse20ExprParser(Lexer lexer){ + super(lexer); + this.aggregateFunctions = AGGREGATE_FUNCTIONS; + this.aggregateFunctionHashCodes = AGGREGATE_FUNCTIONS_CODES; + } + + protected SQLExpr parseAliasExpr(String alias) { + String chars = alias.substring(1, alias.length() - 1); + return new SQLCharExpr(chars); + } + + public SQLExpr primaryRest(SQLExpr expr) { + if (lexer.token() == Token.LBRACKET) { + SQLArrayExpr array = new SQLArrayExpr(); + array.setExpr(expr); + lexer.nextToken(); + this.exprList(array.getValues(), array); + accept(Token.RBRACKET); + return primaryRest(array); + } + + return super.primaryRest(expr); + } +} diff --git a/dlink-metadata/dlink-metadata-clickhouse/src/main/java/com/dlink/metadata/parser/Clickhouse20Lexer.java b/dlink-metadata/dlink-metadata-clickhouse/src/main/java/com/dlink/metadata/parser/Clickhouse20Lexer.java new file mode 100644 index 0000000000..52b4389866 --- /dev/null +++ b/dlink-metadata/dlink-metadata-clickhouse/src/main/java/com/dlink/metadata/parser/Clickhouse20Lexer.java @@ -0,0 +1,48 @@ +package com.dlink.metadata.parser; + +import com.alibaba.druid.DbType; +import com.alibaba.druid.sql.parser.Keywords; +import com.alibaba.druid.sql.parser.Lexer; +import com.alibaba.druid.sql.parser.SQLParserFeature; +import com.alibaba.druid.sql.parser.Token; + +import java.util.HashMap; +import java.util.Map; + +public class Clickhouse20Lexer extends Lexer { + public final static Keywords DEFAULT_KEYWORDS; + + static { + Map map = new HashMap(); + + map.putAll(Keywords.DEFAULT_KEYWORDS.getKeywords()); + + map.put("OF", Token.OF); + map.put("CONCAT", Token.CONCAT); + map.put("CONTINUE", Token.CONTINUE); + map.put("MERGE", Token.MERGE); + map.put("USING", Token.USING); + + map.put("ROW", Token.ROW); + map.put("LIMIT", Token.LIMIT); + map.put("SHOW", Token.SHOW); + map.put("ALL", Token.ALL); + map.put("GLOBAL", Token.GLOBAL); + + DEFAULT_KEYWORDS = new Keywords(map); + } + + public Clickhouse20Lexer(String input) { + super(input); + dbType = DbType.clickhouse; + super.keywords = DEFAULT_KEYWORDS; + } + + public Clickhouse20Lexer(String input, SQLParserFeature... features){ + super(input); + super.keywords = DEFAULT_KEYWORDS; + for (SQLParserFeature feature : features) { + config(feature, true); + } + } +} diff --git a/dlink-metadata/dlink-metadata-clickhouse/src/main/java/com/dlink/metadata/parser/Clickhouse20StatementParser.java b/dlink-metadata/dlink-metadata-clickhouse/src/main/java/com/dlink/metadata/parser/Clickhouse20StatementParser.java new file mode 100644 index 0000000000..6324197aac --- /dev/null +++ b/dlink-metadata/dlink-metadata-clickhouse/src/main/java/com/dlink/metadata/parser/Clickhouse20StatementParser.java @@ -0,0 +1,77 @@ +package com.dlink.metadata.parser; + +import com.alibaba.druid.sql.ast.statement.SQLWithSubqueryClause; +import com.alibaba.druid.sql.parser.Lexer; +import com.alibaba.druid.sql.parser.SQLCreateTableParser; +import com.alibaba.druid.sql.parser.SQLParserFeature; +import com.alibaba.druid.sql.parser.SQLStatementParser; +import com.alibaba.druid.sql.parser.Token; + +public class Clickhouse20StatementParser extends SQLStatementParser { + public Clickhouse20StatementParser(String sql) { + super (new Clickhouse20ExprParser(sql)); + } + + public Clickhouse20StatementParser(String sql, SQLParserFeature... features) { + super (new Clickhouse20ExprParser(sql, features)); + } + + public Clickhouse20StatementParser(Lexer lexer){ + super(new Clickhouse20ExprParser(lexer)); + } + + + @Override + public SQLWithSubqueryClause parseWithQuery() { + SQLWithSubqueryClause withQueryClause = new SQLWithSubqueryClause(); + if (lexer.hasComment() && lexer.isKeepComments()) { + withQueryClause.addBeforeComment(lexer.readAndResetComments()); + } + + accept(Token.WITH); + + for (; ; ) { + SQLWithSubqueryClause.Entry entry = new SQLWithSubqueryClause.Entry(); + entry.setParent(withQueryClause); + + if (lexer.token() == Token.LPAREN) { + lexer.nextToken(); + switch (lexer.token()) { + case VALUES: + case WITH: + case SELECT: + entry.setSubQuery( + this.createSQLSelectParser() + .select()); + break; + default: + break; + } + accept(Token.RPAREN); + + } else { + entry.setExpr(exprParser.expr()); + } + + accept(Token.AS); + String alias = this.lexer.stringVal(); + lexer.nextToken(); + entry.setAlias(alias); + + withQueryClause.addEntry(entry); + + if (lexer.token() == Token.COMMA) { + lexer.nextToken(); + continue; + } + + break; + } + + return withQueryClause; + } + + public SQLCreateTableParser getSQLCreateTableParser() { + return new Clickhouse20CreateTableParser(this.exprParser); + } +} diff --git a/dlink-metadata/dlink-metadata-clickhouse/src/main/java/com/dlink/metadata/query/ClickHouseQuery.java b/dlink-metadata/dlink-metadata-clickhouse/src/main/java/com/dlink/metadata/query/ClickHouseQuery.java index 28fd434946..38c5b99f2a 100644 --- a/dlink-metadata/dlink-metadata-clickhouse/src/main/java/com/dlink/metadata/query/ClickHouseQuery.java +++ b/dlink-metadata/dlink-metadata-clickhouse/src/main/java/com/dlink/metadata/query/ClickHouseQuery.java @@ -17,7 +17,7 @@ public String schemaAllSql() { @Override public String tablesSql(String schemaName) { - return "show table status WHERE 1=1 "; + return "show tables"; } diff --git a/dlink-metadata/dlink-metadata-clickhouse/src/main/java/com/dlink/metadata/visitor/Click20SchemaStatVisitor.java b/dlink-metadata/dlink-metadata-clickhouse/src/main/java/com/dlink/metadata/visitor/Click20SchemaStatVisitor.java new file mode 100644 index 0000000000..61c2db73b3 --- /dev/null +++ b/dlink-metadata/dlink-metadata-clickhouse/src/main/java/com/dlink/metadata/visitor/Click20SchemaStatVisitor.java @@ -0,0 +1,19 @@ +package com.dlink.metadata.visitor; + +import com.alibaba.druid.DbType; +import com.alibaba.druid.sql.repository.SchemaRepository; +import com.alibaba.druid.sql.visitor.SchemaStatVisitor; + +public class Click20SchemaStatVisitor extends SchemaStatVisitor implements Clickhouse20Visitor { + { + dbType = DbType.antspark; + } + + public Click20SchemaStatVisitor() { + super(DbType.antspark); + } + + public Click20SchemaStatVisitor(SchemaRepository repository) { + super(repository); + } +} diff --git a/dlink-metadata/dlink-metadata-clickhouse/src/main/java/com/dlink/metadata/visitor/Clickhouse20ExportParameterVisitor.java b/dlink-metadata/dlink-metadata-clickhouse/src/main/java/com/dlink/metadata/visitor/Clickhouse20ExportParameterVisitor.java new file mode 100644 index 0000000000..65224a3e38 --- /dev/null +++ b/dlink-metadata/dlink-metadata-clickhouse/src/main/java/com/dlink/metadata/visitor/Clickhouse20ExportParameterVisitor.java @@ -0,0 +1,121 @@ +/* + * Copyright 1999-2017 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dlink.metadata.visitor; + +import com.alibaba.druid.sql.ast.SQLOrderBy; +import com.alibaba.druid.sql.ast.expr.SQLBetweenExpr; +import com.alibaba.druid.sql.ast.expr.SQLBinaryOpExpr; +import com.alibaba.druid.sql.ast.expr.SQLInListExpr; +import com.alibaba.druid.sql.ast.expr.SQLMethodInvokeExpr; +import com.alibaba.druid.sql.ast.statement.SQLSelectGroupByClause; +import com.alibaba.druid.sql.ast.statement.SQLSelectItem; +import com.alibaba.druid.sql.visitor.ExportParameterVisitor; +import com.alibaba.druid.sql.visitor.ExportParameterVisitorUtils; + +import java.util.ArrayList; +import java.util.List; + +public class Clickhouse20ExportParameterVisitor extends Clickhouse20OutputVisitor implements ExportParameterVisitor { + + /** + * true= if require parameterized sql output + */ + private final boolean requireParameterizedOutput; + + public Clickhouse20ExportParameterVisitor(final List parameters, final Appendable appender, final boolean wantParameterizedOutput){ + super(appender, true); + this.parameters = parameters; + this.requireParameterizedOutput = wantParameterizedOutput; + } + + public Clickhouse20ExportParameterVisitor() { + this(new ArrayList()); + } + + public Clickhouse20ExportParameterVisitor(final List parameters){ + this(parameters,new StringBuilder(),false); + } + + public Clickhouse20ExportParameterVisitor(final Appendable appender) { + this(new ArrayList(),appender,true); + } + + public List getParameters() { + return parameters; + } + + @Override + public boolean visit(SQLSelectItem x) { + if(requireParameterizedOutput){ + return super.visit(x); + } + return false; + } + + @Override + public boolean visit(SQLOrderBy x) { + if(requireParameterizedOutput){ + return super.visit(x); + } + return false; + } + + @Override + public boolean visit(SQLSelectGroupByClause x) { + if(requireParameterizedOutput){ + return super.visit(x); + } + return false; + } + + @Override + public boolean visit(SQLMethodInvokeExpr x) { + if(requireParameterizedOutput){ + return super.visit(x); + } + ExportParameterVisitorUtils.exportParamterAndAccept(this.parameters, x.getArguments()); + + return true; + } + + @Override + public boolean visit(SQLInListExpr x) { + if(requireParameterizedOutput){ + return super.visit(x); + } + ExportParameterVisitorUtils.exportParamterAndAccept(this.parameters, x.getTargetList()); + + return true; + } + + @Override + public boolean visit(SQLBetweenExpr x) { + if(requireParameterizedOutput){ + return super.visit(x); + } + ExportParameterVisitorUtils.exportParameter(this.parameters, x); + return true; + } + + public boolean visit(SQLBinaryOpExpr x) { + if(requireParameterizedOutput){ + return super.visit(x); + } + ExportParameterVisitorUtils.exportParameter(this.parameters, x); + return true; + } + +} diff --git a/dlink-metadata/dlink-metadata-clickhouse/src/main/java/com/dlink/metadata/visitor/Clickhouse20OutputVisitor.java b/dlink-metadata/dlink-metadata-clickhouse/src/main/java/com/dlink/metadata/visitor/Clickhouse20OutputVisitor.java new file mode 100644 index 0000000000..ea8692cb04 --- /dev/null +++ b/dlink-metadata/dlink-metadata-clickhouse/src/main/java/com/dlink/metadata/visitor/Clickhouse20OutputVisitor.java @@ -0,0 +1,120 @@ +package com.dlink.metadata.visitor; + +import com.alibaba.druid.DbType; +import com.alibaba.druid.sql.ast.SQLDataType; +import com.alibaba.druid.sql.ast.SQLExpr; +import com.alibaba.druid.sql.ast.SQLName; +import com.alibaba.druid.sql.ast.SQLOrderBy; +import com.alibaba.druid.sql.ast.SQLStructDataType; +import com.alibaba.druid.sql.ast.statement.SQLAlterTableAddColumn; +import com.alibaba.druid.sql.ast.statement.SQLAssignItem; +import com.alibaba.druid.sql.ast.statement.SQLCreateTableStatement; +import com.alibaba.druid.sql.ast.statement.SQLSelect; +import com.alibaba.druid.sql.ast.statement.SQLWithSubqueryClause; +import com.alibaba.druid.sql.visitor.SQLASTOutputVisitor; +import com.dlink.metadata.ast.Clickhouse20CreateTableStatement; + +import java.util.List; + +public class Clickhouse20OutputVisitor extends SQLASTOutputVisitor implements Clickhouse20Visitor { + public Clickhouse20OutputVisitor(Appendable appender) { + super(appender, DbType.clickhouse); + } + + public Clickhouse20OutputVisitor(Appendable appender, DbType dbType) { + super(appender, dbType); + } + + public Clickhouse20OutputVisitor(Appendable appender, boolean parameterized) { + super(appender, parameterized); + } + + @Override + public boolean visit(SQLWithSubqueryClause.Entry x) { + if (x.getExpr() != null) { + x.getExpr().accept(this); + } else if (x.getSubQuery() != null) { + print('('); + println(); + SQLSelect query = x.getSubQuery(); + if (query != null) { + query.accept(this); + } else { + x.getReturningStatement().accept(this); + } + println(); + print(')'); + } + print(' '); + print0(ucase ? "AS " : "as "); + print0(x.getAlias()); + + return false; + } + + public boolean visit(SQLStructDataType x) { + print0(ucase ? "NESTED (" : "nested ("); + incrementIndent(); + println(); + printlnAndAccept(x.getFields(), ","); + decrementIndent(); + println(); + print(')'); + return false; + } + + @Override + public boolean visit(SQLStructDataType.Field x) { + SQLName name = x.getName(); + if (name != null) { + name.accept(this); + } + SQLDataType dataType = x.getDataType(); + + if (dataType != null) { + print(' '); + dataType.accept(this); + } + + return false; + } + + @Override + public boolean visit(Clickhouse20CreateTableStatement x) { + super.visit((SQLCreateTableStatement) x); + + SQLExpr partitionBy = x.getPartitionBy(); + if (partitionBy != null) { + println(); + print0(ucase ? "PARTITION BY " : "partition by "); + partitionBy.accept(this); + } + + SQLOrderBy orderBy = x.getOrderBy(); + if (orderBy != null) { + println(); + orderBy.accept(this); + } + + SQLExpr sampleBy = x.getSampleBy(); + if (sampleBy != null) { + println(); + print0(ucase ? "SAMPLE BY " : "sample by "); + sampleBy.accept(this); + } + + List settings = x.getSettings(); + if (!settings.isEmpty()) { + println(); + print0(ucase ? "SETTINGS " : "settings "); + printAndAccept(settings, ", "); + } + return false; + } + + public boolean visit(SQLAlterTableAddColumn x) { + print0(ucase ? "ADD COLUMN " : "add column "); + printAndAccept(x.getColumns(), ", "); + return false; + } +} diff --git a/dlink-metadata/dlink-metadata-clickhouse/src/main/java/com/dlink/metadata/visitor/Clickhouse20Visitor.java b/dlink-metadata/dlink-metadata-clickhouse/src/main/java/com/dlink/metadata/visitor/Clickhouse20Visitor.java new file mode 100644 index 0000000000..d8706de1e0 --- /dev/null +++ b/dlink-metadata/dlink-metadata-clickhouse/src/main/java/com/dlink/metadata/visitor/Clickhouse20Visitor.java @@ -0,0 +1,13 @@ +package com.dlink.metadata.visitor; + +import com.alibaba.druid.sql.visitor.SQLASTVisitor; +import com.dlink.metadata.ast.Clickhouse20CreateTableStatement; + +public interface Clickhouse20Visitor extends SQLASTVisitor { + default boolean visit(Clickhouse20CreateTableStatement x) { + return true; + } + + default void endVisit(Clickhouse20CreateTableStatement x) { + } +} diff --git a/dlink-metadata/dlink-metadata-mysql/pom.xml b/dlink-metadata/dlink-metadata-mysql/pom.xml index 911a802b9e..eb2e0ec8c3 100644 --- a/dlink-metadata/dlink-metadata-mysql/pom.xml +++ b/dlink-metadata/dlink-metadata-mysql/pom.xml @@ -24,7 +24,7 @@ mysql mysql-connector-java - test + - \ No newline at end of file + diff --git a/dlink-metadata/dlink-metadata-mysql/src/main/java/com/dlink/metadata/driver/MySqlDriver.java b/dlink-metadata/dlink-metadata-mysql/src/main/java/com/dlink/metadata/driver/MySqlDriver.java index c1ea22fc67..565e75defe 100644 --- a/dlink-metadata/dlink-metadata-mysql/src/main/java/com/dlink/metadata/driver/MySqlDriver.java +++ b/dlink-metadata/dlink-metadata-mysql/src/main/java/com/dlink/metadata/driver/MySqlDriver.java @@ -5,10 +5,8 @@ import com.dlink.metadata.convert.MySqlTypeConvert; import com.dlink.metadata.query.IDBQuery; import com.dlink.metadata.query.MySqlQuery; -import com.dlink.metadata.result.SelectResult; import com.dlink.model.Column; import com.dlink.model.Table; -import com.fasterxml.jackson.databind.JsonNode; import java.util.ArrayList; import java.util.List; diff --git a/dlink-metadata/dlink-metadata-mysql/src/test/java/com/dlink/metadata/MysqlTest.java b/dlink-metadata/dlink-metadata-mysql/src/test/java/com/dlink/metadata/MysqlTest.java index 1802f2bc23..c0a71e602c 100644 --- a/dlink-metadata/dlink-metadata-mysql/src/test/java/com/dlink/metadata/MysqlTest.java +++ b/dlink-metadata/dlink-metadata-mysql/src/test/java/com/dlink/metadata/MysqlTest.java @@ -2,6 +2,7 @@ import com.dlink.metadata.driver.Driver; import com.dlink.metadata.driver.DriverConfig; +import com.dlink.metadata.result.JdbcSelectResult; import com.dlink.model.Column; import com.dlink.model.Schema; import org.junit.Test; @@ -16,14 +17,15 @@ **/ public class MysqlTest { + private static final String IP = "127.0.0.1"; public Driver getDriver(){ DriverConfig config = new DriverConfig(); config.setType("Mysql"); - config.setIp("10.1.51.25"); + config.setIp(IP); config.setPort(3306); config.setUsername("dca"); config.setPassword("dca"); - config.setUrl("jdbc:mysql://10.1.51.25:3306/dca?zeroDateTimeBehavior=convertToNull&useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&autoReconnect=true"); + config.setUrl("jdbc:mysql://"+IP+":3306/dca?zeroDateTimeBehavior=convertToNull&useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&autoReconnect=true"); return Driver.build(config).connect(); } @@ -31,11 +33,11 @@ public Driver getDriver(){ public void connectTest(){ DriverConfig config = new DriverConfig(); config.setType("Mysql"); - config.setIp("10.1.51.25"); + config.setIp(IP); config.setPort(3306); config.setUsername("dca"); config.setPassword("dca"); - config.setUrl("jdbc:mysql://10.1.51.25:3306/dca?zeroDateTimeBehavior=convertToNull&useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&autoReconnect=true"); + config.setUrl("jdbc:mysql://"+IP+":3306/dca?zeroDateTimeBehavior=convertToNull&useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&autoReconnect=true"); String test = Driver.build(config).test(); System.out.println(test); System.out.println("end..."); @@ -58,7 +60,7 @@ public void columnTest(){ @Test public void queryTest(){ Driver driver = getDriver(); - List query = driver.query("select * from MENU"); + JdbcSelectResult query = driver.query("select * from MENU",10); System.out.println("end..."); } } diff --git a/dlink-metadata/dlink-metadata-oracle/src/test/java/com/dlink/metadata/OracleTest.java b/dlink-metadata/dlink-metadata-oracle/src/test/java/com/dlink/metadata/OracleTest.java index 4f203ecdd5..fc52a73791 100644 --- a/dlink-metadata/dlink-metadata-oracle/src/test/java/com/dlink/metadata/OracleTest.java +++ b/dlink-metadata/dlink-metadata-oracle/src/test/java/com/dlink/metadata/OracleTest.java @@ -2,6 +2,7 @@ import com.dlink.metadata.driver.Driver; import com.dlink.metadata.driver.DriverConfig; +import com.dlink.metadata.result.JdbcSelectResult; import com.dlink.model.Column; import com.dlink.model.Schema; import org.junit.Test; @@ -16,14 +17,16 @@ **/ public class OracleTest { + private static final String IP = "127.0.0.1"; + public Driver getDriver(){ DriverConfig config = new DriverConfig(); config.setType("Oracle"); - config.setIp("10.1.51.25"); + config.setIp(IP); config.setPort(1521); config.setUsername("cdr"); config.setPassword("cdr"); - config.setUrl("jdbc:oracle:thin:@10.1.51.25:1521:orcl"); + config.setUrl("jdbc:oracle:thin:@"+IP+":1521:orcl"); return Driver.build(config).connect(); } @@ -31,11 +34,11 @@ public Driver getDriver(){ public void connectTest(){ DriverConfig config = new DriverConfig(); config.setType("Oracle"); - config.setIp("10.1.51.25"); + config.setIp(IP); config.setPort(1521); config.setUsername("cdr"); config.setPassword("cdr"); - config.setUrl("jdbc:oracle:thin:@10.1.51.25:1521:orcl"); + config.setUrl("jdbc:oracle:thin:@"+IP+":1521:orcl"); String test = Driver.build(config).test(); System.out.println(test); System.out.println("end..."); @@ -58,7 +61,7 @@ public void columnTest(){ @Test public void queryTest(){ Driver driver = getDriver(); - List query = driver.query("select * from CDR.PAT_INFO where ROWNUM<10"); + JdbcSelectResult selectResult = driver.query("select * from CDR.PAT_INFO where ROWNUM<10", 10); System.out.println("end..."); } } diff --git a/dlink-web/src/components/Studio/StudioConsole/StudioMsg/index.tsx b/dlink-web/src/components/Studio/StudioConsole/StudioMsg/index.tsx index 83aa2a1553..e04da30a44 100644 --- a/dlink-web/src/components/Studio/StudioConsole/StudioMsg/index.tsx +++ b/dlink-web/src/components/Studio/StudioConsole/StudioMsg/index.tsx @@ -1,7 +1,9 @@ import {Typography, Divider, Badge, Empty,Tag} from "antd"; import {StateType} from "@/pages/FlinkSqlStudio/model"; import {connect} from "umi"; -import { FireOutlined } from '@ant-design/icons'; +import {FireOutlined, ScheduleOutlined} from '@ant-design/icons'; +import StudioSqlConfig from "@/components/Studio/StudioRightTool/StudioSqlConfig"; +import {DIALECT} from "@/components/Studio/conf"; const { Title, Paragraph, Text, Link } = Typography; @@ -9,29 +11,53 @@ const StudioMsg = (props:any) => { const {current} = props; - return ( - - {current.console.result.jobConfig?( -
- [{current.console.result.jobConfig.session}:{current.console.result.jobConfig.address}] + const renderCommonSqlContent = () => { + return (<> + +
{current.console.result.startTime} + {current.console.result.endTime} + + {!(current.console.result.success) ? <>Error : + <>Success} + +
+ {current.console.result.statement && (
{current.console.result.statement}
)} + {current.console.result.error && (
{current.console.result.error}
)} +
+ ) + }; + + const renderFlinkSqlContent = () => { + return (<> + +
+ [{current.console.result.jobConfig?.session}:{current.console.result.jobConfig?.address}] {current.console.result.startTime} {current.console.result.endTime} - {!(current.console.result.status=='SUCCESS') ? <>Error : + {!(current.console.result.status==='SUCCESS') ? <>Error : <>Success} - {current.console.result.jobConfig.jobName&&{current.console.result.jobConfig.jobName}} + {current.console.result.jobConfig?.jobName&&{current.console.result.jobConfig?.jobName}} {current.console.result.jobId&& - (<> - - - {current.console.result.jobId} - + (<> + + + {current.console.result.jobId} + )}
{current.console.result.statement && (
{current.console.result.statement}
)} {current.console.result.error && (
{current.console.result.error}
)} -
): + + ) + }; + + + return ( + + {current.console.result.startTime?(current.task.dialect === DIALECT.SQL ? renderCommonSqlContent(): + renderFlinkSqlContent() ): } ); diff --git a/dlink-web/src/components/Studio/StudioConsole/StudioTable/index.tsx b/dlink-web/src/components/Studio/StudioConsole/StudioTable/index.tsx index b7aa29bdd4..365765f6fa 100644 --- a/dlink-web/src/components/Studio/StudioConsole/StudioTable/index.tsx +++ b/dlink-web/src/components/Studio/StudioConsole/StudioTable/index.tsx @@ -6,6 +6,7 @@ import {useState} from "react"; import { SearchOutlined } from '@ant-design/icons'; import {showJobData} from "@/components/Studio/StudioEvent/DQL"; import ProTable from '@ant-design/pro-table'; +import {DIALECT} from "@/components/Studio/conf"; const { Option } = Select; const { Title, Paragraph, Text, Link } = Typography; @@ -101,11 +102,13 @@ const StudioTable = (props:any) => { }; return (
- {current.console&¤t.console.result.jobId? + {current.console&¤t.console.result.success? (<> - + {current.task.dialect === DIALECT.FLINKSQL ? + ():undefined + } {result.rowData&&result.columns? @@ -82,6 +83,12 @@ const StudioMenu = (props: any) => { type: "Studio/saveTabs", payload: newTabs, }); + if(current.task.dialect === DIALECT.SQL){ + dispatch && dispatch({ + type: "Studio/saveResult", + payload: res.datas.result, + }); + } useSession && showTables(currentSession.session, dispatch); }) }; diff --git a/dlink-web/src/components/Studio/StudioRightTool/StudioSetting/index.tsx b/dlink-web/src/components/Studio/StudioRightTool/StudioSetting/index.tsx index 1b01428478..14fe3629d1 100644 --- a/dlink-web/src/components/Studio/StudioRightTool/StudioSetting/index.tsx +++ b/dlink-web/src/components/Studio/StudioRightTool/StudioSetting/index.tsx @@ -7,7 +7,7 @@ import {useEffect} from "react"; import {showTables} from "@/components/Studio/StudioEvent/DDL"; import {JarStateType} from "@/pages/Jar/model"; import {Scrollbars} from "react-custom-scrollbars"; -import {RUN_MODE} from "@/components/Studio/StudioRightTool/StudioSetting/conf"; +import {RUN_MODE} from "@/components/Studio/conf"; const {Option} = Select; const {Text} = Typography; @@ -124,7 +124,6 @@ const StudioSetting = (props: any) => { ) : ( {getClusterConfigurationOptions()} diff --git a/dlink-web/src/components/Studio/StudioRightTool/StudioSqlConfig/index.less b/dlink-web/src/components/Studio/StudioRightTool/StudioSqlConfig/index.less new file mode 100644 index 0000000000..58a55f65b3 --- /dev/null +++ b/dlink-web/src/components/Studio/StudioRightTool/StudioSqlConfig/index.less @@ -0,0 +1,9 @@ +@import '~antd/es/style/themes/default.less'; + +.form_setting{ + padding-left: 10px; +} + +.form_item{ + margin-bottom: 5px; +} diff --git a/dlink-web/src/components/Studio/StudioRightTool/StudioSqlConfig/index.tsx b/dlink-web/src/components/Studio/StudioRightTool/StudioSqlConfig/index.tsx new file mode 100644 index 0000000000..059b3d33ad --- /dev/null +++ b/dlink-web/src/components/Studio/StudioRightTool/StudioSqlConfig/index.tsx @@ -0,0 +1,104 @@ +import {connect} from "umi"; +import {StateType} from "@/pages/FlinkSqlStudio/model"; +import { + Form, InputNumber, Select, Tag, Row, Col, Tooltip, Button, +} from "antd"; +import {MinusSquareOutlined} from "@ant-design/icons"; +import styles from "./index.less"; +import {useEffect, useState} from "react"; +import { Scrollbars } from 'react-custom-scrollbars'; + +const { Option } = Select; + +const StudioSqlConfig = (props: any) => { + + const {current,form,dispatch,tabs,database,toolHeight} = props; + + form.setFieldsValue(current.task); + + + const onValuesChange = (change:any,all:any)=>{ + let newTabs = tabs; + for(let i=0;i { + const itemList = []; + for (const item of database) { + const tag = (<>{item.type}{item.alias}); + itemList.push() + } + return itemList; + }; + + return ( + <> + +
+
+ +
+ + + +
+ +
+ + + + + + + + + + + + + + ); +}; + +export default connect(({Studio}: { Studio: StateType }) => ({ + database: Studio.database, + current: Studio.current, + tabs: Studio.tabs, + toolHeight: Studio.toolHeight, +}))(StudioSqlConfig); diff --git a/dlink-web/src/components/Studio/StudioRightTool/index.tsx b/dlink-web/src/components/Studio/StudioRightTool/index.tsx index 30a77d7ecf..8144f1e221 100644 --- a/dlink-web/src/components/Studio/StudioRightTool/index.tsx +++ b/dlink-web/src/components/Studio/StudioRightTool/index.tsx @@ -6,19 +6,29 @@ import styles from "./index.less"; import StudioConfig from "./StudioConfig"; import StudioSetting from "./StudioSetting"; import StudioSavePoint from "./StudioSavePoint"; +import StudioSqlConfig from "./StudioSqlConfig"; +import {DIALECT} from "@/components/Studio/conf"; const { TabPane } = Tabs; const StudioRightTool = (props:any) => { - // const [form] = Form.useForm(); - const {form,toolHeight} = props; - return ( - - 作业配置} key="StudioSetting" > - + + const {current,form,toolHeight} = props; + + const renderSqlContent = () => { + return (<> + 执行配置} key="StudioConfig" > + + ) + }; + + const renderFlinkSqlContent = () => { + return (<> 作业配置} key="StudioSetting" > + + 执行配置} key="StudioConfig" > @@ -27,7 +37,12 @@ const StudioRightTool = (props:any) => { 审计} key="4" > - + ) + }; + + return ( + + {current.task.dialect === DIALECT.SQL ? renderSqlContent(): renderFlinkSqlContent()} ); }; @@ -35,4 +50,5 @@ const StudioRightTool = (props:any) => { export default connect(({ Studio }: { Studio: StateType }) => ({ sql: Studio.sql, toolHeight: Studio.toolHeight, + current: Studio.current, }))(StudioRightTool); diff --git a/dlink-web/src/components/Studio/StudioTabs/index.tsx b/dlink-web/src/components/Studio/StudioTabs/index.tsx index 12df19c840..1557d59bff 100644 --- a/dlink-web/src/components/Studio/StudioTabs/index.tsx +++ b/dlink-web/src/components/Studio/StudioTabs/index.tsx @@ -12,6 +12,10 @@ const EditorTabs = (props: any) => { const {tabs, dispatch, current, toolHeight,width} = props; const onChange = (activeKey: any) => { + dispatch&&dispatch({ + type: "Studio/saveToolHeight", + payload: toolHeight-0.0001, + }); dispatch({ type: "Studio/changeActiveKey", payload: activeKey, @@ -22,6 +26,10 @@ const EditorTabs = (props: any) => { if (action == 'add') { add(); } else if (action == 'remove') { + dispatch&&dispatch({ + type: "Studio/saveToolHeight", + payload: toolHeight-0.0001, + }); if (current.isModified) { saveTask(current, dispatch); } diff --git a/dlink-web/src/components/Studio/StudioTree/components/UpdateTaskForm.tsx b/dlink-web/src/components/Studio/StudioTree/components/SimpleTaskForm.tsx similarity index 74% rename from dlink-web/src/components/Studio/StudioTree/components/UpdateTaskForm.tsx rename to dlink-web/src/components/Studio/StudioTree/components/SimpleTaskForm.tsx index 296064904a..093915f242 100644 --- a/dlink-web/src/components/Studio/StudioTree/components/UpdateTaskForm.tsx +++ b/dlink-web/src/components/Studio/StudioTree/components/SimpleTaskForm.tsx @@ -1,7 +1,10 @@ import React, {useEffect, useState} from 'react'; -import {Form, Button, Input, Modal} from 'antd'; +import {Form, Button, Input, Modal,Select} from 'antd'; import type {TaskTableListItem} from '../data.d'; +import {DIALECT} from "@/components/Studio/conf"; + +const {Option} = Select; export type UpdateFormProps = { onCancel: (flag?: boolean, formVals?: Partial) => void; @@ -11,14 +14,12 @@ export type UpdateFormProps = { values: Partial; }; -const FormItem = Form.Item; - const formLayout = { labelCol: {span: 7}, wrapperCol: {span: 13}, }; -const UpdateTaskForm: React.FC = (props) => { +const SimpleTaskForm: React.FC = (props) => { const [formVals, setFormVals] = useState>({ id: props.values.id, name: props.values.name, @@ -45,18 +46,28 @@ const UpdateTaskForm: React.FC = (props) => { const renderContent = () => { return ( <> - + + ):undefined} + - - + - + ); }; @@ -89,6 +100,7 @@ const UpdateTaskForm: React.FC = (props) => { id: formVals.id, name: formVals.name, alias: formVals.alias, + dialect: formVals.dialect, parentId: formVals.parentId, }} > @@ -98,4 +110,4 @@ const UpdateTaskForm: React.FC = (props) => { ); }; -export default UpdateTaskForm; +export default SimpleTaskForm; diff --git a/dlink-web/src/components/Studio/StudioTree/data.d.ts b/dlink-web/src/components/Studio/StudioTree/data.d.ts index e819afdf12..0d49afa3a3 100644 --- a/dlink-web/src/components/Studio/StudioTree/data.d.ts +++ b/dlink-web/src/components/Studio/StudioTree/data.d.ts @@ -9,5 +9,6 @@ export type TaskTableListItem = { id: number, name: string, alias: string, + dialect: string, parentId: number, }; diff --git a/dlink-web/src/components/Studio/StudioTree/index.tsx b/dlink-web/src/components/Studio/StudioTree/index.tsx index d3ee591040..7526b6d1c8 100644 --- a/dlink-web/src/components/Studio/StudioTree/index.tsx +++ b/dlink-web/src/components/Studio/StudioTree/index.tsx @@ -12,7 +12,7 @@ import { } from "@/components/Common/crud"; import UpdateCatalogueForm from './components/UpdateCatalogueForm'; import {ActionType} from "@ant-design/pro-table"; -import UpdateTaskForm from "@/components/Studio/StudioTree/components/UpdateTaskForm"; +import SimpleTaskForm from "@/components/Studio/StudioTree/components/SimpleTaskForm"; import { Scrollbars } from 'react-custom-scrollbars'; const { DirectoryTree } = Tree; const {Search} = Input; @@ -392,7 +392,7 @@ const StudioTree: React.FC = (props) => { /> ) : null} {updateTaskModalVisible? ( - { const datas = await handleAddOrUpdateWithResult('/api/catalogue/createTask',value); if (datas) { diff --git a/dlink-web/src/components/Studio/StudioRightTool/StudioSetting/conf.ts b/dlink-web/src/components/Studio/conf.ts similarity index 68% rename from dlink-web/src/components/Studio/StudioRightTool/StudioSetting/conf.ts rename to dlink-web/src/components/Studio/conf.ts index 5a346c34ac..f722e69ff2 100644 --- a/dlink-web/src/components/Studio/StudioRightTool/StudioSetting/conf.ts +++ b/dlink-web/src/components/Studio/conf.ts @@ -5,3 +5,9 @@ export const RUN_MODE = { YARN_PER_JOB:'yarn-per-job', YARN_APPLICATION:'yarn-application', }; + +export const DIALECT = { + FLINKSQL:'FlinkSql', + SQL:'Sql', + JAVA:'Java', +}; diff --git a/dlink-web/src/pages/DataBase/components/ClickHouseForm.tsx b/dlink-web/src/pages/DataBase/components/ClickHouseForm.tsx new file mode 100644 index 0000000000..9e4ae17f74 --- /dev/null +++ b/dlink-web/src/pages/DataBase/components/ClickHouseForm.tsx @@ -0,0 +1,172 @@ +import React, {useEffect, useState} from 'react'; +import {Form, Button, Input, Space, Select} from 'antd'; + +import Switch from "antd/es/switch"; +import TextArea from "antd/es/input/TextArea"; +import {DataBaseItem} from "@/pages/DataBase/data"; + + +export type ClickHouseFormProps = { + onCancel: (flag?: boolean, formVals?: Partial) => void; + onSubmit: (values: Partial) => void; + onTest: (values: Partial) => void; + modalVisible: boolean; + values: Partial; +}; +const Option = Select.Option; + +const formLayout = { + labelCol: {span: 7}, + wrapperCol: {span: 13}, +}; + +const ClickHouseForm: React.FC = (props) => { + const [formVals, setFormVals] = useState>({ + id: props.values.id, + name: props.values.name, + alias: props.values.alias, + type: "ClickHouse", + groupName: props.values.groupName, + url: props.values.url, + username: props.values.username, + password: props.values.password, + dbVersion: props.values.dbVersion, + note: props.values.note, + enabled: props.values.enabled, + }); + + const [form] = Form.useForm(); + const { + onSubmit: handleUpdate, + onTest: handleTest, + onCancel: handleModalVisible, + modalVisible, + values, + } = props; + + const submitForm = async () => { + const fieldsValue = await form.validateFields(); + setFormVals({...formVals, ...fieldsValue}); + handleUpdate({...formVals, ...fieldsValue}); + }; + + const testForm = async () => { + const fieldsValue = await form.validateFields(); + setFormVals({...formVals, ...fieldsValue}); + handleTest({...formVals, ...fieldsValue}); + }; + + const onReset = () => { + form.resetFields(); + }; + + const renderContent = (formVals) => { + return ( + <> + + + + + + + + + + +