Skip to content

Commit

Permalink
config(ticdc): No longer set open-protocol as default value for kafka…
Browse files Browse the repository at this point in the history
… sink protocol (#4002)
  • Loading branch information
Rustin170506 authored Dec 22, 2021
1 parent bd9494d commit e2214aa
Show file tree
Hide file tree
Showing 53 changed files with 112 additions and 112 deletions.
2 changes: 1 addition & 1 deletion cdc/owner/feed_state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (s *feedStateManagerSuite) TestChangefeedStatusNotExist(c *check.C) {
state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(c, state, map[string]string{
"/tidb/cdc/capture/d563bfc0-f406-4f34-bc7d-6dc2e35a44e5": `{"id":"d563bfc0-f406-4f34-bc7d-6dc2e35a44e5","address":"172.16.6.147:8300","version":"v5.0.0-master-dirty"}`,
"/tidb/cdc/changefeed/info/" + ctx.ChangefeedVars().ID: `{"sink-uri":"blackhole:///","opts":{},"create-time":"2021-06-05T00:44:15.065939487+08:00","start-ts":425381670108266496,"target-ts":0,"admin-job-type":1,"sort-engine":"unified","config":{"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"default"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1}},"state":"failed","history":[],"error":{"addr":"172.16.6.147:8300","code":"CDC:ErrSnapshotLostByGC","message":"[CDC:ErrSnapshotLostByGC]fail to create or maintain changefeed due to snapshot loss caused by GC. checkpoint-ts 425381670108266496 is earlier than GC safepoint at 0"},"sync-point-enabled":false,"sync-point-interval":600000000000,"creator-version":"v5.0.0-master-dirty"}`,
"/tidb/cdc/changefeed/info/" + ctx.ChangefeedVars().ID: `{"sink-uri":"blackhole:///","opts":{},"create-time":"2021-06-05T00:44:15.065939487+08:00","start-ts":425381670108266496,"target-ts":0,"admin-job-type":1,"sort-engine":"unified","config":{"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"open-protocol"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1}},"state":"failed","history":[],"error":{"addr":"172.16.6.147:8300","code":"CDC:ErrSnapshotLostByGC","message":"[CDC:ErrSnapshotLostByGC]fail to create or maintain changefeed due to snapshot loss caused by GC. checkpoint-ts 425381670108266496 is earlier than GC safepoint at 0"},"sync-point-enabled":false,"sync-point-interval":600000000000,"creator-version":"v5.0.0-master-dirty"}`,
"/tidb/cdc/owner/156579d017f84a68": "d563bfc0-f406-4f34-bc7d-6dc2e35a44e5",
})
manager.Tick(state)
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func initProcessor4Test(ctx cdcContext.Context, c *check.C) (*processor, *orches
p.changefeed = orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID)
return p, orchestrator.NewReactorStateTester(c, p.changefeed, map[string]string{
"/tidb/cdc/capture/" + ctx.GlobalVars().CaptureInfo.ID: `{"id":"` + ctx.GlobalVars().CaptureInfo.ID + `","address":"127.0.0.1:8300"}`,
"/tidb/cdc/changefeed/info/" + ctx.ChangefeedVars().ID: `{"sink-uri":"blackhole://","opts":{},"create-time":"2020-02-02T00:00:00.000000+00:00","start-ts":0,"target-ts":0,"admin-job-type":0,"sort-engine":"memory","sort-dir":".","config":{"case-sensitive":true,"enable-old-value":false,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"default"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1}},"state":"normal","history":null,"error":null,"sync-point-enabled":false,"sync-point-interval":600000000000}`,
"/tidb/cdc/changefeed/info/" + ctx.ChangefeedVars().ID: `{"sink-uri":"blackhole://","opts":{},"create-time":"2020-02-02T00:00:00.000000+00:00","start-ts":0,"target-ts":0,"admin-job-type":0,"sort-engine":"memory","sort-dir":".","config":{"case-sensitive":true,"enable-old-value":false,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"open-protocol"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1}},"state":"normal","history":null,"error":null,"sync-point-enabled":false,"sync-point-interval":600000000000}`,
"/tidb/cdc/job/" + ctx.ChangefeedVars().ID: `{"resolved-ts":0,"checkpoint-ts":0,"admin-job-type":0}`,
"/tidb/cdc/task/status/" + ctx.GlobalVars().CaptureInfo.ID + "/" + ctx.ChangefeedVars().ID: `{"tables":{},"operation":null,"admin-job-type":0}`,
})
Expand Down
6 changes: 3 additions & 3 deletions cdc/sink/mq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (s mqSinkSuite) TestKafkaSink(c *check.C) {

uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" +
"&max-message-bytes=1048576&partition-num=1" +
"&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip&protocol=default"
"&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip&protocol=open-protocol"
uri := fmt.Sprintf(uriTemplate, leader.Addr(), topic)
sinkURI, err := url.Parse(uri)
c.Assert(err, check.IsNil)
Expand Down Expand Up @@ -158,7 +158,7 @@ func (s mqSinkSuite) TestKafkaSinkFilter(c *check.C) {
prodSuccess := new(sarama.ProduceResponse)
prodSuccess.AddTopicPartition(topic, 0, sarama.ErrNoError)

uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&auto-create-topic=false&protocol=default"
uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&auto-create-topic=false&protocol=open-protocol"
uri := fmt.Sprintf(uriTemplate, leader.Addr(), topic)
sinkURI, err := url.Parse(uri)
c.Assert(err, check.IsNil)
Expand Down Expand Up @@ -257,7 +257,7 @@ func (s mqSinkSuite) TestFlushRowChangedEvents(c *check.C) {

uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" +
"&max-message-bytes=1048576&partition-num=1" +
"&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip&protocol=default"
"&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip&protocol=open-protocol"
uri := fmt.Sprintf(uriTemplate, leader.Addr(), topic)
sinkURI, err := url.Parse(uri)
c.Assert(err, check.IsNil)
Expand Down
2 changes: 1 addition & 1 deletion docs/design/2021-10-13-ticdc-mq-sink-column-selector.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ dispatchers = [
{matcher = ['test3.*', 'test4.*'], dispatcher = "rowid"},
]

protocol = "default"
protocol = "open-protocol"

column-selectors = [
{matcher = ['test1.*', 'test2.*'], columns = ["Column selector expression"]},
Expand Down
6 changes: 3 additions & 3 deletions pkg/cmd/util/changefeed.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ column-selectors = [
{ matcher = ['test3.*', 'test4.*'], columns = ["!a", "column3"] },
]
# 对于 MQ 类的 Sink,可以指定消息的协议格式
# 协议目前支持 default, canal, avro 和 maxwell 四种,default 为 ticdc-open-protocol
# 协议目前支持 open-protocol, canal, canal-json, avro 和 maxwell 五种。
# For MQ Sinks, you can configure the protocol of the messages sending to MQ
# Currently the protocol support default, canal, avro and maxwell. Default is ticdc-open-protocol
protocol = "default"
# Currently the protocol support open-protocol, canal, canal-json, avro and maxwell.
protocol = "open-protocol"

[cyclic-replication]
# 是否开启环形复制
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/util/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (s *utilsSuite) TestAndWriteExampleReplicaTOML(c *check.C) {
{Matcher: []string{"test1.*", "test2.*"}, Columns: []string{"column1", "column2"}},
{Matcher: []string{"test3.*", "test4.*"}, Columns: []string{"!a", "column3"}},
},
Protocol: "default",
Protocol: "open-protocol",
})
c.Assert(cfg.Cyclic, check.DeepEquals, &config.CyclicConfig{
Enable: false,
Expand Down
4 changes: 1 addition & 3 deletions pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@ var defaultReplicaConfig = &ReplicaConfig{
Mounter: &MounterConfig{
WorkerNum: 16,
},
Sink: &SinkConfig{
Protocol: ProtocolOpen.String(),
},
Sink: &SinkConfig{},
Cyclic: &CyclicConfig{
Enable: false,
},
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/replica_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func TestReplicaConfigMarshal(t *testing.T) {
conf.ForceReplicate = true
conf.Filter.Rules = []string{"1.1"}
conf.Mounter.WorkerNum = 3
conf.Sink.Protocol = "open-protocol"
conf.Sink.ColumnSelectors = []*ColumnSelector{
{
Matcher: []string{"1.1"},
Expand Down Expand Up @@ -74,6 +75,7 @@ func TestReplicaConfigOutDated(t *testing.T) {
conf.ForceReplicate = true
conf.Filter.Rules = []string{"1.1"}
conf.Mounter.WorkerNum = 3
conf.Sink.Protocol = "open-protocol"
conf.Sink.DispatchRules = []*DispatchRule{
{Matcher: []string{"a.b"}, Dispatcher: "r1"},
{Matcher: []string{"a.c"}, Dispatcher: "r2"},
Expand Down
14 changes: 7 additions & 7 deletions pkg/orchestrator/reactor_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (s *stateSuite) TestChangefeedStateUpdate(c *check.C) {
"/tidb/cdc/capture/6bbc01c8-0605-4f86-a0f9-b3119109b225",
},
updateValue: []string{
`{"sink-uri":"blackhole://","opts":{},"create-time":"2020-02-02T00:00:00.000000+00:00","start-ts":421980685886554116,"target-ts":0,"admin-job-type":0,"sort-engine":"memory","sort-dir":"","config":{"case-sensitive":true,"enable-old-value":false,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"default"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1},"consistent":{"level":"normal","storage":"local"}},"state":"normal","history":null,"error":null,"sync-point-enabled":false,"sync-point-interval":600000000000}`,
`{"sink-uri":"blackhole://","opts":{},"create-time":"2020-02-02T00:00:00.000000+00:00","start-ts":421980685886554116,"target-ts":0,"admin-job-type":0,"sort-engine":"memory","sort-dir":"","config":{"case-sensitive":true,"enable-old-value":false,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"open-protocol"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1},"consistent":{"level":"normal","storage":"local"}},"state":"normal","history":null,"error":null,"sync-point-enabled":false,"sync-point-interval":600000000000}`,
`{"resolved-ts":421980720003809281,"checkpoint-ts":421980719742451713,"admin-job-type":0}`,
`{"checkpoint-ts":421980720003809281,"resolved-ts":421980720003809281,"count":0,"error":null}`,
`{"tables":{"45":{"start-ts":421980685886554116,"mark-table-id":0}},"operation":null,"admin-job-type":0}`,
Expand All @@ -86,7 +86,7 @@ func (s *stateSuite) TestChangefeedStateUpdate(c *check.C) {
CheckGCSafePoint: true,
Filter: &config.FilterConfig{Rules: []string{"*.*"}},
Mounter: &config.MounterConfig{WorkerNum: 16},
Sink: &config.SinkConfig{Protocol: "default"},
Sink: &config.SinkConfig{Protocol: "open-protocol"},
Cyclic: &config.CyclicConfig{},
Scheduler: &config.SchedulerConfig{Tp: "table-number", PollingTime: -1},
Consistent: &config.ConsistentConfig{Level: "normal", Storage: "local"},
Expand Down Expand Up @@ -121,7 +121,7 @@ func (s *stateSuite) TestChangefeedStateUpdate(c *check.C) {
"/tidb/cdc/capture/666777888",
},
updateValue: []string{
`{"sink-uri":"blackhole://","opts":{},"create-time":"2020-02-02T00:00:00.000000+00:00","start-ts":421980685886554116,"target-ts":0,"admin-job-type":0,"sort-engine":"memory","sort-dir":"","config":{"case-sensitive":true,"enable-old-value":false,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"default"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1},"consistent":{"level":"normal","storage":"local"}},"state":"normal","history":null,"error":null,"sync-point-enabled":false,"sync-point-interval":600000000000}`,
`{"sink-uri":"blackhole://","opts":{},"create-time":"2020-02-02T00:00:00.000000+00:00","start-ts":421980685886554116,"target-ts":0,"admin-job-type":0,"sort-engine":"memory","sort-dir":"","config":{"case-sensitive":true,"enable-old-value":false,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"open-protocol"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1},"consistent":{"level":"normal","storage":"local"}},"state":"normal","history":null,"error":null,"sync-point-enabled":false,"sync-point-interval":600000000000}`,
`{"resolved-ts":421980720003809281,"checkpoint-ts":421980719742451713,"admin-job-type":0}`,
`{"checkpoint-ts":421980720003809281,"resolved-ts":421980720003809281,"count":0,"error":null}`,
`{"tables":{"45":{"start-ts":421980685886554116,"mark-table-id":0}},"operation":null,"admin-job-type":0}`,
Expand All @@ -147,7 +147,7 @@ func (s *stateSuite) TestChangefeedStateUpdate(c *check.C) {
CheckGCSafePoint: true,
Filter: &config.FilterConfig{Rules: []string{"*.*"}},
Mounter: &config.MounterConfig{WorkerNum: 16},
Sink: &config.SinkConfig{Protocol: "default"},
Sink: &config.SinkConfig{Protocol: "open-protocol"},
Cyclic: &config.CyclicConfig{},
Scheduler: &config.SchedulerConfig{Tp: "table-number", PollingTime: -1},
Consistent: &config.ConsistentConfig{Level: "normal", Storage: "local"},
Expand Down Expand Up @@ -188,7 +188,7 @@ func (s *stateSuite) TestChangefeedStateUpdate(c *check.C) {
"/tidb/cdc/task/workload/6bbc01c8-0605-4f86-a0f9-b3119109b225/test-fake",
},
updateValue: []string{
`{"sink-uri":"blackhole://","opts":{},"create-time":"2020-02-02T00:00:00.000000+00:00","start-ts":421980685886554116,"target-ts":0,"admin-job-type":0,"sort-engine":"memory","sort-dir":"","config":{"case-sensitive":true,"enable-old-value":false,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"default"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1},"consistent":{"level":"normal","storage":"local"}},"state":"normal","history":null,"error":null,"sync-point-enabled":false,"sync-point-interval":600000000000}`,
`{"sink-uri":"blackhole://","opts":{},"create-time":"2020-02-02T00:00:00.000000+00:00","start-ts":421980685886554116,"target-ts":0,"admin-job-type":0,"sort-engine":"memory","sort-dir":"","config":{"case-sensitive":true,"enable-old-value":false,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"open-protocol"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1},"consistent":{"level":"normal","storage":"local"}},"state":"normal","history":null,"error":null,"sync-point-enabled":false,"sync-point-interval":600000000000}`,
`{"resolved-ts":421980720003809281,"checkpoint-ts":421980719742451713,"admin-job-type":0}`,
`{"checkpoint-ts":421980720003809281,"resolved-ts":421980720003809281,"count":0,"error":null}`,
`{"tables":{"45":{"start-ts":421980685886554116,"mark-table-id":0}},"operation":null,"admin-job-type":0}`,
Expand All @@ -215,7 +215,7 @@ func (s *stateSuite) TestChangefeedStateUpdate(c *check.C) {
CheckGCSafePoint: true,
Filter: &config.FilterConfig{Rules: []string{"*.*"}},
Mounter: &config.MounterConfig{WorkerNum: 16},
Sink: &config.SinkConfig{Protocol: "default"},
Sink: &config.SinkConfig{Protocol: "open-protocol"},
Cyclic: &config.CyclicConfig{},
Scheduler: &config.SchedulerConfig{Tp: "table-number", PollingTime: -1},
Consistent: &config.ConsistentConfig{Level: "normal", Storage: "local"},
Expand Down Expand Up @@ -257,7 +257,7 @@ func (s *stateSuite) TestChangefeedStateUpdate(c *check.C) {
"/tidb/cdc/task/status/666777888/test1",
},
updateValue: []string{
`{"sink-uri":"blackhole://","opts":{},"create-time":"2020-02-02T00:00:00.000000+00:00","start-ts":421980685886554116,"target-ts":0,"admin-job-type":0,"sort-engine":"memory","sort-dir":"","config":{"case-sensitive":true,"enable-old-value":false,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"default"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1},"consistent":{"level":"normal","storage":"local"}},"state":"normal","history":null,"error":null,"sync-point-enabled":false,"sync-point-interval":600000000000}`,
`{"sink-uri":"blackhole://","opts":{},"create-time":"2020-02-02T00:00:00.000000+00:00","start-ts":421980685886554116,"target-ts":0,"admin-job-type":0,"sort-engine":"memory","sort-dir":"","config":{"case-sensitive":true,"enable-old-value":false,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"open-protocol"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1},"consistent":{"level":"normal","storage":"local"}},"state":"normal","history":null,"error":null,"sync-point-enabled":false,"sync-point-interval":600000000000}`,
`{"resolved-ts":421980720003809281,"checkpoint-ts":421980719742451713,"admin-job-type":0}`,
`{"checkpoint-ts":421980720003809281,"resolved-ts":421980720003809281,"count":0,"error":null}`,
`{"tables":{"45":{"start-ts":421980685886554116,"mark-table-id":0}},"operation":null,"admin-job-type":0}`,
Expand Down
4 changes: 2 additions & 2 deletions tests/integration_tests/autorandom/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ function run() {

TOPIC_NAME="ticdc-autorandom-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
*) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;;
esac
cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi
run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# sync_diff can't check non-exist table, so we check expected tables are created in downstream first
Expand Down
Loading

0 comments on commit e2214aa

Please sign in to comment.