From 9b918e76302979155ad2f230d32adafa9d2fe96e Mon Sep 17 00:00:00 2001 From: leoppro Date: Thu, 19 Mar 2020 11:31:34 +0800 Subject: [PATCH] ci: add kafka integration test (#346) --- Makefile | 11 +- kafka_consumer/main.go | 84 ++++++-- .../cdc_ghpr_integration_test.groovy | 179 ++---------------- .../cdc_ghpr_kafka_integration_test.groovy | 59 ++++++ .../jenkins_ci/integration_test_common.groovy | 171 +++++++++++++++++ tests/_utils/run_kafka_consumer | 17 ++ tests/cdc/run.sh | 13 +- tests/multi_capture/run.sh | 13 +- tests/row_format/run.sh | 13 +- tests/run.sh | 16 +- tests/simple/run.sh | 13 +- tests/split_region/run.sh | 13 +- 12 files changed, 406 insertions(+), 196 deletions(-) create mode 100644 scripts/jenkins_ci/cdc_ghpr_kafka_integration_test.groovy create mode 100644 scripts/jenkins_ci/integration_test_common.groovy create mode 100755 tests/_utils/run_kafka_consumer diff --git a/Makefile b/Makefile index e41b7d96fbf..29f263e0d67 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ ### Makefile for ticdc .PHONY: build test check clean fmt cdc kafka_consumer coverage \ - integration_test_build integration_test + integration_test_build integration_test integration_test_mysql integration_test_kafka PROJECT=ticdc @@ -86,8 +86,13 @@ integration_test_build: check_failpoint_ctl || { $(FAILPOINT_DISABLE); exit 1; } $(FAILPOINT_DISABLE) -integration_test: check_third_party_binary - tests/run.sh $(CASE) +integration_test: integration_test_mysql + +integration_test_mysql: check_third_party_binary + tests/run.sh $(CASE) mysql + +integration_test_kafka: check_third_party_binary + tests/run.sh $(CASE) kafka fmt: @echo "gofmt (simplify)" diff --git a/kafka_consumer/main.go b/kafka_consumer/main.go index 24c36a6927a..2328a3fe2bf 100644 --- a/kafka_consumer/main.go +++ b/kafka_consumer/main.go @@ -3,6 +3,7 @@ package main import ( "context" "flag" + "fmt" "math" "net/url" "os" @@ -14,6 +15,8 @@ import ( "syscall" "time" + "github.com/google/uuid" + "github.com/pingcap/ticdc/pkg/util" "go.uber.org/zap" @@ -29,7 +32,7 @@ var ( kafkaAddrs []string kafkaTopic string kafkaPartitionNum int32 - kafkaGroupID = "ticdc_kafka_consumer" + kafkaGroupID = fmt.Sprintf("ticdc_kafka_consumer_%s", uuid.New().String()) kafkaVersion = "2.4.0" downstreamURIStr string @@ -61,18 +64,9 @@ func init() { } scheme := strings.ToLower(upstreamURI.Scheme) if scheme != "kafka" { - log.Fatal("invalid upstream-uri scheme, the scheme of upstream-uri must be `kafka`") - } - s := upstreamURI.Query().Get("partition-num") - if s == "" { - log.Fatal("partition-num of upstream-uri can not be empty") - } - c, err := strconv.Atoi(s) - if err != nil { - log.Fatal("invalid partition-num of upstream-uri") + log.Fatal("invalid upstream-uri scheme, the scheme of upstream-uri must be `kafka`", zap.String("upstream-uri", upstreamURIStr)) } - kafkaPartitionNum = int32(c) - s = upstreamURI.Query().Get("version") + s := upstreamURI.Query().Get("version") if s != "" { kafkaVersion = s } @@ -84,8 +78,69 @@ func init() { return r == '/' }) kafkaAddrs = strings.Split(upstreamURI.Host, ",") + + config, err := newSaramaConfig() + if err != nil { + log.Fatal("Error creating sarama config", zap.Error(err)) + } + + s = upstreamURI.Query().Get("partition-num") + if s == "" { + partition, err := getPartitionNum(kafkaAddrs, kafkaTopic, config) + if err != nil { + log.Fatal("can not get partition number", zap.String("topic", kafkaTopic), zap.Error(err)) + } + kafkaPartitionNum = partition + } else { + c, err := strconv.Atoi(s) + if err != nil { + log.Fatal("invalid partition-num of upstream-uri") + } + kafkaPartitionNum = int32(c) + } } +func getPartitionNum(address []string, topic string, cfg *sarama.Config) (int32, error) { + // get partition number or create topic automatically + admin, err := sarama.NewClusterAdmin(address, cfg) + if err != nil { + return 0, errors.Trace(err) + } + topics, err := admin.ListTopics() + if err != nil { + return 0, errors.Trace(err) + } + err = admin.Close() + if err != nil { + return 0, errors.Trace(err) + } + topicDetail, exist := topics[topic] + if !exist { + return 0, errors.Errorf("can not find topic %s", topic) + } + log.Info("get partition number of topic", zap.String("topic", topic), zap.Int32("partition_num", topicDetail.NumPartitions)) + return topicDetail.NumPartitions, nil +} + +func waitTopicCreated(address []string, topic string, cfg *sarama.Config) error { + admin, err := sarama.NewClusterAdmin(address, cfg) + if err != nil { + return errors.Trace(err) + } + defer admin.Close() + for i := 0; i <= 10; i++ { + topics, err := admin.ListTopics() + if err != nil { + return errors.Trace(err) + } + if _, ok := topics[topic]; ok { + return nil + } + log.Info("wait the topic created", zap.String("topic", topic)) + time.Sleep(1 * time.Second) + } + return errors.Errorf("wait the topic(%s) created timeout", topic) +} func newSaramaConfig() (*sarama.Config, error) { config := sarama.NewConfig() @@ -116,7 +171,10 @@ func main() { if err != nil { log.Fatal("Error creating sarama config", zap.Error(err)) } - + err = waitTopicCreated(kafkaAddrs, kafkaTopic, config) + if err != nil { + log.Fatal("wait topic created failed", zap.Error(err)) + } /** * Setup a new Sarama consumer group */ diff --git a/scripts/jenkins_ci/cdc_ghpr_integration_test.groovy b/scripts/jenkins_ci/cdc_ghpr_integration_test.groovy index bdb97de19d6..912b39664f7 100644 --- a/scripts/jenkins_ci/cdc_ghpr_integration_test.groovy +++ b/scripts/jenkins_ci/cdc_ghpr_integration_test.groovy @@ -1,171 +1,14 @@ -def test_case_names = ["simple", "cdc", "multi_capture", "split_region", "row_format"] -catchError { - stage('Prepare Binaries') { - def prepares = [:] - - prepares["download third binaries"] = { - container("golang") { - def ws = pwd() - deleteDir() - - sh "mkdir -p third_bin" - - sh "mkdir -p tmp" - - sh "curl ${FILE_SERVER_URL}/download/builds/pingcap/tidb/700d9def026185fe836dd56b0c39e0b4df3c320b/centos7/tidb-server.tar.gz | tar xz -C ./tmp bin/tidb-server" - - sh "curl ${FILE_SERVER_URL}/download/builds/pingcap/pd/08d927675c8feb30552f9fb27246b120cc9ed6d7/centos7/pd-server.tar.gz | tar xz -C ./tmp bin/*" - - sh "curl ${FILE_SERVER_URL}/download/builds/pingcap/tikv/eeaf4be81fabb71c30f62bc9fd11e77860d47d02/centos7/tikv-server.tar.gz | tar xz -C ./tmp bin/tikv-server" - - sh "mv tmp/bin/* third_bin" - - sh "curl ${FILE_SERVER_URL}/download/builds/pingcap/go-ycsb/test-br/go-ycsb -o third_bin/go-ycsb" - - sh "curl https://download.pingcap.org/tidb-tools-v2.1.6-linux-amd64.tar.gz | tar xz -C ./tmp tidb-tools-v2.1.6-linux-amd64/bin/sync_diff_inspector" - - sh "mv tmp/tidb-tools-v2.1.6-linux-amd64/bin/* third_bin" - - sh "chmod a+x third_bin/*" - - sh "rm -rf tmp" - - stash includes: "third_bin/**", name: "third_binaries" - } - } - - prepares["build binaries"] = { - container("golang") { - def ws = pwd() - deleteDir() - unstash 'ticdc' - - dir("go/src/github.com/pingcap/ticdc") { - sh """ - GO111MODULE=off GOPATH=\$GOPATH:${ws}/go PATH=\$GOPATH/bin:${ws}/go/bin:\$PATH make cdc - GO111MODULE=off GOPATH=\$GOPATH:${ws}/go PATH=\$GOPATH/bin:${ws}/go/bin:\$PATH make integration_test_build - GO111MODULE=off GOPATH=\$GOPATH:${ws}/go PATH=\$GOPATH/bin:${ws}/go/bin:\$PATH make kafka_consumer - GO111MODULE=off GOPATH=\$GOPATH:${ws}/go PATH=\$GOPATH/bin:${ws}/go/bin:\$PATH make check_failpoint_ctl - """ - } - stash includes: "go/src/github.com/pingcap/ticdc/bin/**", name: "ticdc_binaries", useDefaultExcludes: false - } - } +def script_path = "go/src/github.com/pingcap/ticdc/scripts/jenkins_ci/integration_test_common.groovy" +println script_path +sh""" +wc -l ${script_path} +""" +def common = load script_path - parallel prepares - - } - - stage("Tests") { - def tests = [:] - - tests["unit test"] = { - node ("${GO_TEST_SLAVE}") { - container("golang") { - def ws = pwd() - deleteDir() - unstash 'ticdc' - unstash 'ticdc_binaries' - - dir("go/src/github.com/pingcap/ticdc") { - sh """ - rm -rf /tmp/tidb_cdc_test - mkdir -p /tmp/tidb_cdc_test - GO111MODULE=off GOPATH=\$GOPATH:${ws}/go PATH=\$GOPATH/bin:${ws}/go/bin:\$PATH make test - rm -rf cov_dir - mkdir -p cov_dir - ls /tmp/tidb_cdc_test - cp /tmp/tidb_cdc_test/cov*out cov_dir - """ - sh """ - tail /tmp/tidb_cdc_test/cov* - """ - } - stash includes: "go/src/github.com/pingcap/ticdc/cov_dir/**", name: "unit_test", useDefaultExcludes: false - } - } - } - - def run_integration_test = { case_name -> - node ("${GO_TEST_SLAVE}") { - container("golang") { - def ws = pwd() - deleteDir() - unstash 'ticdc' - unstash 'third_binaries' - unstash 'ticdc_binaries' - - dir("go/src/github.com/pingcap/ticdc") { - sh "mv ${ws}/third_bin/* ./bin/" - try { - sh """ - rm -rf /tmp/tidb_cdc_test - mkdir -p /tmp/tidb_cdc_test - GO111MODULE=off GOPATH=\$GOPATH:${ws}/go PATH=\$GOPATH/bin:${ws}/go/bin:\$PATH make integration_test CASE=${case_name} - rm -rf cov_dir - mkdir -p cov_dir - ls /tmp/tidb_cdc_test - cp /tmp/tidb_cdc_test/cov*out cov_dir || touch cov_dir/dummy_file_${case_name} - """ - sh """ - tail /tmp/tidb_cdc_test/cov* - """ - } catch (Exception e) { - sh """ - echo "print all log" - for log in `ls /tmp/tidb_cdc_test/*/*.log`; do - echo "____________________________________" - echo "\$log" - cat "\$log" - echo "____________________________________" - done - """ - throw e; - } - } - stash includes: "go/src/github.com/pingcap/ticdc/cov_dir/**", name: "integration_test_${case_name}", useDefaultExcludes: false - } - } - } - - test_case_names.each{ case_name -> - tests["integration test ${case_name}"] = { - run_integration_test(case_name) - } - } - - parallel tests - } - - stage('Coverage') { - node("${GO_TEST_SLAVE}") { - def ws = pwd() - deleteDir() - unstash 'ticdc' - unstash 'unit_test' - - test_case_names.each{ case_name -> - unstash "integration_test_${case_name}" - } - - dir("go/src/github.com/pingcap/ticdc") { - container("golang") { - archiveArtifacts artifacts: 'cov_dir/*', fingerprint: true - - timeout(30) { - sh """ - rm -rf /tmp/tidb_cdc_test - mkdir -p /tmp/tidb_cdc_test - cp cov_dir/* /tmp/tidb_cdc_test - set +x - BUILD_NUMBER=${env.BUILD_NUMBER} CODECOV_TOKEN="${CODECOV_TOKEN}" COVERALLS_TOKEN="${COVERALLS_TOKEN}" GOPATH=${ws}/go:\$GOPATH PATH=${ws}/go/bin:/go/bin:\$PATH JenkinsCI=1 make coverage - set -x - """ - } - } - } - } - } +catchError { + common.prepare_binaries() + common.tests("mysql", "${GO_TEST_SLAVE}") + common.coverage() currentBuild.result = "SUCCESS" } @@ -174,7 +17,7 @@ stage('Summary') { def slackmsg = "[#${ghprbPullId}: ${ghprbPullTitle}]" + "\n" + "${ghprbPullLink}" + "\n" + "${ghprbPullDescription}" + "\n" + - "Unit Test Result: `${currentBuild.result}`" + "\n" + + "Integration Test Result: `${currentBuild.result}`" + "\n" + "Elapsed Time: `${duration} mins` " + "\n" + "${env.RUN_DISPLAY_URL}" diff --git a/scripts/jenkins_ci/cdc_ghpr_kafka_integration_test.groovy b/scripts/jenkins_ci/cdc_ghpr_kafka_integration_test.groovy new file mode 100644 index 00000000000..61712e158e8 --- /dev/null +++ b/scripts/jenkins_ci/cdc_ghpr_kafka_integration_test.groovy @@ -0,0 +1,59 @@ +def script_path = "go/src/github.com/pingcap/ticdc/scripts/jenkins_ci/integration_test_common.groovy" +println script_path +sh""" +wc -l ${script_path} +""" +def common = load script_path + +catchError { + common.prepare_binaries() + + def label = "cdc-kafka-integration-${UUID.randomUUID().toString()}" + podTemplate(label: label, idleMinutes: 0, + containers: [ + containerTemplate(name: 'golang',alwaysPullImage: false, image: "${GO_DOCKER_IMAGE}", + resourceRequestCpu: '2000m', resourceRequestMemory: '4Gi', + ttyEnabled: true, command: 'cat'), + containerTemplate(name: 'zookeeper',alwaysPullImage: false, image: 'wurstmeister/zookeeper', + resourceRequestCpu: '2000m', resourceRequestMemory: '4Gi', + ttyEnabled: true), + containerTemplate( + name: 'kafka', + image: 'wurstmeister/kafka', + resourceRequestCpu: '2000m', resourceRequestMemory: '4Gi', + ttyEnabled: true, + alwaysPullImage: false, + envVars: [ + envVar(key: 'KAFKA_MESSAGE_MAX_BYTES', value: '1073741824'), + envVar(key: 'KAFKA_REPLICA_FETCH_MAX_BYTES', value: '1073741824'), + envVar(key: 'KAFKA_ADVERTISED_PORT', value: '9092'), + envVar(key: 'KAFKA_ADVERTISED_HOST_NAME', value:'127.0.0.1'), + envVar(key: 'KAFKA_BROKER_ID', value: '1'), + envVar(key: 'ZK', value: 'zk'), + envVar(key: 'KAFKA_ZOOKEEPER_CONNECT', value: 'localhost:2181'), + ] + )], + volumes:[ + emptyDirVolume(mountPath: '/tmp', memory: true), + emptyDirVolume(mountPath: '/home/jenkins', memory: true) + ] + ) { + common.tests("kafka", label) + } + + currentBuild.result = "SUCCESS" +} + +stage('Summary') { + def duration = ((System.currentTimeMillis() - currentBuild.startTimeInMillis) / 1000 / 60).setScale(2, BigDecimal.ROUND_HALF_UP) + def slackmsg = "[#${ghprbPullId}: ${ghprbPullTitle}]" + "\n" + + "${ghprbPullLink}" + "\n" + + "${ghprbPullDescription}" + "\n" + + "Integration Kafka Test Result: `${currentBuild.result}`" + "\n" + + "Elapsed Time: `${duration} mins` " + "\n" + + "${env.RUN_DISPLAY_URL}" + + if (currentBuild.result != "SUCCESS") { + slackSend channel: '#jenkins-ci', color: 'danger', teamDomain: 'pingcap', tokenCredentialId: 'slack-pingcap-token', message: "${slackmsg}" + } +} diff --git a/scripts/jenkins_ci/integration_test_common.groovy b/scripts/jenkins_ci/integration_test_common.groovy new file mode 100644 index 00000000000..7ba2aaf4c26 --- /dev/null +++ b/scripts/jenkins_ci/integration_test_common.groovy @@ -0,0 +1,171 @@ +test_case_names = ["simple", "cdc", "multi_capture", "split_region", "row_format"] + +def prepare_binaries() { + stage('Prepare Binaries') { + def prepares = [:] + + prepares["download third binaries"] = { + node ("${GO_TEST_SLAVE}") { + container("golang") { + def ws = pwd() + deleteDir() + + sh """ + mkdir -p third_bin + mkdir -p tmp + curl ${FILE_SERVER_URL}/download/builds/pingcap/tidb/700d9def026185fe836dd56b0c39e0b4df3c320b/centos7/tidb-server.tar.gz | tar xz -C ./tmp bin/tidb-server + curl ${FILE_SERVER_URL}/download/builds/pingcap/pd/08d927675c8feb30552f9fb27246b120cc9ed6d7/centos7/pd-server.tar.gz | tar xz -C ./tmp bin/* + curl ${FILE_SERVER_URL}/download/builds/pingcap/tikv/eeaf4be81fabb71c30f62bc9fd11e77860d47d02/centos7/tikv-server.tar.gz | tar xz -C ./tmp bin/tikv-server + mv tmp/bin/* third_bin + curl ${FILE_SERVER_URL}/download/builds/pingcap/go-ycsb/test-br/go-ycsb -o third_bin/go-ycsb + curl https://download.pingcap.org/tidb-tools-v2.1.6-linux-amd64.tar.gz | tar xz -C ./tmp tidb-tools-v2.1.6-linux-amd64/bin/sync_diff_inspector + mv tmp/tidb-tools-v2.1.6-linux-amd64/bin/* third_bin + chmod a+x third_bin/* + rm -rf tmp + """ + + stash includes: "third_bin/**", name: "third_binaries" + } + } + } + + prepares["build binaries"] = { + node ("${GO_TEST_SLAVE}") { + container("golang") { + def ws = pwd() + deleteDir() + unstash 'ticdc' + + dir("go/src/github.com/pingcap/ticdc") { + sh """ + GO111MODULE=off GOPATH=\$GOPATH:${ws}/go PATH=\$GOPATH/bin:${ws}/go/bin:\$PATH make cdc + GO111MODULE=off GOPATH=\$GOPATH:${ws}/go PATH=\$GOPATH/bin:${ws}/go/bin:\$PATH make integration_test_build + GO111MODULE=off GOPATH=\$GOPATH:${ws}/go PATH=\$GOPATH/bin:${ws}/go/bin:\$PATH make kafka_consumer + GO111MODULE=off GOPATH=\$GOPATH:${ws}/go PATH=\$GOPATH/bin:${ws}/go/bin:\$PATH make check_failpoint_ctl + """ + } + stash includes: "go/src/github.com/pingcap/ticdc/bin/**", name: "ticdc_binaries", useDefaultExcludes: false + } + } + } + + parallel prepares + } +} + +def tests(sink_type, node_label) { + stage("Tests") { + def test_cases = [:] + + test_cases["unit test"] = { + node (node_label) { + container("golang") { + def ws = pwd() + deleteDir() + unstash 'ticdc' + unstash 'ticdc_binaries' + + dir("go/src/github.com/pingcap/ticdc") { + sh """ + rm -rf /tmp/tidb_cdc_test + mkdir -p /tmp/tidb_cdc_test + GO111MODULE=off GOPATH=\$GOPATH:${ws}/go PATH=\$GOPATH/bin:${ws}/go/bin:\$PATH make test + rm -rf cov_dir + mkdir -p cov_dir + ls /tmp/tidb_cdc_test + cp /tmp/tidb_cdc_test/cov*out cov_dir + """ + sh """ + tail /tmp/tidb_cdc_test/cov* + """ + } + stash includes: "go/src/github.com/pingcap/ticdc/cov_dir/**", name: "unit_test", useDefaultExcludes: false + } + } + } + + def run_integration_test = { case_name -> + node (node_label) { + container("golang") { + def ws = pwd() + deleteDir() + unstash 'ticdc' + unstash 'third_binaries' + unstash 'ticdc_binaries' + + dir("go/src/github.com/pingcap/ticdc") { + sh "mv ${ws}/third_bin/* ./bin/" + try { + sh """ + rm -rf /tmp/tidb_cdc_test + mkdir -p /tmp/tidb_cdc_test + GO111MODULE=off GOPATH=\$GOPATH:${ws}/go PATH=\$GOPATH/bin:${ws}/go/bin:\$PATH make integration_test_${sink_type} CASE=${case_name} + rm -rf cov_dir + mkdir -p cov_dir + ls /tmp/tidb_cdc_test + cp /tmp/tidb_cdc_test/cov*out cov_dir || touch cov_dir/dummy_file_${case_name} + """ + sh """ + tail /tmp/tidb_cdc_test/cov* + """ + } catch (Exception e) { + sh """ + echo "print all log" + for log in `ls /tmp/tidb_cdc_test/*/*.log`; do + echo "____________________________________" + echo "\$log" + cat "\$log" + echo "____________________________________" + done + """ + throw e; + } + } + stash includes: "go/src/github.com/pingcap/ticdc/cov_dir/**", name: "integration_test_${case_name}", useDefaultExcludes: false + } + } + } + + test_case_names.each{ case_name -> + test_cases["integration test ${case_name}"] = { + run_integration_test(case_name) + } + } + + parallel test_cases + } +} + +def coverage() { + stage('Coverage') { + node("${GO_TEST_SLAVE}") { + def ws = pwd() + deleteDir() + unstash 'ticdc' + unstash 'unit_test' + + test_case_names.each{ case_name -> + unstash "integration_test_${case_name}" + } + + dir("go/src/github.com/pingcap/ticdc") { + container("golang") { + archiveArtifacts artifacts: 'cov_dir/*', fingerprint: true + + timeout(30) { + sh """ + rm -rf /tmp/tidb_cdc_test + mkdir -p /tmp/tidb_cdc_test + cp cov_dir/* /tmp/tidb_cdc_test + set +x + BUILD_NUMBER=${env.BUILD_NUMBER} CODECOV_TOKEN="${CODECOV_TOKEN}" COVERALLS_TOKEN="${COVERALLS_TOKEN}" GOPATH=${ws}/go:\$GOPATH PATH=${ws}/go/bin:/go/bin:\$PATH JenkinsCI=1 make coverage + set -x + """ + } + } + } + } + } +} + +return this \ No newline at end of file diff --git a/tests/_utils/run_kafka_consumer b/tests/_utils/run_kafka_consumer new file mode 100755 index 00000000000..e6e5a3630dc --- /dev/null +++ b/tests/_utils/run_kafka_consumer @@ -0,0 +1,17 @@ +#!/bin/bash + +# parameter 1: work directory +# parameter 2: sink-uri +# parameter 3: log suffix + +set -e + +workdir=$1 +sink_uri=$2 +log_suffix=$3 +pwd=$pwd + +echo "[$(date)] <<<<<< START kafka consumer in $TEST_NAME case >>>>>>" +cd $workdir +cdc_kafka_consumer --log-file $workdir/cdc_kafka_consumer$log_suffix.log --log-level info --upstream-uri $sink_uri --downstream-uri mysql://root@127.0.0.1:3306/ >> $workdir/cdc_kafka_consumer_stdout$log_suffix.log 2>&1 & +cd $pwd diff --git a/tests/cdc/run.sh b/tests/cdc/run.sh index af96e44fdbb..d5aee16fe78 100755 --- a/tests/cdc/run.sh +++ b/tests/cdc/run.sh @@ -6,6 +6,7 @@ CUR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) source $CUR/../_utils/test_prepare WORK_DIR=$OUT_DIR/$TEST_NAME CDC_BINARY=cdc.test +SINK_TYPE=$1 function prepare() { rm -rf $WORK_DIR && mkdir -p $WORK_DIR @@ -19,7 +20,17 @@ function prepare() { start_ts=$(cdc cli tso query --pd=http://$UP_PD_HOST:$UP_PD_PORT) run_cdc_server $WORK_DIR $CDC_BINARY - cdc cli changefeed create --start-ts=$start_ts --sink-uri="dsn://root@tcp(127.0.0.1:3306)/" + + TOPIC_NAME="ticdc-cdc-test-$RANDOM" + case $SINK_TYPE in + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4";; + mysql) ;& + *) SINK_URI="mysql://root@127.0.0.1:3306/";; + esac + cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4" + fi } trap stop_tidb_cluster EXIT diff --git a/tests/multi_capture/run.sh b/tests/multi_capture/run.sh index 59bd031eef4..8f927ff155d 100755 --- a/tests/multi_capture/run.sh +++ b/tests/multi_capture/run.sh @@ -6,6 +6,7 @@ CUR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) source $CUR/../_utils/test_prepare WORK_DIR=$OUT_DIR/$TEST_NAME CDC_BINARY=cdc.test +SINK_TYPE=$1 CDC_COUNT=3 DB_COUNT=4 @@ -31,7 +32,17 @@ function run() { for i in $(seq $CDC_COUNT); do run_cdc_server $WORK_DIR $CDC_BINARY "$i" done - cdc cli changefeed create --start-ts=$start_ts --sink-uri="mysql://root@127.0.0.1:3306/" + + TOPIC_NAME="ticdc-multi-capture-test-$RANDOM" + case $SINK_TYPE in + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4";; + mysql) ;& + *) SINK_URI="mysql://root@127.0.0.1:3306/";; + esac + cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4" + fi # check tables are created and data is synchronized for i in $(seq $DB_COUNT); do diff --git a/tests/row_format/run.sh b/tests/row_format/run.sh index 4d2c34ab104..880099d8599 100644 --- a/tests/row_format/run.sh +++ b/tests/row_format/run.sh @@ -6,6 +6,7 @@ CUR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) source $CUR/../_utils/test_prepare WORK_DIR=$OUT_DIR/$TEST_NAME CDC_BINARY=cdc.test +SINK_TYPE=$1 function run() { rm -rf $WORK_DIR && mkdir -p $WORK_DIR @@ -18,7 +19,17 @@ function run() { start_ts=$(cdc cli tso query --pd=http://$UP_PD_HOST:$UP_PD_PORT) run_cdc_server $WORK_DIR $CDC_BINARY - cdc cli changefeed create --start-ts=$start_ts --sink-uri="mysql://root@127.0.0.1:3306/" + + TOPIC_NAME="ticdc-row-format-test-$RANDOM" + case $SINK_TYPE in + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4";; + mysql) ;& + *) SINK_URI="mysql://root@127.0.0.1:3306/";; + esac + cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4" + fi run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} # sync_diff can't check non-exist table, so we check expected tables are created in downstream first check_table_exists row_format.multi_data_type ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} diff --git a/tests/run.sh b/tests/run.sh index fc8ff3db1f2..93bc4bbbd2f 100755 --- a/tests/run.sh +++ b/tests/run.sh @@ -31,28 +31,30 @@ fi run_case() { local case=$1 local script=$2 - echo "Running test $script..." + local sink_type=$3 + echo "Running test $script using Sink-Type: $sink_type..." PATH="$CUR/../bin:$CUR/_utils:$PATH" \ OUT_DIR=$OUT_DIR \ TEST_NAME=$case \ - bash "$script" + bash "$script" "$sink_type" } -if [ "$#" -ge 1 ]; then - test_case=$1 -else +test_case=$1 +sink_type=$2 + +if [ -z "$test_case" ]; then test_case="*" fi if [ "$test_case" == "*" ]; then for script in $CUR/*/run.sh; do test_name="$(basename "$(dirname "$script")")" - run_case $test_name $script + run_case $test_name $script $sink_type done else for name in $test_case; do script="$CUR/$name/run.sh" - run_case $name $script + run_case $name $script $sink_type done fi diff --git a/tests/simple/run.sh b/tests/simple/run.sh index fd99a2f888e..334c50875a3 100644 --- a/tests/simple/run.sh +++ b/tests/simple/run.sh @@ -6,6 +6,7 @@ CUR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) source $CUR/../_utils/test_prepare WORK_DIR=$OUT_DIR/$TEST_NAME CDC_BINARY=cdc.test +SINK_TYPE=$1 function prepare() { rm -rf $WORK_DIR && mkdir -p $WORK_DIR @@ -21,7 +22,17 @@ function prepare() { run_sql "CREATE table test.simple2(id int primary key, val int);" run_cdc_server $WORK_DIR $CDC_BINARY - cdc cli changefeed create --start-ts=$start_ts --sink-uri="mysql://root@127.0.0.1:3306/" + + TOPIC_NAME="ticdc-simple-test-$RANDOM" + case $SINK_TYPE in + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4";; + mysql) ;& + *) SINK_URI="mysql://root@127.0.0.1:3306/";; + esac + cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4" + fi } function sql_check() { diff --git a/tests/split_region/run.sh b/tests/split_region/run.sh index 3768207cf65..af2779e0b2a 100755 --- a/tests/split_region/run.sh +++ b/tests/split_region/run.sh @@ -6,6 +6,7 @@ CUR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) source $CUR/../_utils/test_prepare WORK_DIR=$OUT_DIR/$TEST_NAME CDC_BINARY=cdc.test +SINK_TYPE=$1 function run() { rm -rf $WORK_DIR && mkdir -p $WORK_DIR @@ -20,7 +21,17 @@ function run() { run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_cdc_server $WORK_DIR $CDC_BINARY - cdc cli changefeed create --start-ts=$start_ts --sink-uri="mysql://root@127.0.0.1:3306/" + + TOPIC_NAME="ticdc-split-region-test-$RANDOM" + case $SINK_TYPE in + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4";; + mysql) ;& + *) SINK_URI="mysql://root@127.0.0.1:3306/";; + esac + cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4" + fi # sync_diff can't check non-exist table, so we check expected tables are created in downstream first check_table_exists split_region.test1 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}