Skip to content

Commit

Permalink
sink(ticdc): set the default value of enable-partition-separator to t…
Browse files Browse the repository at this point in the history
…rue (#8771) (#8779)

close #8724
  • Loading branch information
ti-chi-bot authored Apr 12, 2023
1 parent 5b3c852 commit 3ad19a3
Show file tree
Hide file tree
Showing 19 changed files with 231 additions and 133 deletions.
2 changes: 1 addition & 1 deletion cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ var defaultAPIConfig = &ReplicaConfig{
EncoderConcurrency: 16,
Terminator: config.CRLF,
DateSeparator: config.DateSeparatorNone.String(),
EnablePartitionSeparator: false,
EnablePartitionSeparator: true,
},
Consistent: &ConsistentConfig{
Level: "none",
Expand Down
3 changes: 2 additions & 1 deletion cdc/entry/schema/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -975,7 +975,8 @@ func (s *snapshot) exchangePartition(targetTable *model.TableInfo, currentTS uin
return errors.Trace(err)
}

newSourceTable := sourceTable.Clone()
newSourceTable := model.WrapTableInfo(sourceTable.SchemaID, sourceTable.TableName.Schema,
targetTable.Version, sourceTable.TableInfo.Clone())
// 5.update the sourceTable
err = s.dropTable(sourceTable.ID, currentTS)
if err != nil {
Expand Down
14 changes: 8 additions & 6 deletions pkg/cmd/util/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,10 @@ func TestAndWriteExampleReplicaTOML(t *testing.T) {
Delimiter: string(config.Comma),
NullString: config.NULL,
},
Terminator: "\r\n",
DateSeparator: config.DateSeparatorNone.String(),
Protocol: "open-protocol",
Terminator: "\r\n",
DateSeparator: config.DateSeparatorNone.String(),
EnablePartitionSeparator: true,
Protocol: "open-protocol",
}, cfg.Sink)
}

