Skip to content

Commit

Permalink
[Feature-733][admin,web,client] Add Flink meta store info and column …
Browse files Browse the repository at this point in the history
…details (DataLinkDC#734)

Co-authored-by: wenmo <32723967+wenmo@users.noreply.github.com>
  • Loading branch information
aiwenmo and aiwenmo authored Jul 18, 2022
1 parent 56a8db9 commit e8a87da
Show file tree
Hide file tree
Showing 16 changed files with 505 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,23 @@ public Result getMSCatalogs(@RequestBody StudioMetaStoreDTO studioMetaStoreDTO)
}

/**
* 获取 Meta Store Table
* 获取 Meta Store Schema/Database 信息
*/
@PostMapping("/getMSTables")
public Result getMSTables(@RequestBody StudioMetaStoreDTO studioMetaStoreDTO) {
return Result.succeed(studioService.getMSTables(studioMetaStoreDTO), "获取成功");
@PostMapping("/getMSSchemaInfo")
public Result getMSSchemaInfo(@RequestBody StudioMetaStoreDTO studioMetaStoreDTO) {
return Result.succeed(studioService.getMSSchemaInfo(studioMetaStoreDTO), "获取成功");
}

/**
* 获取 Meta Store Flink Column 信息
*/
@GetMapping("/getMSFlinkColumns")
public Result getMSFlinkColumns(@RequestParam Integer envId, @RequestParam String catalog, @RequestParam String database, @RequestParam String table) {
StudioMetaStoreDTO studioMetaStoreDTO = new StudioMetaStoreDTO();
studioMetaStoreDTO.setEnvId(envId);
studioMetaStoreDTO.setCatalog(catalog);
studioMetaStoreDTO.setDatabase(database);
studioMetaStoreDTO.setTable(table);
return Result.succeed(studioService.getMSFlinkColumns(studioMetaStoreDTO), "获取成功");
}
}
13 changes: 5 additions & 8 deletions dlink-admin/src/main/java/com/dlink/dto/StudioMetaStoreDTO.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
package com.dlink.dto;

import lombok.Getter;

import java.util.HashMap;
import java.util.Map;

import com.dlink.assertion.Asserts;
import com.dlink.gateway.GatewayType;
import com.dlink.job.JobConfig;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;

import lombok.Getter;
import lombok.Setter;

