Skip to content

Commit

Permalink
[Feature-728][admin,web] Add meta store feature (DataLinkDC#729)
Browse files Browse the repository at this point in the history
Co-authored-by: wenmo <32723967+wenmo@users.noreply.github.com>
  • Loading branch information
aiwenmo and aiwenmo authored Jul 17, 2022
1 parent 92f97a6 commit 56a8db9
Show file tree
Hide file tree
Showing 16 changed files with 659 additions and 93 deletions.
Original file line number Diff line number Diff line change
@@ -1,25 +1,31 @@
package com.dlink.controller;

import com.dlink.assertion.Asserts;
import java.util.ArrayList;
import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import com.dlink.common.result.Result;
import com.dlink.dto.SessionDTO;
import com.dlink.dto.StudioCADTO;
import com.dlink.dto.StudioDDLDTO;
import com.dlink.dto.StudioExecuteDTO;
import com.dlink.explainer.lineage.LineageResult;
import com.dlink.dto.StudioMetaStoreDTO;
import com.dlink.job.JobResult;
import com.dlink.result.IResult;
import com.dlink.service.StudioService;
import com.fasterxml.jackson.databind.JsonNode;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.util.ArrayList;
import java.util.List;

/**
* StudioController
*
Expand Down Expand Up @@ -161,4 +167,20 @@ public Result savepoint(@RequestParam Integer clusterId, @RequestParam String jo
@RequestParam String savePointType, @RequestParam String name, @RequestParam Integer taskId) {
return Result.succeed(studioService.savepoint(taskId, clusterId, jobId, savePointType, name), "savepoint 成功");
}

/**
* 获取 Meta Store Catalog 和 Database
*/
@PostMapping("/getMSCatalogs")
public Result getMSCatalogs(@RequestBody StudioMetaStoreDTO studioMetaStoreDTO) {
return Result.succeed(studioService.getMSCatalogs(studioMetaStoreDTO), "获取成功");
}

/**
* 获取 Meta Store Table
*/
@PostMapping("/getMSTables")
public Result getMSTables(@RequestBody StudioMetaStoreDTO studioMetaStoreDTO) {
return Result.succeed(studioService.getMSTables(studioMetaStoreDTO), "获取成功");
}
}
34 changes: 34 additions & 0 deletions dlink-admin/src/main/java/com/dlink/dto/StudioMetaStoreDTO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.dlink.dto;

import lombok.Getter;

import java.util.HashMap;
import java.util.Map;

import com.dlink.assertion.Asserts;
import com.dlink.gateway.GatewayType;
import com.dlink.job.JobConfig;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;

/**
* StudioMetaStoreDTO
*
* @author wenmo
* @since 2022/7/16 23:18
*/
@Getter
public class StudioMetaStoreDTO extends AbstractStatementDTO {
private String catalog;
private String database;
private String dialect;
private Integer databaseId;

public JobConfig getJobConfig() {
return new JobConfig(
GatewayType.LOCAL.getLongValue(), true, false, false, false,
null, null, null, null, null,
null, isFragment(), false, false, 0,
null, null, null, null, null);
}
}
17 changes: 14 additions & 3 deletions dlink-admin/src/main/java/com/dlink/service/StudioService.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
package com.dlink.service;

import com.dlink.dto.*;
import java.util.List;

import com.dlink.dto.SessionDTO;
import com.dlink.dto.SqlDTO;
import com.dlink.dto.StudioCADTO;
import com.dlink.dto.StudioDDLDTO;
import com.dlink.dto.StudioExecuteDTO;
import com.dlink.dto.StudioMetaStoreDTO;
import com.dlink.explainer.lineage.LineageResult;
import com.dlink.job.JobResult;
import com.dlink.model.Catalog;
import com.dlink.model.Table;
import com.dlink.result.IResult;
import com.dlink.result.SelectResult;
import com.dlink.result.SqlExplainResult;
import com.dlink.session.SessionInfo;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

import java.util.List;

