diff --git a/.gitignore b/.gitignore index a1c2a238a9..d233bae52e 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,10 @@ # virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml hs_err_pid* +.idea/* +target/* +*.iml +*.lst +*/target/* +*/*/target/* +dlink-web/node_modules/* diff --git a/README.md b/README.md index 6a7ed1b453..55d678fe65 100644 --- a/README.md +++ b/README.md @@ -1 +1,9 @@ -# dlink \ No newline at end of file +# Dlink + +## 简介 + +Dlink 为 Apache Flink 而生。它是一个敏捷的 FlinkSQL 开发运维平台,可以在线开发、预览、提交作业。 + +与此同时,Dlink 也是 DataLink 数据中台生态的核心组件。 + +DataLink 开源项目及社区正在建设,希望本项目可以帮助你更快发展。 \ No newline at end of file diff --git a/dlink-admin/pom.xml b/dlink-admin/pom.xml new file mode 100644 index 0000000000..a53c9a9e00 --- /dev/null +++ b/dlink-admin/pom.xml @@ -0,0 +1,50 @@ + + + + dlink + com.dlink + 0.1-SNAPSHOT + + 4.0.0 + + dlink-admin + + + + com.alibaba + druid-spring-boot-starter + + + com.baomidou + mybatis-plus-boot-starter + + + org.springframework.boot + spring-boot-configuration-processor + true + + + org.springframework.boot + spring-boot-starter-test + test + + + org.projectlombok + lombok + + + com.fasterxml.jackson.core + jackson-annotations + + + com.fasterxml.jackson.core + jackson-databind + + + com.google.guava + guava + + + \ No newline at end of file diff --git a/dlink-admin/src/main/java/com/dlink/common/result/PageResult.java b/dlink-admin/src/main/java/com/dlink/common/result/PageResult.java new file mode 100644 index 0000000000..d14de23df6 --- /dev/null +++ b/dlink-admin/src/main/java/com/dlink/common/result/PageResult.java @@ -0,0 +1,35 @@ +package com.dlink.common.result; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; +import java.util.List; + +/** + * 分页结果 + * + * @author wenmo + * @since 2021/5/3 20:03 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class PageResult implements Serializable { + private static final long serialVersionUID = -5143774412936881374L; + /** + * 总数 + */ + private Long count; + /** + * 是否成功:0 成功、1 失败 + */ + private int code; + /** + * 当前页结果集 + */ + private List data; +} diff --git a/dlink-admin/src/main/java/com/dlink/common/result/ProTableResult.java b/dlink-admin/src/main/java/com/dlink/common/result/ProTableResult.java new file mode 100644 index 0000000000..bb038de08e --- /dev/null +++ b/dlink-admin/src/main/java/com/dlink/common/result/ProTableResult.java @@ -0,0 +1,43 @@ +package com.dlink.common.result; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; +import java.util.List; + +/** + * Ant Design Pro ProTable Query Result + * + * @author wenmo + * @since 2021/5/18 21:54 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class ProTableResult implements Serializable { + private static final long serialVersionUID = -6377431009117000655L; + /** + * 总数 + */ + private Long total; + /** + * 是否成功:true 成功、false 失败 + */ + private boolean success; + /** + * 当前页码 + */ + private Integer current; + /** + * 当前每页记录数 + */ + private Integer pageSize; + /** + * 当前页结果集 + */ + private List data; +} diff --git a/dlink-admin/src/main/java/com/dlink/common/result/Result.java b/dlink-admin/src/main/java/com/dlink/common/result/Result.java new file mode 100644 index 0000000000..e15264f0be --- /dev/null +++ b/dlink-admin/src/main/java/com/dlink/common/result/Result.java @@ -0,0 +1,48 @@ +package com.dlink.common.result; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +/** + * 返回对象 + * + * @author wenmo + * @since 2021/5/3 19:56 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class Result implements Serializable { + + private T datas; + private Integer code; + private String msg; + + public static Result succeed(String msg) { + return of(null, CodeEnum.SUCCESS.getCode(), msg); + } + + public static Result succeed(T model, String msg) { + return of(model, CodeEnum.SUCCESS.getCode(), msg); + } + + public static Result succeed(T model) { + return of(model, CodeEnum.SUCCESS.getCode(), ""); + } + + public static Result of(T datas, Integer code, String msg) { + return new Result<>(datas, code, msg); + } + + public static Result failed(String msg) { + return of(null, CodeEnum.ERROR.getCode(), msg); + } + + public static Result failed(T model, String msg) { + return of(model, CodeEnum.ERROR.getCode(), msg); + } +} + diff --git a/dlink-admin/src/main/java/com/dlink/db/config/MybatisPlusConfigure.java b/dlink-admin/src/main/java/com/dlink/db/config/MybatisPlusConfigure.java new file mode 100644 index 0000000000..8b9ebf5e75 --- /dev/null +++ b/dlink-admin/src/main/java/com/dlink/db/config/MybatisPlusConfigure.java @@ -0,0 +1,41 @@ +package com.dlink.db.config; + +import com.baomidou.mybatisplus.core.handlers.MetaObjectHandler; +import com.baomidou.mybatisplus.core.parser.ISqlParser; +import com.baomidou.mybatisplus.core.parser.ISqlParserFilter; +import com.baomidou.mybatisplus.extension.plugins.PaginationInterceptor; +import com.baomidou.mybatisplus.extension.plugins.tenant.TenantHandler; +import com.baomidou.mybatisplus.extension.plugins.tenant.TenantSqlParser; +import com.dlink.db.handler.DateMetaObjectHandler; +import com.dlink.db.properties.MybatisPlusFillProperties; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; + +/** + * MybatisPlusConfigure + * + * @author wenmo + * @since 2021/5/25 + **/ +@EnableConfigurationProperties(MybatisPlusFillProperties.class) +public class MybatisPlusConfigure { + + @Autowired + private MybatisPlusFillProperties autoFillProperties; + + @Bean + public PaginationInterceptor paginationInterceptor() { + PaginationInterceptor paginationInterceptor = new PaginationInterceptor(); + return paginationInterceptor; + } + + @Bean + @ConditionalOnMissingBean + @ConditionalOnProperty(prefix = "dlink.mybatis-plus.fill", name = "enabled", havingValue = "true", matchIfMissing = true) + public MetaObjectHandler metaObjectHandler() { + return new DateMetaObjectHandler(autoFillProperties); + } +} diff --git a/dlink-admin/src/main/java/com/dlink/db/handler/DateMetaObjectHandler.java b/dlink-admin/src/main/java/com/dlink/db/handler/DateMetaObjectHandler.java new file mode 100644 index 0000000000..64821425ec --- /dev/null +++ b/dlink-admin/src/main/java/com/dlink/db/handler/DateMetaObjectHandler.java @@ -0,0 +1,48 @@ +package com.dlink.db.handler; + +import com.baomidou.mybatisplus.core.handlers.MetaObjectHandler; +import com.dlink.db.properties.MybatisPlusFillProperties; +import org.apache.ibatis.reflection.MetaObject; + +import java.time.LocalDateTime; + +/** + * DateMetaObjectHandler + * + * @author wenmo + * @since 2021/5/25 + **/ +public class DateMetaObjectHandler implements MetaObjectHandler { + private MybatisPlusFillProperties mybatisPlusFillProperties; + + public DateMetaObjectHandler(MybatisPlusFillProperties mybatisPlusFillProperties) { + this.mybatisPlusFillProperties = mybatisPlusFillProperties; + } + + @Override + public boolean openInsertFill() { + return mybatisPlusFillProperties.getEnableInsertFill(); + } + + @Override + public boolean openUpdateFill() { + return mybatisPlusFillProperties.getEnableUpdateFill(); + } + + @Override + public void insertFill(MetaObject metaObject) { + Object createTime = getFieldValByName(mybatisPlusFillProperties.getCreateTimeField(), metaObject); + Object updateTime = getFieldValByName(mybatisPlusFillProperties.getUpdateTimeField(), metaObject); + if (createTime == null) { + setFieldValByName(mybatisPlusFillProperties.getCreateTimeField(), LocalDateTime.now(), metaObject); + } + if (updateTime == null) { + setFieldValByName(mybatisPlusFillProperties.getUpdateTimeField(), LocalDateTime.now(), metaObject); + } + } + + @Override + public void updateFill(MetaObject metaObject) { + setFieldValByName(mybatisPlusFillProperties.getUpdateTimeField(), LocalDateTime.now(), metaObject); + } +} \ No newline at end of file diff --git a/dlink-admin/src/main/java/com/dlink/db/mapper/SuperMapper.java b/dlink-admin/src/main/java/com/dlink/db/mapper/SuperMapper.java new file mode 100644 index 0000000000..b7e31e2da4 --- /dev/null +++ b/dlink-admin/src/main/java/com/dlink/db/mapper/SuperMapper.java @@ -0,0 +1,22 @@ +package com.dlink.db.mapper; + +import com.baomidou.mybatisplus.core.conditions.Wrapper; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.baomidou.mybatisplus.core.toolkit.Constants; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import org.apache.ibatis.annotations.Param; + +import java.util.List; +import java.util.Map; + +/** + * SuperMapper + * + * @author wenmo + * @since 2021/5/25 + **/ +public interface SuperMapper extends BaseMapper { + + List selectForProTable(Page page, @Param(Constants.WRAPPER) Wrapper queryWrapper, @Param("param") Map param); + +} diff --git a/dlink-admin/src/main/java/com/dlink/db/properties/MybatisPlusFillProperties.java b/dlink-admin/src/main/java/com/dlink/db/properties/MybatisPlusFillProperties.java new file mode 100644 index 0000000000..12deaf710e --- /dev/null +++ b/dlink-admin/src/main/java/com/dlink/db/properties/MybatisPlusFillProperties.java @@ -0,0 +1,27 @@ +package com.dlink.db.properties; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * MybatisPlusFillProperties + * + * @author wenmo + * @since 2021/5/25 + **/ +@Setter +@Getter +@ConfigurationProperties(prefix = "dlink.mybatis-plus.fill") +public class MybatisPlusFillProperties { + + private Boolean enabled = true; + + private Boolean enableInsertFill = true; + + private Boolean enableUpdateFill = true; + + private String createTimeField = "createTime"; + + private String updateTimeField = "updateTime"; +} diff --git a/dlink-admin/src/main/java/com/dlink/db/service/ISuperService.java b/dlink-admin/src/main/java/com/dlink/db/service/ISuperService.java new file mode 100644 index 0000000000..9a83f36f5f --- /dev/null +++ b/dlink-admin/src/main/java/com/dlink/db/service/ISuperService.java @@ -0,0 +1,18 @@ +package com.dlink.db.service; + +import com.baomidou.mybatisplus.extension.service.IService; +import com.dlink.common.result.ProTableResult; +import com.fasterxml.jackson.databind.JsonNode; + + +/** + * ISuperService + * + * @author wenmo + * @since 2021/5/25 + **/ +public interface ISuperService extends IService { + + ProTableResult selectForProTable(JsonNode para); + +} diff --git a/dlink-admin/src/main/java/com/dlink/db/service/impl/SuperServiceImpl.java b/dlink-admin/src/main/java/com/dlink/db/service/impl/SuperServiceImpl.java new file mode 100644 index 0000000000..767da3d63f --- /dev/null +++ b/dlink-admin/src/main/java/com/dlink/db/service/impl/SuperServiceImpl.java @@ -0,0 +1,37 @@ +package com.dlink.db.service.impl; + +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.dlink.common.result.ProTableResult; +import com.dlink.db.mapper.SuperMapper; +import com.dlink.db.service.ISuperService; +import com.dlink.db.util.ProTableUtil; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.util.List; +import java.util.Map; + +/** + * SuperServiceImpl + * + * @author wenmo + * @since 2021/5/25 + **/ +public class SuperServiceImpl, T> extends ServiceImpl implements ISuperService { + + @Override + public ProTableResult selectForProTable(JsonNode para) { + Integer current = para.has("current") ? para.get("current").asInt() : 1; + Integer pageSize = para.has("pageSize") ? para.get("pageSize").asInt() : 10; + QueryWrapper queryWrapper = new QueryWrapper<>(); + ProTableUtil.autoQueryDefalut(para, queryWrapper); + ObjectMapper mapper = new ObjectMapper(); + Map param = mapper.convertValue(para, Map.class); + Page page = new Page<>(current, pageSize); + List list = baseMapper.selectForProTable(page, queryWrapper, param); + return ProTableResult.builder().success(true).data(list).total(page.getTotal()).current(current).pageSize(pageSize).build(); + } + +} diff --git a/dlink-admin/src/main/java/com/dlink/db/util/ProTableUtil.java b/dlink-admin/src/main/java/com/dlink/db/util/ProTableUtil.java new file mode 100644 index 0000000000..3d835c7328 --- /dev/null +++ b/dlink-admin/src/main/java/com/dlink/db/util/ProTableUtil.java @@ -0,0 +1,194 @@ +package com.dlink.db.util; + +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeType; +import com.google.common.base.CaseFormat; + +import java.util.*; + +/** + * ProTableUtil + * + * @author wenmo + * @since 2021/5/25 + **/ +public class ProTableUtil { + + /** + * @Author wenmo + * @Description 自动装载表格分页排序参数 + * @Date 2021/5/18 + * @Param [para, wrapper, camelToUnderscore, isDelete] + **/ + public static void autoQuery(JsonNode para, QueryWrapper wrapper, boolean camelToUnderscore, boolean isDelete) { + buildDelete(wrapper,camelToUnderscore,isDelete); + JsonNode sortField = para.get("sorter"); + if(sortField!=null) { + Iterator> fields = sortField.fields(); + while (fields.hasNext()) { + Map.Entry entry = fields.next(); + buildSort(entry.getKey(), entry.getValue().asText(), wrapper, camelToUnderscore); + } + } + JsonNode filter = para.get("filter"); + if(filter!=null) { + Iterator> fields2 = filter.fields(); + while (fields2.hasNext()) { + Map.Entry entry = fields2.next(); + buildFilter(entry.getKey(), entry.getValue(), wrapper, camelToUnderscore); + } + } + } + + private static void buildDelete( QueryWrapper wrapper, boolean camelToUnderscore, boolean isDelete){ + if (isDelete) { + if (camelToUnderscore) { + wrapper.eq(CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, "isDelete"), 0); + } else { + wrapper.eq("isDelete", 0); + } + } + } + + private static void buildSort(String sortField,String sortValue,QueryWrapper wrapper, boolean camelToUnderscore){ + if (sortField != null && sortValue != null) { + if (camelToUnderscore) { + sortField = CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, sortField); + } + if (sortValue.equals("descend")) { + if(!sortField.contains(".")) { + wrapper.orderByDesc("a." + sortField); + } + } else { + if(!sortField.contains(".")) { + wrapper.orderByAsc("a." + sortField); + } + } + } + } + + private static void buildFilter(String searchField,JsonNode searchValue,QueryWrapper wrapper, boolean camelToUnderscore){ + if (searchField != null && !searchField.equals("") && searchValue != null) { + if (camelToUnderscore) { + searchField = CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, searchField); + } + final String field = searchField; + List searchValues = new ArrayList<>(); + String type ="String"; + if (searchValue.isArray()){ + for (final JsonNode objNode : searchValue){ + if(objNode.getNodeType()==JsonNodeType.NUMBER){ + type ="Number"; + } + searchValues.add(objNode.asText()); + } + } + if(searchValues.size()>0) { + if ("Number".equals(type)) { + wrapper.and(qw -> { + for (int i = 0; i < searchValues.size(); i++) { + Double itemField = Double.parseDouble(searchValues.get(i)); + if (i > 0) { + qw.or(); + } + qw.eq("a." + field, itemField); + } + }); + } else { + wrapper.and(qw -> { + for (int i = 0; i < searchValues.size(); i++) { + String itemField = searchValues.get(i); + if (i > 0) { + qw.or(); + } + qw.eq("a." + field, itemField); + } + }); + } + } + } + } + /** + * @return void + * @Author wenmo + * @Description 自动装载表单查询参数 + * @Date 2021/5/18 + * @Param [wrapper, para, blackarr, writearr, camelToUnderscore] + **/ + public static void autoSetFromPara(QueryWrapper wrapper, JsonNode para, String[] blackarr, String[] writearr, boolean camelToUnderscore) { + List blacklist = Arrays.asList(blackarr); + List writelist = Arrays.asList(writearr); + if (para.isObject()) + { + Iterator> it = para.fields(); + while (it.hasNext()) + { + Map.Entry entry = it.next(); + String mapKey = entry.getKey(); + if (blacklist.indexOf(mapKey) == -1 || writelist.indexOf(mapKey) > -1) { + if(entry.getValue().getNodeType()== JsonNodeType.NUMBER) { + Double mapValue = entry.getValue().asDouble(); + if (mapValue != null) { + if (camelToUnderscore) { + wrapper.eq(CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, mapKey), mapValue); + } else { + wrapper.eq(mapKey, mapValue); + } + } + }else{ + String mapValue = entry.getValue().asText(); + if (mapValue != null&&!"".equals(mapValue)) { + if (camelToUnderscore) { + wrapper.eq(CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, mapKey), mapValue); + } else { + wrapper.eq(mapKey, mapValue); + } + } + } + } + } + } + } + + /** + * @return void + * @Author wenmo + * @Description 默认表单黑白名单 + * @Date 2021/5/18 + * @Param [wrapper, para] + **/ + public static void autoSetFromParaDefault(QueryWrapper wrapper, JsonNode para) { + final String[] blackarr = {"current", "pageSize", "sorter", "filter"}; + final String[] writearr = {}; + autoSetFromPara(wrapper, para, blackarr, writearr, true); + } + + /** + * @return void + * @Author wenmo + * @Description 默认表格参数 + * @Date 2021/5/18 + * @Param [para, wrapper] + **/ + public static void autoQueryDefalut(JsonNode para, QueryWrapper wrapper) { + autoQuery(para, wrapper, true, false); + } + + /** + * @return void + * @Author wenmo + * @Description protable默认调用方法 + * @Date 2021/5/18 + * @Param [para, wrapper] + **/ + public static void autoQueryAndSetFormParaDefalut(JsonNode para, QueryWrapper wrapper) { + autoSetFromParaDefault(wrapper, para); + autoQueryDefalut(para, wrapper); + } + + public static void autoQueryAndSetFormParaCustom(JsonNode para, QueryWrapper wrapper) { + autoSetFromParaDefault(wrapper, para); + autoQuery(para, wrapper, true, false); + } +} diff --git a/dlink-admin/src/main/resources/application-dev.properties b/dlink-admin/src/main/resources/application-dev.properties new file mode 100644 index 0000000000..fe6dbc106a --- /dev/null +++ b/dlink-admin/src/main/resources/application-dev.properties @@ -0,0 +1,32 @@ +########################## 统一变量配置 ########################## +##### 数据库配置 +datalink.datasource.ip=192.168.24.1 +datalink.datasource.username=datalink +datalink.datasource.password=datalink + +##### redis配置 +spring.redis.host=192.168.123.156 +spring.redis.port=6379 +spring.redis.password=123456 +spring.redis.timeout=5000 + +##### Flink 集群配置 +datalink.flink.host=192.168.123.157 +datalink.flink.port=8081 + +##### elasticsearch配置 +#datalink.elasticsearch.uris=192.168.123.156:9200 +#datalink.elasticsearch.username=elastic +#datalink.elasticsearch.password=qEnNfKNujqNrOPD9q5kb + +##### sentinel配置 +#datalink.sentinel.dashboard=192.168.123.156:6999 + +##### 日志链路追踪 +#datalink.trace.enable=true + +##### 负载均衡隔离(version隔离,只适用于开发环境) +datalink.ribbon.isolation.enabled=false + +##### mybatis-plus打印完整sql(只适用于开发环境) +mybatis-plus.configuration.log-impl=org.apache.ibatis.logging.stdout.StdOutImpl \ No newline at end of file diff --git a/dlink-admin/src/main/resources/application.properties b/dlink-admin/src/main/resources/application.properties new file mode 100644 index 0000000000..a1333f837c --- /dev/null +++ b/dlink-admin/src/main/resources/application.properties @@ -0,0 +1,99 @@ +########################## application级别通用配置 ########################## +##### ribbon配置 +## 从注册中心刷新servelist的时间 默认30秒,单位ms +ribbon.ServerListRefreshInterval=15000 +## 请求连接的超时时间 默认1秒,单位ms +ribbon.ConnectTimeout=30000 +## 请求处理的超时时间 默认1秒,单位ms +ribbon.ReadTimeout=30000 +## 对所有操作请求都进行重试,不配置这个MaxAutoRetries不起作用 默认false +#ribbon.OkToRetryOnAllOperations=true +## 对当前实例的重试次数 默认0 +#ribbon.MaxAutoRetries=1 +## 切换实例的重试次数 默认1 +ribbon.MaxAutoRetriesNextServer=0 + + +##### feign配置 +feign.sentinel.enabled=true +feign.hystrix.enabled=false +feign.okhttp.enabled=true +feign.httpclient.enabled=false +feign.httpclient.max-connections=1000 +feign.httpclient.max-connections-per-route=100 +feign.client.config.feignName.connectTimeout=30000 +feign.client.config.feignName.readTimeout=30000 +## 开启Feign请求响应压缩 +feign.compression.request.enabled=true +feign.compression.response.enabled=true +## 配置压缩文档类型及最小压缩的文档大小 +feign.compression.request.mime-types=text/xml,application/xml,application/json +feign.compression.request.min-request-size=2048 + + +##### sentinel配置 +spring.cloud.sentinel.transport.dashboard=${datalink.sentinel.dashboard} +spring.cloud.sentinel.eager=true + + +##### druid配置 +#连接池配置(通常来说,只需要修改initialSize、minIdle、maxActive +spring.datasource.druid.initial-size=10 +spring.datasource.druid.max-active=500 +spring.datasource.druid.min-idle=10 +# 配置获取连接等待超时的时间 +spring.datasource.druid.max-wait=60000 +#打开PSCache,并且指定每个连接上PSCache的大小 +spring.datasource.druid.pool-prepared-statements=true +spring.datasource.druid.max-pool-prepared-statement-per-connection-size=20 +spring.datasource.druid.validation-query=SELECT 'x' +spring.datasource.druid.test-on-borrow=false +spring.datasource.druid.test-on-return=false +spring.datasource.druid.test-while-idle=true +#配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 +spring.datasource.druid.time-between-eviction-runs-millis=60000 +#配置一个连接在池中最小生存的时间,单位是毫秒 +spring.datasource.druid.min-evictable-idle-time-millis=300000 +spring.datasource.druid.filters=stat,wall +# WebStatFilter配置,说明请参考Druid Wiki,配置_配置WebStatFilter +#是否启用StatFilter默认值true +spring.datasource.druid.web-stat-filter.enabled=true +spring.datasource.druid.web-stat-filter.url-pattern=/* +spring.datasource.druid.web-stat-filter.exclusions="*.js , *.gif ,*.jpg ,*.png ,*.css ,*.ico , /druid/*" +spring.datasource.druid.web-stat-filter.session-stat-max-count=1000 +spring.datasource.druid.web-stat-filter.profile-enable=true +spring.datasource.druid.web-stat-filter.session-stat-enable=false +# StatViewServlet配置 +#展示Druid的统计信息,StatViewServlet的用途包括:1.提供监控信息展示的html页面2.提供监控信息的JSON API +#是否启用StatViewServlet默认值true +spring.datasource.druid.stat-view-servlet.enabled=true +#根据配置中的url-pattern来访问内置监控页面,如果是上面的配置,内置监控页面的首页是/druid/index.html例如:http://127.0.0.1:9000/druid/index.html +spring.datasource.druid.stat-view-servlet.url-pattern=/druid/* +#允许清空统计数据 +spring.datasource.druid.stat-view-servlet.reset-enable=true +spring.datasource.druid.stat-view-servlet.login-username=admin +spring.datasource.druid.stat-view-servlet.login-password=admin + + +##### mybatis-plus配置 +#字段策略 IGNORED:"忽略判断",NOT_NULL:"非 NULL 判断"),NOT_EMPTY:"非空判断" +mybatis-plus.global-config.db-config.field-strategy=NOT_NULL +#逻辑删除配置 +mybatis-plus.global-config.db-config.logic-delete-field=isDelete +mybatis-plus.global-config.db-config.logic-delete-value=1 +mybatis-plus.global-config.db-config.logic-not-delete-value=0 +# 原生配置 +mybatis-plus.configuration.map-underscore-to-camel-case=true +mybatis-plus.configuration.cache-enabled=false + + +##### redis配置 +# 连接池最大连接数(使用负值表示没有限制) +spring.redis.lettuce.pool.max-active=8 +# 连接池最大阻塞等待时间(使用负值表示没有限制) +spring.redis.lettuce.pool.max-wait=-1 +# 连接池中的最大空闲连接 +spring.redis.lettuce.pool.max-idle=8 +# 连接池中的最小空闲连接 +spring.redis.lettuce.pool.min-idle=0 + diff --git a/dlink-admin/src/main/resources/bootstrap.properties b/dlink-admin/src/main/resources/bootstrap.properties new file mode 100644 index 0000000000..2855c0b843 --- /dev/null +++ b/dlink-admin/src/main/resources/bootstrap.properties @@ -0,0 +1,18 @@ +########################## bootstrap级别通用配置 ########################## +# 默认开发环境 +spring.profiles.active=dev + +##### nacos(注册中心和配置中心)地址 +spring.cloud.nacos.server-addr=192.168.123.156:8848 +#spring.cloud.nacos.username=nacos +#spring.cloud.nacos.password=nacos +spring.cloud.nacos.config.file-extension=yml +spring.cloud.nacos.config.shared-dataids=common.yml +spring.cloud.nacos.config.refreshable-dataids=common.yml + +##### spring-boot-actuator配置 +management.endpoints.web.exposure.include=* +management.endpoint.health.show-details=always + +##### 允许bean覆盖 +spring.main.allow-bean-definition-overriding=true \ No newline at end of file diff --git a/dlink-core/pom.xml b/dlink-core/pom.xml new file mode 100644 index 0000000000..bfbf6261be --- /dev/null +++ b/dlink-core/pom.xml @@ -0,0 +1,43 @@ + + + + dlink + com.dlink + 0.1-SNAPSHOT + + 4.0.0 + jar + dlink-core + + 1.8 + UTF-8 + 1.12.2 + 2.11 + 1.8 + 1.8 + 1.12 + 4.12 + 2.7.7 + + + + org.apache.flink + flink-table-planner_${scala.binary.version} + ${flink.version} + + + + org.apache.flink + flink-table-planner-blink_${scala.binary.version} + ${flink.version} + + + + cn.hutool + hutool-all + + + + \ No newline at end of file diff --git a/dlink-core/src/main/java/com/dlink/cluster/FlinkCluster.java b/dlink-core/src/main/java/com/dlink/cluster/FlinkCluster.java new file mode 100644 index 0000000000..84a4d4f7df --- /dev/null +++ b/dlink-core/src/main/java/com/dlink/cluster/FlinkCluster.java @@ -0,0 +1,56 @@ +package com.dlink.cluster; + +import cn.hutool.http.HttpUtil; +import com.dlink.constant.FlinkConstant; +import com.dlink.constant.FlinkHistoryConstant; +import com.dlink.constant.NetConstant; + +import java.util.HashMap; +import java.util.Map; + +/** + * FlinkCluster + * + * @author wenmo + * @since 2021/5/25 15:08 + **/ +public class FlinkCluster { + + private static String flinkJobMangerHost; + + public static String getFlinkJobMangerHost() { + return flinkJobMangerHost; + } + + public static void setFlinkJobMangerHost(String flinkJobMangerHost) { + FlinkCluster.flinkJobMangerHost = flinkJobMangerHost; + } + + public static String getFlinkJobManagerIP(String flinkServers) { + String res = ""; + String flinkAddress = ""; + try { + flinkAddress = getFlinkJobMangerHost(); + res = HttpUtil.get(NetConstant.HTTP + flinkAddress + NetConstant.COLON + NetConstant.PORT + NetConstant.SLASH + FlinkHistoryConstant.JOBS, NetConstant.SERVER_TIME_OUT_ACTIVE); + if (!res.isEmpty()) { + return flinkAddress; + } + } catch (Exception e) { + } + String[] servers = flinkServers.split(","); + for (String server : servers) { + try { + String url = NetConstant.HTTP + server + NetConstant.COLON + NetConstant.PORT + NetConstant.SLASH + FlinkHistoryConstant.JOBS; + res = HttpUtil.get(url, NetConstant.SERVER_TIME_OUT_ACTIVE); + if (!res.isEmpty()) { + if(server.equalsIgnoreCase(flinkAddress)){ + setFlinkJobMangerHost(server); + } + return server; + } + } catch (Exception e) { + } + } + return ""; + } +} diff --git a/dlink-core/src/main/java/com/dlink/constant/FlinkConstant.java b/dlink-core/src/main/java/com/dlink/constant/FlinkConstant.java new file mode 100644 index 0000000000..3e96719dc5 --- /dev/null +++ b/dlink-core/src/main/java/com/dlink/constant/FlinkConstant.java @@ -0,0 +1,27 @@ +package com.dlink.constant; + +/** + * FlinkConstant + * + * @author wenmo + * @since 2021/5/25 14:39 + **/ +public interface FlinkConstant { + + /** + * flink端口 + */ + Integer PORT = 8081; + /** + * flink会话默认个数 + */ + Integer DEFAULT_SESSION_COUNT = 256; + /** + * flink加载因子 + */ + Double DEFAULT_FACTOR = 0.75; + /** + * flink运行节点 + */ + String FLINK_JOB_MANAGER_HOST = "flinkJobManagerHOST"; +} diff --git a/dlink-core/src/main/java/com/dlink/constant/FlinkFunctionConstant.java b/dlink-core/src/main/java/com/dlink/constant/FlinkFunctionConstant.java new file mode 100644 index 0000000000..b8389c7626 --- /dev/null +++ b/dlink-core/src/main/java/com/dlink/constant/FlinkFunctionConstant.java @@ -0,0 +1,12 @@ +package com.dlink.constant; + +public interface FlinkFunctionConstant { + /** + * TO_MAP 函数 + */ + String TO_MAP = "TO_MAP"; + /** + * GET_KEY 函数 + */ + String GET_KEY = "GET_KEY"; +} diff --git a/dlink-core/src/main/java/com/dlink/constant/FlinkHistoryConstant.java b/dlink-core/src/main/java/com/dlink/constant/FlinkHistoryConstant.java new file mode 100644 index 0000000000..bfc8ade6f0 --- /dev/null +++ b/dlink-core/src/main/java/com/dlink/constant/FlinkHistoryConstant.java @@ -0,0 +1,79 @@ +package com.dlink.constant; + +public interface FlinkHistoryConstant { + /** + * history端口 + */ + String PORT = "8082"; + + /** + * 逗号, + */ + String COMMA = ","; + /** + * 任务复数 jobs + */ + String JOBS = "jobs"; + /** + * 任务单数 job + */ + String JOB = "job"; + /** + * 总览 overview + */ + String OVERVIEW = "overview"; + /** + * 错误 error + */ + String ERROR = "error"; + /** + * 起始时间 start-time + */ + String START_TIME = "start-time"; + /** + * 任务名称 name + */ + String NAME = "name"; + /** + * 任务状态 state + */ + String STATE = "state"; + /** + * 异常 获取任务数据失败 + */ + String EXCEPTION_DATA_NOT_FOUND = "获取任务数据失败"; + /** + * 30天时间戳的大小 + */ + Long THIRTY_DAY = 30L * 24 * 60 * 60 * 1000; + /** + * 一天时间戳 + */ + Integer ONE_DAY = 24 * 60 * 60 * 1000; + /** + * 运行active + */ + String ACTIVE = "active"; + /** + * 查询记录的条数 + */ + String COUNT = "count"; + /** + * 当前页码 page + */ + String PAGE = "page"; + /** + * 每一页的大小 SIZE + */ + String SIZE = "size"; + /** + * 当前页的条数 pageCount + */ + String PAGE_COUNT = "pageCount"; + /** + * 返回数据集 resList + */ + String RES_LIST = "resList"; + + +} diff --git a/dlink-core/src/main/java/com/dlink/constant/FlinkJobConstant.java b/dlink-core/src/main/java/com/dlink/constant/FlinkJobConstant.java new file mode 100644 index 0000000000..52acd9c6fa --- /dev/null +++ b/dlink-core/src/main/java/com/dlink/constant/FlinkJobConstant.java @@ -0,0 +1,23 @@ +package com.dlink.constant; + +/** + * flink任务的常量 + */ +public interface FlinkJobConstant { + /** + * flink job id + */ + String FLINK_JOB_ID = "jobId"; + /** + * flink job error + */ + String FLINK_JOB_ERROR = "error"; + /** + * 默认空串 + */ + String DEFAULT_EMPTY = ""; + /** + * 默认端口 + */ + int DEFAULT_PORT = 8081; +} diff --git a/dlink-core/src/main/java/com/dlink/constant/FlinkSQLConstant.java b/dlink-core/src/main/java/com/dlink/constant/FlinkSQLConstant.java new file mode 100644 index 0000000000..97a5f3d608 --- /dev/null +++ b/dlink-core/src/main/java/com/dlink/constant/FlinkSQLConstant.java @@ -0,0 +1,50 @@ +package com.dlink.constant; + +/** + * FlinkSQLConstant + * + * @author wenmo + * @since 2021/5/25 15:51 + **/ +public interface FlinkSQLConstant { + /** + * 创建 + */ + String CREATE = "CREATE"; + /** + * 删除 + */ + String DROP = "DROP"; + /** + * 插入 + */ + String INSERT = "INSERT"; + /** + * 修改 + */ + String ALTER = "ALTER"; + /** + * 查询 + */ + String SELECT = "SELECT"; + /** + * show操作 + */ + String SHOW = "SHOW"; + /** + * 未知操作类型 + */ + String UNKNOWN_TYPE = "UNKNOWN TYPE"; + /** + * 查询时null对应的值 + */ + String NULL_COLUMNS = ""; + /** + * 创建聚合表 CREATEAGGTABLE + */ + String CREATE_AGG_TABLE = "CREATEAGGTABLE"; + /** + * 删除表语句的头部 DROP TABLE IF EXISTS + */ + String DROP_TABLE_HEAD = " DROP TABLE IF EXISTS "; +} diff --git a/dlink-core/src/main/java/com/dlink/constant/NetConstant.java b/dlink-core/src/main/java/com/dlink/constant/NetConstant.java new file mode 100644 index 0000000000..99f1421ce5 --- /dev/null +++ b/dlink-core/src/main/java/com/dlink/constant/NetConstant.java @@ -0,0 +1,28 @@ +package com.dlink.constant; + +public interface NetConstant { + /** + * http:// + */ + String HTTP = "http://"; + /** + * 冒号: + */ + String COLON = ":"; + /** + * 斜杠/ + */ + String SLASH = "/"; + /** + * Flink默认端口 + */ + String PORT = "8081"; + /** + * 连接运行服务器超时时间 1000 + */ + Integer SERVER_TIME_OUT_ACTIVE = 1000; + /** + * 连接FLINK历史服务器超时时间 2000 + */ + Integer SERVER_TIME_OUT_HISTORY = 3000; +} diff --git a/dlink-core/src/main/java/com/dlink/executor/EnvironmentSetting.java b/dlink-core/src/main/java/com/dlink/executor/EnvironmentSetting.java new file mode 100644 index 0000000000..5b4f04b593 --- /dev/null +++ b/dlink-core/src/main/java/com/dlink/executor/EnvironmentSetting.java @@ -0,0 +1,33 @@ +package com.dlink.executor; + +/** + * EnvironmentSetting + * + * @author wenmo + * @since 2021/5/25 13:45 + **/ +public class EnvironmentSetting { + private String host; + private int port; + + public EnvironmentSetting(String host, int port) { + this.host = host; + this.port = port; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } +} diff --git a/dlink-core/src/main/java/com/dlink/executor/Executor.java b/dlink-core/src/main/java/com/dlink/executor/Executor.java new file mode 100644 index 0000000000..7cb2e1a258 --- /dev/null +++ b/dlink-core/src/main/java/com/dlink/executor/Executor.java @@ -0,0 +1,63 @@ +package com.dlink.executor; + +import com.dlink.executor.custom.CustomTableEnvironmentImpl; +import com.dlink.result.SqlExplainResult; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.ExplainDetail; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.flink.table.functions.UserDefinedFunction; + +/** + * Executor + * @author wenmo + * @since 2021/5/25 13:39 + **/ +public abstract class Executor { + + public static final String LOCAL = "LOCAL"; + public static final String REMOTE = "REMOTE"; + + public static Executor build(){ + return new LocalStreamExecutor(new ExecutorSetting(LOCAL)); + } + + public static Executor build(EnvironmentSetting environmentSetting,ExecutorSetting executorSetting){ + if(LOCAL.equals(executorSetting.getType())){ + return new LocalStreamExecutor(executorSetting); + }else if(REMOTE.equals(executorSetting.getType())){ + return new RemoteStreamExecutor(environmentSetting,executorSetting); + }else{ + return new LocalStreamExecutor(executorSetting); + } + } + + public abstract StreamExecutionEnvironment getEnvironment(); + + public abstract CustomTableEnvironmentImpl getCustomTableEnvironmentImpl(); + + public abstract ExecutorSetting getExecutorSetting(); + + public abstract EnvironmentSetting getEnvironmentSetting(); + + public abstract JobExecutionResult execute(String statement) throws Exception; + + public abstract TableResult executeSql(String statement); + + public abstract Table sqlQuery(String statement); + + public abstract String explainSql(String statement, ExplainDetail... extraDetails); + + public abstract SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails); + + public abstract String getStreamGraphString(String statement); + + public abstract ObjectNode getStreamGraph(String statement); + + public abstract void registerFunction(String name, ScalarFunction function); + + public abstract void createTemporarySystemFunction(String name, Class var2); +} diff --git a/dlink-core/src/main/java/com/dlink/executor/ExecutorSetting.java b/dlink-core/src/main/java/com/dlink/executor/ExecutorSetting.java new file mode 100644 index 0000000000..fc86bf4ed8 --- /dev/null +++ b/dlink-core/src/main/java/com/dlink/executor/ExecutorSetting.java @@ -0,0 +1,52 @@ +package com.dlink.executor; + +/** + * ExecutorSetting + * + * @author wenmo + * @since 2021/5/25 13:43 + **/ +public class ExecutorSetting { + private String type = Executor.LOCAL; + private Long checkpoint; + private boolean useSqlFragment = true; + + public ExecutorSetting(String type) { + this.type = type; + } + + public ExecutorSetting(String type, Long checkpoint) { + this.type = type; + this.checkpoint = checkpoint; + } + + public ExecutorSetting(String type, Long checkpoint, boolean useSqlFragment) { + this.type = type; + this.checkpoint = checkpoint; + this.useSqlFragment = useSqlFragment; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public Long getCheckpoint() { + return checkpoint; + } + + public void setCheckpoint(Long checkpoint) { + this.checkpoint = checkpoint; + } + + public boolean isUseSqlFragment() { + return useSqlFragment; + } + + public void setUseSqlFragment(boolean useSqlFragment) { + this.useSqlFragment = useSqlFragment; + } +} diff --git a/dlink-core/src/main/java/com/dlink/executor/LocalStreamExecutor.java b/dlink-core/src/main/java/com/dlink/executor/LocalStreamExecutor.java new file mode 100644 index 0000000000..a418a6a01e --- /dev/null +++ b/dlink-core/src/main/java/com/dlink/executor/LocalStreamExecutor.java @@ -0,0 +1,102 @@ +package com.dlink.executor; + +import com.dlink.executor.custom.CustomTableEnvironmentImpl; +import com.dlink.result.SqlExplainResult; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.ExplainDetail; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.flink.table.functions.UserDefinedFunction; + +/** + * LocalStreamExecuter + * + * @author wenmo + * @since 2021/5/25 13:48 + **/ +public class LocalStreamExecutor extends Executor { + + private StreamExecutionEnvironment environment; + private CustomTableEnvironmentImpl stEnvironment; + private ExecutorSetting executorSetting; + + public LocalStreamExecutor(ExecutorSetting executorSetting) { + this.executorSetting = executorSetting; + this.environment = StreamExecutionEnvironment.createLocalEnvironment(); + stEnvironment = CustomTableEnvironmentImpl.create(environment); + if(executorSetting.isUseSqlFragment()){ + stEnvironment.useSqlFragment(); + }else{ + stEnvironment.unUseSqlFragment(); + } + } + + @Override + public StreamExecutionEnvironment getEnvironment() { + return this.environment; + } + + @Override + public CustomTableEnvironmentImpl getCustomTableEnvironmentImpl() { + return this.stEnvironment; + } + + @Override + public ExecutorSetting getExecutorSetting() { + return this.executorSetting; + } + + @Override + public EnvironmentSetting getEnvironmentSetting() { + return null; + } + + @Override + public JobExecutionResult execute(String statement) throws Exception { + return stEnvironment.execute(statement); + } + + @Override + public TableResult executeSql(String statement) { + return stEnvironment.executeSql(statement); + } + + @Override + public Table sqlQuery(String statement) { + return stEnvironment.sqlQuery(statement); + } + + @Override + public String explainSql(String statement, ExplainDetail... extraDetails) { + return stEnvironment.explainSql(statement,extraDetails); + } + + @Override + public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) { + return stEnvironment.explainSqlRecord(statement,extraDetails); + } + + @Override + public String getStreamGraphString(String statement) { + return stEnvironment.getStreamGraphString(statement); + } + + @Override + public ObjectNode getStreamGraph(String statement) { + return stEnvironment.getStreamGraph(statement); + } + + @Override + public void registerFunction(String name, ScalarFunction function) { + stEnvironment.registerFunction(name,function); + } + + @Override + public void createTemporarySystemFunction(String name, Class var2) { + stEnvironment.createTemporarySystemFunction(name,var2); + } + +} diff --git a/dlink-core/src/main/java/com/dlink/executor/RemoteStreamExecutor.java b/dlink-core/src/main/java/com/dlink/executor/RemoteStreamExecutor.java new file mode 100644 index 0000000000..062567e64c --- /dev/null +++ b/dlink-core/src/main/java/com/dlink/executor/RemoteStreamExecutor.java @@ -0,0 +1,108 @@ +package com.dlink.executor; + +import com.dlink.executor.custom.CustomTableEnvironmentImpl; +import com.dlink.result.SqlExplainResult; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.ExplainDetail; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.flink.table.functions.UserDefinedFunction; + +/** + * RemoteStreamExecutor + * + * @author wenmo + * @since 2021/5/25 14:05 + **/ +public class RemoteStreamExecutor extends Executor { + + private StreamExecutionEnvironment environment; + private CustomTableEnvironmentImpl stEnvironment; + private EnvironmentSetting environmentSetting; + private ExecutorSetting executorSetting; + + + public RemoteStreamExecutor(EnvironmentSetting environmentSetting,ExecutorSetting executorSetting) { + this.environmentSetting = environmentSetting; + this.executorSetting = executorSetting; + synchronized (RemoteStreamExecutor.class){ + this.environment = StreamExecutionEnvironment.createRemoteEnvironment(environmentSetting.getHost(), environmentSetting.getPort()); + if(stEnvironment == null){ + stEnvironment = CustomTableEnvironmentImpl.create(environment); + } + if(executorSetting.isUseSqlFragment()){ + stEnvironment.useSqlFragment(); + }else{ + stEnvironment.unUseSqlFragment(); + } + } + } + + @Override + public StreamExecutionEnvironment getEnvironment() { + return this.environment; + } + + @Override + public CustomTableEnvironmentImpl getCustomTableEnvironmentImpl() { + return this.stEnvironment; + } + + @Override + public ExecutorSetting getExecutorSetting() { + return this.executorSetting; + } + + @Override + public EnvironmentSetting getEnvironmentSetting() { + return this.environmentSetting; + } + + @Override + public JobExecutionResult execute(String statement) throws Exception { + return stEnvironment.execute(statement); + } + + @Override + public TableResult executeSql(String statement){ + return stEnvironment.executeSql(statement); + } + + @Override + public Table sqlQuery(String statement){ + return stEnvironment.sqlQuery(statement); + } + + @Override + public String explainSql(String statement, ExplainDetail... extraDetails) { + return stEnvironment.explainSql(statement,extraDetails); + } + + @Override + public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) { + return stEnvironment.explainSqlRecord(statement,extraDetails); + } + + @Override + public String getStreamGraphString(String statement) { + return stEnvironment.getStreamGraphString(statement); + } + + @Override + public ObjectNode getStreamGraph(String statement) { + return stEnvironment.getStreamGraph(statement); + } + + @Override + public void registerFunction(String name, ScalarFunction function) { + stEnvironment.registerFunction(name,function); + } + + @Override + public void createTemporarySystemFunction(String name, Class var2) { + stEnvironment.createTemporarySystemFunction(name,var2); + } +} diff --git a/dlink-core/src/main/java/com/dlink/executor/custom/CustomTableEnvironmentImpl.java b/dlink-core/src/main/java/com/dlink/executor/custom/CustomTableEnvironmentImpl.java new file mode 100644 index 0000000000..f8a8d8764a --- /dev/null +++ b/dlink-core/src/main/java/com/dlink/executor/custom/CustomTableEnvironmentImpl.java @@ -0,0 +1,270 @@ +package com.dlink.executor.custom; + +import com.dlink.result.SqlExplainResult; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.JSONGenerator; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.table.api.*; +import org.apache.flink.table.api.internal.TableEnvironmentImpl; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.catalog.FunctionCatalog; +import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.delegation.Executor; +import org.apache.flink.table.delegation.ExecutorFactory; +import org.apache.flink.table.delegation.Planner; +import org.apache.flink.table.delegation.PlannerFactory; +import org.apache.flink.table.factories.ComponentFactoryService; +import org.apache.flink.table.module.ModuleManager; +import org.apache.flink.table.operations.ExplainOperation; +import org.apache.flink.table.operations.ModifyOperation; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.QueryOperation; +import org.apache.flink.table.planner.delegation.ExecutorBase; +import org.apache.flink.table.planner.utils.ExecutorUtils; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * 定制TableEnvironmentImpl + * + * @author wenmo + * @since 2021/5/25 + **/ +public class CustomTableEnvironmentImpl extends TableEnvironmentImpl { + + private SqlManager sqlManager; + + private boolean useSqlFragment = true; + + protected CustomTableEnvironmentImpl(CatalogManager catalogManager, SqlManager sqlManager, ModuleManager moduleManager, TableConfig tableConfig, Executor executor, FunctionCatalog functionCatalog, Planner planner, boolean isStreamingMode, ClassLoader userClassLoader) { + super(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode, userClassLoader); + this.sqlManager = sqlManager; + } + + public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment) { + return create(executionEnvironment, EnvironmentSettings.newInstance().build()); + } + + static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings) { + return create(executionEnvironment, settings, new TableConfig()); + } + + public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings, TableConfig tableConfig) { + if (!settings.isStreamingMode()) { + throw new TableException("StreamTableEnvironment can not run in batch mode for now, please use TableEnvironment."); + } else { + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + ModuleManager moduleManager = new ModuleManager(); + SqlManager sqlManager = new SqlManager(); + CatalogManager catalogManager = CatalogManager.newBuilder().classLoader(classLoader).config(tableConfig.getConfiguration()).defaultCatalog(settings.getBuiltInCatalogName(), new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName())).executionConfig(executionEnvironment.getConfig()).build(); + FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager); + Map executorProperties = settings.toExecutorProperties(); + Executor executor = lookupExecutor(executorProperties, executionEnvironment); + Map plannerProperties = settings.toPlannerProperties(); + Planner planner = ((PlannerFactory) ComponentFactoryService.find(PlannerFactory.class, plannerProperties)).create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager); + return new CustomTableEnvironmentImpl(catalogManager, sqlManager, moduleManager, tableConfig, executor, functionCatalog, planner, settings.isStreamingMode(), classLoader); + } + } + + private static Executor lookupExecutor(Map executorProperties, StreamExecutionEnvironment executionEnvironment) { + try { + ExecutorFactory executorFactory = (ExecutorFactory) ComponentFactoryService.find(ExecutorFactory.class, executorProperties); + Method createMethod = executorFactory.getClass().getMethod("create", Map.class, StreamExecutionEnvironment.class); + return (Executor) createMethod.invoke(executorFactory, executorProperties, executionEnvironment); + } catch (Exception var4) { + throw new TableException("Could not instantiate the executor. Make sure a planner module is on the classpath", var4); + } + } + + public void useSqlFragment() { + this.useSqlFragment = true; + } + + public void unUseSqlFragment() { + this.useSqlFragment = false; + } + + @Override + public String explainSql(String statement, ExplainDetail... extraDetails) { + if(useSqlFragment) { + statement = sqlManager.parseVariable(statement); + if (statement.length() == 0) { + return "This is a sql fragment."; + } + } + if (checkShowFragments(statement)) { + return "'SHOW FRAGMENTS' can't be explained."; + } else { + return super.explainSql(statement, extraDetails); + } + } + + public String getStreamGraphString(String statement) { + if(useSqlFragment) { + statement = sqlManager.parseVariable(statement); + if (statement.length() == 0) { + return "This is a sql fragment."; + } + } + if (checkShowFragments(statement)) { + return "'SHOW FRAGMENTS' can't be explained."; + } + List operations = super.parser.parse(statement); + if (operations.size() != 1) { + throw new TableException("Unsupported SQL query! explainSql() only accepts a single SQL query."); + } else { + List modifyOperations = new ArrayList<>(); + for (int i = 0; i < operations.size(); i++) { + if(operations.get(i) instanceof ModifyOperation){ + modifyOperations.add((ModifyOperation)operations.get(i)); + } + } + List> trans = super.planner.translate(modifyOperations); + if(execEnv instanceof ExecutorBase){ + return ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans).getStreamingPlanAsJSON(); + }else{ + return "Unsupported SQL query! explainSql() need a single SQL to query."; + } + } + } + + public ObjectNode getStreamGraph(String statement) { + if(useSqlFragment) { + statement = sqlManager.parseVariable(statement); + if (statement.length() == 0) { + throw new TableException("This is a sql fragment."); + } + } + if (checkShowFragments(statement)) { + throw new TableException("'SHOW FRAGMENTS' can't be explained."); + } + List operations = super.parser.parse(statement); + if (operations.size() != 1) { + throw new TableException("Unsupported SQL query! explainSql() only accepts a single SQL query."); + } else { + List modifyOperations = new ArrayList<>(); + for (int i = 0; i < operations.size(); i++) { + if(operations.get(i) instanceof ModifyOperation){ + modifyOperations.add((ModifyOperation)operations.get(i)); + } + } + List> trans = super.planner.translate(modifyOperations); + if(execEnv instanceof ExecutorBase){ + StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans); + JSONGenerator jsonGenerator = new JSONGenerator(streamGraph); + ObjectNode jsonNode = jsonGenerator.getJSONNode(); + return jsonNode; + }else{ + throw new TableException("Unsupported SQL query! explainSql() need a single SQL to query."); + } + } + } + + public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) { + SqlExplainResult record = new SqlExplainResult(); + if(useSqlFragment) { + String orignSql = statement; + statement = sqlManager.parseVariable(statement); + if (statement.length() == 0) { + record.setParseTrue(true); + record.setType("Sql Fragment"); + record.setExplain(orignSql); + record.setExplainTrue(true); + return record; + } + } + List operations = parser.parse(statement); + record.setParseTrue(true); + if (operations.size() != 1) { + throw new TableException( + "Unsupported SQL query! explainSql() only accepts a single SQL query."); + } + List operationlist = new ArrayList<>(operations); + for (int i = 0; i < operationlist.size(); i++) { + Operation operation = operationlist.get(i); + if (operation instanceof ModifyOperation) { + record.setType("Modify DML"); + } else if (operation instanceof ExplainOperation) { + record.setType("Explain DML"); + } else if (operation instanceof QueryOperation) { + record.setType("Query DML"); + } else { + operationlist.remove(i); + record.setType("DDL"); + i=i-1; + } + } + if(operationlist.size()==0){ + //record.setExplain("DDL语句不进行解释。"); + return record; + } + record.setExplain(planner.explain(operationlist, extraDetails)); + record.setExplainTrue(true); + return record; + } + + @Override + public String[] getCompletionHints(String statement, int position) { + if(useSqlFragment) { + statement = sqlManager.parseVariable(statement); + if (statement.length() == 0) { + return new String[0]; + } + } + return super.getCompletionHints(statement, position); + } + + @Override + public Table sqlQuery(String query) { + if(useSqlFragment) { + query = sqlManager.parseVariable(query); + if (query.length() == 0) { + throw new TableException("Unsupported SQL query! The SQL query parsed is null.If it's a sql fragment, and please use executeSql()."); + } + if (checkShowFragments(query)) { + return sqlManager.getSqlFragmentsTable(this); + } else { + return super.sqlQuery(query); + } + }else { + return super.sqlQuery(query); + } + } + + @Override + public TableResult executeSql(String statement) { + if(useSqlFragment) { + statement = sqlManager.parseVariable(statement); + if (statement.length() == 0) { + return CustomTableResultImpl.TABLE_RESULT_OK; + } + if (checkShowFragments(statement)) { + return sqlManager.getSqlFragments(); + } else { + return super.executeSql(statement); + } + }else{ + return super.executeSql(statement); + } + } + + @Override + public void sqlUpdate(String stmt) { + if(useSqlFragment) { + stmt = sqlManager.parseVariable(stmt); + if (stmt.length() == 0) { + throw new TableException("Unsupported SQL update! The SQL update parsed is null.If it's a sql fragment, and please use executeSql()."); + } + } + super.sqlUpdate(stmt); + } + + public boolean checkShowFragments(String sql){ + return sqlManager.checkShowFragments(sql); + } +} diff --git a/dlink-core/src/main/java/com/dlink/executor/custom/CustomTableResultImpl.java b/dlink-core/src/main/java/com/dlink/executor/custom/CustomTableResultImpl.java new file mode 100644 index 0000000000..9ed4698043 --- /dev/null +++ b/dlink-core/src/main/java/com/dlink/executor/custom/CustomTableResultImpl.java @@ -0,0 +1,271 @@ +package com.dlink.executor.custom; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.table.api.*; +import org.apache.flink.table.utils.PrintUtils; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; +import java.io.PrintWriter; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.*; + +/** + * 定制TableResultImpl + * @author wenmo + * @since 2021/5/25 + **/ +@Internal +class CustomTableResultImpl implements TableResult { + public static final TableResult TABLE_RESULT_OK; + private final JobClient jobClient; + private final TableSchema tableSchema; + private final ResultKind resultKind; + private final CloseableRowIteratorWrapper data; + private final PrintStyle printStyle; + + private CustomTableResultImpl(@Nullable JobClient jobClient, TableSchema tableSchema, ResultKind resultKind, CloseableIterator data, PrintStyle printStyle) { + this.jobClient = jobClient; + this.tableSchema = (TableSchema) Preconditions.checkNotNull(tableSchema, "tableSchema should not be null"); + this.resultKind = (ResultKind)Preconditions.checkNotNull(resultKind, "resultKind should not be null"); + Preconditions.checkNotNull(data, "data should not be null"); + this.data = new CloseableRowIteratorWrapper(data); + this.printStyle = (PrintStyle)Preconditions.checkNotNull(printStyle, "printStyle should not be null"); + } + + public static TableResult buildTableResult(List fields,List rows){ + Builder builder = builder().resultKind(ResultKind.SUCCESS); + if(fields.size()>0) { + TableSchema.Builder tableSchemaBuild = TableSchema.builder(); + for (int i = 0; i < fields.size(); i++) { + tableSchemaBuild.field(fields.get(i).getName(),fields.get(i).getType()); + } + builder.tableSchema(tableSchemaBuild.build()).data(rows); + } + return builder.build(); + } + + public Optional getJobClient() { + return Optional.ofNullable(this.jobClient); + } + + public void await() throws InterruptedException, ExecutionException { + try { + this.awaitInternal(-1L, TimeUnit.MILLISECONDS); + } catch (TimeoutException var2) { + ; + } + + } + + public void await(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + this.awaitInternal(timeout, unit); + } + + private void awaitInternal(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + if (this.jobClient != null) { + ExecutorService executor = Executors.newFixedThreadPool(1, (r) -> { + return new Thread(r, "TableResult-await-thread"); + }); + + try { + CompletableFuture future = CompletableFuture.runAsync(() -> { + while(!this.data.isFirstRowReady()) { + try { + Thread.sleep(100L); + } catch (InterruptedException var2) { + throw new TableException("Thread is interrupted"); + } + } + + }, executor); + if (timeout >= 0L) { + future.get(timeout, unit); + } else { + future.get(); + } + } finally { + executor.shutdown(); + } + + } + } + + public TableSchema getTableSchema() { + return this.tableSchema; + } + + public ResultKind getResultKind() { + return this.resultKind; + } + + public CloseableIterator collect() { + return this.data; + } + + public void print() { + Iterator it = this.collect(); + if (this.printStyle instanceof TableauStyle) { + int maxColumnWidth = ((TableauStyle)this.printStyle).getMaxColumnWidth(); + String nullColumn = ((TableauStyle)this.printStyle).getNullColumn(); + boolean deriveColumnWidthByType = ((TableauStyle)this.printStyle).isDeriveColumnWidthByType(); + boolean printRowKind = ((TableauStyle)this.printStyle).isPrintRowKind(); + PrintUtils.printAsTableauForm(this.getTableSchema(), it, new PrintWriter(System.out), maxColumnWidth, nullColumn, deriveColumnWidthByType, printRowKind); + } else { + if (!(this.printStyle instanceof RawContentStyle)) { + throw new TableException("Unsupported print style: " + this.printStyle); + } + + while(it.hasNext()) { + System.out.println(String.join(",", PrintUtils.rowToString((Row)it.next()))); + } + } + + } + + public static Builder builder() { + return new Builder(); + } + + static { + TABLE_RESULT_OK = builder().resultKind(ResultKind.SUCCESS).tableSchema(TableSchema.builder().field("result", DataTypes.STRING()).build()).data(Collections.singletonList(Row.of(new Object[]{"OK"}))).build(); + } + + private static final class CloseableRowIteratorWrapper implements CloseableIterator { + private final CloseableIterator iterator; + private boolean isFirstRowReady; + + private CloseableRowIteratorWrapper(CloseableIterator iterator) { + this.isFirstRowReady = false; + this.iterator = iterator; + } + + public void close() throws Exception { + this.iterator.close(); + } + + public boolean hasNext() { + boolean hasNext = this.iterator.hasNext(); + this.isFirstRowReady = this.isFirstRowReady || hasNext; + return hasNext; + } + + public Row next() { + Row next = (Row)this.iterator.next(); + this.isFirstRowReady = true; + return next; + } + + public boolean isFirstRowReady() { + return this.isFirstRowReady || this.hasNext(); + } + } + + private static final class RawContentStyle implements PrintStyle { + private RawContentStyle() { + } + } + + private static final class TableauStyle implements PrintStyle { + private final boolean deriveColumnWidthByType; + private final int maxColumnWidth; + private final String nullColumn; + private final boolean printRowKind; + + private TableauStyle(int maxColumnWidth, String nullColumn, boolean deriveColumnWidthByType, boolean printRowKind) { + this.deriveColumnWidthByType = deriveColumnWidthByType; + this.maxColumnWidth = maxColumnWidth; + this.nullColumn = nullColumn; + this.printRowKind = printRowKind; + } + + public boolean isDeriveColumnWidthByType() { + return this.deriveColumnWidthByType; + } + + int getMaxColumnWidth() { + return this.maxColumnWidth; + } + + String getNullColumn() { + return this.nullColumn; + } + + public boolean isPrintRowKind() { + return this.printRowKind; + } + } + + public interface PrintStyle { + static PrintStyle tableau(int maxColumnWidth, String nullColumn, boolean deriveColumnWidthByType, boolean printRowKind) { + Preconditions.checkArgument(maxColumnWidth > 0, "maxColumnWidth should be greater than 0"); + Preconditions.checkNotNull(nullColumn, "nullColumn should not be null"); + return new TableauStyle(maxColumnWidth, nullColumn, deriveColumnWidthByType, printRowKind); + } + + static PrintStyle rawContent() { + return new RawContentStyle(); + } + } + + public static class Builder { + private JobClient jobClient; + private TableSchema tableSchema; + private ResultKind resultKind; + private CloseableIterator data; + private PrintStyle printStyle; + + private Builder() { + this.jobClient = null; + this.tableSchema = null; + this.resultKind = null; + this.data = null; + this.printStyle = PrintStyle.tableau(2147483647, "(NULL)", false, false); + } + + public Builder jobClient(JobClient jobClient) { + this.jobClient = jobClient; + return this; + } + + public Builder tableSchema(TableSchema tableSchema) { + Preconditions.checkNotNull(tableSchema, "tableSchema should not be null"); + this.tableSchema = tableSchema; + return this; + } + + public Builder resultKind(ResultKind resultKind) { + Preconditions.checkNotNull(resultKind, "resultKind should not be null"); + this.resultKind = resultKind; + return this; + } + + public Builder data(CloseableIterator rowIterator) { + Preconditions.checkNotNull(rowIterator, "rowIterator should not be null"); + this.data = rowIterator; + return this; + } + + public Builder data(List rowList) { + Preconditions.checkNotNull(rowList, "listRows should not be null"); + this.data = CloseableIterator.adapterForIterator(rowList.iterator()); + return this; + } + + public Builder setPrintStyle(PrintStyle printStyle) { + Preconditions.checkNotNull(printStyle, "printStyle should not be null"); + this.printStyle = printStyle; + return this; + } + + public TableResult build() { + return new CustomTableResultImpl(this.jobClient, this.tableSchema, this.resultKind, this.data, this.printStyle); + } + } +} diff --git a/dlink-core/src/main/java/com/dlink/executor/custom/SqlManager.java b/dlink-core/src/main/java/com/dlink/executor/custom/SqlManager.java new file mode 100644 index 0000000000..c08acb881a --- /dev/null +++ b/dlink-core/src/main/java/com/dlink/executor/custom/SqlManager.java @@ -0,0 +1,194 @@ +package com.dlink.executor.custom; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.ExpressionParserException; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.types.Row; +import org.apache.flink.util.StringUtils; + +import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static java.lang.String.format; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Flink Sql Fragment Manager + * @author wenmo + * @since 2021/5/25 + **/ +@Internal +public final class SqlManager { + + private Map sqlFragments; + static final String SHOW_FRAGMENTS = "SHOW FRAGMENTS"; + + public SqlManager() { + sqlFragments = new HashMap<>(); + } + + /** + * Get names of sql fragments loaded. + * + * @return a list of names of sql fragments loaded + */ + public List listSqlFragments() { + return new ArrayList<>(sqlFragments.keySet()); + } + + /** + * Registers a fragment of sql under the given name. The sql fragment name must be unique. + * + * @param sqlFragmentName name under which to register the given sql fragment + * @param sqlFragment a fragment of sql to register + * @throws CatalogException if the registration of the sql fragment under the given name failed. + * But at the moment, with CatalogException, not SqlException + */ + public void registerSqlFragment(String sqlFragmentName, String sqlFragment) { + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(sqlFragmentName), + "sql fragment name cannot be null or empty."); + checkNotNull(sqlFragment, "sql fragment cannot be null"); + + if (sqlFragments.containsKey(sqlFragmentName)) { + throw new CatalogException( + format("The fragment of sql %s already exists.", sqlFragmentName)); + } + + sqlFragments.put(sqlFragmentName, sqlFragment); + } + + /** + * Unregisters a fragment of sql under the given name. The sql fragment name must be existed. + * + * @param sqlFragmentName name under which to unregister the given sql fragment. + * @param ignoreIfNotExists If false exception will be thrown if the fragment of sql to be + * altered does not exist. + * @throws CatalogException if the unregistration of the sql fragment under the given name + * failed. But at the moment, with CatalogException, not SqlException + */ + public void unregisterSqlFragment(String sqlFragmentName, boolean ignoreIfNotExists) { + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(sqlFragmentName), + "sql fragmentName name cannot be null or empty."); + + if (sqlFragments.containsKey(sqlFragmentName)) { + sqlFragments.remove(sqlFragmentName); + } else if (!ignoreIfNotExists) { + throw new CatalogException( + format("The fragment of sql %s does not exist.", sqlFragmentName)); + } + } + + /** + * Get a fragment of sql under the given name. The sql fragment name must be existed. + * + * @param sqlFragmentName name under which to unregister the given sql fragment. + * @throws CatalogException if the unregistration of the sql fragment under the given name + * failed. But at the moment, with CatalogException, not SqlException + */ + public String getSqlFragment(String sqlFragmentName) { + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(sqlFragmentName), + "sql fragmentName name cannot be null or empty."); + + if (sqlFragments.containsKey(sqlFragmentName)) { + return sqlFragments.get(sqlFragmentName); + } else { + throw new CatalogException( + format("The fragment of sql %s does not exist.", sqlFragmentName)); + } + } + + /** + * Get a fragment of sql under the given name. The sql fragment name must be existed. + * + * @throws CatalogException if the unregistration of the sql fragment under the given name + * failed. But at the moment, with CatalogException, not SqlException + */ + public Map getSqlFragment() { + return sqlFragments; + } + + public TableResult getSqlFragments() { + List rows = new ArrayList<>(); + for (String key : sqlFragments.keySet()) { + rows.add(Row.of(key)); + } + return CustomTableResultImpl.buildTableResult(new ArrayList<>(Arrays.asList(new TableSchemaField("sql fragment name", DataTypes.STRING()))), rows); + } + + public Iterator getSqlFragmentsIterator() { + return sqlFragments.entrySet().iterator(); + } + + public Table getSqlFragmentsTable(CustomTableEnvironmentImpl environment) { + List keys = new ArrayList<>(); + for (String key : sqlFragments.keySet()) { + keys.add(key); + } + return environment.fromValues(keys); + } + + public boolean checkShowFragments(String sql){ + return SHOW_FRAGMENTS.equals(sql.trim().toUpperCase()); + } + /** + * Parse some variables under the given sql. + * + * @param statement A sql will be parsed. + * @throws ExpressionParserException if the name of the variable under the given sql failed. + */ + public String parseVariable(String statement) { + if (statement == null || "".equals(statement)) { + return statement; + } + String[] strs = statement.split(";"); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < strs.length; i++) { + String str = strs[i].trim(); + if (str.length() == 0) { + continue; + } + if (str.contains(":=")) { + String[] strs2 = str.split(":="); + if (strs2.length >= 2) { + if (strs2[0].length() == 0) { + throw new ExpressionParserException("Illegal variable name."); + } + String valueString = str.substring(str.indexOf(":=") + 2); + this.registerSqlFragment(strs2[0], replaceVariable(valueString)); + } else { + throw new ExpressionParserException("Illegal variable definition."); + } + } else { + sb.append(replaceVariable(str)); + } + } + return sb.toString(); + } + + /** + * Replace some variables under the given sql. + * + * @param statement A sql will be replaced. + */ + private String replaceVariable(String statement) { + String pattern = "\\$\\{(.+?)\\}"; + Pattern p = Pattern.compile(pattern); + Matcher m = p.matcher(statement); + StringBuffer sb = new StringBuffer(); + while (m.find()) { + String key = m.group(1); + String value = this.getSqlFragment(key); + m.appendReplacement(sb, value == null ? "" : value); + } + m.appendTail(sb); + return sb.toString(); + } +} diff --git a/dlink-core/src/main/java/com/dlink/executor/custom/TableSchemaField.java b/dlink-core/src/main/java/com/dlink/executor/custom/TableSchemaField.java new file mode 100644 index 0000000000..899a54441c --- /dev/null +++ b/dlink-core/src/main/java/com/dlink/executor/custom/TableSchemaField.java @@ -0,0 +1,33 @@ +package com.dlink.executor.custom; + +import org.apache.flink.table.types.DataType; + +/** + * @author wenmo + * @since 2021/5/11 14:04 + **/ +public class TableSchemaField { + private String name; + private DataType type; + + public TableSchemaField(String name, DataType type) { + this.name = name; + this.type = type; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public DataType getType() { + return type; + } + + public void setType(DataType type) { + this.type = type; + } +} diff --git a/dlink-core/src/main/java/com/dlink/job/JobManager.java b/dlink-core/src/main/java/com/dlink/job/JobManager.java new file mode 100644 index 0000000000..0c87248b03 --- /dev/null +++ b/dlink-core/src/main/java/com/dlink/job/JobManager.java @@ -0,0 +1,133 @@ +package com.dlink.job; + +import com.dlink.constant.FlinkConstant; +import com.dlink.constant.FlinkSQLConstant; +import com.dlink.executor.EnvironmentSetting; +import com.dlink.executor.Executor; +import com.dlink.executor.ExecutorSetting; +import com.dlink.result.*; +import com.dlink.session.ExecutorEntity; +import com.dlink.session.SessionPool; +import com.dlink.trans.Operations; +import org.apache.flink.api.common.JobID; +import org.apache.flink.table.api.TableResult; + +import java.time.LocalDate; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * JobManager + * + * @author wenmo + * @since 2021/5/25 15:27 + **/ +public class JobManager { + + private String flinkHost; + private Integer port; + private String sessionId; + private Integer maxRowNum = 100; + + public JobManager(String flinkHost, Integer port) { + this.flinkHost = flinkHost; + this.port = port; + } + + public JobManager(String flinkHost, Integer port, String sessionId, Integer maxRowNum) { + this.flinkHost = flinkHost; + this.sessionId = sessionId; + this.maxRowNum = maxRowNum; + this.port = port; + } + + public RunResult execute(String statement) { + RunResult runResult = new RunResult(sessionId, statement, flinkHost); + Executor executor = null; + ExecutorEntity executorEntity = SessionPool.get(sessionId); + if (executorEntity != null) { + executor = executorEntity.getExecutor(); + } else { + executor = Executor.build(new EnvironmentSetting(flinkHost, FlinkConstant.PORT), new ExecutorSetting(Executor.REMOTE)); + SessionPool.push(new ExecutorEntity(sessionId, executor)); + } + String[] Statements = statement.split(";"); + int currentIndex = 0; + //当前只支持对 show select的操作的结果的数据查询 后期需要可添加 + try { + for (String item : Statements) { + currentIndex++; + if (item.trim().isEmpty()) { + continue; + } + String operationType = Operations.getOperationType(item); + long start = System.currentTimeMillis(); + TableResult tableResult = executor.executeSql(item); + long finish = System.currentTimeMillis(); + long timeElapsed = finish - start; + IResult result = ResultBuilder.build(operationType, maxRowNum, "", false).getResult(tableResult); + runResult.setResult(result); + runResult.setTime(timeElapsed); + runResult.setFinishDate(LocalDate.now()); + } + } catch (Exception e) { + e.printStackTrace(); + StackTraceElement[] trace = e.getStackTrace(); + StringBuffer resMsg = new StringBuffer(""); + for (StackTraceElement s : trace) { + resMsg.append("
" + s + " "); + } + runResult.setError(LocalDate.now().toString() + ":" + "运行第" + currentIndex + "行sql时出现异常:" + e.getMessage() + "
>>>堆栈信息<<<" + resMsg.toString()); + return runResult; + } + return runResult; + } + + public SubmitResult submit(List sqlList, ExecutorSetting executerSetting) { + SubmitResult result = new SubmitResult(sessionId,sqlList,flinkHost); + Map map = new HashMap<>(); + int currentIndex = 0; + try { + if (sqlList != null && sqlList.size() > 0) { + Executor executor = Executor.build(new EnvironmentSetting(flinkHost, port), executerSetting); + for (String sqlText : sqlList) { + currentIndex++; + String operationType = Operations.getOperationType(sqlText); + if (operationType.equalsIgnoreCase(FlinkSQLConstant.INSERT)) { + + long start = System.currentTimeMillis(); + TableResult tableResult = executor.executeSql(sqlText); + long finish = System.currentTimeMillis(); + long timeElapsed = finish - start; + JobID jobID = tableResult.getJobClient().get().getJobID(); + result.setSuccess(true); + result.setTime(timeElapsed); + result.setFinishDate(LocalDate.now()); + InsertResult insertResult = new InsertResult(sqlText,(jobID == null ? "" : jobID.toHexString()),true,timeElapsed,LocalDate.now()); + result.setResult(insertResult); + } else { + executor.executeSql(sqlText); + } + } + } else { + result.setSuccess(false); + result.setMsg(LocalDate.now().toString()+":执行sql语句为空。"); + return result; + } + } catch (Exception e) { + e.printStackTrace(); + StackTraceElement[] trace = e.getStackTrace(); + StringBuilder resMsg = new StringBuilder(); + for (StackTraceElement s : trace) { + resMsg.append("
" + s + " "); + } + result.setError(LocalDate.now().toString() + ":" + "运行第" + currentIndex + "行sql时出现异常:" + e.getMessage() + "
>>>堆栈信息<<<" + resMsg.toString()); + return result; + + } + result.setSuccess(true); + result.setMsg(LocalDate.now().toString() + ":任务提交成功!"); + return result; + } +} diff --git a/dlink-core/src/main/java/com/dlink/result/AbstractBuilder.java b/dlink-core/src/main/java/com/dlink/result/AbstractBuilder.java new file mode 100644 index 0000000000..2b98b7d582 --- /dev/null +++ b/dlink-core/src/main/java/com/dlink/result/AbstractBuilder.java @@ -0,0 +1,16 @@ +package com.dlink.result; + +/** + * AbstractBuilder + * + * @author wenmo + * @since 2021/5/25 16:11 + **/ +public class AbstractBuilder { + + protected String operationType; + protected Integer maxRowNum; + protected boolean printRowKind; + protected String nullColumn; + +} diff --git a/dlink-core/src/main/java/com/dlink/result/IResult.java b/dlink-core/src/main/java/com/dlink/result/IResult.java new file mode 100644 index 0000000000..eca0e62b6a --- /dev/null +++ b/dlink-core/src/main/java/com/dlink/result/IResult.java @@ -0,0 +1,10 @@ +package com.dlink.result; + +/** + * IResult + * + * @author wenmo + * @since 2021/5/25 16:22 + **/ +public interface IResult { +} diff --git a/dlink-core/src/main/java/com/dlink/result/InsertResult.java b/dlink-core/src/main/java/com/dlink/result/InsertResult.java new file mode 100644 index 0000000000..f664982d8c --- /dev/null +++ b/dlink-core/src/main/java/com/dlink/result/InsertResult.java @@ -0,0 +1,65 @@ +package com.dlink.result; + +import java.time.LocalDate; + +/** + * InsertResult + * + * @author wenmo + * @since 2021/5/25 19:08 + **/ +public class InsertResult implements IResult { + private String statement; + private String jobID; + private boolean success; + private long time; + private LocalDate finishDate; + + public InsertResult(String statement, String jobID, boolean success, long time, LocalDate finishDate) { + this.statement = statement; + this.jobID = jobID; + this.success = success; + this.time = time; + this.finishDate = finishDate; + } + + public String getStatement() { + return statement; + } + + public void setStatement(String statement) { + this.statement = statement; + } + + public String getJobID() { + return jobID; + } + + public void setJobID(String jobID) { + this.jobID = jobID; + } + + public boolean isSuccess() { + return success; + } + + public void setSuccess(boolean success) { + this.success = success; + } + + public long getTime() { + return time; + } + + public void setTime(long time) { + this.time = time; + } + + public LocalDate getFinishDate() { + return finishDate; + } + + public void setFinishDate(LocalDate finishDate) { + this.finishDate = finishDate; + } +} diff --git a/dlink-core/src/main/java/com/dlink/result/JobSubmitResult.java b/dlink-core/src/main/java/com/dlink/result/JobSubmitResult.java new file mode 100644 index 0000000000..e09aa3f1a6 --- /dev/null +++ b/dlink-core/src/main/java/com/dlink/result/JobSubmitResult.java @@ -0,0 +1,10 @@ +package com.dlink.result; + +/** + * JobSubmitRecord + * + * @author wenmo + * @since 2021/5/25 15:32 + **/ +public class JobSubmitResult { +} diff --git a/dlink-core/src/main/java/com/dlink/result/ResultBuilder.java b/dlink-core/src/main/java/com/dlink/result/ResultBuilder.java new file mode 100644 index 0000000000..81bccfaea4 --- /dev/null +++ b/dlink-core/src/main/java/com/dlink/result/ResultBuilder.java @@ -0,0 +1,23 @@ +package com.dlink.result; + +import org.apache.flink.table.api.TableResult; + +/** + * ResultBuilder + * + * @author wenmo + * @since 2021/5/25 15:59 + **/ +public interface ResultBuilder { + + static ResultBuilder build(String operationType, Integer maxRowNum, String nullColumn, boolean printRowKind){ + switch (operationType.toUpperCase()){ + case SelectBuilder.OPERATION_TYPE: + return new SelectBuilder(operationType,maxRowNum,nullColumn,printRowKind); + default: + return new SelectBuilder(operationType,maxRowNum,nullColumn,printRowKind); + } + } + + IResult getResult(TableResult tableResult); +} diff --git a/dlink-core/src/main/java/com/dlink/result/RunResult.java b/dlink-core/src/main/java/com/dlink/result/RunResult.java new file mode 100644 index 0000000000..401420e151 --- /dev/null +++ b/dlink-core/src/main/java/com/dlink/result/RunResult.java @@ -0,0 +1,102 @@ +package com.dlink.result; + +import java.time.LocalDate; + +/** + * RunResult + * + * @author wenmo + * @since 2021/5/25 16:46 + **/ +public class RunResult { + private String sessionId; + private String statement; + private String flinkHost; + private boolean success; + private long time; + private LocalDate finishDate; + private String msg; + private String error; + private IResult result; + + public RunResult() { + } + + public RunResult(String sessionId, String statement, String flinkHost) { + this.sessionId = sessionId; + this.statement = statement; + this.flinkHost = flinkHost; + } + + public String getSessionId() { + return sessionId; + } + + public void setSessionId(String sessionId) { + this.sessionId = sessionId; + } + + public String getStatement() { + return statement; + } + + public void setStatement(String statement) { + this.statement = statement; + } + + public boolean isSuccess() { + return success; + } + + public void setSuccess(boolean success) { + this.success = success; + } + + public String getError() { + return error; + } + + public void setError(String error) { + this.error = error; + } + + public IResult getResult() { + return result; + } + + public void setResult(IResult result) { + this.result = result; + } + + public String getFlinkHost() { + return flinkHost; + } + + public void setFlinkHost(String flinkHost) { + this.flinkHost = flinkHost; + } + + public long getTime() { + return time; + } + + public void setTime(long time) { + this.time = time; + } + + public LocalDate getFinishDate() { + return finishDate; + } + + public void setFinishDate(LocalDate finishDate) { + this.finishDate = finishDate; + } + + public String getMsg() { + return msg; + } + + public void setMsg(String msg) { + this.msg = msg; + } +} diff --git a/dlink-core/src/main/java/com/dlink/result/SelectBuilder.java b/dlink-core/src/main/java/com/dlink/result/SelectBuilder.java new file mode 100644 index 0000000000..9a6b50af47 --- /dev/null +++ b/dlink-core/src/main/java/com/dlink/result/SelectBuilder.java @@ -0,0 +1,83 @@ +package com.dlink.result; + +import org.apache.flink.table.api.TableColumn; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.types.Row; +import org.apache.flink.util.StringUtils; + +import java.util.*; +import java.util.stream.Stream; + +/** + * SelectBuilder + * + * @author wenmo + * @since 2021/5/25 16:03 + **/ +public class SelectBuilder extends AbstractBuilder implements ResultBuilder { + + public static final String OPERATION_TYPE = "SELECT"; + + public SelectBuilder(String operationType, Integer maxRowNum,String nullColumn,boolean printRowKind) { + this.operationType = operationType; + this.maxRowNum = maxRowNum; + this.printRowKind = printRowKind; + this.nullColumn = nullColumn; + } + + @Override + public IResult getResult(TableResult tableResult) { + List columns = tableResult.getTableSchema().getTableColumns(); + int totalCount = 0; + Set column = new LinkedHashSet(); + String[] columnNames = (String[]) columns.stream().map(TableColumn::getName).map(s -> s.replace(" ","")).toArray((x$0) -> { + return (new String[x$0]); + }); + if (printRowKind) { + columnNames = (String[]) Stream.concat(Stream.of("op"), Arrays.stream(columnNames)).toArray((x$0) -> { + return new String[x$0]; + }); + } + long numRows; + List> rows = new ArrayList<>(); + Iterator it = tableResult.collect(); + for (numRows = 0L; it.hasNext() ; ++numRows) { + if (numRows < maxRowNum) { + String[] cols = rowToString((Row) it.next()); + Map row = new HashMap<>(); + for (int i = 0; i < cols.length; i++) { + if (i > columnNames.length) { + column.add("UKN" + i); + row.put("UKN" + i, cols[i]); + } else { + column.add(columnNames[i]); + row.put(columnNames[i], cols[i]); + } + } + rows.add(row); + }else { + it.next(); + } + totalCount++; + } + return new SelectResult(rows,totalCount,rows.size(),column); + } + + public String[] rowToString(Row row) { + int len = printRowKind ? row.getArity() + 1 : row.getArity(); + List fields = new ArrayList(len); + if (printRowKind) { + fields.add(row.getKind().shortString()); + } + for (int i = 0; i < row.getArity(); ++i) { + Object field = row.getField(i); + if (field == null) { + fields.add(nullColumn); + } else { + fields.add(StringUtils.arrayAwareToString(field)); + } + } + return (String[]) fields.toArray(new String[0]); + } + +} diff --git a/dlink-core/src/main/java/com/dlink/result/SelectResult.java b/dlink-core/src/main/java/com/dlink/result/SelectResult.java new file mode 100644 index 0000000000..94692e7743 --- /dev/null +++ b/dlink-core/src/main/java/com/dlink/result/SelectResult.java @@ -0,0 +1,25 @@ +package com.dlink.result; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * SelectResult + * + * @author wenmo + * @since 2021/5/25 16:01 + **/ +public class SelectResult implements IResult{ + private List> rowData; + private Integer total; + private Integer currentCount; + private Set columns; + + public SelectResult(List> rowData, Integer total, Integer currentCount, Set columns) { + this.rowData = rowData; + this.total = total; + this.currentCount = currentCount; + this.columns = columns; + } +} diff --git a/dlink-core/src/main/java/com/dlink/result/SqlExplainResult.java b/dlink-core/src/main/java/com/dlink/result/SqlExplainResult.java new file mode 100644 index 0000000000..07b8623727 --- /dev/null +++ b/dlink-core/src/main/java/com/dlink/result/SqlExplainResult.java @@ -0,0 +1,108 @@ +package com.dlink.result; + +import java.util.Date; + +/** + * 解释结果 + * + * @author wenmo + * @since 2021/5/25 11:41 + **/ +public class SqlExplainResult { + private Integer index; + private String type; + private String sql; + private String parse; + private String explain; + private String error; + private boolean parseTrue; + private boolean explainTrue; + private Date explainTime; + + public Integer getIndex() { + return index; + } + + public void setIndex(Integer index) { + this.index = index; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getSql() { + return sql; + } + + public void setSql(String sql) { + this.sql = sql; + } + + public String getParse() { + return parse; + } + + public void setParse(String parse) { + this.parse = parse; + } + + public String getExplain() { + return explain; + } + + public void setExplain(String explain) { + this.explain = explain; + } + + public String getError() { + return error; + } + + public void setError(String error) { + this.error = error; + } + + public boolean isParseTrue() { + return parseTrue; + } + + public void setParseTrue(boolean parseTrue) { + this.parseTrue = parseTrue; + } + + public boolean isExplainTrue() { + return explainTrue; + } + + public void setExplainTrue(boolean explainTrue) { + this.explainTrue = explainTrue; + } + + public Date getExplainTime() { + return explainTime; + } + + public void setExplainTime(Date explainTime) { + this.explainTime = explainTime; + } + + @Override + public String toString() { + return "SqlExplainRecord{" + + "index=" + index + + ", type='" + type + '\'' + + ", sql='" + sql + '\'' + + ", parse='" + parse + '\'' + + ", explain='" + explain + '\'' + + ", error='" + error + '\'' + + ", parseTrue=" + parseTrue + + ", explainTrue=" + explainTrue + + ", explainTime=" + explainTime + + '}'; + } +} diff --git a/dlink-core/src/main/java/com/dlink/result/SubmitResult.java b/dlink-core/src/main/java/com/dlink/result/SubmitResult.java new file mode 100644 index 0000000000..4d6941377a --- /dev/null +++ b/dlink-core/src/main/java/com/dlink/result/SubmitResult.java @@ -0,0 +1,103 @@ +package com.dlink.result; + +import java.time.LocalDate; +import java.util.List; + +/** + * SubmitResult + * + * @author wenmo + * @since 2021/5/25 19:04 + **/ +public class SubmitResult { + private String sessionId; + private List statements; + private String flinkHost; + private boolean success; + private long time; + private LocalDate finishDate; + private String msg; + private String error; + private IResult result; + + public SubmitResult() { + } + + public SubmitResult(String sessionId, List statements, String flinkHost) { + this.sessionId = sessionId; + this.statements = statements; + this.flinkHost = flinkHost; + } + + public String getSessionId() { + return sessionId; + } + + public void setSessionId(String sessionId) { + this.sessionId = sessionId; + } + + public List getStatements() { + return statements; + } + + public void setStatements(List statements) { + this.statements = statements; + } + + public String getFlinkHost() { + return flinkHost; + } + + public void setFlinkHost(String flinkHost) { + this.flinkHost = flinkHost; + } + + public boolean isSuccess() { + return success; + } + + public void setSuccess(boolean success) { + this.success = success; + } + + public long getTime() { + return time; + } + + public void setTime(long time) { + this.time = time; + } + + public LocalDate getFinishDate() { + return finishDate; + } + + public void setFinishDate(LocalDate finishDate) { + this.finishDate = finishDate; + } + + public String getMsg() { + return msg; + } + + public void setMsg(String msg) { + this.msg = msg; + } + + public String getError() { + return error; + } + + public void setError(String error) { + this.error = error; + } + + public IResult getResult() { + return result; + } + + public void setResult(IResult result) { + this.result = result; + } +} diff --git a/dlink-core/src/main/java/com/dlink/session/ExecutorEntity.java b/dlink-core/src/main/java/com/dlink/session/ExecutorEntity.java new file mode 100644 index 0000000000..bfa951de69 --- /dev/null +++ b/dlink-core/src/main/java/com/dlink/session/ExecutorEntity.java @@ -0,0 +1,35 @@ +package com.dlink.session; + +import com.dlink.executor.Executor; + +/** + * FlinkEntity + * + * @author wenmo + * @since 2021/5/25 14:45 + **/ +public class ExecutorEntity { + private String sessionId; + private Executor executor; + + public ExecutorEntity(String sessionId, Executor executor) { + this.sessionId = sessionId; + this.executor = executor; + } + + public String getSessionId() { + return sessionId; + } + + public void setSessionId(String sessionId) { + this.sessionId = sessionId; + } + + public Executor getExecutor() { + return executor; + } + + public void setExecutor(Executor executor) { + this.executor = executor; + } +} diff --git a/dlink-core/src/main/java/com/dlink/session/SessionPool.java b/dlink-core/src/main/java/com/dlink/session/SessionPool.java new file mode 100644 index 0000000000..0c164755b3 --- /dev/null +++ b/dlink-core/src/main/java/com/dlink/session/SessionPool.java @@ -0,0 +1,47 @@ +package com.dlink.session; + +import com.dlink.constant.FlinkConstant; + +import java.util.List; +import java.util.Vector; + +/** + * SessionPool + * + * @author wenmo + * @since 2021/5/25 14:32 + **/ +public class SessionPool { + + private static volatile List executorList = new Vector<>(FlinkConstant.DEFAULT_SESSION_COUNT); + + public static Integer push(ExecutorEntity executorEntity){ + if (executorList.size() >= FlinkConstant.DEFAULT_SESSION_COUNT * FlinkConstant.DEFAULT_FACTOR) { + executorList.remove(0); + }else if(executorList.size() >= FlinkConstant.DEFAULT_SESSION_COUNT){ + executorList.clear(); + } + executorList.add(executorEntity); + return executorList.size(); + } + + public static Integer remove(String sessionId) { + int count = executorList.size(); + for (int i = 0; i < executorList.size(); i++) { + if (sessionId.equals(executorList.get(i).getSessionId())) { + executorList.remove(i); + break; + } + } + return count - executorList.size(); + } + + public static ExecutorEntity get(String sessionId) { + for (ExecutorEntity executorEntity : executorList) { + if (executorEntity.getSessionId().equals(sessionId)) { + return executorEntity; + } + } + return null; + } +} diff --git a/dlink-core/src/main/java/com/dlink/trans/Operations.java b/dlink-core/src/main/java/com/dlink/trans/Operations.java new file mode 100644 index 0000000000..eb2e96764b --- /dev/null +++ b/dlink-core/src/main/java/com/dlink/trans/Operations.java @@ -0,0 +1,40 @@ +package com.dlink.trans; + +import com.dlink.constant.FlinkSQLConstant; + +/** + * SqlUtil + * + * @author wenmo + * @since 2021/5/25 15:50 + **/ +public class Operations { + /** + * 获取操作类型 + * + * @param sql + * @return + */ + public static String getOperationType(String sql) { + String sqlTrim = sql.replaceAll("[\\s\\t\\n\\r]", "").toUpperCase(); + if (sqlTrim.startsWith(FlinkSQLConstant.CREATE)) { + return FlinkSQLConstant.CREATE; + } + if (sqlTrim.startsWith(FlinkSQLConstant.ALTER)) { + return FlinkSQLConstant.ALTER; + } + if (sqlTrim.startsWith(FlinkSQLConstant.INSERT)) { + return FlinkSQLConstant.INSERT; + } + if (sqlTrim.startsWith(FlinkSQLConstant.DROP)) { + return FlinkSQLConstant.INSERT; + } + if (sqlTrim.startsWith(FlinkSQLConstant.SELECT)) { + return FlinkSQLConstant.SELECT; + } + if (sqlTrim.startsWith(FlinkSQLConstant.SHOW)) { + return FlinkSQLConstant.SHOW; + } + return FlinkSQLConstant.UNKNOWN_TYPE; + } +} diff --git a/dlink-core/src/main/java/com/dlink/ud/udf/GetKey.java b/dlink-core/src/main/java/com/dlink/ud/udf/GetKey.java new file mode 100644 index 0000000000..b832863dfe --- /dev/null +++ b/dlink-core/src/main/java/com/dlink/ud/udf/GetKey.java @@ -0,0 +1,28 @@ +package com.dlink.ud.udf; + +import org.apache.flink.table.functions.ScalarFunction; + +/** + * GetKey + * + * @author wenmo + * @since 2021/5/25 15:50 + **/ +public class GetKey extends ScalarFunction { + + public String eval(String map, String key, String defaultValue) { + if (map == null || !map.contains(key)) { + return defaultValue; + } + String[] maps = map.replaceAll("\\{", "").replaceAll("\\}", "").split(","); + for (int i = 0; i < maps.length; i++) { + String[] items = maps[i].split("="); + if (items.length >= 2) { + if (key.equals(items[0].trim())) { + return items[1]; + } + } + } + return defaultValue; + } +} \ No newline at end of file diff --git a/dlink-core/src/main/java/com/dlink/ud/udtaf/RowsToMap.java b/dlink-core/src/main/java/com/dlink/ud/udtaf/RowsToMap.java new file mode 100644 index 0000000000..952ec5881f --- /dev/null +++ b/dlink-core/src/main/java/com/dlink/ud/udtaf/RowsToMap.java @@ -0,0 +1,51 @@ +package com.dlink.ud.udtaf; + + +import org.apache.flink.table.functions.TableAggregateFunction; +import org.apache.flink.util.Collector; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** + * RowsToMap + * + * @author wenmo + * @since 2021/5/25 15:50 + **/ +public class RowsToMap extends TableAggregateFunction { + @Override + public Map createAccumulator() { + return new HashMap(); + } + + + public void accumulate(Map acc, String cls, Object v, String key) { + String[] keys = key.split(","); + for (int i = 0; i < keys.length; i++) { + if (keys[i].equals(cls)) { + acc.put(cls, v); + } + } + } + + public void accumulate(Map acc, String cls, Object v) { + acc.put(cls, v); + } + + public void merge(Map acc, Iterable iterable) { + for (Map otherAcc : iterable) { + Iterator iter = otherAcc.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry entry = (Map.Entry) iter.next(); + accumulate(acc, entry.getKey().toString(), entry.getValue()); + } + } + } + + public void emitValue(Map acc, Collector out) { + out.collect(acc.toString()); + + } +} \ No newline at end of file diff --git a/dlink-core/src/main/java/com/dlink/utils/DateFormatUtil.java b/dlink-core/src/main/java/com/dlink/utils/DateFormatUtil.java new file mode 100644 index 0000000000..30620a780d --- /dev/null +++ b/dlink-core/src/main/java/com/dlink/utils/DateFormatUtil.java @@ -0,0 +1,37 @@ +package com.dlink.utils; + +import com.dlink.constant.FlinkHistoryConstant; + +import java.util.Date; +import java.util.TimeZone; + +public class DateFormatUtil { + /** + * 获取一个日期的0:00:00 时间戳 日期必须大于00:00:00否则返回上一天 + * + * @param date + * @return + */ + public static long getZeroTimeStamp(Date date) { + return getZeroTimeStamp(date.getTime()); + } + + public static long getZeroTimeStamp(Long timestamp) { + timestamp += TimeZone.getDefault().getRawOffset(); + return timestamp / FlinkHistoryConstant.ONE_DAY * FlinkHistoryConstant.ONE_DAY - TimeZone.getDefault().getRawOffset(); + } + + /** + * 获取指定时间 当天的最后一秒 23:59:59 日期必须大于00:00:00 否则返回上一天 + * @param date + * @return + */ + public static long getLastTimeStampOfOneday(Date date) { + return getLastTimeStampOfOneday(date.getTime()); + } + + public static long getLastTimeStampOfOneday(Long timestamp) { + timestamp += TimeZone.getDefault().getRawOffset(); + return ( timestamp / FlinkHistoryConstant.ONE_DAY * FlinkHistoryConstant.ONE_DAY + FlinkHistoryConstant.ONE_DAY - 100)- TimeZone.getDefault().getRawOffset(); + } +} diff --git a/dlink-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java b/dlink-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java new file mode 100644 index 0000000000..ad9c936b99 --- /dev/null +++ b/dlink-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; + +import java.util.*; + +/** Helper class for generating a JSON representation from a {@link StreamGraph}. */ +@Internal +public class JSONGenerator { + + public static final String STEPS = "step_function"; + public static final String ID = "id"; + public static final String SIDE = "side"; + public static final String SHIP_STRATEGY = "ship_strategy"; + public static final String PREDECESSORS = "predecessors"; + public static final String TYPE = "type"; + public static final String PACT = "pact"; + public static final String CONTENTS = "contents"; + public static final String PARALLELISM = "parallelism"; + + private StreamGraph streamGraph; + private final ObjectMapper mapper = new ObjectMapper(); + + public JSONGenerator(StreamGraph streamGraph) { + this.streamGraph = streamGraph; + } + + public String getJSON() { + return getJSONNode().toPrettyString(); + } + + public ObjectNode getJSONNode() { + ObjectNode json = mapper.createObjectNode(); + ArrayNode nodes = mapper.createArrayNode(); + json.put("nodes", nodes); + + List operatorIDs = new ArrayList<>(streamGraph.getVertexIDs()); + Comparator operatorIDComparator = + Comparator.comparingInt( + (Integer id) -> streamGraph.getSinkIDs().contains(id) ? 1 : 0) + .thenComparingInt(id -> id); + operatorIDs.sort(operatorIDComparator); + + visit(nodes, operatorIDs, new HashMap<>()); + + return json; + } + + private void visit( + ArrayNode jsonArray, List toVisit, Map edgeRemapings) { + + Integer vertexID = toVisit.get(0); + StreamNode vertex = streamGraph.getStreamNode(vertexID); + + if (streamGraph.getSourceIDs().contains(vertexID) + || Collections.disjoint(vertex.getInEdges(), toVisit)) { + + ObjectNode node = mapper.createObjectNode(); + decorateNode(vertexID, node); + + if (!streamGraph.getSourceIDs().contains(vertexID)) { + ArrayNode inputs = mapper.createArrayNode(); + node.put(PREDECESSORS, inputs); + + for (StreamEdge inEdge : vertex.getInEdges()) { + int inputID = inEdge.getSourceId(); + + Integer mappedID = + (edgeRemapings.keySet().contains(inputID)) + ? edgeRemapings.get(inputID) + : inputID; + decorateEdge(inputs, inEdge, mappedID); + } + } + jsonArray.add(node); + toVisit.remove(vertexID); + } else { + Integer iterationHead = -1; + for (StreamEdge inEdge : vertex.getInEdges()) { + int operator = inEdge.getSourceId(); + + if (streamGraph.vertexIDtoLoopTimeout.containsKey(operator)) { + iterationHead = operator; + } + } + + ObjectNode obj = mapper.createObjectNode(); + ArrayNode iterationSteps = mapper.createArrayNode(); + obj.put(STEPS, iterationSteps); + obj.put(ID, iterationHead); + obj.put(PACT, "IterativeDataStream"); + obj.put(PARALLELISM, streamGraph.getStreamNode(iterationHead).getParallelism()); + obj.put(CONTENTS, "Stream Iteration"); + ArrayNode iterationInputs = mapper.createArrayNode(); + obj.put(PREDECESSORS, iterationInputs); + toVisit.remove(iterationHead); + visitIteration(iterationSteps, toVisit, iterationHead, edgeRemapings, iterationInputs); + jsonArray.add(obj); + } + + if (!toVisit.isEmpty()) { + visit(jsonArray, toVisit, edgeRemapings); + } + } + + private void visitIteration( + ArrayNode jsonArray, + List toVisit, + int headId, + Map edgeRemapings, + ArrayNode iterationInEdges) { + + Integer vertexID = toVisit.get(0); + StreamNode vertex = streamGraph.getStreamNode(vertexID); + toVisit.remove(vertexID); + + // Ignoring head and tail to avoid redundancy + if (!streamGraph.vertexIDtoLoopTimeout.containsKey(vertexID)) { + ObjectNode obj = mapper.createObjectNode(); + jsonArray.add(obj); + decorateNode(vertexID, obj); + ArrayNode inEdges = mapper.createArrayNode(); + obj.put(PREDECESSORS, inEdges); + + for (StreamEdge inEdge : vertex.getInEdges()) { + int inputID = inEdge.getSourceId(); + + if (edgeRemapings.keySet().contains(inputID)) { + decorateEdge(inEdges, inEdge, inputID); + } else if (!streamGraph.vertexIDtoLoopTimeout.containsKey(inputID)) { + decorateEdge(iterationInEdges, inEdge, inputID); + } + } + + edgeRemapings.put(vertexID, headId); + visitIteration(jsonArray, toVisit, headId, edgeRemapings, iterationInEdges); + } + } + + private void decorateEdge(ArrayNode inputArray, StreamEdge inEdge, int mappedInputID) { + ObjectNode input = mapper.createObjectNode(); + inputArray.add(input); + input.put(ID, mappedInputID); + input.put(SHIP_STRATEGY, inEdge.getPartitioner().toString()); + input.put(SIDE, (inputArray.size() == 0) ? "first" : "second"); + } + + private void decorateNode(Integer vertexID, ObjectNode node) { + + StreamNode vertex = streamGraph.getStreamNode(vertexID); + + node.put(ID, vertexID); + node.put(TYPE, vertex.getOperatorName()); + + if (streamGraph.getSourceIDs().contains(vertexID)) { + node.put(PACT, "Data Source"); + } else if (streamGraph.getSinkIDs().contains(vertexID)) { + node.put(PACT, "Data Sink"); + } else { + node.put(PACT, "Operator"); + } + + node.put(CONTENTS, vertex.getOperatorName()); + + node.put(PARALLELISM, streamGraph.getStreamNode(vertexID).getParallelism()); + } +} diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000000..9fa419c716 --- /dev/null +++ b/pom.xml @@ -0,0 +1,95 @@ + + + 4.0.0 + + com.dlink + dlink + pom + 0.1-SNAPSHOT + + dlink-core + dlink-admin + + + + 1.8 + 1.8 + UTF-8 + UTF-8 + 2.3.8.RELEASE + 5.1.4 + 1.1.22 + 3.4.0 + 1.18.16 + 2.11.4 + 21.0 + + + + + + + cn.hutool + hutool-all + ${hutool.version} + + + + + + com.alibaba + druid-spring-boot-starter + ${druid-starter} + + + + com.baomidou + mybatis-plus-boot-starter + ${mybatis-plus-boot-starter.version} + + + org.springframework.boot + spring-boot-dependencies + ${spring-boot-dependencies.version} + pom + import + + + org.projectlombok + lombok + ${lombok.version} + + + com.fasterxml.jackson.core + jackson-annotations + ${jackjson.version} + + + com.fasterxml.jackson.core + jackson-databind + ${jackjson.version} + + + com.google.guava + guava + ${guava.version} + + + + \ No newline at end of file