/**
* StudioMetaStoreDTO
Expand All @@ -18,9 +13,11 @@
* @since 2022/7/16 23:18
*/
@Getter
@Setter
public class StudioMetaStoreDTO extends AbstractStatementDTO {
private String catalog;
private String database;
private String table;
private String dialect;
private Integer databaseId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
import com.dlink.explainer.lineage.LineageResult;
import com.dlink.job.JobResult;
import com.dlink.model.Catalog;
import com.dlink.model.Table;
import com.dlink.model.FlinkColumn;
import com.dlink.model.Schema;
import com.dlink.result.IResult;
import com.dlink.result.SelectResult;
import com.dlink.result.SqlExplainResult;
Expand Down Expand Up @@ -57,5 +58,7 @@ public interface StudioService {

List<Catalog> getMSCatalogs(StudioMetaStoreDTO studioMetaStoreDTO);

List<Table> getMSTables(StudioMetaStoreDTO studioMetaStoreDTO);
Schema getMSSchemaInfo(StudioMetaStoreDTO studioMetaStoreDTO);

List<FlinkColumn> getMSFlinkColumns(StudioMetaStoreDTO studioMetaStoreDTO);
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.dlink.model.Catalog;
import com.dlink.model.Cluster;
import com.dlink.model.DataBase;
import com.dlink.model.FlinkColumn;
import com.dlink.model.Savepoints;
import com.dlink.model.Schema;
import com.dlink.model.Table;
Expand All @@ -51,6 +52,7 @@
import com.dlink.session.SessionConfig;
import com.dlink.session.SessionInfo;
import com.dlink.session.SessionPool;
import com.dlink.sql.FlinkQuery;
import com.dlink.utils.RunTimeUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -369,13 +371,13 @@ public List<Catalog> getMSCatalogs(StudioMetaStoreDTO studioMetaStoreDTO) {
if (Dialect.isSql(studioMetaStoreDTO.getDialect())) {
DataBase dataBase = dataBaseService.getById(studioMetaStoreDTO.getDatabaseId());
if (!Asserts.isNull(dataBase)) {
Catalog defaultCatalog = Catalog.build("default_catalog");
Catalog defaultCatalog = Catalog.build(FlinkQuery.defaultCatalog());
Driver driver = Driver.build(dataBase.getDriverConfig());
defaultCatalog.setSchemas(driver.listSchemas());
catalogs.add(defaultCatalog);
}
} else {
studioMetaStoreDTO.setStatement("SHOW CATALOGS");
studioMetaStoreDTO.setStatement(FlinkQuery.showCatalogs());
IResult result = executeMSFlinkSql(studioMetaStoreDTO);
if (result instanceof DDLResult) {
DDLResult ddlResult = (DDLResult) result;
Expand All @@ -388,7 +390,7 @@ public List<Catalog> getMSCatalogs(StudioMetaStoreDTO studioMetaStoreDTO) {
}
}
for (Catalog catalog : catalogs) {
String statement = "USE CATALOG " + catalog.getName() + ";\r\nSHOW DATABASES";
String statement = FlinkQuery.useCatalog(catalog.getName()) + FlinkQuery.separator() + FlinkQuery.showDatabases();
studioMetaStoreDTO.setStatement(statement);
IResult tableResult = executeMSFlinkSql(studioMetaStoreDTO);
if (result instanceof DDLResult) {
Expand All @@ -411,7 +413,8 @@ public List<Catalog> getMSCatalogs(StudioMetaStoreDTO studioMetaStoreDTO) {
}

@Override
public List<Table> getMSTables(StudioMetaStoreDTO studioMetaStoreDTO) {
public Schema getMSSchemaInfo(StudioMetaStoreDTO studioMetaStoreDTO) {
Schema schema = Schema.build(studioMetaStoreDTO.getDatabase());
List<Table> tables = new ArrayList<>();
if (Dialect.isSql(studioMetaStoreDTO.getDialect())) {
DataBase dataBase = dataBaseService.getById(studioMetaStoreDTO.getDatabaseId());
Expand All @@ -420,9 +423,11 @@ public List<Table> getMSTables(StudioMetaStoreDTO studioMetaStoreDTO) {
tables.addAll(driver.listTables(studioMetaStoreDTO.getDatabase()));
}
} else {
String statement = "USE CATALOG " + studioMetaStoreDTO.getCatalog() + ";\r\n" +
"USE " + studioMetaStoreDTO.getDatabase() + ";\r\nSHOW TABLES";
studioMetaStoreDTO.setStatement(statement);
String baseStatement = FlinkQuery.useCatalog(studioMetaStoreDTO.getCatalog()) + FlinkQuery.separator() +
FlinkQuery.useDatabase(studioMetaStoreDTO.getDatabase()) + FlinkQuery.separator();
// show tables
String tableStatement = baseStatement + FlinkQuery.showTables();
studioMetaStoreDTO.setStatement(tableStatement);
IResult result = executeMSFlinkSql(studioMetaStoreDTO);
if (result instanceof DDLResult) {
DDLResult ddlResult = (DDLResult) result;
Expand All @@ -437,8 +442,69 @@ public List<Table> getMSTables(StudioMetaStoreDTO studioMetaStoreDTO) {
}
}
}
// show views
schema.setViews(showInfo(studioMetaStoreDTO, baseStatement, FlinkQuery.showViews()));
// show functions
schema.setFunctions(showInfo(studioMetaStoreDTO, baseStatement, FlinkQuery.showFunctions()));
// show user functions
schema.setUserFunctions(showInfo(studioMetaStoreDTO, baseStatement, FlinkQuery.showUserFunctions()));
// show modules
schema.setModules(showInfo(studioMetaStoreDTO, baseStatement, FlinkQuery.showModules()));
}
return tables;
schema.setTables(tables);
return schema;
}

@Override
public List<FlinkColumn> getMSFlinkColumns(StudioMetaStoreDTO studioMetaStoreDTO) {
List<FlinkColumn> columns = new ArrayList<>();
if (Dialect.isSql(studioMetaStoreDTO.getDialect())) {
// nothing to do
} else {
String baseStatement = FlinkQuery.useCatalog(studioMetaStoreDTO.getCatalog()) + FlinkQuery.separator() +
FlinkQuery.useDatabase(studioMetaStoreDTO.getDatabase()) + FlinkQuery.separator();
// desc tables
String tableStatement = baseStatement + FlinkQuery.descTable(studioMetaStoreDTO.getTable());
studioMetaStoreDTO.setStatement(tableStatement);
IResult result = executeMSFlinkSql(studioMetaStoreDTO);
if (result instanceof DDLResult) {
DDLResult ddlResult = (DDLResult) result;
List<Map<String, Object>> rowData = ddlResult.getRowData();
int i = 1;
for (Map<String, Object> item : rowData) {
FlinkColumn column = FlinkColumn.build(i,
item.get(FlinkQuery.columnName()).toString(),
item.get(FlinkQuery.columnType()).toString(),
item.get(FlinkQuery.columnKey()).toString(),
item.get(FlinkQuery.columnNull()).toString(),
item.get(FlinkQuery.columnExtras()).toString(),
item.get(FlinkQuery.columnWatermark()).toString()
);
columns.add(column);
i++;
}
}
}
return columns;
}

private List<String> showInfo(StudioMetaStoreDTO studioMetaStoreDTO, String baseStatement, String statement) {
List<String> infos = new ArrayList<>();
String tableStatement = baseStatement + statement;
studioMetaStoreDTO.setStatement(tableStatement);
IResult result = executeMSFlinkSql(studioMetaStoreDTO);
if (result instanceof DDLResult) {
DDLResult ddlResult = (DDLResult) result;
Iterator<String> iterator = ddlResult.getColumns().iterator();
if (iterator.hasNext()) {
String key = iterator.next();
List<Map<String, Object>> rowData = ddlResult.getRowData();
for (Map<String, Object> item : rowData) {
infos.add(item.get(key).toString());
}
}
}
return infos;
}

private void initUDF(JobConfig config, String statement) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.dlink.sql;

/**
* FlinkQuery
*
* @author wenmo
* @since 2022/7/18 19:10
**/
public class FlinkQuery extends AbstractFlinkQuery {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.dlink.sql;

/**
* FlinkQuery
*
* @author wenmo
* @since 2022/7/18 19:10
**/
public class FlinkQuery extends AbstractFlinkQuery {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.dlink.sql;

/**
* FlinkQuery
*
* @author wenmo
* @since 2022/7/18 19:10
**/
public class FlinkQuery extends AbstractFlinkQuery {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.dlink.sql;

/**
* FlinkQuery
*
* @author wenmo
* @since 2022/7/18 19:10
**/
public class FlinkQuery extends AbstractFlinkQuery {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.dlink.sql;

/**
* FlinkQuery
*
* @author wenmo
* @since 2022/7/18 19:10
**/
public class FlinkQuery extends AbstractFlinkQuery {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.dlink.sql;

/**
* AbstractFlinkQuery
*
* @author wenmo
* @since 2022/7/18 18:43
**/
public abstract class AbstractFlinkQuery {

public static String separator() {
return ";\r\n";
}

public static String defaultCatalog() {
return "default_catalog";
}

public static String defaultDatabase() {
return "default_database";
}

public static String showCatalogs() {
return "SHOW CATALOGS";
}

public static String useCatalog(String catalog) {
return String.format("USE CATALOG %s", catalog);
}

public static String showDatabases() {
return "SHOW DATABASES";
}

public static String useDatabase(String database) {
return String.format("USE %s", database);
}

public static String showTables() {
return "SHOW TABLES";
}

public static String showViews() {
return "SHOW VIEWS";
}

public static String showFunctions() {
return "SHOW FUNCTIONS";
}

public static String showUserFunctions() {
return "SHOW USER FUNCTIONS";
}

public static String showModules() {
return "SHOW MODULES";
}

public static String descTable(String table) {
return String.format("DESC %s", table);
}

public static String columnName() {
return "name";
}

public static String columnType() {
return "type";
}

public static String columnNull() {
return "null";
}

public static String columnKey() {
return "key";
}

public static String columnExtras() {
return "extras";
}

public static String columnWatermark() {
return "watermark";
}
}
Loading

0 comments on commit e8a87da

Please sign in to comment.