Skip to content
Merged
16 changes: 13 additions & 3 deletions ticdc/ticdc-changefeed-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,20 @@ enable-table-across-nodes = false
# 注意:该参数只有当下游为消息队列时,才会生效。
# 注意:当下游 MQ 为 Pulsar 时,如果 partition 的路由规则未指定为 'ts', 'index-value', 'table', 'default' 中的任意一个,那么将会使用你设置的字符串作为每一条 Pulsar message 的 key 进行路由。例如,如果你指定的路由规则为 'code' 字符串,那么符合该 matcher 的所有 Pulsar message 都将会以 'code' 作为 key 进行路由。
# dispatchers = [
# {matcher = ['test1.*', 'test2.*'], topic = "Topic 表达式 1", partition = "ts" },
# {matcher = ['test3.*', 'test4.*'], topic = "Topic 表达式 2", partition = "index-value" },
# {matcher = ['test1.*', 'test2.*'], topic = "Topic 表达式 1", partition = "index-value"},
# {matcher = ['test3.*', 'test4.*'], topic = "Topic 表达式 2", partition = "index-value", index-name="index1"},
# {matcher = ['test1.*', 'test5.*'], topic = "Topic 表达式 3", partition = "table"},
# {matcher = ['test6.*'], partition = "ts"}
# {matcher = ['test6.*'], partition = "columns", columns = "['a', 'b']"}
# {matcher = ['test7.*'], partition = "ts"}
# ]

# column-selectors 从 v7.5.0 开始引入,仅对 Kafka Sink 生效。
# column-selectors 用于选择部分列进行同步。
# column-selectors = [
# {matcher = ['test.t1'], columns = ['a', 'b']},
# {matcher = ['test.*'], columns = ["*", "!b"]},
# {matcher = ['test1.t1'], columns = ['column*', '!column1']},
# {matcher = ['test3.t'], columns = ["column?", "!column1"]},
# ]

# protocol 用于指定编码消息时使用的格式协议
Expand Down
63 changes: 55 additions & 8 deletions ticdc/ticdc-sink-to-kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,33 @@ Topic 表达式的基本规则为 `[prefix][{schema}][middle][{table}][suffix]`

### Partition 分发器

partition 分发器用 partition = "xxx" 来指定,支持 default、ts、index-valuetable 四种 partition 分发器,分发规则如下:
partition 分发器用 `partition = "xxx"` 来指定,支持 `default`、`index-value`、`columns`、`table` 和 `ts` 共五种 partition 分发器,分发规则如下:

- default:按照 table 分发
- ts:以行变更的 commitTs 做 Hash 计算并进行 event 分发
- index-value:以表的主键或者唯一索引的值做 Hash 计算并进行 event 分发
- table:以表的 schema 名和 table 名做 Hash 计算并进行 event 分发
- `default`:默认使用 table 分发规则。使用所属库名和表名计算 partition 编号,一张表的数据被发送到相同的 partition。单表数据只存在于一个 partition 中并保证有序,但是发送吞吐量有限,无法通过添加消费者的方式提升消费速度。
- `index-value`:使用事件所属表的主键、唯一索引或由 `index-name` 指定的索引的值计算 partition 编号,一张表的数据被发送到多个 partition。单表数据被发送到多个 partition 中,每个 partition 中的数据有序,可以通过添加消费者的方式提升消费速度。
- `columns`:使用由 `columns` 指定的列的值计算 partition 编号。一张表的数据被发送到多个 partition。单表数据被发送到多个 partition 中,每个 partition 中的数据有序,可以通过添加消费者的方式提升消费速度。
- `table`:使用事件所属的表的库名和表名计算 partition 编号。
- `ts`:使用事件的 commitTs 计算 partition 编号。一张表的数据被发送到多个 partition。单表数据被发送到多个 partition 中,每个 partition 中的数据有序,可以通过添加消费者的方式提升消费速度。一条数据的多次修改可能被发送到不同的 partition 中。消费者消费进度不同可能导致消费端数据不一致。因此,消费端需要对来自多个 partition 的数据按 commitTs 排序后再进行消费。

