Skip to content

Commit

Permalink
[Feature-506][client] The CDCSOURCE table parameter supports line fee…
Browse files Browse the repository at this point in the history
…d and column sort by primary key
  • Loading branch information
aiwenmo committed May 16, 2022
1 parent e7b6f50 commit e9ea98f
Show file tree
Hide file tree
Showing 14 changed files with 56 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public List<String> getSchemaList() {
}
List<String> tableList = getTableList();
for (String tableName : tableList) {
tableName = tableName.trim();
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split("\\\\.");
if (!schemaList.contains(names[0])) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public List<String> getSchemaList() {
}
List<String> tableList = getTableList();
for (String tableName : tableList) {
tableName = tableName.trim();
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split("\\\\.");
if (!schemaList.contains(names[0])) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public List<String> getSchemaList() {
}
List<String> tableList = getTableList();
for (String tableName : tableList) {
tableName = tableName.trim();
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split("\\\\.");
if (!schemaList.contains(names[0])) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public List<String> getSchemaList() {
}
List<String> tableList = getTableList();
for (String tableName : tableList) {
tableName = tableName.trim();
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split("\\\\.");
if (!schemaList.contains(names[0])) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public List<String> getSchemaList() {
}
List<String> tableList = getTableList();
for (String tableName : tableList) {
tableName = tableName.trim();
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split("\\\\.");
if (!schemaList.contains(names[0])) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public List<String> getSchemaList() {
}
List<String> tableList = getTableList();
for (String tableName : tableList) {
tableName = tableName.trim();
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split("\\\\.");
if (!schemaList.contains(names[0])) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public List<String> getSchemaList() {
}
List<String> tableList = getTableList();
for (String tableName : tableList) {
tableName = tableName.trim();
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split("\\\\.");
if (!schemaList.contains(names[0])) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public List<String> getSchemaList() {
}
List<String> tableList = getTableList();
for (String tableName : tableList) {
tableName = tableName.trim();
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split("\\\\.");
if (!schemaList.contains(names[0])) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public List<String> getSchemaList() {
}
List<String> tableList = getTableList();
for (String tableName : tableList) {
tableName = tableName.trim();
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split("\\\\.");
if (!schemaList.contains(names[0])) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public List<String> getSchemaList() {
}
List<String> tableList = getTableList();
for (String tableName : tableList) {
tableName = tableName.trim();
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split("\\\\.");
if (!schemaList.contains(names[0])) {
Expand Down
35 changes: 24 additions & 11 deletions dlink-core/src/test/java/com/dlink/core/SqlParserTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,30 @@ public void regTest() {
@Test
public void createCDCSourceTest() {
String sql = "EXECUTE CDCSOURCE demo WITH (\n" +
" 'type'='mysql-cdc',\n" +
" 'hostname'='127.0.0.1',\n" +
" 'port'='3306',\n" +
" 'password'='dlink',\n" +
" 'hostname'='dlink',\n" +
" 'checkpoint'='3000',\n" +
" 'parallelism'='1',\n" +
" 'database'='dlink,test',\n" +
" 'table'='',\n" +
" 'topic'='dlinkcdc',\n" +
" 'brokers'='127.0.0.1:9092'\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = '10.1.51.25',\n" +
" 'port' = '3306',\n" +
" 'username' = 'dfly',\n" +
" 'password' = 'Dareway@2020',\n" +
" 'checkpoint' = '3000',\n" +
" 'scan.startup.mode' = 'initial',\n" +
" 'parallelism' = '1',\n" +
" -- 'database-name'='test',\n" +
" 'table-name' = 'test\\.student,\n" +
" test\\.score',\n" +
" -- 'sink.connector'='datastream-doris',\n" +
" 'sink.connector' = 'doris',\n" +
" 'sink.fenodes' = '10.1.51.26:8030',\n" +
" 'sink.username' = 'root',\n" +
" 'sink.password' = 'dw123456',\n" +
" 'sink.sink.batch.size' = '1',\n" +
" 'sink.sink.max-retries' = '1',\n" +
" 'sink.sink.batch.interval' = '60000',\n" +
" 'sink.sink.db' = 'test',\n" +
" 'sink.table.prefix' = 'ODS_',\n" +
" 'sink.table.upper' = 'true',\n" +
" 'sink.table.identifier' = '${schemaName}.${tableName}',\n" +
" 'sink.sink.enable-delete' = 'true'\n" +
");";
Map<String, List<String>> lists = SingleSqlParserFactory.generateParser(sql);
System.out.println(lists.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,15 @@ public TableResult build(Executor executor) {
if (!Asserts.isEquals(table.getType(), "VIEW")) {
if (Asserts.isNotNullCollection(tableRegList)) {
for (String tableReg : tableRegList) {
if (table.getSchemaTableName().matches(tableReg) && !schema.getTables().contains(Table.build(table.getName()))) {
table.setColumns(driver.listColumns(schemaName, table.getName()));
if (table.getSchemaTableName().matches(tableReg.trim()) && !schema.getTables().contains(Table.build(table.getName()))) {
table.setColumns(driver.listColumnsSortByPK(schemaName, table.getName()));
schema.getTables().add(table);
schemaTableNameList.add(table.getSchemaTableName());
break;
}
}
} else {
table.setColumns(driver.listColumns(schemaName, table.getName()));
table.setColumns(driver.listColumnsSortByPK(schemaName, table.getName()));
schemaTableNameList.add(table.getSchemaTableName());
schema.getTables().add(table);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,23 @@ public List<Column> listColumns(String schemaName, String tableName) {
return columns;
}

@Override
public List<Column> listColumnsSortByPK(String schemaName, String tableName) {
List<Column> columnList = listColumns(schemaName, tableName);
List<Column> columnListSortByPK = new ArrayList<>();
for(Column column: columnList){
if(column.isKeyFlag()){
columnListSortByPK.add(column);
}
}
for(Column column: columnList){
if(!column.isKeyFlag()){
columnListSortByPK.add(column);
}
}
return columnListSortByPK;
}

@Override
public boolean createTable(Table table) throws Exception {
String sql = getCreateTableSql(table).replaceAll("\r\n", " ");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ static Driver build(DriverConfig config) {

List<Column> listColumns(String schemaName, String tableName);

List<Column> listColumnsSortByPK(String schemaName, String tableName);

List<Schema> getSchemasAndTables();

List<Table> getTablesAndColumns(String schemaName);
Expand Down

0 comments on commit e9ea98f

Please sign in to comment.