Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

owner: fix gc safepoint larger by one #2647

Merged
merged 5 commits into from
Aug 26, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions cdc/owner/gc_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,11 @@ func (m *gcManager) updateGCSafePoint(ctx cdcContext.Context, state *model.Globa
default:
continue
}
checkpointTs := cfState.Info.GetCheckpointTs(cfState.Status)
if minCheckpointTs > checkpointTs {
minCheckpointTs = checkpointTs
// When the changefeed starts up, CDC will do a snapshot read at (checkpoint-ts - 1) from TiKV,
// so (checkpoint - 1) should be an upper bound for the GC safepoint.
gcSafepointUpperBound := cfState.Info.GetCheckpointTs(cfState.Status) - 1
if minCheckpointTs > gcSafepointUpperBound {
minCheckpointTs = gcSafepointUpperBound
}
}
m.lastUpdatedTime = time.Now()
Expand Down Expand Up @@ -129,17 +131,18 @@ func (m *gcManager) currentTimeFromPDCached(ctx cdcContext.Context) (time.Time,
}

func (m *gcManager) checkStaleCheckpointTs(ctx cdcContext.Context, checkpointTs model.Ts) error {
gcSafepointUpperBound := checkpointTs - 1
if m.isTiCDCBlockGC {
pdTime, err := m.currentTimeFromPDCached(ctx)
if err != nil {
return errors.Trace(err)
}
if pdTime.Sub(oracle.GetTimeFromTS(checkpointTs)) > time.Duration(m.gcTTL)*time.Second {
if pdTime.Sub(oracle.GetTimeFromTS(gcSafepointUpperBound)) > time.Duration(m.gcTTL)*time.Second {
return cerror.ErrGCTTLExceeded.GenWithStackByArgs(checkpointTs, ctx.ChangefeedVars().ID)
}
} else {
// if `isTiCDCBlockGC` is false, it means there is another service gc point less than the min checkpoint ts.
if checkpointTs < m.lastSafePointTs {
if gcSafepointUpperBound < m.lastSafePointTs {
return cerror.ErrSnapshotLostByGC.GenWithStackByArgs(checkpointTs, m.lastSafePointTs)
}
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/owner/gc_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (s *gcManagerSuite) TestUpdateGCSafePoint(c *check.C) {
mockPDClient.updateServiceGCSafePointFunc = func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
c.Assert(serviceID, check.Equals, cdcServiceSafePointID)
c.Assert(ttl, check.Equals, gcManager.gcTTL)
c.Assert(safePoint, check.Equals, uint64(20))
c.Assert(safePoint, check.Equals, uint64(19))
return 0, nil
}
err = gcManager.updateGCSafePoint(ctx, state)
Expand Down
9 changes: 9 additions & 0 deletions tests/gc_safepoint/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ function get_safepoint() {
echo $safe_point
}

function clear_gc_worker_safepoint() {
pd_addr=$1
pd_cluster_id=$2
ETCDCTL_API=3 etcdctl --endpoints=$pd_addr del /pd/$pd_cluster_id/gc/safe_point/service/ticdc
}

function check_safepoint_cleared() {
pd_addr=$1
pd_cluster_id=$2
Expand Down Expand Up @@ -67,6 +73,7 @@ export -f check_safepoint_forward
export -f check_safepoint_cleared
export -f check_safepoint_equal
export -f check_changefeed_state
export -f get_clear_gc_worker_safepoint
amyangfei marked this conversation as resolved.
Show resolved Hide resolved

function run() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR
Expand All @@ -86,6 +93,8 @@ function run() {
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr
changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}')

clear_gc_worker_safepoint $pd_addr $pd_cluster_id

run_sql "CREATE DATABASE gc_safepoint;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "CREATE table gc_safepoint.simple(id int primary key auto_increment, val int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "INSERT INTO gc_safepoint.simple VALUES (),();" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down