diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index 1a3709d7521..db1fdbcaee0 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -179,6 +179,28 @@ func ValidateNamespace(namespace string) error { return nil } +// NeedBlockGC returns true if the changefeed need to block the GC safepoint. +// Note: if the changefeed is failed by GC, it should not block the GC safepoint. +func (info *ChangeFeedInfo) NeedBlockGC() bool { + switch info.State { + case StateNormal, StateStopped, StateError: + return true + case StateFailed: + return !info.isFailedByGC() + case StateFinished, StateRemoved: + default: + } + return false +} + +func (info *ChangeFeedInfo) isFailedByGC() bool { + if info.Error == nil { + log.Panic("changefeed info is not consistent", + zap.Any("state", info.State), zap.Any("error", info.Error)) + } + return cerror.IsChangefeedGCFastFailErrorCode(errors.RFCErrorCode(info.Error.Code)) +} + // String implements fmt.Stringer interface, but hide some sensitive information func (info *ChangeFeedInfo) String() (str string) { var err error @@ -343,7 +365,7 @@ func (info *ChangeFeedInfo) fixState() { // This corresponds to the case of failure or error. case AdminNone, AdminResume: if info.Error != nil { - if cerror.IsChangefeedFastFailErrorCode(errors.RFCErrorCode(info.Error.Code)) { + if cerror.IsChangefeedGCFastFailErrorCode(errors.RFCErrorCode(info.Error.Code)) { state = StateFailed } else { state = StateError diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 06d326cb809..59de6b8882a 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -291,8 +291,8 @@ func (c *changefeed) handleWarning(ctx cdcContext.Context, err error) { } func (c *changefeed) checkStaleCheckpointTs(ctx cdcContext.Context, checkpointTs uint64) error { - state := c.state.Info.State - if state == model.StateNormal || state == model.StateStopped || state == model.StateError { + cfInfo := c.state.Info + if cfInfo.NeedBlockGC() { failpoint.Inject("InjectChangefeedFastFailError", func() error { return cerror.ErrStartTsBeforeGC.FastGen("InjectChangefeedFastFailError") }) diff --git a/cdc/owner/feed_state_manager.go b/cdc/owner/feed_state_manager.go index 4bcdf0dd7f8..9d689dd5b00 100644 --- a/cdc/owner/feed_state_manager.go +++ b/cdc/owner/feed_state_manager.go @@ -487,7 +487,7 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) { // if there are a fastFail error in errs, we can just fastFail the changefeed // and no need to patch other error to the changefeed info for _, err := range errs { - if cerrors.IsChangefeedFastFailErrorCode(errors.RFCErrorCode(err.Code)) { + if cerrors.IsChangefeedGCFastFailErrorCode(errors.RFCErrorCode(err.Code)) { m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { if info == nil { return nil, false, nil diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index b3b8fe160a9..f116914c980 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -691,26 +691,6 @@ func (o *ownerImpl) updateGCSafepoint( return nil } -// ignoreFailedChangeFeedWhenGC checks if a failed changefeed should be ignored -// when calculating the gc safepoint of the associated upstream. -func (o *ownerImpl) ignoreFailedChangeFeedWhenGC( - state *orchestrator.ChangefeedReactorState, -) bool { - upID := state.Info.UpstreamID - us, exist := o.upstreamManager.Get(upID) - if !exist { - log.Warn("upstream not found", zap.Uint64("ID", upID)) - return false - } - // in case the changefeed failed right after it is created - // and the status is not initialized yet. - ts := state.Info.StartTs - if state.Status != nil { - ts = state.Status.CheckpointTs - } - return us.GCManager.IgnoreFailedChangeFeed(ts) -} - // calculateGCSafepoint calculates GCSafepoint for different upstream. // Note: we need to maintain a TiCDC service GC safepoint for each upstream TiDB cluster // to prevent upstream TiDB GC from removing data that is still needed by TiCDC. @@ -722,17 +702,7 @@ func (o *ownerImpl) calculateGCSafepoint(state *orchestrator.GlobalReactorState) forceUpdateMap := make(map[uint64]interface{}) for changefeedID, changefeedState := range state.Changefeeds { - if changefeedState.Info == nil { - continue - } - - switch changefeedState.Info.State { - case model.StateNormal, model.StateStopped, model.StateError: - case model.StateFailed: - if o.ignoreFailedChangeFeedWhenGC(changefeedState) { - continue - } - default: + if changefeedState.Info == nil || !changefeedState.Info.NeedBlockGC() { continue } diff --git a/cdc/owner/owner_test.go b/cdc/owner/owner_test.go index 0171b756da5..fbc77601ef0 100644 --- a/cdc/owner/owner_test.go +++ b/cdc/owner/owner_test.go @@ -18,10 +18,10 @@ import ( "context" "fmt" "math" + "math/rand" "testing" "time" - "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" @@ -30,7 +30,7 @@ import ( "github.com/pingcap/tiflow/cdc/scheduler" "github.com/pingcap/tiflow/pkg/config" cdcContext "github.com/pingcap/tiflow/pkg/context" - cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/etcd" "github.com/pingcap/tiflow/pkg/filter" "github.com/pingcap/tiflow/pkg/orchestrator" @@ -51,7 +51,7 @@ type mockManager struct { func (m *mockManager) CheckStaleCheckpointTs( ctx context.Context, changefeedID model.ChangeFeedID, checkpointTs model.Ts, ) error { - return cerror.ErrStartTsBeforeGC.GenWithStackByArgs() + return errors.ErrStartTsBeforeGC.GenWithStackByArgs() } var _ gc.Manager = (*mockManager)(nil) @@ -474,6 +474,17 @@ func TestUpdateGCSafePoint(t *testing.T) { changefeedID1.ID), []byte(`{"config":{},"state":"failed"}`)) tester.MustApplyPatches() + gcErr := errors.ChangeFeedGCFastFailError[rand.Intn(len(errors.ChangeFeedGCFastFailError))] + errCode, ok := errors.RFCCode(gcErr) + require.True(t, ok) + state.Changefeeds[changefeedID1].PatchInfo( + func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { + if info == nil { + return nil, false, nil + } + info.Error = &model.RunningError{Code: string(errCode), Message: gcErr.Error()} + return info, true, nil + }) state.Changefeeds[changefeedID1].PatchStatus( func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { return &model.ChangeFeedStatus{CheckpointTs: 2}, true, nil @@ -687,11 +698,22 @@ func TestCalculateGCSafepointTs(t *testing.T) { expectForceUpdateMap := make(map[uint64]interface{}) o := ownerImpl{changefeeds: make(map[model.ChangeFeedID]*changefeed)} + stateMap := []model.FeedState{ + model.StateNormal, model.StateStopped, model.StateError, + model.StateFailed, /* failed changefeed with normal error should not be ignored */ + } for i := 0; i < 100; i++ { cfID := model.DefaultChangeFeedID(fmt.Sprintf("testChangefeed-%d", i)) upstreamID := uint64(i / 10) - cfInfo := &model.ChangeFeedInfo{UpstreamID: upstreamID, State: model.StateNormal} - cfStatus := &model.ChangeFeedStatus{CheckpointTs: uint64(i)} + cfStatus := &model.ChangeFeedStatus{CheckpointTs: uint64(i) + 100} + cfInfo := &model.ChangeFeedInfo{UpstreamID: upstreamID, State: stateMap[rand.Intn(4)]} + if cfInfo.State == model.StateFailed { + cfInfo.Error = &model.RunningError{ + Addr: "test", + Code: "test", + Message: "test", + } + } changefeed := &orchestrator.ChangefeedReactorState{ ID: cfID, Info: cfInfo, @@ -701,7 +723,7 @@ func TestCalculateGCSafepointTs(t *testing.T) { // expectMinTsMap will be like map[upstreamID]{0, 10, 20, ..., 90} if i%10 == 0 { - expectMinTsMap[upstreamID] = uint64(i) + expectMinTsMap[upstreamID] = uint64(i) + 100 } // If a changefeed does not exist in ownerImpl.changefeeds, @@ -713,6 +735,26 @@ func TestCalculateGCSafepointTs(t *testing.T) { } } + for i := 0; i < 10; i++ { + cfID := model.DefaultChangeFeedID(fmt.Sprintf("testChangefeed-ignored-%d", i)) + upstreamID := uint64(i) + cfStatus := &model.ChangeFeedStatus{CheckpointTs: uint64(i)} + err := errors.ChangeFeedGCFastFailError[rand.Intn(len(errors.ChangeFeedGCFastFailError))] + errCode, ok := errors.RFCCode(err) + require.True(t, ok) + cfInfo := &model.ChangeFeedInfo{ + UpstreamID: upstreamID, + State: model.StateFailed, + Error: &model.RunningError{Code: string(errCode), Message: err.Error()}, + } + changefeed := &orchestrator.ChangefeedReactorState{ + ID: cfID, + Info: cfInfo, + Status: cfStatus, + } + state.Changefeeds[cfID] = changefeed + } + minCheckpoinTsMap, forceUpdateMap := o.calculateGCSafepoint(state) require.Equal(t, expectMinTsMap, minCheckpoinTsMap) diff --git a/pkg/errors/helper.go b/pkg/errors/helper.go index 0e7df7549b6..c8e5019a639 100644 --- a/pkg/errors/helper.go +++ b/pkg/errors/helper.go @@ -34,20 +34,20 @@ func WrapError(rfcError *errors.Error, err error, args ...interface{}) error { return rfcError.Wrap(err).GenWithStackByArgs(args...) } -// changeFeedFastFailError is read only. +// ChangeFeedGCFastFailError is read only. // If this type of error occurs in a changefeed, it means that the data it // wants to replicate has been or will be GC. So it makes no sense to try to // resume the changefeed, and the changefeed should immediately be failed. -var changeFeedFastFailError = []*errors.Error{ +var ChangeFeedGCFastFailError = []*errors.Error{ ErrGCTTLExceeded, ErrSnapshotLostByGC, ErrStartTsBeforeGC, } -// IsChangefeedFastFailError checks if an error is a ChangefeedFastFailError -func IsChangefeedFastFailError(err error) bool { +// IsChangefeedGCFastFailError checks if an error is a ChangefeedFastFailError +func IsChangefeedGCFastFailError(err error) bool { if err == nil { return false } - for _, e := range changeFeedFastFailError { + for _, e := range ChangeFeedGCFastFailError { if e.Equal(err) { return true } @@ -59,10 +59,10 @@ func IsChangefeedFastFailError(err error) bool { return false } -// IsChangefeedFastFailErrorCode checks the error code, returns true if it is a +// IsChangefeedGCFastFailErrorCode checks the error code, returns true if it is a // ChangefeedFastFailError code -func IsChangefeedFastFailErrorCode(errCode errors.RFCErrorCode) bool { - for _, e := range changeFeedFastFailError { +func IsChangefeedGCFastFailErrorCode(errCode errors.RFCErrorCode) bool { + for _, e := range ChangeFeedGCFastFailError { if errCode == e.RFCCode() { return true } diff --git a/pkg/errors/helper_test.go b/pkg/errors/helper_test.go index bd19b5c8a58..e48282fb870 100644 --- a/pkg/errors/helper_test.go +++ b/pkg/errors/helper_test.go @@ -100,18 +100,18 @@ func TestChangefeedFastFailError(t *testing.T) { t.Parallel() err := ErrSnapshotLostByGC.FastGenByArgs() rfcCode, _ := RFCCode(err) - require.Equal(t, true, IsChangefeedFastFailError(err)) - require.Equal(t, true, IsChangefeedFastFailErrorCode(rfcCode)) + require.Equal(t, true, IsChangefeedGCFastFailError(err)) + require.Equal(t, true, IsChangefeedGCFastFailErrorCode(rfcCode)) err = ErrStartTsBeforeGC.FastGenByArgs() rfcCode, _ = RFCCode(err) - require.Equal(t, true, IsChangefeedFastFailError(err)) - require.Equal(t, true, IsChangefeedFastFailErrorCode(rfcCode)) + require.Equal(t, true, IsChangefeedGCFastFailError(err)) + require.Equal(t, true, IsChangefeedGCFastFailErrorCode(rfcCode)) err = ErrToTLSConfigFailed.FastGenByArgs() rfcCode, _ = RFCCode(err) - require.Equal(t, false, IsChangefeedFastFailError(err)) - require.Equal(t, false, IsChangefeedFastFailErrorCode(rfcCode)) + require.Equal(t, false, IsChangefeedGCFastFailError(err)) + require.Equal(t, false, IsChangefeedGCFastFailErrorCode(rfcCode)) } func TestShouldFailChangefeed(t *testing.T) { diff --git a/pkg/txnutil/gc/gc_manager.go b/pkg/txnutil/gc/gc_manager.go index 639aba6e2be..db1882d0089 100644 --- a/pkg/txnutil/gc/gc_manager.go +++ b/pkg/txnutil/gc/gc_manager.go @@ -38,9 +38,6 @@ type Manager interface { // Set `forceUpdate` to force Manager update. TryUpdateGCSafePoint(ctx context.Context, checkpointTs model.Ts, forceUpdate bool) error CheckStaleCheckpointTs(ctx context.Context, changefeedID model.ChangeFeedID, checkpointTs model.Ts) error - // IgnoreFailedChangeFeed verifies whether a failed changefeed should be - // disregarded. When calculating the GC safepoint of the related upstream, - IgnoreFailedChangeFeed(checkpointTs uint64) bool } type gcManager struct { @@ -139,22 +136,3 @@ func (m *gcManager) CheckStaleCheckpointTs( } return nil } - -func (m *gcManager) IgnoreFailedChangeFeed( - checkpointTs uint64, -) bool { - pdTime, err := m.pdClock.CurrentTime() - if err != nil { - log.Warn("failed to get ts", - zap.String("GcManagerID", m.gcServiceID), - zap.Error(err), - ) - return false - } - // ignore the changefeed if its current checkpoint TS is earlier - // than the (currentPDTso - failedFeedDataRetentionTime). - gcSafepointUpperBound := checkpointTs - 1 - return pdTime.Sub( - oracle.GetTimeFromTS(gcSafepointUpperBound), - ) > time.Duration(m.gcTTL)*time.Second -} diff --git a/pkg/txnutil/gc/gc_manager_test.go b/pkg/txnutil/gc/gc_manager_test.go index 990c936faa5..4d68044bf1a 100644 --- a/pkg/txnutil/gc/gc_manager_test.go +++ b/pkg/txnutil/gc/gc_manager_test.go @@ -102,30 +102,5 @@ func TestCheckStaleCheckpointTs(t *testing.T) { gcManager.lastSafePointTs = 20 err = gcManager.CheckStaleCheckpointTs(ctx, cfID, 10) require.True(t, cerror.ErrSnapshotLostByGC.Equal(errors.Cause(err))) - require.True(t, cerror.IsChangefeedFastFailError(err)) -} - -func TestIgnoreFailedFeed(t *testing.T) { - t.Parallel() - - mockPDClient := &MockPDClient{} - pdClock := pdutil.NewClock4Test() - gcManager := NewManager(etcd.GcServiceIDForTest(), - mockPDClient, pdClock).(*gcManager) - gcManager.gcTTL = 24 * 60 * 60 - - // 5 hours ago - ts1 := oracle.GoTimeToTS(time.Now().Add(-time.Hour * 5)) - ret1 := gcManager.IgnoreFailedChangeFeed(ts1) - require.False(t, ret1) - - // 20 hours ago - ts2 := oracle.GoTimeToTS(time.Now().Add(-time.Hour * 20)) - ret2 := gcManager.IgnoreFailedChangeFeed(ts2) - require.False(t, ret2) - - // 25 hours ago - ts3 := oracle.GoTimeToTS(time.Now().Add(-time.Hour * 25)) - ret3 := gcManager.IgnoreFailedChangeFeed(ts3) - require.True(t, ret3) + require.True(t, cerror.IsChangefeedGCFastFailError(err)) }