Skip to content

Commit

Permalink
tests: add max-message-bytes to kafka integration tests (#3105)
Browse files Browse the repository at this point in the history
* revert back to 1M.

* add max-message-bytes to all integration test.

* add max-message-bytes to all consumer.
  • Loading branch information
3AceShowHand authored Nov 1, 2021
1 parent ecd6f55 commit dc32dd6
Show file tree
Hide file tree
Showing 42 changed files with 86 additions and 86 deletions.
4 changes: 2 additions & 2 deletions tests/autorandom/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ function run() {

TOPIC_NAME="ticdc-autorandom-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}" ;;
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
*) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;;
esac
cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}"
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi
run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# sync_diff can't check non-exist table, so we check expected tables are created in downstream first
Expand Down
4 changes: 2 additions & 2 deletions tests/batch_add_table/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ function run() {

TOPIC_NAME="ticdc-batch-add-table-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}" ;;
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
*) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;;
esac
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}"
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi

run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down
4 changes: 2 additions & 2 deletions tests/capture_session_done_during_task/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ function run() {
pd_addr="http://$UP_PD_HOST_1:$UP_PD_PORT_1"
TOPIC_NAME="ticdc-capture-session-done-during-task-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}" ;;
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
*) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;;
esac

Expand All @@ -35,7 +35,7 @@ function run() {
sleep 1

if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}"
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi

capture_key=$(ETCDCTL_API=3 etcdctl get /tidb/cdc/capture --prefix | head -n 1)
Expand Down
4 changes: 2 additions & 2 deletions tests/cdc/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ function prepare() {

TOPIC_NAME="ticdc-cdc-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}" ;;
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
*) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;;
esac
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}"
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi
}

Expand Down
4 changes: 2 additions & 2 deletions tests/changefeed_auto_stop/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ function run() {

TOPIC_NAME="ticdc-changefeed-auto-stop-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}" ;;
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
*) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;;
esac
changefeedid=$(cdc cli changefeed create --pd="http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" --start-ts=$start_ts --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}')
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}"
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi

ensure 10 check_changefeed_state "http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" ${changefeedid} "normal" "null"
Expand Down
4 changes: 2 additions & 2 deletions tests/changefeed_error/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,13 @@ function run() {

TOPIC_NAME="ticdc-sink-retry-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}" ;;
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
*) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;;
esac
changefeedid="changefeed-error"
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c $changefeedid
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}"
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi

ensure $MAX_RETRIES check_changefeed_mark_failed_regex http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} ".*CDC:ErrStartTsBeforeGC.*"
Expand Down
4 changes: 2 additions & 2 deletions tests/changefeed_finish/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ function run() {
pd_addr="http://$UP_PD_HOST_1:$UP_PD_PORT_1"
TOPIC_NAME="ticdc-changefeed-pause-resume-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}" ;;
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
*) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;;
esac

Expand All @@ -48,7 +48,7 @@ function run() {
changefeed_id=$(cdc cli changefeed create --sink-uri="$SINK_URI" --target-ts=$target_ts 2>&1 | tail -n2 | head -n1 | awk '{print $2}')

if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}"
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi

run_sql "CREATE DATABASE changefeed_finish;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down
4 changes: 2 additions & 2 deletions tests/changefeed_pause_resume/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ function run() {
pd_addr="http://$UP_PD_HOST_1:$UP_PD_PORT_1"
TOPIC_NAME="ticdc-changefeed-pause-resume-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}" ;;
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
*) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;;
esac

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr
changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}')
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}"
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi

run_sql "CREATE DATABASE changefeed_pause_resume;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down
4 changes: 2 additions & 2 deletions tests/changefeed_reconstruct/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ function run() {
pd_addr="http://$UP_PD_HOST_1:$UP_PD_PORT_1"
TOPIC_NAME="ticdc-changefeed-reconstruct-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}" ;;
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
*) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;;
esac

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --logsuffix server1 --pd $pd_addr
owner_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}')
changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}')
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}"
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi

run_sql "CREATE DATABASE changefeed_reconstruct;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down
6 changes: 3 additions & 3 deletions tests/cli/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ function run() {

TOPIC_NAME="ticdc-cli-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}" ;;
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
*) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;;
esac

uuid="custom-changefeed-name"
run_cdc_cli changefeed create --start-ts=$start_ts --sort-engine=memory --sink-uri="$SINK_URI" --tz="Asia/Shanghai" -c="$uuid"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}"
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi

run_cdc_cli changefeed cyclic create-marktables \
Expand Down Expand Up @@ -151,7 +151,7 @@ EOF
# Test Kafka SSL connection.
if [ "$SINK_TYPE" == "kafka" ]; then
SSL_TOPIC_NAME="ticdc-cli-test-ssl-$RANDOM"
SINK_URI="kafka://127.0.0.1:9093/$SSL_TOPIC_NAME?ca=${TLS_DIR}/ca.pem&cert=${TLS_DIR}/client.pem&key=${TLS_DIR}/client-key.pem&kafka-version=${KAFKA_VERSION}"
SINK_URI="kafka://127.0.0.1:9093/$SSL_TOPIC_NAME?ca=${TLS_DIR}/ca.pem&cert=${TLS_DIR}/client.pem&key=${TLS_DIR}/client-key.pem&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760"
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --tz="Asia/Shanghai"
fi

