Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion docs/content.zh/docs/core-concept/route.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,25 @@ route:
description: route all tables in source_db to sink_db
```

然后,`source_db` 库下所有的表都会被同步到 `sink_db` 库下。
然后,`source_db` 库下所有的表都会被同步到 `sink_db` 库下。

## 高级:基于正则捕获组的替换规则

您可以在 `source-table` 字段中定义正则表达式的捕获组:

```yaml
route:
- source-table: db_(\.*).(\.*)_tbl
sink-table: sink_db_$1.sink_table_$2
```

这里我们创建了两个捕获组,分别用来匹配数据库名 `db_` 之后的后缀和表名 `_tbl` 之前的前缀。

以上游表 `db_foo.bar_tbl` 为例,我们将会从中提取出 `(foo, bar)` 作为捕获组,并且将其依次绑定到 `$1` 和 `$2` 变量中。
因此,这张表将被路由到 `sink_db_foo.sink_table_bar` 下游表中。

{{< hint info >}}

注意:基于正则捕获组的替换规则无法与 `replace-symbol` 选项搭配使用。

{{< /hint >}}
23 changes: 22 additions & 1 deletion docs/content/docs/core-concept/route.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,25 @@ route:
description: route all tables in source_db to sink_db
```

Then, all tables including `source_db.XXX` will be routed to `sink_db.XXX` without hassle.
Then, all tables including `source_db.XXX` will be routed to `sink_db.XXX` without hassle.

## Advanced: RegExp Capturing & Replacement Rules

It is also possible to create capturing groups in `source-table` fields like this:

```yaml
route:
- source-table: db_(\.*).(\.*)_tbl
sink-table: sink_db_$1.sink_table_$2
```

Here we create two capturing groups matching database suffix and table prefix.

For upstream table `db_foo.bar_tbl`, capturing group `(foo, bar)` will be extracted and bound to `$1` and `$2`.
As a result, such table will be routed to downstream table `sink_db_foo.sink_table_bar`.

{{< hint info >}}

Standard RegExp capturing could not be used with `replace-symbol` options.

{{< /hint >}}
Original file line number Diff line number Diff line change
Expand Up @@ -15,44 +15,105 @@
* limitations under the License.
*/

package org.apache.flink.cdc.runtime.operators.schema.common;
package org.apache.flink.cdc.common.route;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cdc.common.annotation.PublicEvolving;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.schema.Selectors;

import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheLoader;
import org.apache.flink.shaded.guava31.com.google.common.cache.LoadingCache;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import java.util.stream.Collectors;

