Skip to content

Commit

Permalink
owner: fix gc safepoint larger by one (#2647)
Browse files Browse the repository at this point in the history
  • Loading branch information
liuzix authored Aug 26, 2021
1 parent 196112d commit 81c22b1
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 6 deletions.
13 changes: 8 additions & 5 deletions cdc/owner/gc_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,11 @@ func (m *gcManager) updateGCSafePoint(ctx cdcContext.Context, state *model.Globa
default:
continue
}
checkpointTs := cfState.Info.GetCheckpointTs(cfState.Status)
if minCheckpointTs > checkpointTs {
minCheckpointTs = checkpointTs
// When the changefeed starts up, CDC will do a snapshot read at (checkpoint-ts - 1) from TiKV,
// so (checkpoint - 1) should be an upper bound for the GC safepoint.
gcSafepointUpperBound := cfState.Info.GetCheckpointTs(cfState.Status) - 1
if minCheckpointTs > gcSafepointUpperBound {
minCheckpointTs = gcSafepointUpperBound
}
}
m.lastUpdatedTime = time.Now()
Expand Down Expand Up @@ -129,17 +131,18 @@ func (m *gcManager) currentTimeFromPDCached(ctx cdcContext.Context) (time.Time,
}

func (m *gcManager) checkStaleCheckpointTs(ctx cdcContext.Context, checkpointTs model.Ts) error {
gcSafepointUpperBound := checkpointTs - 1
if m.isTiCDCBlockGC {
pdTime, err := m.currentTimeFromPDCached(ctx)
if err != nil {
return errors.Trace(err)
}
if pdTime.Sub(oracle.GetTimeFromTS(checkpointTs)) > time.Duration(m.gcTTL)*time.Second {
if pdTime.Sub(oracle.GetTimeFromTS(gcSafepointUpperBound)) > time.Duration(m.gcTTL)*time.Second {
return cerror.ErrGCTTLExceeded.GenWithStackByArgs(checkpointTs, ctx.ChangefeedVars().ID)
}
} else {
// if `isTiCDCBlockGC` is false, it means there is another service gc point less than the min checkpoint ts.
if checkpointTs < m.lastSafePointTs {
if gcSafepointUpperBound < m.lastSafePointTs {
return cerror.ErrSnapshotLostByGC.GenWithStackByArgs(checkpointTs, m.lastSafePointTs)
}
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/owner/gc_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (s *gcManagerSuite) TestUpdateGCSafePoint(c *check.C) {
mockPDClient.updateServiceGCSafePointFunc = func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
c.Assert(serviceID, check.Equals, cdcServiceSafePointID)
c.Assert(ttl, check.Equals, gcManager.gcTTL)
c.Assert(safePoint, check.Equals, uint64(20))
c.Assert(safePoint, check.Equals, uint64(19))
return 0, nil
}
err = gcManager.updateGCSafePoint(ctx, state)
Expand Down
9 changes: 9 additions & 0 deletions tests/gc_safepoint/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ function get_safepoint() {
echo $safe_point
}

function clear_gc_worker_safepoint() {
pd_addr=$1
pd_cluster_id=$2
ETCDCTL_API=3 etcdctl --endpoints=$pd_addr del /pd/$pd_cluster_id/gc/safe_point/service/ticdc
}

function check_safepoint_cleared() {
pd_addr=$1
pd_cluster_id=$2
Expand Down Expand Up @@ -67,6 +73,7 @@ export -f check_safepoint_forward
export -f check_safepoint_cleared
export -f check_safepoint_equal
export -f check_changefeed_state
export -f clear_gc_worker_safepoint

function run() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR
Expand All @@ -86,6 +93,8 @@ function run() {
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr
changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}')

clear_gc_worker_safepoint $pd_addr $pd_cluster_id

run_sql "CREATE DATABASE gc_safepoint;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "CREATE table gc_safepoint.simple(id int primary key auto_increment, val int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "INSERT INTO gc_safepoint.simple VALUES (),();" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down

0 comments on commit 81c22b1

Please sign in to comment.