Expand Down
4 changes: 2 additions & 2 deletions tests/clustered_index/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ function run() {

TOPIC_NAME="ticdc-clustered-index-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}" ;;
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
*) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;;
esac
cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}"
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi
run_sql "set global tidb_enable_clustered_index=1;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# TiDB global variables cache 2 seconds at most
Expand Down
4 changes: 2 additions & 2 deletions tests/common_1/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ function run() {
# can't use the normal user
TOPIC_NAME="ticdc-common-1-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}" ;;
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
*) SINK_URI="mysql://root@127.0.0.1:3306/" ;;
esac
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}"
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi

run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down
4 changes: 2 additions & 2 deletions tests/dailytest/case.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,9 @@ func RunCase(src *sql.DB, dst *sql.DB, schema string) {
if err != nil {
log.S().Fatal(err)
}
// insert 5 * 512KB
// insert 5 * 1M
// note limitation of TiDB: https://github.com/pingcap/docs/blob/733a5b0284e70c5b4d22b93a818210a3f6fbb5a0/FAQ.md#the-error-message-transaction-too-large-is-displayed
data := make([]byte, 1<<19)
data := make([]byte, 1<<20)
for i := 0; i < 5; i++ {
_, err = tx.Query("INSERT INTO binlog_big(id, data) VALUES(?, ?);", i, data)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions tests/ddl_puller_lag/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ function prepare() {

TOPIC_NAME="ticdc-ddl-puller-lag-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka+ssl://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-client-id=ddl_puller_lag&kafka-version=${KAFKA_VERSION}" ;;
kafka) SINK_URI="kafka+ssl://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-client-id=ddl_puller_lag&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
*) SINK_URI="mysql+ssl://normal:123456@127.0.0.1:3306/" ;;
esac
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}"
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi
}

Expand Down
4 changes: 2 additions & 2 deletions tests/ddl_sequence/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ function run() {

TOPIC_NAME="ticdc-ddl-sequence-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}" ;;
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
*) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;;
esac
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}"
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi
run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# sync_diff can't check non-exist table, so we check expected tables are created in downstream first
Expand Down
4 changes: 2 additions & 2 deletions tests/drop_many_tables/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ function run() {

TOPIC_NAME="ticdc-drop-tables-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}" ;;
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
*) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;;
esac
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}"
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi
run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# sync_diff can't check non-exist table, so we check expected tables are created in downstream first
Expand Down
4 changes: 2 additions & 2 deletions tests/force_replicate_table/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ function run() {

TOPIC_NAME="ticdc-force_replicate_table-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}" ;;
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
*) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?safe-mode=true" ;;
esac
cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --config $CUR/conf/changefeed.toml
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}"
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi

run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down
4 changes: 2 additions & 2 deletions tests/gc_safepoint/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,14 @@ function run() {
pd_addr="http://$UP_PD_HOST_1:$UP_PD_PORT_1"
TOPIC_NAME="ticdc-gc-safepoint-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}" ;;
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
*) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;;
esac
export GO_FAILPOINTS='github.com/pingcap/ticdc/pkg/txnutil/gc/InjectGcSafepointUpdateInterval=return(500)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr
changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}')
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}"
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi

clear_gc_worker_safepoint $pd_addr $pd_cluster_id
Expand Down
4 changes: 2 additions & 2 deletions tests/generate_column/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ function run() {

TOPIC_NAME="ticdc-generate-column-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}" ;;
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
*) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;;
esac
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}"
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi
run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# sync_diff can't check non-exist table, so we check expected tables are created in downstream first
Expand Down
8 changes: 4 additions & 4 deletions tests/kafka_messages/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ function run_length_limit() {
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "info"

TOPIC_NAME="ticdc-kafka-message-test-$RANDOM"
SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&max-message-bytes=4096&kafka-version=${KAFKA_VERSION}"
SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760"
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&max-message-bytes=4096&version=${KAFKA_VERSION}"
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi

# Add a check table to reduce check time, or if we check data with sync diff
Expand Down Expand Up @@ -87,10 +87,10 @@ function run_batch_size_limit() {
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "info"

TOPIC_NAME="ticdc-kafka-message-test-$RANDOM"
SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&max-batch-size=3&kafka-version=${KAFKA_VERSION}"
SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&max-batch-size=3&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760"
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&max-batch-size=3&version=${KAFKA_VERSION}"
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760&max-batch-size=3"
fi

# Add a check table to reduce check time, or if we check data with sync diff
Expand Down
Loading

0 comments on commit dc32dd6

Please sign in to comment.