Skip to content

Commit

Permalink
owner(ticdc): calculate pending changefeed in gc (#9541) (#9554)
Browse files Browse the repository at this point in the history
close #9543
  • Loading branch information
ti-chi-bot authored Aug 22, 2023
1 parent 141df43 commit a754418
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 103 deletions.
24 changes: 23 additions & 1 deletion cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,28 @@ func ValidateNamespace(namespace string) error {
return nil
}

// NeedBlockGC returns true if the changefeed need to block the GC safepoint.
// Note: if the changefeed is failed by GC, it should not block the GC safepoint.
func (info *ChangeFeedInfo) NeedBlockGC() bool {
switch info.State {
case StateNormal, StateStopped, StateError:
return true
case StateFailed:
return !info.isFailedByGC()
case StateFinished, StateRemoved:
default:
}
return false
}

func (info *ChangeFeedInfo) isFailedByGC() bool {
if info.Error == nil {
log.Panic("changefeed info is not consistent",
zap.Any("state", info.State), zap.Any("error", info.Error))
}
return cerror.IsChangefeedGCFastFailErrorCode(errors.RFCErrorCode(info.Error.Code))
}

// String implements fmt.Stringer interface, but hide some sensitive information
func (info *ChangeFeedInfo) String() (str string) {
var err error
Expand Down Expand Up @@ -343,7 +365,7 @@ func (info *ChangeFeedInfo) fixState() {
// This corresponds to the case of failure or error.
case AdminNone, AdminResume:
if info.Error != nil {
if cerror.IsChangefeedFastFailErrorCode(errors.RFCErrorCode(info.Error.Code)) {
if cerror.IsChangefeedGCFastFailErrorCode(errors.RFCErrorCode(info.Error.Code)) {
state = StateFailed
} else {
state = StateError
Expand Down
4 changes: 2 additions & 2 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,8 @@ func (c *changefeed) handleWarning(ctx cdcContext.Context, err error) {
}

func (c *changefeed) checkStaleCheckpointTs(ctx cdcContext.Context, checkpointTs uint64) error {
state := c.state.Info.State
if state == model.StateNormal || state == model.StateStopped || state == model.StateError {
cfInfo := c.state.Info
if cfInfo.NeedBlockGC() {
failpoint.Inject("InjectChangefeedFastFailError", func() error {
return cerror.ErrStartTsBeforeGC.FastGen("InjectChangefeedFastFailError")
})
Expand Down
2 changes: 1 addition & 1 deletion cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) {
// if there are a fastFail error in errs, we can just fastFail the changefeed
// and no need to patch other error to the changefeed info
for _, err := range errs {
if cerrors.IsChangefeedFastFailErrorCode(errors.RFCErrorCode(err.Code)) {
if cerrors.IsChangefeedGCFastFailErrorCode(errors.RFCErrorCode(err.Code)) {
m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
if info == nil {
return nil, false, nil
Expand Down
32 changes: 1 addition & 31 deletions cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,26 +691,6 @@ func (o *ownerImpl) updateGCSafepoint(
return nil
}

// ignoreFailedChangeFeedWhenGC checks if a failed changefeed should be ignored
// when calculating the gc safepoint of the associated upstream.
func (o *ownerImpl) ignoreFailedChangeFeedWhenGC(
state *orchestrator.ChangefeedReactorState,
) bool {
upID := state.Info.UpstreamID
us, exist := o.upstreamManager.Get(upID)
if !exist {
log.Warn("upstream not found", zap.Uint64("ID", upID))
return false
}
// in case the changefeed failed right after it is created
// and the status is not initialized yet.
ts := state.Info.StartTs
if state.Status != nil {
ts = state.Status.CheckpointTs
}
return us.GCManager.IgnoreFailedChangeFeed(ts)
}

// calculateGCSafepoint calculates GCSafepoint for different upstream.
// Note: we need to maintain a TiCDC service GC safepoint for each upstream TiDB cluster
// to prevent upstream TiDB GC from removing data that is still needed by TiCDC.
Expand All @@ -722,17 +702,7 @@ func (o *ownerImpl) calculateGCSafepoint(state *orchestrator.GlobalReactorState)
forceUpdateMap := make(map[uint64]interface{})

for changefeedID, changefeedState := range state.Changefeeds {
if changefeedState.Info == nil {
continue
}

switch changefeedState.Info.State {
case model.StateNormal, model.StateStopped, model.StateError:
case model.StateFailed:
if o.ignoreFailedChangeFeedWhenGC(changefeedState) {
continue
}
default:
if changefeedState.Info == nil || !changefeedState.Info.NeedBlockGC() {
continue
}

Expand Down
54 changes: 48 additions & 6 deletions cdc/owner/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import (
"context"
"fmt"
"math"
"math/rand"
"testing"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
Expand All @@ -30,7 +30,7 @@ import (
"github.com/pingcap/tiflow/cdc/scheduler"
"github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/orchestrator"
Expand All @@ -51,7 +51,7 @@ type mockManager struct {
func (m *mockManager) CheckStaleCheckpointTs(
ctx context.Context, changefeedID model.ChangeFeedID, checkpointTs model.Ts,
) error {
return cerror.ErrStartTsBeforeGC.GenWithStackByArgs()
return errors.ErrStartTsBeforeGC.GenWithStackByArgs()
}

var _ gc.Manager = (*mockManager)(nil)
Expand Down Expand Up @@ -474,6 +474,17 @@ func TestUpdateGCSafePoint(t *testing.T) {
changefeedID1.ID),
[]byte(`{"config":{},"state":"failed"}`))
tester.MustApplyPatches()
gcErr := errors.ChangeFeedGCFastFailError[rand.Intn(len(errors.ChangeFeedGCFastFailError))]
errCode, ok := errors.RFCCode(gcErr)
require.True(t, ok)
state.Changefeeds[changefeedID1].PatchInfo(
func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
if info == nil {
return nil, false, nil
}
info.Error = &model.RunningError{Code: string(errCode), Message: gcErr.Error()}
return info, true, nil
})
state.Changefeeds[changefeedID1].PatchStatus(
func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
return &model.ChangeFeedStatus{CheckpointTs: 2}, true, nil
Expand Down Expand Up @@ -687,11 +698,22 @@ func TestCalculateGCSafepointTs(t *testing.T) {
expectForceUpdateMap := make(map[uint64]interface{})
o := ownerImpl{changefeeds: make(map[model.ChangeFeedID]*changefeed)}

stateMap := []model.FeedState{
model.StateNormal, model.StateStopped, model.StateError,
model.StateFailed, /* failed changefeed with normal error should not be ignored */
}
for i := 0; i < 100; i++ {
cfID := model.DefaultChangeFeedID(fmt.Sprintf("testChangefeed-%d", i))
upstreamID := uint64(i / 10)
cfInfo := &model.ChangeFeedInfo{UpstreamID: upstreamID, State: model.StateNormal}
cfStatus := &model.ChangeFeedStatus{CheckpointTs: uint64(i)}
cfStatus := &model.ChangeFeedStatus{CheckpointTs: uint64(i) + 100}
cfInfo := &model.ChangeFeedInfo{UpstreamID: upstreamID, State: stateMap[rand.Intn(4)]}
if cfInfo.State == model.StateFailed {
cfInfo.Error = &model.RunningError{
Addr: "test",
Code: "test",
Message: "test",
}
}
changefeed := &orchestrator.ChangefeedReactorState{
ID: cfID,
Info: cfInfo,
Expand All @@ -701,7 +723,7 @@ func TestCalculateGCSafepointTs(t *testing.T) {

// expectMinTsMap will be like map[upstreamID]{0, 10, 20, ..., 90}
if i%10 == 0 {
expectMinTsMap[upstreamID] = uint64(i)
expectMinTsMap[upstreamID] = uint64(i) + 100
}

// If a changefeed does not exist in ownerImpl.changefeeds,
Expand All @@ -713,6 +735,26 @@ func TestCalculateGCSafepointTs(t *testing.T) {
}
}

for i := 0; i < 10; i++ {
cfID := model.DefaultChangeFeedID(fmt.Sprintf("testChangefeed-ignored-%d", i))
upstreamID := uint64(i)
cfStatus := &model.ChangeFeedStatus{CheckpointTs: uint64(i)}
err := errors.ChangeFeedGCFastFailError[rand.Intn(len(errors.ChangeFeedGCFastFailError))]
errCode, ok := errors.RFCCode(err)
require.True(t, ok)
cfInfo := &model.ChangeFeedInfo{
UpstreamID: upstreamID,
State: model.StateFailed,
Error: &model.RunningError{Code: string(errCode), Message: err.Error()},
}
changefeed := &orchestrator.ChangefeedReactorState{
ID: cfID,
Info: cfInfo,
Status: cfStatus,
}
state.Changefeeds[cfID] = changefeed
}

minCheckpoinTsMap, forceUpdateMap := o.calculateGCSafepoint(state)

require.Equal(t, expectMinTsMap, minCheckpoinTsMap)
Expand Down
16 changes: 8 additions & 8 deletions pkg/errors/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,20 @@ func WrapError(rfcError *errors.Error, err error, args ...interface{}) error {
return rfcError.Wrap(err).GenWithStackByArgs(args...)
}

// changeFeedFastFailError is read only.
// ChangeFeedGCFastFailError is read only.
// If this type of error occurs in a changefeed, it means that the data it
// wants to replicate has been or will be GC. So it makes no sense to try to
// resume the changefeed, and the changefeed should immediately be failed.
var changeFeedFastFailError = []*errors.Error{
var ChangeFeedGCFastFailError = []*errors.Error{
ErrGCTTLExceeded, ErrSnapshotLostByGC, ErrStartTsBeforeGC,
}

// IsChangefeedFastFailError checks if an error is a ChangefeedFastFailError
func IsChangefeedFastFailError(err error) bool {
// IsChangefeedGCFastFailError checks if an error is a ChangefeedFastFailError
func IsChangefeedGCFastFailError(err error) bool {
if err == nil {
return false
}
for _, e := range changeFeedFastFailError {
for _, e := range ChangeFeedGCFastFailError {
if e.Equal(err) {
return true
}
Expand All @@ -59,10 +59,10 @@ func IsChangefeedFastFailError(err error) bool {
return false
}

// IsChangefeedFastFailErrorCode checks the error code, returns true if it is a
// IsChangefeedGCFastFailErrorCode checks the error code, returns true if it is a
// ChangefeedFastFailError code
func IsChangefeedFastFailErrorCode(errCode errors.RFCErrorCode) bool {
for _, e := range changeFeedFastFailError {
func IsChangefeedGCFastFailErrorCode(errCode errors.RFCErrorCode) bool {
for _, e := range ChangeFeedGCFastFailError {
if errCode == e.RFCCode() {
return true
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/errors/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,18 @@ func TestChangefeedFastFailError(t *testing.T) {
t.Parallel()
err := ErrSnapshotLostByGC.FastGenByArgs()
rfcCode, _ := RFCCode(err)
require.Equal(t, true, IsChangefeedFastFailError(err))
require.Equal(t, true, IsChangefeedFastFailErrorCode(rfcCode))
require.Equal(t, true, IsChangefeedGCFastFailError(err))
require.Equal(t, true, IsChangefeedGCFastFailErrorCode(rfcCode))

err = ErrStartTsBeforeGC.FastGenByArgs()
rfcCode, _ = RFCCode(err)
require.Equal(t, true, IsChangefeedFastFailError(err))
require.Equal(t, true, IsChangefeedFastFailErrorCode(rfcCode))
require.Equal(t, true, IsChangefeedGCFastFailError(err))
require.Equal(t, true, IsChangefeedGCFastFailErrorCode(rfcCode))

err = ErrToTLSConfigFailed.FastGenByArgs()
rfcCode, _ = RFCCode(err)
require.Equal(t, false, IsChangefeedFastFailError(err))
require.Equal(t, false, IsChangefeedFastFailErrorCode(rfcCode))
require.Equal(t, false, IsChangefeedGCFastFailError(err))
require.Equal(t, false, IsChangefeedGCFastFailErrorCode(rfcCode))
}

func TestShouldFailChangefeed(t *testing.T) {
Expand Down
22 changes: 0 additions & 22 deletions pkg/txnutil/gc/gc_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ type Manager interface {
// Set `forceUpdate` to force Manager update.
TryUpdateGCSafePoint(ctx context.Context, checkpointTs model.Ts, forceUpdate bool) error
CheckStaleCheckpointTs(ctx context.Context, changefeedID model.ChangeFeedID, checkpointTs model.Ts) error
// IgnoreFailedChangeFeed verifies whether a failed changefeed should be
// disregarded. When calculating the GC safepoint of the related upstream,
IgnoreFailedChangeFeed(checkpointTs uint64) bool
}

type gcManager struct {
Expand Down Expand Up @@ -139,22 +136,3 @@ func (m *gcManager) CheckStaleCheckpointTs(
}
return nil
}

func (m *gcManager) IgnoreFailedChangeFeed(
checkpointTs uint64,
) bool {
pdTime, err := m.pdClock.CurrentTime()
if err != nil {
log.Warn("failed to get ts",
zap.String("GcManagerID", m.gcServiceID),
zap.Error(err),
)
return false
}
// ignore the changefeed if its current checkpoint TS is earlier
// than the (currentPDTso - failedFeedDataRetentionTime).
gcSafepointUpperBound := checkpointTs - 1
return pdTime.Sub(
oracle.GetTimeFromTS(gcSafepointUpperBound),
) > time.Duration(m.gcTTL)*time.Second
}
27 changes: 1 addition & 26 deletions pkg/txnutil/gc/gc_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,30 +102,5 @@ func TestCheckStaleCheckpointTs(t *testing.T) {
gcManager.lastSafePointTs = 20
err = gcManager.CheckStaleCheckpointTs(ctx, cfID, 10)
require.True(t, cerror.ErrSnapshotLostByGC.Equal(errors.Cause(err)))
require.True(t, cerror.IsChangefeedFastFailError(err))
}

func TestIgnoreFailedFeed(t *testing.T) {
t.Parallel()

mockPDClient := &MockPDClient{}
pdClock := pdutil.NewClock4Test()
gcManager := NewManager(etcd.GcServiceIDForTest(),
mockPDClient, pdClock).(*gcManager)
gcManager.gcTTL = 24 * 60 * 60

// 5 hours ago
ts1 := oracle.GoTimeToTS(time.Now().Add(-time.Hour * 5))
ret1 := gcManager.IgnoreFailedChangeFeed(ts1)
require.False(t, ret1)

// 20 hours ago
ts2 := oracle.GoTimeToTS(time.Now().Add(-time.Hour * 20))
ret2 := gcManager.IgnoreFailedChangeFeed(ts2)
require.False(t, ret2)

// 25 hours ago
ts3 := oracle.GoTimeToTS(time.Now().Add(-time.Hour * 25))
ret3 := gcManager.IgnoreFailedChangeFeed(ts3)
require.True(t, ret3)
require.True(t, cerror.IsChangefeedGCFastFailError(err))
}

0 comments on commit a754418

Please sign in to comment.