/**
* Calculates how upstream data change events should be dispatched to downstream tables. Returns one
* or many destination Table IDs based on provided routing rules.
*/
@PublicEvolving
public class TableIdRouter {

private final List<Tuple3<Selectors, String, String>> routes;
private final LoadingCache<TableId, List<TableId>> routingCache;
private static final Logger LOG = LoggerFactory.getLogger(TableIdRouter.class);
private static final Duration CACHE_EXPIRE_DURATION = Duration.ofDays(1);

private final List<Tuple3<Pattern, String, String>> routes;
private final LoadingCache<TableId, List<TableId>> routingCache;

private static final String DOT_PLACEHOLDER = "_dot_placeholder_";

/**
* Currently, The supported regular syntax is not exactly the same in {@link Selectors}.
*
* <p>The main discrepancies are :
*
* <p>1) {@link Selectors} use {@code ,} to split table names instead of `|`.
*
* <p>2) If there is a need to use a dot ({@code .}) in a regular expression to match any
* character, it is necessary to escape the dot with a backslash.
*
* <p>3) The unescaped {@code .} is used as the separator of database and table name. When
* converting to Debezium style, it is expected to be escaped to match the dot ({@code .})
* literally instead of the meta-character.
*/
public static String convertTableListToRegExpPattern(String tables) {
LOG.info("Rewriting CDC style table capture list: {}", tables);

// In CDC-style table matching, table names could be separated by `,` character.
// Convert it to `|` as it's standard RegEx syntax.
tables =
Arrays.stream(tables.split(",")).map(String::trim).collect(Collectors.joining("|"));
LOG.info("Expression after replacing comma with vert separator: {}", tables);

// Essentially, we're just trying to swap escaped `\\.` and unescaped `.`.
// In our table matching syntax, `\\.` means RegEx token matcher and `.` means database &
// table name separator.
// On the contrary, while we're matching TableId string, `\\.` means matching the "dot"
// literal and `.` is the meta-character.

// Step 1: escape the dot with a backslash, but keep it as a placeholder (like `$`).
// For example, `db\.*.tbl\.*` => `db$*.tbl$*`
String unescapedTables = tables.replace("\\.", DOT_PLACEHOLDER);
LOG.info("Expression after un-escaping dots as RegEx meta-character: {}", unescapedTables);

// Step 2: replace all remaining dots (`.`) to quoted version (`\.`), as a separator between
// database and table names.
// For example, `db$*.tbl$*` => `db$*\.tbl$*`
String unescapedTablesWithDbTblSeparator = unescapedTables.replace(".", "\\.");
LOG.info("Re-escaping dots as TableId delimiter: {}", unescapedTablesWithDbTblSeparator);

// Step 3: restore placeholder to normal RegEx matcher (`.`)
// For example, `db$*\.tbl$*` => `db.*\.tbl.*`
String standardRegExpTableCaptureList =
unescapedTablesWithDbTblSeparator.replace(DOT_PLACEHOLDER, ".");
LOG.info("Final standard RegExp table capture list: {}", standardRegExpTableCaptureList);

return standardRegExpTableCaptureList;
}

public TableIdRouter(List<RouteRule> routingRules) {
this.routes = new ArrayList<>();
for (RouteRule rule : routingRules) {
try {
String tableInclusions = rule.sourceTable;
Selectors selectors =
new Selectors.SelectorsBuilder().includeTables(tableInclusions).build();
routes.add(new Tuple3<>(selectors, rule.sinkTable, rule.replaceSymbol));
routes.add(
new Tuple3<>(
Pattern.compile(convertTableListToRegExpPattern(rule.sourceTable)),
rule.sinkTable,
rule.replaceSymbol));
} catch (PatternSyntaxException e) {
throw new IllegalArgumentException(
String.format(
Expand Down Expand Up @@ -80,7 +141,7 @@ public List<TableId> route(TableId sourceTableId) {
private List<TableId> calculateRoute(TableId sourceTableId) {
List<TableId> routedTableIds =
routes.stream()
.filter(route -> route.f0.isMatch(sourceTableId))
.filter(route -> matches(route.f0, sourceTableId))
.map(route -> resolveReplacement(sourceTableId, route))
.collect(Collectors.toList());
if (routedTableIds.isEmpty()) {
Expand All @@ -90,9 +151,14 @@ private List<TableId> calculateRoute(TableId sourceTableId) {
}

private TableId resolveReplacement(
TableId originalTable, Tuple3<Selectors, String, String> route) {
TableId originalTable, Tuple3<Pattern, String, String> route) {
if (route.f2 != null) {
return TableId.parse(route.f1.replace(route.f2, originalTable.getTableName()));
} else {
Matcher matcher = route.f0.matcher(originalTable.toString());
if (matcher.find()) {
return TableId.parse(matcher.replaceAll(route.f1));
}
}
return TableId.parse(route.f1);
}
Expand All @@ -111,18 +177,16 @@ public List<Set<TableId>> groupSourceTablesByRouteRule(Set<TableId> tableIdSet)
if (routes.isEmpty()) {
return new ArrayList<>();
}
List<Set<TableId>> routedTableIds =
routes.stream()
.map(
route -> {
return tableIdSet.stream()
.filter(
tableId -> {
return route.f0.isMatch(tableId);
})
.collect(Collectors.toSet());
})
.collect(Collectors.toList());
return routedTableIds;
return routes.stream()
.map(
route ->
tableIdSet.stream()
.filter(tableId -> matches(route.f0, tableId))
.collect(Collectors.toSet()))
.collect(Collectors.toList());
}

private static boolean matches(Pattern pattern, TableId tableId) {
return pattern.matcher(tableId.toString()).matches();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.flink.cdc.common.source.DataSource;
import org.apache.flink.cdc.common.utils.StringUtils;
import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSource;
import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import org.apache.flink.cdc.connectors.mysql.source.config.ServerIdRange;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
Expand All @@ -45,7 +44,6 @@

import com.mysql.cj.conf.PropertyKey;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Tables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -63,6 +61,7 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.flink.cdc.common.route.TableIdRouter.convertTableListToRegExpPattern;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.CHUNK_META_GROUP_SIZE;
Expand Down Expand Up @@ -231,7 +230,7 @@ public DataSource createDataSource(Context context) {
}

if (scanBinlogNewlyAddedTableEnabled) {
String newTables = validateTableAndReturnDebeziumStyle(tables);
String newTables = convertTableListToRegExpPattern(tables);
configFactory.tableList(newTables);
configFactory.excludeTableList(tablesExclude);

Expand Down Expand Up @@ -516,58 +515,6 @@ && doubleCompare(distributionFactorLower, 1.0d) <= 0,

private static final String DOT_PLACEHOLDER = "_$dot_placeholder$_";

/**
* Currently, The supported regular syntax is not exactly the same in {@link Selectors} and
* {@link Tables.TableFilter}.
*
* <p>The main distinction are :
*
* <p>1) {@link Selectors} use {@code ,} to split table names and {@link Tables.TableFilter} use
* `|` to split table names.
*
* <p>2) If there is a need to use a dot ({@code .}) in a regular expression to match any
* character, it is necessary to escape the dot with a backslash, refer to {@link
* MySqlDataSourceOptions#TABLES}.
*
* <p>3) The unescaped {@code .} is used as the separator of database and table name. When
* converting to Debezium style, it is expected to be escaped to match the dot ({@code .})
* literally instead of the meta-character.
*/
private String validateTableAndReturnDebeziumStyle(String tables) {
LOG.info("Rewriting CDC style table capture list: {}", tables);

// In CDC-style table matching, table names could be separated by `,` character.
// Convert it to `|` as it's standard RegEx syntax.
tables =
Arrays.stream(tables.split(",")).map(String::trim).collect(Collectors.joining("|"));
LOG.info("Expression after replacing comma with vert separator: {}", tables);

// Essentially, we're just trying to swap escaped `\\.` and unescaped `.`.
// In our table matching syntax, `\\.` means RegEx token matcher and `.` means database &
// table name separator.
// On the contrary, while we're matching TableId string, `\\.` means matching the "dot"
// literal and `.` is the meta-character.

// Step 1: escape the dot with a backslash, but keep it as a placeholder (like `$`).
// For example, `db\.*.tbl\.*` => `db$*.tbl$*`
String unescapedTables = tables.replace("\\.", DOT_PLACEHOLDER);
LOG.info("Expression after unescaping dots as RegEx meta-character: {}", unescapedTables);

// Step 2: replace all remaining dots (`.`) to quoted version (`\.`), as a separator between
// database and table names.
// For example, `db$*.tbl$*` => `db$*\.tbl$*`
String unescapedTablesWithDbTblSeparator = unescapedTables.replace(".", "\\.");
LOG.info("Re-escaping dots as TableId delimiter: {}", unescapedTablesWithDbTblSeparator);

// Step 3: restore placeholder to normal RegEx matcher (`.`)
// For example, `db$*\.tbl$*` => `db.*\.tbl.*`
String debeziumStyleTableCaptureList =
unescapedTablesWithDbTblSeparator.replace(DOT_PLACEHOLDER, ".");
LOG.info("Final Debezium-style table capture list: {}", debeziumStyleTableCaptureList);

return debeziumStyleTableCaptureList;
}

/** Replaces the default timezone placeholder with session timezone, if applicable. */
private static ZoneId getServerTimeZone(Configuration config) {
final String serverTimeZone = config.get(SERVER_TIME_ZONE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.cdc.common.event.SchemaChangeEventWithPreSchema;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.TableIdRouter;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.MetadataApplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.route.TableIdRouter;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.runtime.operators.schema.common.event.FlushSuccessEvent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.route.TableIdRouter;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils;
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator;
import org.apache.flink.cdc.runtime.operators.schema.common.TableIdRouter;
import org.apache.flink.cdc.runtime.operators.schema.common.metrics.SchemaOperatorMetrics;
import org.apache.flink.cdc.runtime.operators.schema.distributed.event.SchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.distributed.event.SchemaChangeResponse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.route.TableIdRouter;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator;
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaManager;
import org.apache.flink.cdc.runtime.operators.schema.common.TableIdRouter;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.route.TableIdRouter;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils;
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator;
import org.apache.flink.cdc.runtime.operators.schema.common.TableIdRouter;
import org.apache.flink.cdc.runtime.operators.schema.common.metrics.SchemaOperatorMetrics;
import org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeResponse;
Expand Down
Loading
Loading