From e9ea98f23f597a85433299aac51cd184411b1327 Mon Sep 17 00:00:00 2001 From: wenmo <32723967+wenmo@users.noreply.github.com> Date: Mon, 16 May 2022 22:28:46 +0800 Subject: [PATCH] [Feature-506][client] The CDCSOURCE table parameter supports line feed and column sort by primary key --- .../com/dlink/cdc/AbstractCDCBuilder.java | 1 + .../com/dlink/cdc/mysql/MysqlCDCBuilder.java | 1 + .../com/dlink/cdc/AbstractCDCBuilder.java | 1 + .../com/dlink/cdc/mysql/MysqlCDCBuilder.java | 1 + .../com/dlink/cdc/AbstractCDCBuilder.java | 1 + .../com/dlink/cdc/mysql/MysqlCDCBuilder.java | 1 + .../com/dlink/cdc/AbstractCDCBuilder.java | 1 + .../com/dlink/cdc/mysql/MysqlCDCBuilder.java | 1 + .../com/dlink/cdc/AbstractCDCBuilder.java | 1 + .../com/dlink/cdc/mysql/MysqlCDCBuilder.java | 1 + .../java/com/dlink/core/SqlParserTest.java | 35 +++++++++++++------ .../trans/ddl/CreateCDCSourceOperation.java | 6 ++-- .../metadata/driver/AbstractJdbcDriver.java | 17 +++++++++ .../com/dlink/metadata/driver/Driver.java | 2 ++ 14 files changed, 56 insertions(+), 14 deletions(-) diff --git a/dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java b/dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java index d76f2e6934..d352691b68 100644 --- a/dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java +++ b/dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java @@ -42,6 +42,7 @@ public List getSchemaList() { } List tableList = getTableList(); for (String tableName : tableList) { + tableName = tableName.trim(); if (Asserts.isNotNullString(tableName) && tableName.contains(".")) { String[] names = tableName.split("\\\\."); if (!schemaList.contains(names[0])) { diff --git a/dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java b/dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java index 62eb8054b0..3099c254f0 100644 --- a/dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java +++ b/dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java @@ -87,6 +87,7 @@ public List getSchemaList() { } List tableList = getTableList(); for (String tableName : tableList) { + tableName = tableName.trim(); if (Asserts.isNotNullString(tableName) && tableName.contains(".")) { String[] names = tableName.split("\\\\."); if (!schemaList.contains(names[0])) { diff --git a/dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java b/dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java index d76f2e6934..d352691b68 100644 --- a/dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java +++ b/dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java @@ -42,6 +42,7 @@ public List getSchemaList() { } List tableList = getTableList(); for (String tableName : tableList) { + tableName = tableName.trim(); if (Asserts.isNotNullString(tableName) && tableName.contains(".")) { String[] names = tableName.split("\\\\."); if (!schemaList.contains(names[0])) { diff --git a/dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java b/dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java index 4fff18cab9..56facd1219 100644 --- a/dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java +++ b/dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java @@ -100,6 +100,7 @@ public List getSchemaList() { } List tableList = getTableList(); for (String tableName : tableList) { + tableName = tableName.trim(); if (Asserts.isNotNullString(tableName) && tableName.contains(".")) { String[] names = tableName.split("\\\\."); if (!schemaList.contains(names[0])) { diff --git a/dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java b/dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java index d76f2e6934..d352691b68 100644 --- a/dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java +++ b/dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java @@ -42,6 +42,7 @@ public List getSchemaList() { } List tableList = getTableList(); for (String tableName : tableList) { + tableName = tableName.trim(); if (Asserts.isNotNullString(tableName) && tableName.contains(".")) { String[] names = tableName.split("\\\\."); if (!schemaList.contains(names[0])) { diff --git a/dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java b/dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java index 621342f1cb..7a501d0c0b 100644 --- a/dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java +++ b/dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java @@ -145,6 +145,7 @@ public List getSchemaList() { } List tableList = getTableList(); for (String tableName : tableList) { + tableName = tableName.trim(); if (Asserts.isNotNullString(tableName) && tableName.contains(".")) { String[] names = tableName.split("\\\\."); if (!schemaList.contains(names[0])) { diff --git a/dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java b/dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java index d76f2e6934..d352691b68 100644 --- a/dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java +++ b/dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java @@ -42,6 +42,7 @@ public List getSchemaList() { } List tableList = getTableList(); for (String tableName : tableList) { + tableName = tableName.trim(); if (Asserts.isNotNullString(tableName) && tableName.contains(".")) { String[] names = tableName.split("\\\\."); if (!schemaList.contains(names[0])) { diff --git a/dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java b/dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java index 621342f1cb..7a501d0c0b 100644 --- a/dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java +++ b/dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java @@ -145,6 +145,7 @@ public List getSchemaList() { } List tableList = getTableList(); for (String tableName : tableList) { + tableName = tableName.trim(); if (Asserts.isNotNullString(tableName) && tableName.contains(".")) { String[] names = tableName.split("\\\\."); if (!schemaList.contains(names[0])) { diff --git a/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java b/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java index d76f2e6934..d352691b68 100644 --- a/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java +++ b/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java @@ -42,6 +42,7 @@ public List getSchemaList() { } List tableList = getTableList(); for (String tableName : tableList) { + tableName = tableName.trim(); if (Asserts.isNotNullString(tableName) && tableName.contains(".")) { String[] names = tableName.split("\\\\."); if (!schemaList.contains(names[0])) { diff --git a/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java b/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java index 621342f1cb..7a501d0c0b 100644 --- a/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java +++ b/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java @@ -145,6 +145,7 @@ public List getSchemaList() { } List tableList = getTableList(); for (String tableName : tableList) { + tableName = tableName.trim(); if (Asserts.isNotNullString(tableName) && tableName.contains(".")) { String[] names = tableName.split("\\\\."); if (!schemaList.contains(names[0])) { diff --git a/dlink-core/src/test/java/com/dlink/core/SqlParserTest.java b/dlink-core/src/test/java/com/dlink/core/SqlParserTest.java index 24a98e548f..6c0ff12410 100644 --- a/dlink-core/src/test/java/com/dlink/core/SqlParserTest.java +++ b/dlink-core/src/test/java/com/dlink/core/SqlParserTest.java @@ -66,17 +66,30 @@ public void regTest() { @Test public void createCDCSourceTest() { String sql = "EXECUTE CDCSOURCE demo WITH (\n" + - " 'type'='mysql-cdc',\n" + - " 'hostname'='127.0.0.1',\n" + - " 'port'='3306',\n" + - " 'password'='dlink',\n" + - " 'hostname'='dlink',\n" + - " 'checkpoint'='3000',\n" + - " 'parallelism'='1',\n" + - " 'database'='dlink,test',\n" + - " 'table'='',\n" + - " 'topic'='dlinkcdc',\n" + - " 'brokers'='127.0.0.1:9092'\n" + + " 'connector' = 'mysql-cdc',\n" + + " 'hostname' = '10.1.51.25',\n" + + " 'port' = '3306',\n" + + " 'username' = 'dfly',\n" + + " 'password' = 'Dareway@2020',\n" + + " 'checkpoint' = '3000',\n" + + " 'scan.startup.mode' = 'initial',\n" + + " 'parallelism' = '1',\n" + + " -- 'database-name'='test',\n" + + " 'table-name' = 'test\\.student,\n" + + " test\\.score',\n" + + " -- 'sink.connector'='datastream-doris',\n" + + " 'sink.connector' = 'doris',\n" + + " 'sink.fenodes' = '10.1.51.26:8030',\n" + + " 'sink.username' = 'root',\n" + + " 'sink.password' = 'dw123456',\n" + + " 'sink.sink.batch.size' = '1',\n" + + " 'sink.sink.max-retries' = '1',\n" + + " 'sink.sink.batch.interval' = '60000',\n" + + " 'sink.sink.db' = 'test',\n" + + " 'sink.table.prefix' = 'ODS_',\n" + + " 'sink.table.upper' = 'true',\n" + + " 'sink.table.identifier' = '${schemaName}.${tableName}',\n" + + " 'sink.sink.enable-delete' = 'true'\n" + ");"; Map> lists = SingleSqlParserFactory.generateParser(sql); System.out.println(lists.toString()); diff --git a/dlink-executor/src/main/java/com/dlink/trans/ddl/CreateCDCSourceOperation.java b/dlink-executor/src/main/java/com/dlink/trans/ddl/CreateCDCSourceOperation.java index 9b9e26a9d6..7003bd9227 100644 --- a/dlink-executor/src/main/java/com/dlink/trans/ddl/CreateCDCSourceOperation.java +++ b/dlink-executor/src/main/java/com/dlink/trans/ddl/CreateCDCSourceOperation.java @@ -74,15 +74,15 @@ public TableResult build(Executor executor) { if (!Asserts.isEquals(table.getType(), "VIEW")) { if (Asserts.isNotNullCollection(tableRegList)) { for (String tableReg : tableRegList) { - if (table.getSchemaTableName().matches(tableReg) && !schema.getTables().contains(Table.build(table.getName()))) { - table.setColumns(driver.listColumns(schemaName, table.getName())); + if (table.getSchemaTableName().matches(tableReg.trim()) && !schema.getTables().contains(Table.build(table.getName()))) { + table.setColumns(driver.listColumnsSortByPK(schemaName, table.getName())); schema.getTables().add(table); schemaTableNameList.add(table.getSchemaTableName()); break; } } } else { - table.setColumns(driver.listColumns(schemaName, table.getName())); + table.setColumns(driver.listColumnsSortByPK(schemaName, table.getName())); schemaTableNameList.add(table.getSchemaTableName()); schema.getTables().add(table); } diff --git a/dlink-metadata/dlink-metadata-base/src/main/java/com/dlink/metadata/driver/AbstractJdbcDriver.java b/dlink-metadata/dlink-metadata-base/src/main/java/com/dlink/metadata/driver/AbstractJdbcDriver.java index b8bba84102..5268ac8eb8 100644 --- a/dlink-metadata/dlink-metadata-base/src/main/java/com/dlink/metadata/driver/AbstractJdbcDriver.java +++ b/dlink-metadata/dlink-metadata-base/src/main/java/com/dlink/metadata/driver/AbstractJdbcDriver.java @@ -235,6 +235,23 @@ public List listColumns(String schemaName, String tableName) { return columns; } + @Override + public List listColumnsSortByPK(String schemaName, String tableName) { + List columnList = listColumns(schemaName, tableName); + List columnListSortByPK = new ArrayList<>(); + for(Column column: columnList){ + if(column.isKeyFlag()){ + columnListSortByPK.add(column); + } + } + for(Column column: columnList){ + if(!column.isKeyFlag()){ + columnListSortByPK.add(column); + } + } + return columnListSortByPK; + } + @Override public boolean createTable(Table table) throws Exception { String sql = getCreateTableSql(table).replaceAll("\r\n", " "); diff --git a/dlink-metadata/dlink-metadata-base/src/main/java/com/dlink/metadata/driver/Driver.java b/dlink-metadata/dlink-metadata-base/src/main/java/com/dlink/metadata/driver/Driver.java index 80c0f3e9f3..f65c3a08e2 100644 --- a/dlink-metadata/dlink-metadata-base/src/main/java/com/dlink/metadata/driver/Driver.java +++ b/dlink-metadata/dlink-metadata-base/src/main/java/com/dlink/metadata/driver/Driver.java @@ -71,6 +71,8 @@ static Driver build(DriverConfig config) { List listColumns(String schemaName, String tableName); + List listColumnsSortByPK(String schemaName, String tableName); + List getSchemasAndTables(); List getTablesAndColumns(String schemaName);