以如下示例配置文件中的 `dispatchers` 配置项为例:

```toml
[sink]
dispatchers = [
{matcher = ['test.*'], partition = "index-value"},
{matcher = ['test1.*'], partition = "index-value", index-name = "index1"},
{matcher = ['test2.*'], partition = "columns", columns = ["id", "a"]},
{matcher = ['test3.*'], partition = "table"},
]
```

- 任何属于库 `test` 的表均使用 `index-value` 分发规则,即使用主键或者唯一索引的值计算 partition 编号。如果有主键则使用主键,否则使用最短的唯一索引。
- 任何属于库 `test1` 的表均使用 `index-value` 分发规则,并且使用名为 `index1` 的索引的所有列的值计算 partition 编号。如果指定的索引不存在,则报错。注意,`index-name` 指定的索引必须是唯一索引。
- 任何属于库 `test2` 的表均使用 `columns` 分发规则,并且使用列 `id` 和 `a` 的值计算 partition 编号。如果任一列不存在,则报错。
- 任何属于库 `test3` 的表均使用 `table` 分发规则。
- 对于属于库 `test4` 的表,因为不匹配上述任何一个规则,所以使用默认的 `default`,即 `table` 分发规则。

如果一张表,匹配了多个分发规则,以第一个匹配的规则为准。

> **注意:**
>
Expand All @@ -238,17 +259,43 @@ partition 分发器用 partition = "xxx" 来指定,支持 default、ts、index
> ```
> [sink]
> dispatchers = [
> {matcher = ['*.*'], dispatcher = "ts"},
> {matcher = ['*.*'], partition = "ts"},
> {matcher = ['*.*'], dispatcher = "index-value"},
> {matcher = ['*.*'], partition = "index-value"},
> ]
> ```
>
> 但是 `dispatcher` 与 `partition` 不能出现在同一条规则中。例如,以下规则非法:
>
> ```
> {matcher = ['*.*'], dispatcher = "ts", partition = "table"},
> {matcher = ['*.*'], dispatcher = "index-value", partition = "table"},
> ```

## 列选择功能

列选择功能支持对事件中的列进行选择,只将指定的列的数据变更事件发送到下游。

以如下示例配置文件中的 `column-selectors` 配置项为例:

```toml
[sink]
column-selectors = [
{matcher = ['test.t1'], columns = ['a', 'b']},
{matcher = ['test.*'], columns = ["*", "!b"]},
{matcher = ['test1.t1'], columns = ['column*', '!column1']},
{matcher = ['test3.t'], columns = ["column?", "!column1"]},
]
```

- 对于表 `test.t1`,只发送 `a` 和 `b` 两列的数据。
- 对于属于库 `test` 的表(除 `t1` 外),发送除 `b` 列之外的所有列的数据。
- 对于表 `test1.t1`,发送所有以 `column` 开头的列,但是不发送 `column1` 列的数据。
- 对于表 `test3.t`,发送所有以 `column` 开头且列名长度为 7 的列,但是不发送 `column1` 列的数据。
- 不匹配任何规则的表将不进行列过滤,发送所有列的数据。

> **注意:**
>
> 经过 `column-selectors` 规则过滤后,表中的数据必须要有主键或者唯一键被同步,否则在 changefeed 创建或运行时会报错。

## 横向扩展大单表的负载到多个 TiCDC 节点

该功能可以按照大单表的数据量和每分钟的修改行数将表的同步范围切分为多个,并使各个范围之间所同步的数据量和修改行数基本相同。该功能将这些范围分布到多个 TiCDC 节点进行同步,使得多个 TiCDC 节点可以同时同步大单表。该功能可以解决以下两个问题:
Expand Down