Skip to content

Commit

Permalink
Merge branch 'master' into fix_pd_hang
Browse files Browse the repository at this point in the history
  • Loading branch information
leoppro authored Dec 16, 2020
2 parents b3c370b + 7c6c754 commit da57ce6
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 1 deletion.
21 changes: 21 additions & 0 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,27 @@ func (o *Owner) loadChangeFeeds(ctx context.Context) error {
}
continue
}

// remaining task status means some processors are not exited, wait until
// all these statuses cleaned. If the capture of pending processor loses
// etcd session, the cleanUpStaleTasks will clean these statuses later.
allMetadataCleaned := true
allTaskStatus, err := o.etcdClient.GetAllTaskStatus(ctx, changeFeedID)
if err != nil {
return err
}
for _, taskStatus := range allTaskStatus {
if taskStatus.AdminJobType == model.AdminStop || taskStatus.AdminJobType == model.AdminRemove {
log.Info("stale task status is not deleted, wait metadata cleaned to create new changefeed",
zap.Reflect("task status", taskStatus), zap.String("changefeed", changeFeedID))
allMetadataCleaned = false
break
}
}
if !allMetadataCleaned {
continue
}

checkpointTs := cfInfo.GetCheckpointTs(status)

newCf, err := o.newChangeFeed(ctx, changeFeedID, taskStatus, taskPositions, cfInfo, checkpointTs)
Expand Down
1 change: 1 addition & 0 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1230,6 +1230,7 @@ func (p *processor) stop(ctx context.Context) error {
p.ddlPullerCancel()
// mark tables share the same context with its original table, don't need to cancel
p.stateMu.Unlock()
failpoint.Inject("processorStopDelay", nil)
atomic.StoreInt32(&p.stopped, 1)
if err := p.etcdCli.DeleteTaskPosition(ctx, p.changefeedID, p.captureInfo.ID); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/context/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,12 @@ func (s *contextSuite) TestCancel(c *check.C) {

func (s *contextSuite) TestCancelCascade(c *check.C) {
defer testleak.AfterTest(c)()
startTime := time.Now()
stdCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(1*time.Second))
ctx := NewContext(stdCtx, &Vars{})
ctx1, _ := WithCancel(ctx)
ctx2, cancel2 := WithCancel(ctx)
cancel2()
startTime := time.Now()
<-ctx2.StdContext().Done()
<-ctx2.Done()
c.Assert(time.Since(startTime), check.Less, time.Second)
Expand Down
27 changes: 27 additions & 0 deletions tests/processor_stop_delay/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# diff Configuration.

log-level = "info"
chunk-size = 10
check-thread-count = 4
sample-percent = 100
use-rowid = false
use-checksum = true
fix-sql-file = "fix.sql"

# tables need to check.
[[check-tables]]
schema = "processor_stop_delay"
tables = ["~.*"]

[[source-db]]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""
instance-id = "source-1"

[target-db]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
54 changes: 54 additions & 0 deletions tests/processor_stop_delay/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#!/bin/bash

set -e

CUR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1
TABLE_COUNT=3

function run() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR
start_tidb_cluster --workdir $WORK_DIR
cd $WORK_DIR

pd_addr="http://$UP_PD_HOST_1:$UP_PD_PORT_1"
TOPIC_NAME="ticdc-processor-stop-delay-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4";;
*) SINK_URI="mysql://root@127.0.0.1:3306/?max-txn-row=1";;
esac
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4"
fi

export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/processorStopDelay=1*sleep(10000)'

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr
changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}')
run_sql "CREATE DATABASE processor_stop_delay;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "CREATE table processor_stop_delay.t (id int primary key auto_increment, t datetime DEFAULT CURRENT_TIMESTAMP)" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "INSERT INTO processor_stop_delay.t values (),(),(),(),(),(),()" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
check_table_exists "processor_stop_delay.t" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

# pause changefeed first, and then resume the changefeed. The processor stop
# logic will be delayed by 10s, which is controlled by failpoint injection.
# The changefeed should be resumed and no data loss.
cdc cli changefeed pause --changefeed-id=$changefeed_id --pd=$pd_addr
sleep 1
run_sql "INSERT INTO processor_stop_delay.t values (),(),(),(),(),(),()" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
cdc cli changefeed resume --changefeed-id=$changefeed_id --pd=$pd_addr
run_sql "INSERT INTO processor_stop_delay.t values (),(),(),(),(),(),()" ${UP_TIDB_HOST} ${UP_TIDB_PORT}

check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

export GO_FAILPOINTS=''
cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
run $*
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"

0 comments on commit da57ce6

Please sign in to comment.