Skip to content

Commit

Permalink
api(ticdc): adjust the api forward rules (#9423)
Browse files Browse the repository at this point in the history
close #9422
  • Loading branch information
sdojjy authored Jul 27, 2023
1 parent 9d6622e commit 57d2ccc
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 82 deletions.
28 changes: 23 additions & 5 deletions cdc/api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ func RegisterOpenAPIRoutes(router *gin.Engine, api OpenAPI) {
changefeedGroup.GET("", controllerMiddleware, api.ListChangefeed)
changefeedGroup.GET("/:changefeed_id", changefeedOwnerMiddleware, api.GetChangefeed)
changefeedGroup.POST("", controllerMiddleware, api.CreateChangefeed)
changefeedGroup.PUT("/:changefeed_id", controllerMiddleware, api.UpdateChangefeed)
changefeedGroup.PUT("/:changefeed_id", changefeedOwnerMiddleware, api.UpdateChangefeed)
changefeedGroup.POST("/:changefeed_id/pause", changefeedOwnerMiddleware, api.PauseChangefeed)
changefeedGroup.POST("/:changefeed_id/resume", changefeedOwnerMiddleware, api.ResumeChangefeed)
changefeedGroup.DELETE("/:changefeed_id", changefeedOwnerMiddleware, api.RemoveChangefeed)
changefeedGroup.DELETE("/:changefeed_id", controllerMiddleware, api.RemoveChangefeed)
changefeedGroup.POST("/:changefeed_id/tables/rebalance_table", changefeedOwnerMiddleware, api.RebalanceTables)
changefeedGroup.POST("/:changefeed_id/tables/move_table", changefeedOwnerMiddleware, api.MoveTable)

Expand Down Expand Up @@ -297,7 +297,13 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) {
return
}

info, err := verifyCreateChangefeedConfig(ctx, changefeedConfig, h.capture)
ctrl, err := h.capture.GetController()
if err != nil {
_ = c.Error(err)
return
}
info, err := verifyCreateChangefeedConfig(ctx, changefeedConfig,
up, ctrl, h.capture.GetEtcdClient())
if err != nil {
_ = c.Error(err)
return
Expand Down Expand Up @@ -486,11 +492,20 @@ func (h *OpenAPI) RemoveChangefeed(c *gin.Context) {
return
}
// check if the changefeed exists
_, err := h.statusProvider().GetChangeFeedStatus(ctx, changefeedID)
ctrl, err := h.capture.GetController()
if err != nil {
_ = c.Error(err)
return
}
exist, err := ctrl.IsChangefeedExists(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
}
if !exist {
_ = c.Error(cerror.ErrChangeFeedNotExists.GenWithStackByArgs(changefeedID))
return
}

job := model.AdminJob{
CfID: changefeedID,
Expand All @@ -505,13 +520,16 @@ func (h *OpenAPI) RemoveChangefeed(c *gin.Context) {
// Owner needs at least tow ticks to remove a changefeed,
// we need to wait for it.
err = retry.Do(ctx, func() error {
_, err := h.statusProvider().GetChangeFeedStatus(ctx, changefeedID)
exist, err = ctrl.IsChangefeedExists(ctx, changefeedID)
if err != nil {
if strings.Contains(err.Error(), "ErrChangeFeedNotExists") {
return nil
}
return err
}
if !exist {
return nil
}
return cerror.ErrChangeFeedDeletionUnfinished.GenWithStackByArgs(changefeedID)
},
retry.WithMaxTries(100), // max retry duration is 1 minute
Expand Down
24 changes: 13 additions & 11 deletions cdc/api/v1/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ func newRouter(c capture.Capture, p owner.StatusProvider) *gin.Engine {
return router
}

func newRouterWithoutStatusProvider(c capture.Capture) *gin.Engine {
router := gin.New()
RegisterOpenAPIRoutes(router, NewOpenAPI(c))
return router
}

func newStatusProvider() *mockStatusProvider {
statusProvider := &mockStatusProvider{}
statusProvider.On("GetChangeFeedStatus", mock.Anything, changeFeedID).
Expand Down Expand Up @@ -415,22 +421,13 @@ func TestRemoveChangefeed(t *testing.T) {
ctrl := gomock.NewController(t)
mo := mock_owner.NewMockOwner(ctrl)

statusProvider := &mockStatusProvider{}
statusProvider.On("GetChangeFeedStatus", mock.Anything, changeFeedID).
Return(&model.ChangeFeedStatusForAPI{CheckpointTs: 1}, nil).Once()
statusProvider.On("GetChangeFeedStatus", mock.Anything, changeFeedID).
Return(new(model.ChangeFeedStatusForAPI),
cerror.ErrChangeFeedNotExists.FastGenByArgs(changeFeedID)).Once()
statusProvider.On("IsChangefeedOwner", mock.Anything, mock.Anything).
Return(true, nil).Times(3)
controller := mock2.NewMockController(ctrl)
capture := mock_capture.NewMockCapture(ctrl)
capture.EXPECT().GetController().Return(controller, nil).AnyTimes()
capture.EXPECT().GetOwner().Return(mo, nil).AnyTimes()
capture.EXPECT().IsReady().Return(true).AnyTimes()
capture.EXPECT().IsController().Return(true).AnyTimes()
capture.EXPECT().StatusProvider().Return(statusProvider).AnyTimes()
router1 := newRouter(capture, statusProvider)
router1 := newRouterWithoutStatusProvider(capture)

// test remove changefeed succeeded
mo.EXPECT().
Expand All @@ -440,13 +437,17 @@ func TestRemoveChangefeed(t *testing.T) {
require.EqualValues(t, model.AdminRemove, adminJob.Type)
close(done)
})
controller.EXPECT().IsChangefeedExists(gomock.Any(), gomock.Any()).Return(true, nil)
controller.EXPECT().IsChangefeedExists(gomock.Any(), gomock.Any()).Return(false, nil)
api := testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s", changeFeedID.ID), method: "DELETE"}
w := httptest.NewRecorder()
req, _ := http.NewRequestWithContext(context.Background(), api.method, api.url, nil)
router1.ServeHTTP(w, req)
require.Equal(t, 202, w.Code)

router2 := newRouter(capture, newStatusProvider())
router2 := newRouterWithoutStatusProvider(capture)
controller.EXPECT().IsChangefeedExists(gomock.Any(), gomock.Any()).Return(true, nil)

// test remove changefeed failed from owner side
mo.EXPECT().
EnqueueJob(gomock.Any(), gomock.Any()).
Expand All @@ -465,6 +466,7 @@ func TestRemoveChangefeed(t *testing.T) {
require.Contains(t, respErr.Error, "changefeed not exists")

// test remove changefeed failed
controller.EXPECT().IsChangefeedExists(gomock.Any(), nonExistChangefeedID).Return(false, nil)
api = testCase{
url: fmt.Sprintf("/api/v1/changefeeds/%s", nonExistChangefeedID.ID),
method: "DELETE",
Expand Down
26 changes: 10 additions & 16 deletions cdc/api/v1/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/cdc/controller"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/owner"
"github.com/pingcap/tiflow/cdc/sink/validator"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/version"
"github.com/r3labs/diff"
Expand All @@ -39,17 +41,10 @@ import (
func verifyCreateChangefeedConfig(
ctx context.Context,
changefeedConfig model.ChangefeedConfig,
capture capture.Capture,
up *upstream.Upstream,
ctrl controller.Controller,
ectdClient etcd.CDCEtcdClient,
) (*model.ChangeFeedInfo, error) {
upManager, err := capture.GetUpstreamManager()
if err != nil {
return nil, err
}
up, err := upManager.GetDefaultUpstream()
if err != nil {
return nil, err
}

// verify sinkURI
if changefeedConfig.SinkURI == "" {
return nil, cerror.ErrSinkURIInvalid.GenWithStackByArgs("sink-uri is empty, can't not create a changefeed without sink-uri")
Expand All @@ -60,12 +55,11 @@ func verifyCreateChangefeedConfig(
return nil, cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedConfig.ID)
}
// check if the changefeed exists
cfStatus, err := capture.StatusProvider().GetChangeFeedStatus(ctx,
model.DefaultChangeFeedID(changefeedConfig.ID))
ok, err := ctrl.IsChangefeedExists(ctx, model.DefaultChangeFeedID(changefeedConfig.ID))
if err != nil && cerror.ErrChangeFeedNotExists.NotEqual(err) {
return nil, err
}
if cfStatus != nil {
if ok {
return nil, cerror.ErrChangeFeedAlreadyExists.GenWithStackByArgs(changefeedConfig.ID)
}

Expand All @@ -83,7 +77,7 @@ func verifyCreateChangefeedConfig(
if err := gc.EnsureChangefeedStartTsSafety(
ctx,
up.PDClient,
capture.GetEtcdClient().GetEnsureGCServiceID(gc.EnsureGCServiceCreating),
ectdClient.GetEnsureGCServiceID(gc.EnsureGCServiceCreating),
model.DefaultChangeFeedID(changefeedConfig.ID),
ensureTTL, changefeedConfig.StartTS); err != nil {
if !cerror.ErrStartTsBeforeGC.Equal(err) {
Expand Down Expand Up @@ -122,7 +116,7 @@ func verifyCreateChangefeedConfig(
return nil, err
}

captureInfos, err := capture.StatusProvider().GetCaptures(ctx)
captureInfos, err := ctrl.GetCaptures(ctx)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions cdc/api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ func RegisterOpenAPIV2Routes(router *gin.Engine, api OpenAPIV2) {
changefeedGroup.GET("/:changefeed_id", changefeedOwnerMiddleware, api.getChangeFeed)
changefeedGroup.POST("", controllerMiddleware, api.createChangefeed)
changefeedGroup.GET("", controllerMiddleware, api.listChangeFeeds)
changefeedGroup.PUT("/:changefeed_id", controllerMiddleware, api.updateChangefeed)
changefeedGroup.DELETE("/:changefeed_id", changefeedOwnerMiddleware, api.deleteChangefeed)
changefeedGroup.PUT("/:changefeed_id", changefeedOwnerMiddleware, api.updateChangefeed)
changefeedGroup.DELETE("/:changefeed_id", controllerMiddleware, api.deleteChangefeed)
changefeedGroup.GET("/:changefeed_id/meta_info", changefeedOwnerMiddleware, api.getChangeFeedMetaInfo)
changefeedGroup.POST("/:changefeed_id/resume", changefeedOwnerMiddleware, api.resumeChangefeed)
changefeedGroup.POST("/:changefeed_id/pause", changefeedOwnerMiddleware, api.pauseChangefeed)
Expand Down
11 changes: 6 additions & 5 deletions cdc/api/v2/api_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tiflow/cdc/controller"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/kv"
"github.com/pingcap/tiflow/cdc/model"
Expand Down Expand Up @@ -55,12 +56,12 @@ const RegisterImportTaskPrefix = "/tidb/brie/import"
// Defining it as an interface to make APIs more testable.
type APIV2Helpers interface {
// verifyCreateChangefeedConfig verifies the changefeedConfig,
// and yield an valid changefeedInfo or error
// and yield a valid changefeedInfo or error
verifyCreateChangefeedConfig(
ctx context.Context,
cfg *ChangefeedConfig,
pdClient pd.Client,
statusProvider owner.StatusProvider,
ctrl controller.Controller,
ensureGCServiceID string,
kvStorage tidbkv.Storage,
) (*model.ChangeFeedInfo, error)
Expand Down Expand Up @@ -127,7 +128,7 @@ func (APIV2HelpersImpl) verifyCreateChangefeedConfig(
ctx context.Context,
cfg *ChangefeedConfig,
pdClient pd.Client,
statusProvider owner.StatusProvider,
ctrl controller.Controller,
ensureGCServiceID string,
kvStorage tidbkv.Storage,
) (*model.ChangeFeedInfo, error) {
Expand All @@ -154,12 +155,12 @@ func (APIV2HelpersImpl) verifyCreateChangefeedConfig(
"invalid namespace: %s", cfg.Namespace)
}

cfStatus, err := statusProvider.GetChangeFeedStatus(ctx,
exists, err := ctrl.IsChangefeedExists(ctx,
model.ChangeFeedID{Namespace: cfg.Namespace, ID: cfg.ID})
if err != nil && cerror.ErrChangeFeedNotExists.NotEqual(err) {
return nil, err
}
if cfStatus != nil {
if exists {
return nil, cerror.ErrChangeFeedAlreadyExists.GenWithStackByArgs(cfg.ID)
}

Expand Down
10 changes: 5 additions & 5 deletions cdc/api/v2/api_helpers_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 57d2ccc

Please sign in to comment.