/**
* StudioService
*
Expand Down Expand Up @@ -47,4 +54,8 @@ public interface StudioService {
boolean cancel(Integer clusterId, String jobId);

boolean savepoint(Integer taskId, Integer clusterId, String jobId, String savePointType, String name);

List<Catalog> getMSCatalogs(StudioMetaStoreDTO studioMetaStoreDTO);

List<Table> getMSTables(StudioMetaStoreDTO studioMetaStoreDTO);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

Expand All @@ -19,6 +20,7 @@
import com.dlink.dto.StudioCADTO;
import com.dlink.dto.StudioDDLDTO;
import com.dlink.dto.StudioExecuteDTO;
import com.dlink.dto.StudioMetaStoreDTO;
import com.dlink.explainer.lineage.LineageBuilder;
import com.dlink.explainer.lineage.LineageResult;
import com.dlink.gateway.GatewayType;
Expand All @@ -29,10 +31,14 @@
import com.dlink.job.JobResult;
import com.dlink.metadata.driver.Driver;
import com.dlink.metadata.result.JdbcSelectResult;
import com.dlink.model.Catalog;
import com.dlink.model.Cluster;
import com.dlink.model.DataBase;
import com.dlink.model.Savepoints;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.model.Task;
import com.dlink.result.DDLResult;
import com.dlink.result.IResult;
import com.dlink.result.SelectResult;
import com.dlink.result.SqlExplainResult;
Expand Down Expand Up @@ -115,6 +121,15 @@ private JobResult executeFlinkSql(StudioExecuteDTO studioExecuteDTO) {
return jobResult;
}

private IResult executeMSFlinkSql(StudioMetaStoreDTO studioMetaStoreDTO) {
addFlinkSQLEnv(studioMetaStoreDTO);
JobConfig config = studioMetaStoreDTO.getJobConfig();
JobManager jobManager = JobManager.build(config);
IResult jobResult = jobManager.executeDDL(studioMetaStoreDTO.getStatement());
RunTimeUtil.recovery(jobManager);
return jobResult;
}

public JobResult executeCommonSql(SqlDTO sqlDTO) {
JobResult result = new JobResult();
result.setStatement(sqlDTO.getStatement());
Expand Down Expand Up @@ -348,6 +363,84 @@ public boolean savepoint(Integer taskId, Integer clusterId, String jobId, String
return false;
}

@Override
public List<Catalog> getMSCatalogs(StudioMetaStoreDTO studioMetaStoreDTO) {
List<Catalog> catalogs = new ArrayList<>();
if (Dialect.isSql(studioMetaStoreDTO.getDialect())) {
DataBase dataBase = dataBaseService.getById(studioMetaStoreDTO.getDatabaseId());
if (!Asserts.isNull(dataBase)) {
Catalog defaultCatalog = Catalog.build("default_catalog");
Driver driver = Driver.build(dataBase.getDriverConfig());
defaultCatalog.setSchemas(driver.listSchemas());
catalogs.add(defaultCatalog);
}
} else {
studioMetaStoreDTO.setStatement("SHOW CATALOGS");
IResult result = executeMSFlinkSql(studioMetaStoreDTO);
if (result instanceof DDLResult) {
DDLResult ddlResult = (DDLResult) result;
Iterator<String> iterator = ddlResult.getColumns().iterator();
if (iterator.hasNext()) {
String key = iterator.next();
List<Map<String, Object>> rowData = ddlResult.getRowData();
for (Map<String, Object> item : rowData) {
catalogs.add(Catalog.build(item.get(key).toString()));
}
}
for (Catalog catalog : catalogs) {
String statement = "USE CATALOG " + catalog.getName() + ";\r\nSHOW DATABASES";
studioMetaStoreDTO.setStatement(statement);
IResult tableResult = executeMSFlinkSql(studioMetaStoreDTO);
if (result instanceof DDLResult) {
DDLResult tableDDLResult = (DDLResult) tableResult;
Iterator<String> tableIterator = tableDDLResult.getColumns().iterator();
if (tableIterator.hasNext()) {
String key = tableIterator.next();
List<Map<String, Object>> rowData = tableDDLResult.getRowData();
List<Schema> schemas = new ArrayList<>();
for (Map<String, Object> item : rowData) {
schemas.add(Schema.build(item.get(key).toString()));
}
catalog.setSchemas(schemas);
}
}
}
}
}
return catalogs;
}

@Override
public List<Table> getMSTables(StudioMetaStoreDTO studioMetaStoreDTO) {
List<Table> tables = new ArrayList<>();
if (Dialect.isSql(studioMetaStoreDTO.getDialect())) {
DataBase dataBase = dataBaseService.getById(studioMetaStoreDTO.getDatabaseId());
if (Asserts.isNotNull(dataBase)) {
Driver driver = Driver.build(dataBase.getDriverConfig());
tables.addAll(driver.listTables(studioMetaStoreDTO.getDatabase()));
}
} else {
String statement = "USE CATALOG " + studioMetaStoreDTO.getCatalog() + ";\r\n" +
"USE " + studioMetaStoreDTO.getDatabase() + ";\r\nSHOW TABLES";
studioMetaStoreDTO.setStatement(statement);
IResult result = executeMSFlinkSql(studioMetaStoreDTO);
if (result instanceof DDLResult) {
DDLResult ddlResult = (DDLResult) result;
Iterator<String> iterator = ddlResult.getColumns().iterator();
if (iterator.hasNext()) {
String key = iterator.next();
List<Map<String, Object>> rowData = ddlResult.getRowData();
for (Map<String, Object> item : rowData) {
Table table = Table.build(item.get(key).toString(), studioMetaStoreDTO.getDatabase());
table.setCatalog(studioMetaStoreDTO.getCatalog());
tables.add(table);
}
}
}
}
return tables;
}

private void initUDF(JobConfig config, String statement) {
if (!GatewayType.LOCAL.equalsValue(config.getType())) {
return;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,27 @@
package com.dlink.service.impl;

import cn.hutool.core.bean.BeanUtil;
import org.apache.commons.lang3.StringUtils;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import javax.annotation.Resource;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.dlink.alert.Alert;
Expand Down Expand Up @@ -70,25 +91,8 @@
import com.dlink.utils.CustomStringJavaCompiler;
import com.dlink.utils.JSONUtil;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import cn.hutool.core.bean.BeanUtil;

/**
* 任务 服务实现类
Expand Down Expand Up @@ -185,9 +189,9 @@ public JobResult restartTask(Integer id, String savePointPath) {
return executeCommonSql(SqlDTO.build(task.getStatement(),
task.getDatabaseId(), null));
}
if (StringUtils.isBlank(savePointPath)){
if (StringUtils.isBlank(savePointPath)) {
task.setSavePointStrategy(SavePointStrategy.LATEST.getValue());
}else {
} else {
task.setSavePointStrategy(SavePointStrategy.CUSTOM.getValue());
task.setSavePointPath(savePointPath);
updateById(task);
Expand All @@ -202,7 +206,6 @@ public JobResult restartTask(Integer id, String savePointPath) {
}



private JobResult executeCommonSql(SqlDTO sqlDTO) {
JobResult result = new JobResult();
result.setStatement(sqlDTO.getStatement());
Expand Down Expand Up @@ -359,7 +362,7 @@ public Task initDefaultFlinkSQLEnv() {
defaultFlinkSQLEnvTask.setAlias("DefaultCatalog");
defaultFlinkSQLEnvTask.setDialect(Dialect.FLINKSQLENV.getValue());
StringBuilder sb = new StringBuilder();
sb.append("create catalog myCatalog with(\n");
sb.append("create catalog my_catalog with(\n");
sb.append(" 'type' = 'dlink_mysql',\n");
sb.append(" 'username' = '");
sb.append(username);
Expand All @@ -372,7 +375,7 @@ public Task initDefaultFlinkSQLEnv() {
sb.append("'\n");
sb.append(")");
sb.append(separator);
sb.append("use catalog myCatalog");
sb.append("use catalog my_catalog");
sb.append(separator);
defaultFlinkSQLEnvTask.setStatement(sb.toString());
defaultFlinkSQLEnvTask.setFragment(true);
Expand Down Expand Up @@ -545,11 +548,11 @@ public Result reOnLineTask(Integer id, String savePointPath) {
if (Asserts.isNotNull(task.getJobInstanceId()) && task.getJobInstanceId() != 0) {
savepointJobInstance(task.getJobInstanceId(), SavePointType.CANCEL.getValue());
}
if (StringUtils.isNotBlank(savePointPath)){
if (StringUtils.isNotBlank(savePointPath)) {
task.setSavePointStrategy(SavePointStrategy.CUSTOM.getValue());
task.setSavePointPath(savePointPath);
}
final JobResult jobResult = submitTaskToOnline(task, id);
final JobResult jobResult = submitTaskToOnline(task, id);
if (Job.JobStatus.SUCCESS == jobResult.getStatus()) {
task.setStep(JobLifeCycle.ONLINE.getValue());
task.setJobInstanceId(jobResult.getJobInstanceId());
Expand Down
Loading

0 comments on commit 56a8db9

Please sign in to comment.