Expand All @@ -213,9 +214,10 @@ func TestAndWriteStorageSinkTOML(t *testing.T) {
err = cfg.ValidateAndAdjust(nil)
require.Nil(t, err)
require.Equal(t, &config.SinkConfig{
EncoderConcurrency: 16,
Terminator: "\r\n",
DateSeparator: "day",
EncoderConcurrency: 16,
Terminator: "\r\n",
DateSeparator: "day",
EnablePartitionSeparator: true,
CSVConfig: &config.CSVConfig{
Delimiter: ",",
Quote: "\"",
Expand Down
1 change: 1 addition & 0 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ const (
"rule": "r2"
}
],
"enable-partition-separator": true,
"protocol": "open-protocol"
},
"consistent": {
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ var defaultReplicaConfig = &ReplicaConfig{
EncoderConcurrency: 16,
Terminator: CRLF,
DateSeparator: DateSeparatorNone.String(),
EnablePartitionSeparator: false,
EnablePartitionSeparator: true,
TiDBSourceID: 1,
},
Consistent: &ConsistentConfig{
Expand Down
1 change: 1 addition & 0 deletions pkg/sink/cloudstorage/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func TestConfigApply(t *testing.T) {
expected.FlushInterval = 10 * time.Second
expected.FileSize = 16 * 1024 * 1024
expected.DateSeparator = config.DateSeparatorNone.String()
expected.EnablePartitionSeparator = true
uri := "s3://bucket/prefix?worker-count=32&flush-interval=10s&file-size=16777216"
sinkURI, err := url.Parse(uri)
require.Nil(t, err)
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/api_v2/cases.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ var defaultReplicaConfig = &ReplicaConfig{
EncoderConcurrency: 16,
Terminator: "\r\n",
DateSeparator: "none",
EnablePartitionSeparator: false,
EnablePartitionSeparator: true,
},
Consistent: &ConsistentConfig{
Level: "none",
Expand Down
10 changes: 2 additions & 8 deletions tests/integration_tests/canal_json_storage_basic/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,14 @@ WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

rm -rf "$WORK_DIR"
mkdir -p "$WORK_DIR"

stop() {
stop_tidb_cluster
}

function run() {
# Now, we run the storage tests in mysql sink tests.
# It's a temporary solution, we will move it to a new test pipeline later.
if [ "$SINK_TYPE" != "mysql" ]; then
return
fi

rm -rf $WORK_DIR && mkdir -p $WORK_DIR
start_tidb_cluster --workdir $WORK_DIR
cd $WORK_DIR
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
Expand All @@ -38,7 +32,7 @@ function run() {
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 100
}

trap stop EXIT
trap stop_tidb_cluster EXIT
run $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[sink]
protocol = "canal-json"
# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty.
terminator = "\n"
# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none.
date-separator = 'day'
# Enable partition separator. The default value is false.
enable-partition-separator = true

[sink.csv]
# Delimiter between fields. Must be ASCII characters. The default value is ','.
delimiter = ','
# Quoting character. Empty value means no quoting. The default value is '"'.
quote = '"'
# Representation of null values in CSV files, the default value is '\N'
null = '\N'
# Include commit-ts in the row data. The default value is false.
include-commit-ts = true
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# diff Configuration.

check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/partition_table/sync_diff/output"

source-instances = ["mysql1"]

target-instance = "tidb0"

target-check-tables = ["partition_table.?*"]

[data-sources]
[data-sources.mysql1]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""

[data-sources.tidb0]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
drop database if exists `partition_table`;
set @@global.tidb_enable_exchange_partition=on;
create database `partition_table`;
use `partition_table`;

create table t (a int, primary key (a)) partition by hash(a) partitions 5;
insert into t values (1),(2),(3),(4),(5),(6);
insert into t values (7),(8),(9);
alter table t truncate partition p3;
update t set a=a+10 where a=2;


create table t1 (a int primary key) PARTITION BY RANGE ( a ) ( PARTITION p0 VALUES LESS THAN (6),PARTITION p1 VALUES LESS THAN (11),PARTITION p2 VALUES LESS THAN (21));
insert into t1 values (1),(2),(3),(4),(5),(6);
insert into t1 values (7),(8),(9);
insert into t1 values (11),(12),(20);
alter table t1 add partition (partition p3 values less than (30), partition p4 values less than (40));
insert into t1 values (25),(29),(35); /*these values in p3,p4*/
alter table t1 truncate partition p0;
alter table t1 drop partition p1;
insert into t1 values (7),(8),(9);
update t1 set a=a+10 where a=9;

create table t2 (a int primary key);
ALTER TABLE t1 EXCHANGE PARTITION p3 WITH TABLE t2;
insert into t2 values (100),(101),(102),(103),(104),(105); /*these values will be replicated to in downstream t2*/
insert into t1 values (25),(29); /*these values will be replicated to in downstream t1.p3*/

/* REORGANIZE is not supported by release v6.5 */
-- ALTER TABLE t1 REORGANIZE PARTITION p0,p2 INTO (PARTITION p0 VALUES LESS THAN (5), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN (21));
-- insert into t1 values (-1),(6),(13);
-- update t1 set a=a-22 where a=20;
-- delete from t1 where a = 5;
-- ALTER TABLE t1 REORGANIZE PARTITION p2,p3,p4 INTO (PARTITION p2 VALUES LESS THAN (20), PARTITION p3 VALUES LESS THAN (26), PARTITION p4 VALUES LESS THAN (35), PARTITION pMax VALUES LESS THAN (MAXVALUE));
-- insert into t1 values (-3),(5),(14),(22),(30),(100);
-- update t1 set a=a-16 where a=12;
-- delete from t1 where a = 29;

create table finish_mark (a int primary key);
51 changes: 51 additions & 0 deletions tests/integration_tests/canal_json_storage_partition_table/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#!/bin/bash

set -eu

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

function run() {
# Now, we run the storage tests in mysql sink tests.
# It's a temporary solution, we will move it to a new test pipeline later.
if [ "$SINK_TYPE" != "mysql" ]; then
return
fi

rm -rf $WORK_DIR && mkdir -p $WORK_DIR
start_tidb_cluster --workdir $WORK_DIR
cd $WORK_DIR

run_sql "set @@global.tidb_enable_exchange_partition=on" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
# TODO(CharlesCheung): remove this after schema level ddl is supported by storage sink
run_sql "create database partition_table;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --logsuffix cdc0
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8301" --logsuffix cdc1
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8302" --logsuffix cdc2

SINK_URI="file://$WORK_DIR/storage_test?flush-interval=5s&enable-tidb-extension=true"
run_cdc_cli changefeed create --sink-uri="$SINK_URI" --config=$CUR/conf/changefeed.toml

run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}

run_storage_consumer $WORK_DIR $SINK_URI $CUR/conf/changefeed.toml ""
sleep 8

# sync_diff can't check non-exist table, so we check expected tables are created in downstream first
check_table_exists partition_table.t ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists partition_table.t1 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
# check_table_exists partition_table.t2 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists partition_table.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
run $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
10 changes: 2 additions & 8 deletions tests/integration_tests/csv_storage_basic/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,14 @@ WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

rm -rf "$WORK_DIR"
mkdir -p "$WORK_DIR"

stop() {
stop_tidb_cluster
}

function run() {
# Now, we run the storage tests in mysql sink tests.
# It's a temporary solution, we will move it to a new test pipeline later.
if [ "$SINK_TYPE" != "mysql" ]; then
return
fi

rm -rf $WORK_DIR && mkdir -p $WORK_DIR
start_tidb_cluster --workdir $WORK_DIR
cd $WORK_DIR
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
Expand All @@ -37,7 +31,7 @@ function run() {
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 100
}

trap stop EXIT
trap stop_tidb_cluster EXIT
run $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
57 changes: 11 additions & 46 deletions tests/integration_tests/csv_storage_multi_tables_ddl/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,45 +8,14 @@ WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

# start the s3 server
export MINIO_ACCESS_KEY=cdcs3accesskey
export MINIO_SECRET_KEY=cdcs3secretkey
export MINIO_BROWSER=off
export AWS_ACCESS_KEY_ID=$MINIO_ACCESS_KEY
export AWS_SECRET_ACCESS_KEY=$MINIO_SECRET_KEY
export S3_ENDPOINT=127.0.0.1:24927
rm -rf "$WORK_DIR"
mkdir -p "$WORK_DIR"
pkill -9 minio || true
bin/minio server --address $S3_ENDPOINT "$WORK_DIR/s3" &
MINIO_PID=$!
i=0
while ! curl -o /dev/null -v -s "http://$S3_ENDPOINT/"; do
i=$(($i + 1))
if [ $i -gt 30 ]; then
echo 'Failed to start minio'
exit 1
fi
sleep 2
done

stop_minio() {
kill -2 $MINIO_PID
}

stop() {
stop_minio
stop_tidb_cluster
}
s3cmd --access_key=$MINIO_ACCESS_KEY --secret_key=$MINIO_SECRET_KEY --host=$S3_ENDPOINT --host-bucket=$S3_ENDPOINT --no-ssl mb s3://logbucket

function run() {
# Now, we run the storage tests in mysql sink tests.
# It's a temporary solution, we will move it to a new test pipeline later.
if [ "$SINK_TYPE" != "mysql" ]; then
return
fi

rm -rf $WORK_DIR && mkdir -p $WORK_DIR
start_tidb_cluster --workdir $WORK_DIR
cd $WORK_DIR

Expand All @@ -59,26 +28,22 @@ function run() {
TOPIC_NAME_2="ticdc-multi-tables-ddl-test-error-1-$RANDOM"
TOPIC_NAME_3="ticdc-multi-tables-ddl-test-error-2-$RANDOM"

case $SINK_TYPE in
*) ;;
esac

cf_normal="test-normal"
cf_err1="test-error-1"
cf_err2="test-error-2"

run_sql "create database multi_tables_ddl_test" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
SINK_URI="s3://logbucket/$TOPIC_NAME_1?flush-interval=5s&endpoint=http://127.0.0.1:24927&protocol=csv"
cdc cli changefeed create -c=$cf_normal --start-ts=$start_ts --sink-uri="$SINK_URI" --config="$CUR/conf/normal.toml"
SINK_URI1="file://$WORK_DIR/storage_test/$TOPIC_NAME_1?flush-interval=5s&protocol=csv"
cdc cli changefeed create -c=$cf_normal --start-ts=$start_ts --sink-uri="$SINK_URI1" --config="$CUR/conf/normal.toml"

SINK_URI="s3://logbucket/$TOPIC_NAME_2?flush-interval=5s&endpoint=http://127.0.0.1:24927&protocol=csv"
cdc cli changefeed create -c=$cf_err1 --start-ts=$start_ts --sink-uri="$SINK_URI" --config="$CUR/conf/error-1.toml"
SINK_URI2="file://$WORK_DIR/storage_test/$TOPIC_NAME_2?flush-interval=5s&protocol=csv"
cdc cli changefeed create -c=$cf_err1 --start-ts=$start_ts --sink-uri="$SINK_URI2" --config="$CUR/conf/error-1.toml"

SINK_URI="s3://logbucket/$TOPIC_NAME_3?flush-interval=5s&endpoint=http://127.0.0.1:24927&protocol=csv"
cdc cli changefeed create -c=$cf_err2 --start-ts=$start_ts --sink-uri="$SINK_URI" --config="$CUR/conf/error-2.toml"
run_storage_consumer $WORK_DIR "s3://logbucket/$TOPIC_NAME_1?endpoint=http://127.0.0.1:24927&protocol=csv" "$CUR/conf/normal.toml" 1
run_storage_consumer $WORK_DIR "s3://logbucket/$TOPIC_NAME_2?endpoint=http://127.0.0.1:24927&protocol=csv" "$CUR/conf/error-1.toml" 2
run_storage_consumer $WORK_DIR "s3://logbucket/$TOPIC_NAME_3?endpoint=http://127.0.0.1:24927&protocol=csv" "$CUR/conf/error-2.toml" 3
SINK_URI3="file://$WORK_DIR/storage_test/$TOPIC_NAME_3?flush-interval=5s&protocol=csv"
cdc cli changefeed create -c=$cf_err2 --start-ts=$start_ts --sink-uri="$SINK_URI3" --config="$CUR/conf/error-2.toml"
run_storage_consumer $WORK_DIR $SINK_URI1 "$CUR/conf/normal.toml" 1
run_storage_consumer $WORK_DIR $SINK_URI2 "$CUR/conf/error-1.toml" 2
run_storage_consumer $WORK_DIR $SINK_URI3 "$CUR/conf/error-2.toml" 3

run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
check_table_exists multi_tables_ddl_test.t55 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
Expand All @@ -101,7 +66,7 @@ function run() {
cleanup_process $CDC_BINARY
}

trap stop EXIT
trap stop_tidb_cluster EXIT
run $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ export-fix-sql = true
check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/csv_storage_partition_table/sync_diff/output"
output-dir = "./sync_diff/output"

source-instances = ["mysql1"]

target-instance = "tidb0"

target-check-tables = ["test.?*"]
target-check-tables = ["partition_table.?*"]

[data-sources]
[data-sources.mysql1]
Expand Down

This file was deleted.

Loading

0 comments on commit 3ad19a3

Please sign in to comment.