diff --git a/cdc/api/v1/api.go b/cdc/api/v1/api.go index 66bc0d26d56..61e047a5e6f 100644 --- a/cdc/api/v1/api.go +++ b/cdc/api/v1/api.go @@ -90,10 +90,10 @@ func RegisterOpenAPIRoutes(router *gin.Engine, api OpenAPI) { changefeedGroup.GET("", controllerMiddleware, api.ListChangefeed) changefeedGroup.GET("/:changefeed_id", changefeedOwnerMiddleware, api.GetChangefeed) changefeedGroup.POST("", controllerMiddleware, api.CreateChangefeed) - changefeedGroup.PUT("/:changefeed_id", controllerMiddleware, api.UpdateChangefeed) + changefeedGroup.PUT("/:changefeed_id", changefeedOwnerMiddleware, api.UpdateChangefeed) changefeedGroup.POST("/:changefeed_id/pause", changefeedOwnerMiddleware, api.PauseChangefeed) changefeedGroup.POST("/:changefeed_id/resume", changefeedOwnerMiddleware, api.ResumeChangefeed) - changefeedGroup.DELETE("/:changefeed_id", changefeedOwnerMiddleware, api.RemoveChangefeed) + changefeedGroup.DELETE("/:changefeed_id", controllerMiddleware, api.RemoveChangefeed) changefeedGroup.POST("/:changefeed_id/tables/rebalance_table", changefeedOwnerMiddleware, api.RebalanceTables) changefeedGroup.POST("/:changefeed_id/tables/move_table", changefeedOwnerMiddleware, api.MoveTable) @@ -297,7 +297,13 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) { return } - info, err := verifyCreateChangefeedConfig(ctx, changefeedConfig, h.capture) + ctrl, err := h.capture.GetController() + if err != nil { + _ = c.Error(err) + return + } + info, err := verifyCreateChangefeedConfig(ctx, changefeedConfig, + up, ctrl, h.capture.GetEtcdClient()) if err != nil { _ = c.Error(err) return @@ -486,11 +492,20 @@ func (h *OpenAPI) RemoveChangefeed(c *gin.Context) { return } // check if the changefeed exists - _, err := h.statusProvider().GetChangeFeedStatus(ctx, changefeedID) + ctrl, err := h.capture.GetController() if err != nil { _ = c.Error(err) return } + exist, err := ctrl.IsChangefeedExists(ctx, changefeedID) + if err != nil { + _ = c.Error(err) + return + } + if !exist { + _ = c.Error(cerror.ErrChangeFeedNotExists.GenWithStackByArgs(changefeedID)) + return + } job := model.AdminJob{ CfID: changefeedID, @@ -505,13 +520,16 @@ func (h *OpenAPI) RemoveChangefeed(c *gin.Context) { // Owner needs at least tow ticks to remove a changefeed, // we need to wait for it. err = retry.Do(ctx, func() error { - _, err := h.statusProvider().GetChangeFeedStatus(ctx, changefeedID) + exist, err = ctrl.IsChangefeedExists(ctx, changefeedID) if err != nil { if strings.Contains(err.Error(), "ErrChangeFeedNotExists") { return nil } return err } + if !exist { + return nil + } return cerror.ErrChangeFeedDeletionUnfinished.GenWithStackByArgs(changefeedID) }, retry.WithMaxTries(100), // max retry duration is 1 minute diff --git a/cdc/api/v1/api_test.go b/cdc/api/v1/api_test.go index 2e942a3c824..24b08e2d491 100644 --- a/cdc/api/v1/api_test.go +++ b/cdc/api/v1/api_test.go @@ -116,6 +116,12 @@ func newRouter(c capture.Capture, p owner.StatusProvider) *gin.Engine { return router } +func newRouterWithoutStatusProvider(c capture.Capture) *gin.Engine { + router := gin.New() + RegisterOpenAPIRoutes(router, NewOpenAPI(c)) + return router +} + func newStatusProvider() *mockStatusProvider { statusProvider := &mockStatusProvider{} statusProvider.On("GetChangeFeedStatus", mock.Anything, changeFeedID). @@ -415,22 +421,13 @@ func TestRemoveChangefeed(t *testing.T) { ctrl := gomock.NewController(t) mo := mock_owner.NewMockOwner(ctrl) - statusProvider := &mockStatusProvider{} - statusProvider.On("GetChangeFeedStatus", mock.Anything, changeFeedID). - Return(&model.ChangeFeedStatusForAPI{CheckpointTs: 1}, nil).Once() - statusProvider.On("GetChangeFeedStatus", mock.Anything, changeFeedID). - Return(new(model.ChangeFeedStatusForAPI), - cerror.ErrChangeFeedNotExists.FastGenByArgs(changeFeedID)).Once() - statusProvider.On("IsChangefeedOwner", mock.Anything, mock.Anything). - Return(true, nil).Times(3) controller := mock2.NewMockController(ctrl) capture := mock_capture.NewMockCapture(ctrl) capture.EXPECT().GetController().Return(controller, nil).AnyTimes() capture.EXPECT().GetOwner().Return(mo, nil).AnyTimes() capture.EXPECT().IsReady().Return(true).AnyTimes() capture.EXPECT().IsController().Return(true).AnyTimes() - capture.EXPECT().StatusProvider().Return(statusProvider).AnyTimes() - router1 := newRouter(capture, statusProvider) + router1 := newRouterWithoutStatusProvider(capture) // test remove changefeed succeeded mo.EXPECT(). @@ -440,13 +437,17 @@ func TestRemoveChangefeed(t *testing.T) { require.EqualValues(t, model.AdminRemove, adminJob.Type) close(done) }) + controller.EXPECT().IsChangefeedExists(gomock.Any(), gomock.Any()).Return(true, nil) + controller.EXPECT().IsChangefeedExists(gomock.Any(), gomock.Any()).Return(false, nil) api := testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s", changeFeedID.ID), method: "DELETE"} w := httptest.NewRecorder() req, _ := http.NewRequestWithContext(context.Background(), api.method, api.url, nil) router1.ServeHTTP(w, req) require.Equal(t, 202, w.Code) - router2 := newRouter(capture, newStatusProvider()) + router2 := newRouterWithoutStatusProvider(capture) + controller.EXPECT().IsChangefeedExists(gomock.Any(), gomock.Any()).Return(true, nil) + // test remove changefeed failed from owner side mo.EXPECT(). EnqueueJob(gomock.Any(), gomock.Any()). @@ -465,6 +466,7 @@ func TestRemoveChangefeed(t *testing.T) { require.Contains(t, respErr.Error, "changefeed not exists") // test remove changefeed failed + controller.EXPECT().IsChangefeedExists(gomock.Any(), nonExistChangefeedID).Return(false, nil) api = testCase{ url: fmt.Sprintf("/api/v1/changefeeds/%s", nonExistChangefeedID.ID), method: "DELETE", diff --git a/cdc/api/v1/validator.go b/cdc/api/v1/validator.go index 3499b544284..d8f564df40a 100644 --- a/cdc/api/v1/validator.go +++ b/cdc/api/v1/validator.go @@ -20,15 +20,17 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" - "github.com/pingcap/tiflow/cdc/capture" + "github.com/pingcap/tiflow/cdc/controller" "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/owner" "github.com/pingcap/tiflow/cdc/sink/validator" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/etcd" "github.com/pingcap/tiflow/pkg/filter" "github.com/pingcap/tiflow/pkg/txnutil/gc" + "github.com/pingcap/tiflow/pkg/upstream" "github.com/pingcap/tiflow/pkg/util" "github.com/pingcap/tiflow/pkg/version" "github.com/r3labs/diff" @@ -39,17 +41,10 @@ import ( func verifyCreateChangefeedConfig( ctx context.Context, changefeedConfig model.ChangefeedConfig, - capture capture.Capture, + up *upstream.Upstream, + ctrl controller.Controller, + ectdClient etcd.CDCEtcdClient, ) (*model.ChangeFeedInfo, error) { - upManager, err := capture.GetUpstreamManager() - if err != nil { - return nil, err - } - up, err := upManager.GetDefaultUpstream() - if err != nil { - return nil, err - } - // verify sinkURI if changefeedConfig.SinkURI == "" { return nil, cerror.ErrSinkURIInvalid.GenWithStackByArgs("sink-uri is empty, can't not create a changefeed without sink-uri") @@ -60,12 +55,11 @@ func verifyCreateChangefeedConfig( return nil, cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedConfig.ID) } // check if the changefeed exists - cfStatus, err := capture.StatusProvider().GetChangeFeedStatus(ctx, - model.DefaultChangeFeedID(changefeedConfig.ID)) + ok, err := ctrl.IsChangefeedExists(ctx, model.DefaultChangeFeedID(changefeedConfig.ID)) if err != nil && cerror.ErrChangeFeedNotExists.NotEqual(err) { return nil, err } - if cfStatus != nil { + if ok { return nil, cerror.ErrChangeFeedAlreadyExists.GenWithStackByArgs(changefeedConfig.ID) } @@ -83,7 +77,7 @@ func verifyCreateChangefeedConfig( if err := gc.EnsureChangefeedStartTsSafety( ctx, up.PDClient, - capture.GetEtcdClient().GetEnsureGCServiceID(gc.EnsureGCServiceCreating), + ectdClient.GetEnsureGCServiceID(gc.EnsureGCServiceCreating), model.DefaultChangeFeedID(changefeedConfig.ID), ensureTTL, changefeedConfig.StartTS); err != nil { if !cerror.ErrStartTsBeforeGC.Equal(err) { @@ -122,7 +116,7 @@ func verifyCreateChangefeedConfig( return nil, err } - captureInfos, err := capture.StatusProvider().GetCaptures(ctx) + captureInfos, err := ctrl.GetCaptures(ctx) if err != nil { return nil, err } diff --git a/cdc/api/v2/api.go b/cdc/api/v2/api.go index 6674cbf50e7..09a2849cdb2 100644 --- a/cdc/api/v2/api.go +++ b/cdc/api/v2/api.go @@ -57,8 +57,8 @@ func RegisterOpenAPIV2Routes(router *gin.Engine, api OpenAPIV2) { changefeedGroup.GET("/:changefeed_id", changefeedOwnerMiddleware, api.getChangeFeed) changefeedGroup.POST("", controllerMiddleware, api.createChangefeed) changefeedGroup.GET("", controllerMiddleware, api.listChangeFeeds) - changefeedGroup.PUT("/:changefeed_id", controllerMiddleware, api.updateChangefeed) - changefeedGroup.DELETE("/:changefeed_id", changefeedOwnerMiddleware, api.deleteChangefeed) + changefeedGroup.PUT("/:changefeed_id", changefeedOwnerMiddleware, api.updateChangefeed) + changefeedGroup.DELETE("/:changefeed_id", controllerMiddleware, api.deleteChangefeed) changefeedGroup.GET("/:changefeed_id/meta_info", changefeedOwnerMiddleware, api.getChangeFeedMetaInfo) changefeedGroup.POST("/:changefeed_id/resume", changefeedOwnerMiddleware, api.resumeChangefeed) changefeedGroup.POST("/:changefeed_id/pause", changefeedOwnerMiddleware, api.pauseChangefeed) diff --git a/cdc/api/v2/api_helpers.go b/cdc/api/v2/api_helpers.go index 0ac63eb5289..426cfda68e5 100644 --- a/cdc/api/v2/api_helpers.go +++ b/cdc/api/v2/api_helpers.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" tidbkv "github.com/pingcap/tidb/kv" + "github.com/pingcap/tiflow/cdc/controller" "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/kv" "github.com/pingcap/tiflow/cdc/model" @@ -55,12 +56,12 @@ const RegisterImportTaskPrefix = "/tidb/brie/import" // Defining it as an interface to make APIs more testable. type APIV2Helpers interface { // verifyCreateChangefeedConfig verifies the changefeedConfig, - // and yield an valid changefeedInfo or error + // and yield a valid changefeedInfo or error verifyCreateChangefeedConfig( ctx context.Context, cfg *ChangefeedConfig, pdClient pd.Client, - statusProvider owner.StatusProvider, + ctrl controller.Controller, ensureGCServiceID string, kvStorage tidbkv.Storage, ) (*model.ChangeFeedInfo, error) @@ -127,7 +128,7 @@ func (APIV2HelpersImpl) verifyCreateChangefeedConfig( ctx context.Context, cfg *ChangefeedConfig, pdClient pd.Client, - statusProvider owner.StatusProvider, + ctrl controller.Controller, ensureGCServiceID string, kvStorage tidbkv.Storage, ) (*model.ChangeFeedInfo, error) { @@ -154,12 +155,12 @@ func (APIV2HelpersImpl) verifyCreateChangefeedConfig( "invalid namespace: %s", cfg.Namespace) } - cfStatus, err := statusProvider.GetChangeFeedStatus(ctx, + exists, err := ctrl.IsChangefeedExists(ctx, model.ChangeFeedID{Namespace: cfg.Namespace, ID: cfg.ID}) if err != nil && cerror.ErrChangeFeedNotExists.NotEqual(err) { return nil, err } - if cfStatus != nil { + if exists { return nil, cerror.ErrChangeFeedAlreadyExists.GenWithStackByArgs(cfg.ID) } diff --git a/cdc/api/v2/api_helpers_mock.go b/cdc/api/v2/api_helpers_mock.go index 98dbd26ad45..733c48140fa 100644 --- a/cdc/api/v2/api_helpers_mock.go +++ b/cdc/api/v2/api_helpers_mock.go @@ -11,8 +11,8 @@ import ( gomock "github.com/golang/mock/gomock" kv "github.com/pingcap/tidb/kv" + controller "github.com/pingcap/tiflow/cdc/controller" model "github.com/pingcap/tiflow/cdc/model" - owner "github.com/pingcap/tiflow/cdc/owner" config "github.com/pingcap/tiflow/pkg/config" security "github.com/pingcap/tiflow/pkg/security" client "github.com/tikv/pd/client" @@ -104,18 +104,18 @@ func (mr *MockAPIV2HelpersMockRecorder) getVerfiedTables(replicaConfig, storage, } // verifyCreateChangefeedConfig mocks base method. -func (m *MockAPIV2Helpers) verifyCreateChangefeedConfig(ctx context.Context, cfg *ChangefeedConfig, pdClient client.Client, statusProvider owner.StatusProvider, ensureGCServiceID string, kvStorage kv.Storage) (*model.ChangeFeedInfo, error) { +func (m *MockAPIV2Helpers) verifyCreateChangefeedConfig(ctx context.Context, cfg *ChangefeedConfig, pdClient client.Client, ctrl controller.Controller, ensureGCServiceID string, kvStorage kv.Storage) (*model.ChangeFeedInfo, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "verifyCreateChangefeedConfig", ctx, cfg, pdClient, statusProvider, ensureGCServiceID, kvStorage) + ret := m.ctrl.Call(m, "verifyCreateChangefeedConfig", ctx, cfg, pdClient, ctrl, ensureGCServiceID, kvStorage) ret0, _ := ret[0].(*model.ChangeFeedInfo) ret1, _ := ret[1].(error) return ret0, ret1 } // verifyCreateChangefeedConfig indicates an expected call of verifyCreateChangefeedConfig. -func (mr *MockAPIV2HelpersMockRecorder) verifyCreateChangefeedConfig(ctx, cfg, pdClient, statusProvider, ensureGCServiceID, kvStorage interface{}) *gomock.Call { +func (mr *MockAPIV2HelpersMockRecorder) verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, ensureGCServiceID, kvStorage interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "verifyCreateChangefeedConfig", reflect.TypeOf((*MockAPIV2Helpers)(nil).verifyCreateChangefeedConfig), ctx, cfg, pdClient, statusProvider, ensureGCServiceID, kvStorage) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "verifyCreateChangefeedConfig", reflect.TypeOf((*MockAPIV2Helpers)(nil).verifyCreateChangefeedConfig), ctx, cfg, pdClient, ctrl, ensureGCServiceID, kvStorage) } // verifyResumeChangefeedConfig mocks base method. diff --git a/cdc/api/v2/api_helpers_test.go b/cdc/api/v2/api_helpers_test.go index 60d29e36a43..51b9608d598 100644 --- a/cdc/api/v2/api_helpers_test.go +++ b/cdc/api/v2/api_helpers_test.go @@ -18,6 +18,8 @@ import ( "testing" "time" + "github.com/golang/mock/gomock" + mock_controller "github.com/pingcap/tiflow/cdc/controller/mock" "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" @@ -32,64 +34,70 @@ func TestVerifyCreateChangefeedConfig(t *testing.T) { helper := entry.NewSchemaTestHelper(t) helper.Tk().MustExec("use test;") storage := helper.Storage() - provider := &mockStatusProvider{} + ctrl := mock_controller.NewMockController(gomock.NewController(t)) cfg := &ChangefeedConfig{} h := &APIV2HelpersImpl{} - cfInfo, err := h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage) + cfInfo, err := h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage) require.Nil(t, cfInfo) require.NotNil(t, err) cfg.SinkURI = "blackhole://" + ctrl.EXPECT().IsChangefeedExists(gomock.Any(), gomock.Any()).Return(false, nil) // repliconfig is nil require.Panics(t, func() { - _, _ = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage) + _, _ = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage) }) cfg.ReplicaConfig = GetDefaultReplicaConfig() cfg.ReplicaConfig.ForceReplicate = true cfg.ReplicaConfig.EnableOldValue = false cfg.SinkURI = "mysql://" + ctrl.EXPECT().IsChangefeedExists(gomock.Any(), gomock.Any()).Return(false, nil) // disable old value but force replicate, and using mysql sink. - cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage) + cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage) require.NotNil(t, err) require.Nil(t, cfInfo) cfg.ReplicaConfig.ForceReplicate = false cfg.ReplicaConfig.IgnoreIneligibleTable = true cfg.SinkURI = "blackhole://" - cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage) + ctrl.EXPECT().IsChangefeedExists(gomock.Any(), gomock.Any()).Return(false, nil) + cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage) require.Nil(t, err) require.NotNil(t, cfInfo) require.NotEqual(t, "", cfInfo.ID) require.Equal(t, model.DefaultNamespace, cfInfo.Namespace) require.NotEqual(t, 0, cfInfo.Epoch) + // invalid changefeed id or namespace id cfg.ID = "abdc/sss" - cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage) + cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage) require.NotNil(t, err) cfg.ID = "" cfg.Namespace = "abdc/sss" - cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage) + cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage) require.NotNil(t, err) cfg.ID = "" cfg.Namespace = "" // changefeed already exists - provider.changefeedStatus = &model.ChangeFeedStatusForAPI{} - cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage) + ctrl.EXPECT().IsChangefeedExists(gomock.Any(), gomock.Any()).Return(true, nil) + cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage) require.NotNil(t, err) - provider.changefeedStatus = nil - provider.err = cerror.ErrChangeFeedNotExists.GenWithStackByArgs("aaa") - cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage) + ctrl.EXPECT().IsChangefeedExists(gomock.Any(), gomock.Any()).Return(false, cerror.ErrChangeFeedNotExists.GenWithStackByArgs("aaa")) + cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage) require.Nil(t, err) require.Equal(t, uint64(123), cfInfo.UpstreamID) cfg.TargetTs = 3 cfg.StartTs = 4 - cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage) + ctrl.EXPECT().IsChangefeedExists(gomock.Any(), gomock.Any()).Return(false, nil) + cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage) require.NotNil(t, err) cfg.TargetTs = 6 cfg.ReplicaConfig.EnableOldValue = false cfg.SinkURI = "aaab://" - cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage) + ctrl.EXPECT().IsChangefeedExists(gomock.Any(), gomock.Any()).Return(false, nil) + cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage) require.NotNil(t, err) cfg.SinkURI = string([]byte{0x7f, ' '}) - cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage) + ctrl.EXPECT().IsChangefeedExists(gomock.Any(), gomock.Any()).Return(false, nil) + cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage) require.NotNil(t, err) cfg.StartTs = 0 @@ -97,12 +105,14 @@ func TestVerifyCreateChangefeedConfig(t *testing.T) { cfg.SinkURI = "blackhole://127.0.0.1:9092/test?protocol=avro" cfg.ReplicaConfig.EnableOldValue = true cfg.ReplicaConfig.ForceReplicate = false - cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage) + ctrl.EXPECT().IsChangefeedExists(gomock.Any(), gomock.Any()).Return(false, nil) + cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage) require.NoError(t, err) require.False(t, cfInfo.Config.EnableOldValue) cfg.ReplicaConfig.ForceReplicate = true - cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage) + ctrl.EXPECT().IsChangefeedExists(gomock.Any(), gomock.Any()).Return(false, nil) + cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage) require.Error(t, cerror.ErrOldValueNotEnabled, err) } diff --git a/cdc/api/v2/changefeed.go b/cdc/api/v2/changefeed.go index b962bc0a25e..c64f1f1b6f6 100644 --- a/cdc/api/v2/changefeed.go +++ b/cdc/api/v2/changefeed.go @@ -91,6 +91,12 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) { _ = c.Error(cerror.WrapError(cerror.ErrNewStore, err)) return } + ctrl, err := h.capture.GetController() + if err != nil { + _ = c.Error(err) + return + } + // We should not close kvStorage since all kvStorage in cdc is the same one. // defer kvStorage.Close() // TODO: We should get a kvStorage from upstream instead of creating a new one @@ -98,7 +104,7 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) { ctx, cfg, pdClient, - h.capture.StatusProvider(), + ctrl, h.capture.GetEtcdClient().GetEnsureGCServiceID(gc.EnsureGCServiceCreating), kvStorage) if err != nil { @@ -564,7 +570,12 @@ func (h *OpenAPIV2) deleteChangefeed(c *gin.Context) { changefeedID.ID)) return } - _, err := h.capture.StatusProvider().GetChangeFeedStatus(ctx, changefeedID) + ctrl, err := h.capture.GetController() + if err != nil { + _ = c.Error(err) + return + } + exist, err := ctrl.IsChangefeedExists(ctx, changefeedID) if err != nil { if cerror.ErrChangeFeedNotExists.Equal(err) { c.JSON(http.StatusOK, &EmptyResponse{}) @@ -573,7 +584,12 @@ func (h *OpenAPIV2) deleteChangefeed(c *gin.Context) { _ = c.Error(err) return } + if !exist { + c.JSON(http.StatusOK, &EmptyResponse{}) + return + } + // todo: controller call metastroe api to remove the changefeed job := model.AdminJob{ CfID: changefeedID, Type: model.AdminRemove, @@ -587,13 +603,16 @@ func (h *OpenAPIV2) deleteChangefeed(c *gin.Context) { // Owner needs at least two ticks to remove a changefeed, // we need to wait for it. err = retry.Do(ctx, func() error { - _, err := h.capture.StatusProvider().GetChangeFeedStatus(ctx, changefeedID) + exist, err = ctrl.IsChangefeedExists(ctx, changefeedID) if err != nil { if strings.Contains(err.Error(), "ErrChangeFeedNotExists") { return nil } return err } + if !exist { + return nil + } return cerror.ErrChangeFeedDeletionUnfinished.GenWithStackByArgs(changefeedID) }, retry.WithMaxTries(100), // max retry duration is 1 minute diff --git a/cdc/api/v2/changefeed_test.go b/cdc/api/v2/changefeed_test.go index 64217a0db40..d4390d3ed46 100644 --- a/cdc/api/v2/changefeed_test.go +++ b/cdc/api/v2/changefeed_test.go @@ -27,9 +27,9 @@ import ( "github.com/golang/mock/gomock" tidbkv "github.com/pingcap/tidb/kv" mock_capture "github.com/pingcap/tiflow/cdc/capture/mock" + "github.com/pingcap/tiflow/cdc/controller" mock_controller "github.com/pingcap/tiflow/cdc/controller/mock" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/cdc/owner" mock_owner "github.com/pingcap/tiflow/cdc/owner/mock" "github.com/pingcap/tiflow/pkg/config" cerrors "github.com/pingcap/tiflow/pkg/errors" @@ -66,15 +66,15 @@ func TestCreateChangefeed(t *testing.T) { defer testEtcdCluster.Terminate(t) mockUpManager := upstream.NewManager4Test(pdClient) - statusProvider := &mockStatusProvider{} etcdClient.EXPECT(). GetEnsureGCServiceID(gomock.Any()). Return(etcd.GcServiceIDForTest()).AnyTimes() - cp.EXPECT().StatusProvider().Return(statusProvider).AnyTimes() cp.EXPECT().GetEtcdClient().Return(etcdClient).AnyTimes() cp.EXPECT().GetUpstreamManager().Return(mockUpManager, nil).AnyTimes() cp.EXPECT().IsReady().Return(true).AnyTimes() cp.EXPECT().IsController().Return(true).AnyTimes() + ctrl := mock_controller.NewMockController(gomock.NewController(t)) + cp.EXPECT().GetController().Return(ctrl, nil).AnyTimes() // case 1: json format mismatches with the spec. errConfig := struct { @@ -187,7 +187,7 @@ func TestCreateChangefeed(t *testing.T) { DoAndReturn(func(ctx context.Context, cfg *ChangefeedConfig, pdClient pd.Client, - statusProvider owner.StatusProvider, + ctrl controller.Controller, ensureGCServiceID string, kvStorage tidbkv.Storage, ) (*model.ChangeFeedInfo, error) { @@ -813,19 +813,17 @@ func TestDeleteChangefeed(t *testing.T) { pdClient := &mockPDClient{} etcdClient := mock_etcd.NewMockCDCEtcdClient(gomock.NewController(t)) mockUpManager := upstream.NewManager4Test(pdClient) - statusProvider := mock_owner.NewMockStatusProvider(gomock.NewController(t)) - statusProvider.EXPECT().IsChangefeedOwner(gomock.Any(), gomock.Any()). - Return(true, nil).AnyTimes() etcdClient.EXPECT(). GetEnsureGCServiceID(gomock.Any()). Return(etcd.GcServiceIDForTest()).AnyTimes() - cp.EXPECT().StatusProvider().Return(statusProvider).AnyTimes() + ctrl := mock_controller.NewMockController(gomock.NewController(t)) cp.EXPECT().GetEtcdClient().Return(etcdClient).AnyTimes() cp.EXPECT().GetUpstreamManager().Return(mockUpManager, nil).AnyTimes() cp.EXPECT().IsReady().Return(true).AnyTimes() cp.EXPECT().IsController().Return(true).AnyTimes() cp.EXPECT().GetOwner().Return(owner, nil).AnyTimes() + cp.EXPECT().GetController().Return(ctrl, nil).AnyTimes() owner.EXPECT().EnqueueJob(gomock.Any(), gomock.Any()). Do(func(adminJob model.AdminJob, done chan<- error) { require.EqualValues(t, changeFeedID, adminJob.CfID) @@ -846,8 +844,7 @@ func TestDeleteChangefeed(t *testing.T) { // case 2: changefeed not exists validID := changeFeedID.ID - statusProvider.EXPECT().GetChangeFeedStatus(gomock.Any(), gomock.Any()).Return( - nil, cerrors.ErrChangeFeedNotExists.GenWithStackByArgs(validID)) + ctrl.EXPECT().IsChangefeedExists(gomock.Any(), changeFeedID).Return(false, nil) w = httptest.NewRecorder() req, _ = http.NewRequestWithContext(context.Background(), remove.method, fmt.Sprintf(remove.url, validID), nil) @@ -855,8 +852,8 @@ func TestDeleteChangefeed(t *testing.T) { require.Equal(t, http.StatusOK, w.Code) // case 3: query changefeed error - statusProvider.EXPECT().GetChangeFeedStatus(gomock.Any(), gomock.Any()).Return( - nil, cerrors.ErrChangefeedUpdateRefused.GenWithStackByArgs(validID)) + ctrl.EXPECT().IsChangefeedExists(gomock.Any(), changeFeedID).Return(false, + cerrors.ErrChangefeedUpdateRefused.GenWithStackByArgs(validID)) w = httptest.NewRecorder() req, _ = http.NewRequestWithContext(context.Background(), remove.method, fmt.Sprintf(remove.url, validID), nil) @@ -867,10 +864,9 @@ func TestDeleteChangefeed(t *testing.T) { require.Contains(t, respErr.Code, "ErrChangefeedUpdateRefused") // case 4: remove changefeed - statusProvider.EXPECT().GetChangeFeedStatus(gomock.Any(), gomock.Any()).Return( - &model.ChangeFeedStatusForAPI{}, nil) - statusProvider.EXPECT().GetChangeFeedStatus(gomock.Any(), gomock.Any()).Return( - nil, cerrors.ErrChangeFeedNotExists.GenWithStackByArgs(validID)) + ctrl.EXPECT().IsChangefeedExists(gomock.Any(), changeFeedID).Return(true, nil) + ctrl.EXPECT().IsChangefeedExists(gomock.Any(), changeFeedID).Return(false, + cerrors.ErrChangeFeedNotExists.GenWithStackByArgs(validID)) w = httptest.NewRecorder() req, _ = http.NewRequestWithContext(context.Background(), remove.method, fmt.Sprintf(remove.url, validID), nil) @@ -878,8 +874,7 @@ func TestDeleteChangefeed(t *testing.T) { require.Equal(t, http.StatusOK, w.Code) // case 5: remove changefeed failed - statusProvider.EXPECT().GetChangeFeedStatus(gomock.Any(), gomock.Any()).AnyTimes().Return( - &model.ChangeFeedStatusForAPI{}, nil) + ctrl.EXPECT().IsChangefeedExists(gomock.Any(), changeFeedID).Return(true, nil).AnyTimes() w = httptest.NewRecorder() req, _ = http.NewRequestWithContext(context.Background(), remove.method, fmt.Sprintf(remove.url, validID), nil)