Skip to content

Commit

Permalink
ticdc: Support create table ddl only appear in tidb_ddl_history ins…
Browse files Browse the repository at this point in the history
…tead of tidb_ddl_job table (#10907)

close #10908
  • Loading branch information
hongyunyan authored May 7, 2024
1 parent 99d3755 commit 8d6a9e1
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 26 deletions.
88 changes: 76 additions & 12 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
pfilter "github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/integrity"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
Expand All @@ -64,6 +65,19 @@ type rowKVEntry struct {
PreRowExist bool
}

// DDLTableInfo contains the tableInfo about tidb_ddl_job and tidb_ddl_history
// and the column id of `job_meta` in these two tables.
type DDLTableInfo struct {
// ddlJobsTable use to parse all ddl jobs except `create table`
DDLJobTable *model.TableInfo
// It holds the column id of `job_meta` in table `tidb_ddl_jobs`.
JobMetaColumnIDinJobTable int64
// ddlHistoryTable only use to parse `create table` ddl job
DDLHistoryTable *model.TableInfo
// It holds the column id of `job_meta` in table `tidb_ddl_history`.
JobMetaColumnIDinHistoryTable int64
}

// Mounter is used to parse SQL events from KV events
type Mounter interface {
// DecodeEvent accepts `model.PolymorphicEvent` with `RawKVEntry` filled and
Expand Down Expand Up @@ -298,39 +312,89 @@ func IsLegacyFormatJob(rawKV *model.RawKVEntry) bool {
return bytes.HasPrefix(rawKV.Key, metaPrefix)
}

// ParseDDLJob parses the job from the raw KV entry. id is the column id of `job_meta`.
func ParseDDLJob(tblInfo *model.TableInfo, rawKV *model.RawKVEntry, id int64) (*timodel.Job, error) {
// ParseDDLJob parses the job from the raw KV entry.
func ParseDDLJob(rawKV *model.RawKVEntry, ddlTableInfo *DDLTableInfo) (*timodel.Job, error) {
var v []byte
var datum types.Datum

// for test case only
if bytes.HasPrefix(rawKV.Key, metaPrefix) {
// old queue base job.
v = rawKV.Value
} else {
// DDL job comes from `tidb_ddl_job` table after we support concurrent DDL. We should decode the job from the column.
recordID, err := tablecodec.DecodeRowKey(rawKV.Key)
job, err := parseJob(v, rawKV.StartTs, rawKV.CRTs, false)
if err != nil || job == nil {
job, err = parseJob(v, rawKV.StartTs, rawKV.CRTs, true)
}
return job, err
}

recordID, err := tablecodec.DecodeRowKey(rawKV.Key)
if err != nil {
return nil, errors.Trace(err)
}

tableID := tablecodec.DecodeTableID(rawKV.Key)

// parse it with tidb_ddl_job
if tableID == spanz.JobTableID {
row, err := decodeRow(rawKV.Value, recordID, ddlTableInfo.DDLJobTable, time.UTC)
if err != nil {
return nil, errors.Trace(err)
}
row, err := decodeRow(rawKV.Value, recordID, tblInfo, time.UTC)
datum = row[ddlTableInfo.JobMetaColumnIDinJobTable]
v = datum.GetBytes()

return parseJob(v, rawKV.StartTs, rawKV.CRTs, false)
} else if tableID == spanz.JobHistoryID {
// parse it with tidb_ddl_history
row, err := decodeRow(rawKV.Value, recordID, ddlTableInfo.DDLHistoryTable, time.UTC)
if err != nil {
return nil, errors.Trace(err)
}
datum := row[id]
datum = row[ddlTableInfo.JobMetaColumnIDinHistoryTable]
v = datum.GetBytes()

return parseJob(v, rawKV.StartTs, rawKV.CRTs, true)
}

return parseJob(v, rawKV.StartTs, rawKV.CRTs)
return nil, fmt.Errorf("Unvalid tableID %v in rawKV.Key", tableID)
}

// parseJob unmarshal the job from "v".
func parseJob(v []byte, startTs, CRTs uint64) (*timodel.Job, error) {
// fromHistoryTable is used to distinguish the job is from tidb_dd_job or tidb_ddl_history
// We need to be compatible with the two modes, enable_fast_create_table=on and enable_fast_create_table=off
// When enable_fast_create_table=on, `create table` will only be inserted into tidb_ddl_history after being executed successfully.
// When enable_fast_create_table=off, `create table` just like other ddls will be firstly inserted to tidb_ddl_job,
// and being inserted into tidb_ddl_history after being executed successfully.
// In both two modes, other ddls are all firstly inserted into tidb_ddl_job, and then inserted into tidb_ddl_history after being executed successfully.
//
// To be compatible with these two modes, we will get `create table` ddl from tidb_ddl_history, and all ddls from tidb_ddl_job.
// When enable_fast_create_table=off, for each `create table` ddl we will get twice(once from tidb_ddl_history, once from tidb_ddl_job)
// Because in `handleJob` we will skip the repeated ddls, thus it's ok for us to get `create table` twice.
// Besides, the `create table` from tidb_ddl_job always have a earlier commitTs than from tidb_ddl_history.
// Therefore, we always use the commitTs of ddl from `tidb_ddl_job` as StartTs, which ensures we can get all the dmls.
func parseJob(v []byte, startTs, CRTs uint64, fromHistoryTable bool) (*timodel.Job, error) {
var job timodel.Job
err := json.Unmarshal(v, &job)
if err != nil {
return nil, errors.Trace(err)
}
if !job.IsDone() {
return nil, nil

if fromHistoryTable {
// we only want to get `create table` ddl from tidb_ddl_history, so we just throw out others ddls.
// We only want the job with `JobStateSynced`, which is means the ddl job is done successfully.
// Besides, to satisfy the subsequent processing,
// We need to set the job to be Done to make it will replay in schemaStorage
if job.Type != timodel.ActionCreateTable || job.State != timodel.JobStateSynced {
return nil, nil
}
job.State = timodel.JobStateDone
} else {
// we need to get all ddl job which is done from tidb_ddl_job
if !job.IsDone() {
return nil, nil
}
}

// FinishedTS is only set when the job is synced,
// but we can use the entry's ts here
job.StartTS = startTs
Expand Down
38 changes: 27 additions & 11 deletions cdc/puller/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,8 @@ type ddlJobPullerImpl struct {
resolvedTs uint64
schemaVersion int64
filter filter.Filter
// ddlJobsTable is initialized when receive the first concurrent DDL job.
// It holds the info of table `tidb_ddl_jobs` of upstream TiDB.
ddlJobsTable *model.TableInfo
// It holds the column id of `job_meta` in table `tidb_ddl_jobs`.
jobMetaColumnID int64
// ddlTableInfo is initialized when receive the first concurrent DDL job.
ddlTableInfo *entry.DDLTableInfo
// outputCh sends the DDL job entries to the caller.
outputCh chan *model.DDLJobEntry
}
Expand Down Expand Up @@ -239,13 +236,14 @@ func (p *ddlJobPullerImpl) unmarshalDDL(rawKV *model.RawKVEntry) (*timodel.Job,
if rawKV.OpType != model.OpTypePut {
return nil, nil
}
if p.ddlJobsTable == nil && !entry.IsLegacyFormatJob(rawKV) {
err := p.initJobTableMeta()
if p.ddlTableInfo == nil && !entry.IsLegacyFormatJob(rawKV) {
err := p.initDDLTableInfo()
if err != nil {
return nil, errors.Trace(err)
}
}
return entry.ParseDDLJob(p.ddlJobsTable, rawKV, p.jobMetaColumnID)

return entry.ParseDDLJob(rawKV, p.ddlTableInfo)
}

func (p *ddlJobPullerImpl) getResolvedTs() uint64 {
Expand All @@ -256,7 +254,7 @@ func (p *ddlJobPullerImpl) setResolvedTs(ts uint64) {
atomic.StoreUint64(&p.resolvedTs, ts)
}

func (p *ddlJobPullerImpl) initJobTableMeta() error {
func (p *ddlJobPullerImpl) initDDLTableInfo() error {
version, err := p.kvStorage.CurrentVersion(tidbkv.GlobalTxnScope)
if err != nil {
return errors.Trace(err)
Expand All @@ -277,6 +275,8 @@ func (p *ddlJobPullerImpl) initJobTableMeta() error {
if err != nil {
return errors.Trace(err)
}

// for tidb_ddl_job
tableInfo, err := findTableByName(tbls, "tidb_ddl_job")
if err != nil {
return errors.Trace(err)
Expand All @@ -287,8 +287,24 @@ func (p *ddlJobPullerImpl) initJobTableMeta() error {
return errors.Trace(err)
}

p.ddlJobsTable = model.WrapTableInfo(db.ID, db.Name.L, 0, tableInfo)
p.jobMetaColumnID = col.ID
p.ddlTableInfo = &entry.DDLTableInfo{}
p.ddlTableInfo.DDLJobTable = model.WrapTableInfo(db.ID, db.Name.L, 0, tableInfo)
p.ddlTableInfo.JobMetaColumnIDinJobTable = col.ID

// for tidb_ddl_history
historyTableInfo, err := findTableByName(tbls, "tidb_ddl_history")
if err != nil {
return errors.Trace(err)
}

historyTableCol, err := findColumnByName(historyTableInfo.Columns, "job_meta")
if err != nil {
return errors.Trace(err)
}

p.ddlTableInfo.DDLHistoryTable = model.WrapTableInfo(db.ID, db.Name.L, 0, historyTableInfo)
p.ddlTableInfo.JobMetaColumnIDinHistoryTable = historyTableCol.ID

return nil
}

Expand Down
9 changes: 8 additions & 1 deletion pkg/spanz/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
const (
// JobTableID is the id of `tidb_ddl_job`.
JobTableID = ddl.JobTableID
// JobHistoryID is the id of `tidb_ddl_history`
JobHistoryID = ddl.HistoryTableID
)

// UpperBoundKey represents the maximum value.
Expand Down Expand Up @@ -62,12 +64,17 @@ func GetTableRange(tableID int64) (startKey, endKey []byte) {

// GetAllDDLSpan return all cdc interested spans for DDL.
func GetAllDDLSpan() []tablepb.Span {
spans := make([]tablepb.Span, 0, 1)
spans := make([]tablepb.Span, 0, 2)
start, end := GetTableRange(JobTableID)
spans = append(spans, tablepb.Span{
StartKey: ToComparableKey(start),
EndKey: ToComparableKey(end),
})
start, end = GetTableRange(JobHistoryID)
spans = append(spans, tablepb.Span{
StartKey: ToComparableKey(start),
EndKey: ToComparableKey(end),
})
return spans
}

Expand Down
47 changes: 45 additions & 2 deletions tests/integration_tests/batch_add_table/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,54 @@ WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

function run() {
function run_with_fast_create_table() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR

cd $WORK_DIR

run_sql "set global tidb_enable_fast_create_table=on" ${UP_TIDB_HOST} ${UP_TIDB_PORT}

run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql_file $CUR/data/prepare.sql ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

TOPIC_NAME="ticdc-batch-add-table-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar)
run_pulsar_cluster $WORK_DIR normal
SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true"
;;
*) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;;
esac
run_cdc_cli changefeed create --sink-uri="$SINK_URI"
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac

run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# sync_diff can't check non-exist table, so we check expected tables are created in downstream first
check_table_exists batch_add_table.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

cleanup_process $CDC_BINARY
}

function run_without_fast_create_table() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR

cd $WORK_DIR

run_sql "set global tidb_enable_fast_create_table=off" ${UP_TIDB_HOST} ${UP_TIDB_PORT}

run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql_file $CUR/data/prepare.sql ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

Expand Down Expand Up @@ -46,6 +87,8 @@ function run() {
}

trap stop_tidb_cluster EXIT
run $*
run_without_fast_create_table $*
stop_tidb_cluster
run_with_fast_create_table $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
25 changes: 25 additions & 0 deletions tests/integration_tests/multi_source/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ function prepare() {

trap stop_tidb_cluster EXIT
# storage is not supported yet.
# test without fast create table
if [ "$SINK_TYPE" != "storage" ]; then
# TODO(dongmen): enable pulsar in the future.
if [ "$SINK_TYPE" == "pulsar" ]; then
exit 0
fi
prepare $*
run_sql "set global tidb_enable_fast_create_table=off" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
cd "$(dirname "$0")"
set -o pipefail
GO111MODULE=on go run main.go -config ./config.toml 2>&1 | tee $WORK_DIR/tester.log
Expand All @@ -55,6 +57,29 @@ if [ "$SINK_TYPE" != "storage" ]; then
check_table_exists mark.finish_mark_2 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300
check_table_exists mark.finish_mark_3 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300
check_table_exists mark.finish_mark_4 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300
check_table_exists mark.finish_mark_5 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300
check_table_exists mark.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300
check_sync_diff $WORK_DIR $CUR/diff_config.toml
cleanup_process $CDC_BINARY
check_logs $WORK_DIR
fi
# test with fast create table
if [ "$SINK_TYPE" != "storage" ]; then
# TODO(dongmen): enable pulsar in the future.
if [ "$SINK_TYPE" == "pulsar" ]; then
exit 0
fi
prepare $*
run_sql "set global tidb_enable_fast_create_table=on" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
cd "$(dirname "$0")"
set -o pipefail
GO111MODULE=on go run main.go -config ./config.toml 2>&1 | tee $WORK_DIR/tester.log
check_table_exists mark.finish_mark_0 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300
check_table_exists mark.finish_mark_1 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300
check_table_exists mark.finish_mark_2 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300
check_table_exists mark.finish_mark_3 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300
check_table_exists mark.finish_mark_4 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300
check_table_exists mark.finish_mark_5 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300
check_table_exists mark.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300
check_sync_diff $WORK_DIR $CUR/diff_config.toml
cleanup_process $CDC_BINARY
Expand Down

0 comments on commit 8d6a9e1

Please sign in to comment.