diff --git a/cdc/owner/feed_state_manager_test.go b/cdc/owner/feed_state_manager_test.go index 6bb962938e7..3b6ea94a83b 100644 --- a/cdc/owner/feed_state_manager_test.go +++ b/cdc/owner/feed_state_manager_test.go @@ -233,7 +233,7 @@ func (s *feedStateManagerSuite) TestChangefeedStatusNotExist(c *check.C) { state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID) tester := orchestrator.NewReactorStateTester(c, state, map[string]string{ "/tidb/cdc/capture/d563bfc0-f406-4f34-bc7d-6dc2e35a44e5": `{"id":"d563bfc0-f406-4f34-bc7d-6dc2e35a44e5","address":"172.16.6.147:8300","version":"v5.0.0-master-dirty"}`, - "/tidb/cdc/changefeed/info/" + ctx.ChangefeedVars().ID: `{"sink-uri":"blackhole:///","opts":{},"create-time":"2021-06-05T00:44:15.065939487+08:00","start-ts":425381670108266496,"target-ts":0,"admin-job-type":1,"sort-engine":"unified","config":{"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"default"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1}},"state":"failed","history":[],"error":{"addr":"172.16.6.147:8300","code":"CDC:ErrSnapshotLostByGC","message":"[CDC:ErrSnapshotLostByGC]fail to create or maintain changefeed due to snapshot loss caused by GC. checkpoint-ts 425381670108266496 is earlier than GC safepoint at 0"},"sync-point-enabled":false,"sync-point-interval":600000000000,"creator-version":"v5.0.0-master-dirty"}`, + "/tidb/cdc/changefeed/info/" + ctx.ChangefeedVars().ID: `{"sink-uri":"blackhole:///","opts":{},"create-time":"2021-06-05T00:44:15.065939487+08:00","start-ts":425381670108266496,"target-ts":0,"admin-job-type":1,"sort-engine":"unified","config":{"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"open-protocol"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1}},"state":"failed","history":[],"error":{"addr":"172.16.6.147:8300","code":"CDC:ErrSnapshotLostByGC","message":"[CDC:ErrSnapshotLostByGC]fail to create or maintain changefeed due to snapshot loss caused by GC. checkpoint-ts 425381670108266496 is earlier than GC safepoint at 0"},"sync-point-enabled":false,"sync-point-interval":600000000000,"creator-version":"v5.0.0-master-dirty"}`, "/tidb/cdc/owner/156579d017f84a68": "d563bfc0-f406-4f34-bc7d-6dc2e35a44e5", }) manager.Tick(state) diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index d7c89bd135f..b171473d167 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -76,7 +76,7 @@ func initProcessor4Test(ctx cdcContext.Context, c *check.C) (*processor, *orches p.changefeed = orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID) return p, orchestrator.NewReactorStateTester(c, p.changefeed, map[string]string{ "/tidb/cdc/capture/" + ctx.GlobalVars().CaptureInfo.ID: `{"id":"` + ctx.GlobalVars().CaptureInfo.ID + `","address":"127.0.0.1:8300"}`, - "/tidb/cdc/changefeed/info/" + ctx.ChangefeedVars().ID: `{"sink-uri":"blackhole://","opts":{},"create-time":"2020-02-02T00:00:00.000000+00:00","start-ts":0,"target-ts":0,"admin-job-type":0,"sort-engine":"memory","sort-dir":".","config":{"case-sensitive":true,"enable-old-value":false,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"default"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1}},"state":"normal","history":null,"error":null,"sync-point-enabled":false,"sync-point-interval":600000000000}`, + "/tidb/cdc/changefeed/info/" + ctx.ChangefeedVars().ID: `{"sink-uri":"blackhole://","opts":{},"create-time":"2020-02-02T00:00:00.000000+00:00","start-ts":0,"target-ts":0,"admin-job-type":0,"sort-engine":"memory","sort-dir":".","config":{"case-sensitive":true,"enable-old-value":false,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"open-protocol"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1}},"state":"normal","history":null,"error":null,"sync-point-enabled":false,"sync-point-interval":600000000000}`, "/tidb/cdc/job/" + ctx.ChangefeedVars().ID: `{"resolved-ts":0,"checkpoint-ts":0,"admin-job-type":0}`, "/tidb/cdc/task/status/" + ctx.GlobalVars().CaptureInfo.ID + "/" + ctx.ChangefeedVars().ID: `{"tables":{},"operation":null,"admin-job-type":0}`, }) diff --git a/cdc/sink/mq_test.go b/cdc/sink/mq_test.go index 537bc7a69e6..417049c6552 100644 --- a/cdc/sink/mq_test.go +++ b/cdc/sink/mq_test.go @@ -54,7 +54,7 @@ func (s mqSinkSuite) TestKafkaSink(c *check.C) { uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" + "&max-message-bytes=1048576&partition-num=1" + - "&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip&protocol=default" + "&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip&protocol=open-protocol" uri := fmt.Sprintf(uriTemplate, leader.Addr(), topic) sinkURI, err := url.Parse(uri) c.Assert(err, check.IsNil) @@ -158,7 +158,7 @@ func (s mqSinkSuite) TestKafkaSinkFilter(c *check.C) { prodSuccess := new(sarama.ProduceResponse) prodSuccess.AddTopicPartition(topic, 0, sarama.ErrNoError) - uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&auto-create-topic=false&protocol=default" + uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&auto-create-topic=false&protocol=open-protocol" uri := fmt.Sprintf(uriTemplate, leader.Addr(), topic) sinkURI, err := url.Parse(uri) c.Assert(err, check.IsNil) @@ -257,7 +257,7 @@ func (s mqSinkSuite) TestFlushRowChangedEvents(c *check.C) { uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" + "&max-message-bytes=1048576&partition-num=1" + - "&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip&protocol=default" + "&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip&protocol=open-protocol" uri := fmt.Sprintf(uriTemplate, leader.Addr(), topic) sinkURI, err := url.Parse(uri) c.Assert(err, check.IsNil) diff --git a/docs/design/2021-10-13-ticdc-mq-sink-column-selector.md b/docs/design/2021-10-13-ticdc-mq-sink-column-selector.md index 11b75217d11..e440c58ba13 100644 --- a/docs/design/2021-10-13-ticdc-mq-sink-column-selector.md +++ b/docs/design/2021-10-13-ticdc-mq-sink-column-selector.md @@ -45,7 +45,7 @@ dispatchers = [ {matcher = ['test3.*', 'test4.*'], dispatcher = "rowid"}, ] -protocol = "default" +protocol = "open-protocol" column-selectors = [ {matcher = ['test1.*', 'test2.*'], columns = ["Column selector expression"]}, diff --git a/pkg/cmd/util/changefeed.toml b/pkg/cmd/util/changefeed.toml index 53b169d5e71..db5f686d78c 100644 --- a/pkg/cmd/util/changefeed.toml +++ b/pkg/cmd/util/changefeed.toml @@ -37,10 +37,10 @@ column-selectors = [ { matcher = ['test3.*', 'test4.*'], columns = ["!a", "column3"] }, ] # 对于 MQ 类的 Sink,可以指定消息的协议格式 -# 协议目前支持 default, canal, avro 和 maxwell 四种,default 为 ticdc-open-protocol +# 协议目前支持 open-protocol, canal, canal-json, avro 和 maxwell 五种。 # For MQ Sinks, you can configure the protocol of the messages sending to MQ -# Currently the protocol support default, canal, avro and maxwell. Default is ticdc-open-protocol -protocol = "default" +# Currently the protocol support open-protocol, canal, canal-json, avro and maxwell. +protocol = "open-protocol" [cyclic-replication] # 是否开启环形复制 diff --git a/pkg/cmd/util/helper_test.go b/pkg/cmd/util/helper_test.go index a6e466508f9..93b74d2bfda 100644 --- a/pkg/cmd/util/helper_test.go +++ b/pkg/cmd/util/helper_test.go @@ -193,7 +193,7 @@ func (s *utilsSuite) TestAndWriteExampleReplicaTOML(c *check.C) { {Matcher: []string{"test1.*", "test2.*"}, Columns: []string{"column1", "column2"}}, {Matcher: []string{"test3.*", "test4.*"}, Columns: []string{"!a", "column3"}}, }, - Protocol: "default", + Protocol: "open-protocol", }) c.Assert(cfg.Cyclic, check.DeepEquals, &config.CyclicConfig{ Enable: false, diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index 09ec851e7d1..8f29bd88f94 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -35,9 +35,7 @@ var defaultReplicaConfig = &ReplicaConfig{ Mounter: &MounterConfig{ WorkerNum: 16, }, - Sink: &SinkConfig{ - Protocol: ProtocolOpen.String(), - }, + Sink: &SinkConfig{}, Cyclic: &CyclicConfig{ Enable: false, }, diff --git a/pkg/config/replica_config_test.go b/pkg/config/replica_config_test.go index d99d0efc22d..1fd6c93fc67 100644 --- a/pkg/config/replica_config_test.go +++ b/pkg/config/replica_config_test.go @@ -35,6 +35,7 @@ func TestReplicaConfigMarshal(t *testing.T) { conf.ForceReplicate = true conf.Filter.Rules = []string{"1.1"} conf.Mounter.WorkerNum = 3 + conf.Sink.Protocol = "open-protocol" conf.Sink.ColumnSelectors = []*ColumnSelector{ { Matcher: []string{"1.1"}, @@ -74,6 +75,7 @@ func TestReplicaConfigOutDated(t *testing.T) { conf.ForceReplicate = true conf.Filter.Rules = []string{"1.1"} conf.Mounter.WorkerNum = 3 + conf.Sink.Protocol = "open-protocol" conf.Sink.DispatchRules = []*DispatchRule{ {Matcher: []string{"a.b"}, Dispatcher: "r1"}, {Matcher: []string{"a.c"}, Dispatcher: "r2"}, diff --git a/pkg/orchestrator/reactor_state_test.go b/pkg/orchestrator/reactor_state_test.go index f9f83b58f51..8d51eaa6f4d 100644 --- a/pkg/orchestrator/reactor_state_test.go +++ b/pkg/orchestrator/reactor_state_test.go @@ -64,7 +64,7 @@ func (s *stateSuite) TestChangefeedStateUpdate(c *check.C) { "/tidb/cdc/capture/6bbc01c8-0605-4f86-a0f9-b3119109b225", }, updateValue: []string{ - `{"sink-uri":"blackhole://","opts":{},"create-time":"2020-02-02T00:00:00.000000+00:00","start-ts":421980685886554116,"target-ts":0,"admin-job-type":0,"sort-engine":"memory","sort-dir":"","config":{"case-sensitive":true,"enable-old-value":false,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"default"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1},"consistent":{"level":"normal","storage":"local"}},"state":"normal","history":null,"error":null,"sync-point-enabled":false,"sync-point-interval":600000000000}`, + `{"sink-uri":"blackhole://","opts":{},"create-time":"2020-02-02T00:00:00.000000+00:00","start-ts":421980685886554116,"target-ts":0,"admin-job-type":0,"sort-engine":"memory","sort-dir":"","config":{"case-sensitive":true,"enable-old-value":false,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"open-protocol"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1},"consistent":{"level":"normal","storage":"local"}},"state":"normal","history":null,"error":null,"sync-point-enabled":false,"sync-point-interval":600000000000}`, `{"resolved-ts":421980720003809281,"checkpoint-ts":421980719742451713,"admin-job-type":0}`, `{"checkpoint-ts":421980720003809281,"resolved-ts":421980720003809281,"count":0,"error":null}`, `{"tables":{"45":{"start-ts":421980685886554116,"mark-table-id":0}},"operation":null,"admin-job-type":0}`, @@ -86,7 +86,7 @@ func (s *stateSuite) TestChangefeedStateUpdate(c *check.C) { CheckGCSafePoint: true, Filter: &config.FilterConfig{Rules: []string{"*.*"}}, Mounter: &config.MounterConfig{WorkerNum: 16}, - Sink: &config.SinkConfig{Protocol: "default"}, + Sink: &config.SinkConfig{Protocol: "open-protocol"}, Cyclic: &config.CyclicConfig{}, Scheduler: &config.SchedulerConfig{Tp: "table-number", PollingTime: -1}, Consistent: &config.ConsistentConfig{Level: "normal", Storage: "local"}, @@ -121,7 +121,7 @@ func (s *stateSuite) TestChangefeedStateUpdate(c *check.C) { "/tidb/cdc/capture/666777888", }, updateValue: []string{ - `{"sink-uri":"blackhole://","opts":{},"create-time":"2020-02-02T00:00:00.000000+00:00","start-ts":421980685886554116,"target-ts":0,"admin-job-type":0,"sort-engine":"memory","sort-dir":"","config":{"case-sensitive":true,"enable-old-value":false,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"default"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1},"consistent":{"level":"normal","storage":"local"}},"state":"normal","history":null,"error":null,"sync-point-enabled":false,"sync-point-interval":600000000000}`, + `{"sink-uri":"blackhole://","opts":{},"create-time":"2020-02-02T00:00:00.000000+00:00","start-ts":421980685886554116,"target-ts":0,"admin-job-type":0,"sort-engine":"memory","sort-dir":"","config":{"case-sensitive":true,"enable-old-value":false,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"open-protocol"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1},"consistent":{"level":"normal","storage":"local"}},"state":"normal","history":null,"error":null,"sync-point-enabled":false,"sync-point-interval":600000000000}`, `{"resolved-ts":421980720003809281,"checkpoint-ts":421980719742451713,"admin-job-type":0}`, `{"checkpoint-ts":421980720003809281,"resolved-ts":421980720003809281,"count":0,"error":null}`, `{"tables":{"45":{"start-ts":421980685886554116,"mark-table-id":0}},"operation":null,"admin-job-type":0}`, @@ -147,7 +147,7 @@ func (s *stateSuite) TestChangefeedStateUpdate(c *check.C) { CheckGCSafePoint: true, Filter: &config.FilterConfig{Rules: []string{"*.*"}}, Mounter: &config.MounterConfig{WorkerNum: 16}, - Sink: &config.SinkConfig{Protocol: "default"}, + Sink: &config.SinkConfig{Protocol: "open-protocol"}, Cyclic: &config.CyclicConfig{}, Scheduler: &config.SchedulerConfig{Tp: "table-number", PollingTime: -1}, Consistent: &config.ConsistentConfig{Level: "normal", Storage: "local"}, @@ -188,7 +188,7 @@ func (s *stateSuite) TestChangefeedStateUpdate(c *check.C) { "/tidb/cdc/task/workload/6bbc01c8-0605-4f86-a0f9-b3119109b225/test-fake", }, updateValue: []string{ - `{"sink-uri":"blackhole://","opts":{},"create-time":"2020-02-02T00:00:00.000000+00:00","start-ts":421980685886554116,"target-ts":0,"admin-job-type":0,"sort-engine":"memory","sort-dir":"","config":{"case-sensitive":true,"enable-old-value":false,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"default"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1},"consistent":{"level":"normal","storage":"local"}},"state":"normal","history":null,"error":null,"sync-point-enabled":false,"sync-point-interval":600000000000}`, + `{"sink-uri":"blackhole://","opts":{},"create-time":"2020-02-02T00:00:00.000000+00:00","start-ts":421980685886554116,"target-ts":0,"admin-job-type":0,"sort-engine":"memory","sort-dir":"","config":{"case-sensitive":true,"enable-old-value":false,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"open-protocol"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1},"consistent":{"level":"normal","storage":"local"}},"state":"normal","history":null,"error":null,"sync-point-enabled":false,"sync-point-interval":600000000000}`, `{"resolved-ts":421980720003809281,"checkpoint-ts":421980719742451713,"admin-job-type":0}`, `{"checkpoint-ts":421980720003809281,"resolved-ts":421980720003809281,"count":0,"error":null}`, `{"tables":{"45":{"start-ts":421980685886554116,"mark-table-id":0}},"operation":null,"admin-job-type":0}`, @@ -215,7 +215,7 @@ func (s *stateSuite) TestChangefeedStateUpdate(c *check.C) { CheckGCSafePoint: true, Filter: &config.FilterConfig{Rules: []string{"*.*"}}, Mounter: &config.MounterConfig{WorkerNum: 16}, - Sink: &config.SinkConfig{Protocol: "default"}, + Sink: &config.SinkConfig{Protocol: "open-protocol"}, Cyclic: &config.CyclicConfig{}, Scheduler: &config.SchedulerConfig{Tp: "table-number", PollingTime: -1}, Consistent: &config.ConsistentConfig{Level: "normal", Storage: "local"}, @@ -257,7 +257,7 @@ func (s *stateSuite) TestChangefeedStateUpdate(c *check.C) { "/tidb/cdc/task/status/666777888/test1", }, updateValue: []string{ - `{"sink-uri":"blackhole://","opts":{},"create-time":"2020-02-02T00:00:00.000000+00:00","start-ts":421980685886554116,"target-ts":0,"admin-job-type":0,"sort-engine":"memory","sort-dir":"","config":{"case-sensitive":true,"enable-old-value":false,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"default"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1},"consistent":{"level":"normal","storage":"local"}},"state":"normal","history":null,"error":null,"sync-point-enabled":false,"sync-point-interval":600000000000}`, + `{"sink-uri":"blackhole://","opts":{},"create-time":"2020-02-02T00:00:00.000000+00:00","start-ts":421980685886554116,"target-ts":0,"admin-job-type":0,"sort-engine":"memory","sort-dir":"","config":{"case-sensitive":true,"enable-old-value":false,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"open-protocol"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1},"consistent":{"level":"normal","storage":"local"}},"state":"normal","history":null,"error":null,"sync-point-enabled":false,"sync-point-interval":600000000000}`, `{"resolved-ts":421980720003809281,"checkpoint-ts":421980719742451713,"admin-job-type":0}`, `{"checkpoint-ts":421980720003809281,"resolved-ts":421980720003809281,"count":0,"error":null}`, `{"tables":{"45":{"start-ts":421980685886554116,"mark-table-id":0}},"operation":null,"admin-job-type":0}`, diff --git a/tests/integration_tests/autorandom/run.sh b/tests/integration_tests/autorandom/run.sh index 3c5d16b06c9..1260bcc5557 100644 --- a/tests/integration_tests/autorandom/run.sh +++ b/tests/integration_tests/autorandom/run.sh @@ -22,12 +22,12 @@ function run() { TOPIC_NAME="ticdc-autorandom-test-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} # sync_diff can't check non-exist table, so we check expected tables are created in downstream first diff --git a/tests/integration_tests/batch_add_table/run.sh b/tests/integration_tests/batch_add_table/run.sh index c2f543c0c2d..0978027b7be 100644 --- a/tests/integration_tests/batch_add_table/run.sh +++ b/tests/integration_tests/batch_add_table/run.sh @@ -24,12 +24,12 @@ function run() { TOPIC_NAME="ticdc-batch-add-table-test-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} diff --git a/tests/integration_tests/capture_session_done_during_task/run.sh b/tests/integration_tests/capture_session_done_during_task/run.sh index 0205d74943d..ac42c3df067 100644 --- a/tests/integration_tests/capture_session_done_during_task/run.sh +++ b/tests/integration_tests/capture_session_done_during_task/run.sh @@ -16,7 +16,7 @@ function run() { pd_addr="http://$UP_PD_HOST_1:$UP_PD_PORT_1" TOPIC_NAME="ticdc-capture-session-done-during-task-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;; esac @@ -35,7 +35,7 @@ function run() { sleep 1 if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi capture_key=$(ETCDCTL_API=3 etcdctl get /tidb/cdc/capture --prefix | head -n 1) diff --git a/tests/integration_tests/cdc/run.sh b/tests/integration_tests/cdc/run.sh index 4ca76889705..e9eed66427b 100755 --- a/tests/integration_tests/cdc/run.sh +++ b/tests/integration_tests/cdc/run.sh @@ -23,12 +23,12 @@ function prepare() { TOPIC_NAME="ticdc-cdc-test-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi } diff --git a/tests/integration_tests/changefeed_auto_stop/run.sh b/tests/integration_tests/changefeed_auto_stop/run.sh index 19ce910a00b..305fe9e4528 100755 --- a/tests/integration_tests/changefeed_auto_stop/run.sh +++ b/tests/integration_tests/changefeed_auto_stop/run.sh @@ -50,12 +50,12 @@ function run() { TOPIC_NAME="ticdc-changefeed-auto-stop-test-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac changefeedid=$(cdc cli changefeed create --pd="http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" --start-ts=$start_ts --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi ensure 10 check_changefeed_state "http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" ${changefeedid} "normal" "null" diff --git a/tests/integration_tests/changefeed_error/run.sh b/tests/integration_tests/changefeed_error/run.sh index 6c5b878c187..28d62dda939 100755 --- a/tests/integration_tests/changefeed_error/run.sh +++ b/tests/integration_tests/changefeed_error/run.sh @@ -120,13 +120,13 @@ function run() { TOPIC_NAME="ticdc-sink-retry-test-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;; esac changefeedid="changefeed-error" run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c $changefeedid if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi ensure $MAX_RETRIES check_changefeed_mark_failed_regex http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} ".*CDC:ErrStartTsBeforeGC.*" diff --git a/tests/integration_tests/changefeed_finish/run.sh b/tests/integration_tests/changefeed_finish/run.sh index 9857cd32448..58cf6be60ee 100755 --- a/tests/integration_tests/changefeed_finish/run.sh +++ b/tests/integration_tests/changefeed_finish/run.sh @@ -37,7 +37,7 @@ function run() { pd_addr="http://$UP_PD_HOST_1:$UP_PD_PORT_1" TOPIC_NAME="ticdc-changefeed-pause-resume-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;; esac @@ -48,7 +48,7 @@ function run() { changefeed_id=$(cdc cli changefeed create --sink-uri="$SINK_URI" --target-ts=$target_ts 2>&1 | tail -n2 | head -n1 | awk '{print $2}') if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi run_sql "CREATE DATABASE changefeed_finish;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} diff --git a/tests/integration_tests/changefeed_pause_resume/run.sh b/tests/integration_tests/changefeed_pause_resume/run.sh index 9be9837944d..5ecbb82db91 100755 --- a/tests/integration_tests/changefeed_pause_resume/run.sh +++ b/tests/integration_tests/changefeed_pause_resume/run.sh @@ -17,14 +17,14 @@ function run() { pd_addr="http://$UP_PD_HOST_1:$UP_PD_PORT_1" TOPIC_NAME="ticdc-changefeed-pause-resume-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;; esac run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi run_sql "CREATE DATABASE changefeed_pause_resume;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} diff --git a/tests/integration_tests/changefeed_reconstruct/run.sh b/tests/integration_tests/changefeed_reconstruct/run.sh index 54b47bc4601..439077dc189 100755 --- a/tests/integration_tests/changefeed_reconstruct/run.sh +++ b/tests/integration_tests/changefeed_reconstruct/run.sh @@ -40,7 +40,7 @@ function run() { pd_addr="http://$UP_PD_HOST_1:$UP_PD_PORT_1" TOPIC_NAME="ticdc-changefeed-reconstruct-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;; esac @@ -48,7 +48,7 @@ function run() { owner_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}') changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi run_sql "CREATE DATABASE changefeed_reconstruct;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} diff --git a/tests/integration_tests/cli/run.sh b/tests/integration_tests/cli/run.sh index ce07735d410..feebb2a24de 100644 --- a/tests/integration_tests/cli/run.sh +++ b/tests/integration_tests/cli/run.sh @@ -48,14 +48,14 @@ function run() { TOPIC_NAME="ticdc-cli-test-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac uuid="custom-changefeed-name" run_cdc_cli changefeed create --start-ts=$start_ts --sort-engine=memory --sink-uri="$SINK_URI" --tz="Asia/Shanghai" -c="$uuid" if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi run_cdc_cli changefeed cyclic create-marktables \ @@ -151,7 +151,7 @@ EOF # Test Kafka SSL connection. if [ "$SINK_TYPE" == "kafka" ]; then SSL_TOPIC_NAME="ticdc-cli-test-ssl-$RANDOM" - SINK_URI="kafka://127.0.0.1:9093/$SSL_TOPIC_NAME?ca=${TLS_DIR}/ca.pem&cert=${TLS_DIR}/client.pem&key=${TLS_DIR}/client-key.pem&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" + SINK_URI="kafka://127.0.0.1:9093/$SSL_TOPIC_NAME?protocol=open-protocol&ca=${TLS_DIR}/ca.pem&cert=${TLS_DIR}/client.pem&key=${TLS_DIR}/client-key.pem&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --tz="Asia/Shanghai" fi diff --git a/tests/integration_tests/clustered_index/run.sh b/tests/integration_tests/clustered_index/run.sh index f25ecc27ad0..b16ceb53314 100755 --- a/tests/integration_tests/clustered_index/run.sh +++ b/tests/integration_tests/clustered_index/run.sh @@ -22,12 +22,12 @@ function run() { TOPIC_NAME="ticdc-clustered-index-test-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi run_sql "set global tidb_enable_clustered_index=1;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} # TiDB global variables cache 2 seconds at most diff --git a/tests/integration_tests/common_1/run.sh b/tests/integration_tests/common_1/run.sh index 07ebc1193a9..d7512d3121c 100644 --- a/tests/integration_tests/common_1/run.sh +++ b/tests/integration_tests/common_1/run.sh @@ -37,12 +37,12 @@ function run() { # can't use the normal user TOPIC_NAME="ticdc-common-1-test-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql://root@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} diff --git a/tests/integration_tests/consistent_replicate_nfs/run.sh b/tests/integration_tests/consistent_replicate_nfs/run.sh index 3219102055a..3ffaf788638 100644 --- a/tests/integration_tests/consistent_replicate_nfs/run.sh +++ b/tests/integration_tests/consistent_replicate_nfs/run.sh @@ -39,12 +39,12 @@ function run() { TOPIC_NAME="ticdc-sink-retry-test-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&max-message-bytes=102400&kafka-version=${KAFKA_VERSION}" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&max-message-bytes=102400&kafka-version=${KAFKA_VERSION}" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac changefeed_id=$(cdc cli changefeed create --sink-uri="$SINK_URI" --config="$CUR/conf/changefeed.toml" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}" fi run_sql "CREATE DATABASE consistent_replicate_nfs;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} diff --git a/tests/integration_tests/consistent_replicate_s3/run.sh b/tests/integration_tests/consistent_replicate_s3/run.sh index 653bb21101c..253244cfc8c 100644 --- a/tests/integration_tests/consistent_replicate_s3/run.sh +++ b/tests/integration_tests/consistent_replicate_s3/run.sh @@ -72,12 +72,12 @@ function run() { TOPIC_NAME="ticdc-sink-retry-test-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&max-message-bytes=102400&kafka-version=${KAFKA_VERSION}" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&max-message-bytes=102400&kafka-version=${KAFKA_VERSION}" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac changefeed_id=$(cdc cli changefeed create --sink-uri="$SINK_URI" --config="$CUR/conf/changefeed.toml" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}" fi run_sql "CREATE DATABASE consistent_replicate_s3;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} diff --git a/tests/integration_tests/ddl_attributes/run.sh b/tests/integration_tests/ddl_attributes/run.sh index c1624d3e756..a8e8a7a65ab 100644 --- a/tests/integration_tests/ddl_attributes/run.sh +++ b/tests/integration_tests/ddl_attributes/run.sh @@ -22,12 +22,12 @@ function run() { TOPIC_NAME="ticdc-ddl-attributes-test-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}" ;; *) SINK_URI="mysql://root@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}" fi run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} # sync_diff can't check non-exist table, so we check expected tables are created in downstream first diff --git a/tests/integration_tests/ddl_puller_lag/run.sh b/tests/integration_tests/ddl_puller_lag/run.sh index 8f9595d7e0a..959deee203c 100644 --- a/tests/integration_tests/ddl_puller_lag/run.sh +++ b/tests/integration_tests/ddl_puller_lag/run.sh @@ -25,12 +25,12 @@ function prepare() { TOPIC_NAME="ticdc-ddl-puller-lag-test-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka+ssl://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-client-id=ddl_puller_lag&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka+ssl://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-client-id=ddl_puller_lag&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql+ssl://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi } diff --git a/tests/integration_tests/ddl_sequence/run.sh b/tests/integration_tests/ddl_sequence/run.sh index 8ab4b80e71f..07bb628ab9a 100644 --- a/tests/integration_tests/ddl_sequence/run.sh +++ b/tests/integration_tests/ddl_sequence/run.sh @@ -22,12 +22,12 @@ function run() { TOPIC_NAME="ticdc-ddl-sequence-test-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} # sync_diff can't check non-exist table, so we check expected tables are created in downstream first diff --git a/tests/integration_tests/drop_many_tables/run.sh b/tests/integration_tests/drop_many_tables/run.sh index e5a70e53157..ccfcd6e6957 100644 --- a/tests/integration_tests/drop_many_tables/run.sh +++ b/tests/integration_tests/drop_many_tables/run.sh @@ -22,12 +22,12 @@ function run() { TOPIC_NAME="ticdc-drop-tables-test-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} # sync_diff can't check non-exist table, so we check expected tables are created in downstream first diff --git a/tests/integration_tests/force_replicate_table/run.sh b/tests/integration_tests/force_replicate_table/run.sh index 0890524a943..fe2816e3bf2 100755 --- a/tests/integration_tests/force_replicate_table/run.sh +++ b/tests/integration_tests/force_replicate_table/run.sh @@ -59,12 +59,12 @@ function run() { TOPIC_NAME="ticdc-force_replicate_table-test-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?safe-mode=true" ;; esac cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --config $CUR/conf/changefeed.toml if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} diff --git a/tests/integration_tests/gc_safepoint/run.sh b/tests/integration_tests/gc_safepoint/run.sh index fa28076e00b..46165f1195d 100755 --- a/tests/integration_tests/gc_safepoint/run.sh +++ b/tests/integration_tests/gc_safepoint/run.sh @@ -83,14 +83,14 @@ function run() { pd_addr="http://$UP_PD_HOST_1:$UP_PD_PORT_1" TOPIC_NAME="ticdc-gc-safepoint-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;; esac export GO_FAILPOINTS='github.com/pingcap/tiflow/pkg/txnutil/gc/InjectGcSafepointUpdateInterval=return(500)' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi clear_gc_worker_safepoint $pd_addr $pd_cluster_id diff --git a/tests/integration_tests/generate_column/run.sh b/tests/integration_tests/generate_column/run.sh index e701b123bc5..6ddbc25e246 100644 --- a/tests/integration_tests/generate_column/run.sh +++ b/tests/integration_tests/generate_column/run.sh @@ -22,12 +22,12 @@ function run() { TOPIC_NAME="ticdc-generate-column-test-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} # sync_diff can't check non-exist table, so we check expected tables are created in downstream first diff --git a/tests/integration_tests/kafka_messages/run.sh b/tests/integration_tests/kafka_messages/run.sh index 9f952728c9d..1fb75b5adfa 100755 --- a/tests/integration_tests/kafka_messages/run.sh +++ b/tests/integration_tests/kafka_messages/run.sh @@ -31,10 +31,10 @@ function run_length_limit() { run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "info" TOPIC_NAME="ticdc-kafka-message-test-$RANDOM" - SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" + SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi # Add a check table to reduce check time, or if we check data with sync diff @@ -87,10 +87,10 @@ function run_batch_size_limit() { run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "info" TOPIC_NAME="ticdc-kafka-message-test-$RANDOM" - SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&max-batch-size=3&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" + SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&max-batch-size=3&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760&max-batch-size=3" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760&max-batch-size=3" fi # Add a check table to reduce check time, or if we check data with sync diff diff --git a/tests/integration_tests/kafka_sink_error_resume/run.sh b/tests/integration_tests/kafka_sink_error_resume/run.sh index 1020d5cd233..0560852fa6b 100755 --- a/tests/integration_tests/kafka_sink_error_resume/run.sh +++ b/tests/integration_tests/kafka_sink_error_resume/run.sh @@ -37,12 +37,12 @@ function run() { pd_addr="http://$UP_PD_HOST_1:$UP_PD_PORT_1" TOPIC_NAME="ticdc-kafka-sink-error-resume-test-$RANDOM" - SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" + SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/producer/kafka/KafkaSinkAsyncSendError=4*return(true)' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" run_sql "CREATE DATABASE kafka_sink_error_resume;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "CREATE table kafka_sink_error_resume.t1(id int primary key auto_increment, val int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} diff --git a/tests/integration_tests/kv_client_stream_reconnect/run.sh b/tests/integration_tests/kv_client_stream_reconnect/run.sh index ab39329419e..ac01e967428 100644 --- a/tests/integration_tests/kv_client_stream_reconnect/run.sh +++ b/tests/integration_tests/kv_client_stream_reconnect/run.sh @@ -19,7 +19,7 @@ function run() { pd_addr="http://$UP_PD_HOST_1:$UP_PD_PORT_1" TOPIC_NAME="kv-client-stream-reconnect-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;; esac @@ -28,7 +28,7 @@ function run() { run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi run_sql "CREATE DATABASE kv_client_stream_reconnect;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} diff --git a/tests/integration_tests/many_pk_or_uk/run.sh b/tests/integration_tests/many_pk_or_uk/run.sh index cec30b7acc8..e1da4c9f4d4 100755 --- a/tests/integration_tests/many_pk_or_uk/run.sh +++ b/tests/integration_tests/many_pk_or_uk/run.sh @@ -23,12 +23,12 @@ function prepare() { TOPIC_NAME="ticdc-many-pk-or-uk-test-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi } diff --git a/tests/integration_tests/move_table/run.sh b/tests/integration_tests/move_table/run.sh index 366c8074064..edd0b005662 100644 --- a/tests/integration_tests/move_table/run.sh +++ b/tests/integration_tests/move_table/run.sh @@ -27,13 +27,13 @@ function run() { TOPIC_NAME="ticdc-move-table-test-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "debug" --logsuffix "2" --addr 127.0.0.1:8301 diff --git a/tests/integration_tests/multi_capture/run.sh b/tests/integration_tests/multi_capture/run.sh index 965cddbe251..6b54c9186e7 100755 --- a/tests/integration_tests/multi_capture/run.sh +++ b/tests/integration_tests/multi_capture/run.sh @@ -35,12 +35,12 @@ function run() { TOPIC_NAME="ticdc-multi-capture-test-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi # check tables are created and data is synchronized diff --git a/tests/integration_tests/multi_source/run.sh b/tests/integration_tests/multi_source/run.sh index e0442983b64..1c38616dfb6 100755 --- a/tests/integration_tests/multi_source/run.sh +++ b/tests/integration_tests/multi_source/run.sh @@ -23,12 +23,12 @@ function prepare() { TOPIC_NAME="ticdc-multi-source-test-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi } diff --git a/tests/integration_tests/new_ci_collation_with_old_value/run.sh b/tests/integration_tests/new_ci_collation_with_old_value/run.sh index ff857f9365a..8df0ddf9a99 100755 --- a/tests/integration_tests/new_ci_collation_with_old_value/run.sh +++ b/tests/integration_tests/new_ci_collation_with_old_value/run.sh @@ -22,12 +22,12 @@ function run() { TOPIC_NAME="ticdc-new_ci_collation_with_old_value-test-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?safe-mode=true" ;; esac cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --config $CUR/conf/changefeed.toml if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi run_sql_file $CUR/data/test1.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} diff --git a/tests/integration_tests/new_ci_collation_without_old_value/run.sh b/tests/integration_tests/new_ci_collation_without_old_value/run.sh index f9551b4ff88..fbc0fd3f55c 100755 --- a/tests/integration_tests/new_ci_collation_without_old_value/run.sh +++ b/tests/integration_tests/new_ci_collation_without_old_value/run.sh @@ -22,12 +22,12 @@ function run() { TOPIC_NAME="ticdc-new_ci_collation_without_old_value-test-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?safe-mode=true" ;; esac cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --config $CUR/conf/changefeed.toml if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi run_sql_file $CUR/data/test1.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} diff --git a/tests/integration_tests/partition_table/run.sh b/tests/integration_tests/partition_table/run.sh index 86a80e26e67..9d7a5592742 100644 --- a/tests/integration_tests/partition_table/run.sh +++ b/tests/integration_tests/partition_table/run.sh @@ -22,12 +22,12 @@ function run() { TOPIC_NAME="ticdc-partition-table-test-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} # sync_diff can't check non-exist table, so we check expected tables are created in downstream first diff --git a/tests/integration_tests/processor_err_chan/run.sh b/tests/integration_tests/processor_err_chan/run.sh index 7e9ca97176c..ec5025f3d13 100644 --- a/tests/integration_tests/processor_err_chan/run.sh +++ b/tests/integration_tests/processor_err_chan/run.sh @@ -36,7 +36,7 @@ function run() { pd_addr="http://$UP_PD_HOST_1:$UP_PD_PORT_1" TOPIC_NAME="ticdc-processor-err-chan-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;; esac @@ -52,7 +52,7 @@ function run() { changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi retry_time=10 diff --git a/tests/integration_tests/processor_panic/run.sh b/tests/integration_tests/processor_panic/run.sh index 20f9aa6fd8b..c1724b41b9f 100644 --- a/tests/integration_tests/processor_panic/run.sh +++ b/tests/integration_tests/processor_panic/run.sh @@ -25,12 +25,12 @@ function prepare() { TOPIC_NAME="ticdc-processor-panic-test-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-client-id=cdc_test_processor_panic&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-client-id=cdc_test_processor_panic&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi } diff --git a/tests/integration_tests/processor_resolved_ts_fallback/run.sh b/tests/integration_tests/processor_resolved_ts_fallback/run.sh index b99754b42d6..4c3504911fe 100755 --- a/tests/integration_tests/processor_resolved_ts_fallback/run.sh +++ b/tests/integration_tests/processor_resolved_ts_fallback/run.sh @@ -28,7 +28,7 @@ function run() { esac run_cdc_cli changefeed create --sink-uri="$SINK_URI" if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi run_sql "CREATE database processor_resolved_ts_fallback;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} diff --git a/tests/integration_tests/processor_stop_delay/run.sh b/tests/integration_tests/processor_stop_delay/run.sh index 89d23b963fb..9240ec9a4e5 100644 --- a/tests/integration_tests/processor_stop_delay/run.sh +++ b/tests/integration_tests/processor_stop_delay/run.sh @@ -17,7 +17,7 @@ function run() { pd_addr="http://$UP_PD_HOST_1:$UP_PD_PORT_1" TOPIC_NAME="ticdc-processor-stop-delay-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;; esac export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/processor/processorStopDelay=1*sleep(10000)' @@ -25,7 +25,7 @@ function run() { run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi run_sql "CREATE DATABASE processor_stop_delay;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} diff --git a/tests/integration_tests/region_merge/run.sh b/tests/integration_tests/region_merge/run.sh index eb365e6347b..7d2e57bf0dc 100644 --- a/tests/integration_tests/region_merge/run.sh +++ b/tests/integration_tests/region_merge/run.sh @@ -37,12 +37,12 @@ function run() { run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY TOPIC_NAME="ticdc-region-merge-test-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --sink-uri="$SINK_URI" if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi # set max_execution_time to 30s, because split region could block even region has been split. diff --git a/tests/integration_tests/resolve_lock/run.sh b/tests/integration_tests/resolve_lock/run.sh index b02f989b57c..84aa2901d58 100755 --- a/tests/integration_tests/resolve_lock/run.sh +++ b/tests/integration_tests/resolve_lock/run.sh @@ -23,12 +23,12 @@ function prepare() { TOPIC_NAME="ticdc-resolve-lock-test-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/tidb-txn-mode=pessimistic" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi } diff --git a/tests/integration_tests/row_format/run.sh b/tests/integration_tests/row_format/run.sh index 67ed3ef06d7..9c7704d52d7 100644 --- a/tests/integration_tests/row_format/run.sh +++ b/tests/integration_tests/row_format/run.sh @@ -22,12 +22,12 @@ function run() { TOPIC_NAME="ticdc-row-format-test-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi run_sql "SET GLOBAL tidb_row_format_version = 1;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} diff --git a/tests/integration_tests/simple/run.sh b/tests/integration_tests/simple/run.sh index bb390e0dbb2..f629477a7d5 100644 --- a/tests/integration_tests/simple/run.sh +++ b/tests/integration_tests/simple/run.sh @@ -25,12 +25,12 @@ function prepare() { TOPIC_NAME="ticdc-simple-test-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka+ssl://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-client-id=cdc_test_simple&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka+ssl://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-client-id=cdc_test_simple&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql+ssl://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi } diff --git a/tests/integration_tests/sink_hang/run.sh b/tests/integration_tests/sink_hang/run.sh index a75689e101d..aa7f675e378 100644 --- a/tests/integration_tests/sink_hang/run.sh +++ b/tests/integration_tests/sink_hang/run.sh @@ -38,7 +38,7 @@ function run() { pd_addr="http://$UP_PD_HOST_1:$UP_PD_PORT_1" TOPIC_NAME="ticdc-sink-hang-test-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;; esac @@ -46,7 +46,7 @@ function run() { run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi run_sql "CREATE DATABASE sink_hang;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} diff --git a/tests/integration_tests/sink_retry/run.sh b/tests/integration_tests/sink_retry/run.sh index 88b0219257a..6120f718404 100755 --- a/tests/integration_tests/sink_retry/run.sh +++ b/tests/integration_tests/sink_retry/run.sh @@ -26,12 +26,12 @@ function run() { TOPIC_NAME="ticdc-sink-retry-test-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi check_table_exists "sink_retry.USERTABLE" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} diff --git a/tests/integration_tests/sorter/run.sh b/tests/integration_tests/sorter/run.sh index 1020e4d7f98..a6e4d519b6d 100755 --- a/tests/integration_tests/sorter/run.sh +++ b/tests/integration_tests/sorter/run.sh @@ -65,12 +65,12 @@ function run() { TOPIC_NAME="ticdc-unified-sorter-test-$RANDOM" CF_NAME=$TOPIC_NAME case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;; esac run_cdc_cli changefeed create -c $CF_NAME --start-ts=$start_ts --sink-uri="$SINK_URI" --sort-engine="unified" if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi run_test @@ -88,12 +88,12 @@ function run() { TOPIC_NAME="ticdc-leveldb-sorter-test-$RANDOM" CF_NAME=$TOPIC_NAME case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;; esac run_cdc_cli changefeed create -c $CF_NAME --start-ts=$start_ts --sink-uri="$SINK_URI" --sort-engine="unified" if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi run_test diff --git a/tests/integration_tests/split_region/run.sh b/tests/integration_tests/split_region/run.sh index 33b86ec2333..1e241b15d12 100755 --- a/tests/integration_tests/split_region/run.sh +++ b/tests/integration_tests/split_region/run.sh @@ -24,12 +24,12 @@ function run() { TOPIC_NAME="ticdc-split-region-test-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi # sync_diff can't check non-exist table, so we check expected tables are created in downstream first diff --git a/tests/integration_tests/tiflash/run.sh b/tests/integration_tests/tiflash/run.sh index 1f14ee354ed..58d6d90c69a 100644 --- a/tests/integration_tests/tiflash/run.sh +++ b/tests/integration_tests/tiflash/run.sh @@ -22,12 +22,12 @@ function run() { TOPIC_NAME="ticdc-tiflash-test-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} # sync_diff can't check non-exist table, so we check expected tables are created in downstream first