Skip to content

Commit

Permalink
openapi(dm): add task config template releated openapi (#3656)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ehco1996 authored Dec 21, 2021
1 parent d5644a0 commit 9ef831f
Show file tree
Hide file tree
Showing 6 changed files with 1,571 additions and 251 deletions.
118 changes: 118 additions & 0 deletions dm/dm/master/openapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tiflow/dm/dm/pb"
"github.com/pingcap/tiflow/dm/openapi"
"github.com/pingcap/tiflow/dm/pkg/conn"
"github.com/pingcap/tiflow/dm/pkg/ha"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/terror"
"github.com/pingcap/tiflow/dm/pkg/utils"
Expand Down Expand Up @@ -822,6 +823,123 @@ func (s *Server) DMAPIOperateTableStructure(c *gin.Context, taskName string, sou
}
}

// DMAPIImportTaskConfig create task_config_template url is: (POST /api/v1/task/configs/import).
func (s *Server) DMAPIImportTaskConfig(c *gin.Context) {
var req openapi.TaskConfigRequest
if err := c.Bind(&req); err != nil {
_ = c.Error(err)
return
}
resp := openapi.TaskConfigResponse{
FailedTaskList: []struct {
ErrorMsg string `json:"error_msg"`
TaskName string `json:"task_name"`
}{},
SuccessTaskList: []string{},
}
for _, task := range config.SubTaskConfigsToOpenAPITask(s.scheduler.GetSubTaskCfgs()) {
if err := ha.PutOpenAPITaskConfig(s.etcdClient, task, req.Overwrite); err != nil {
resp.FailedTaskList = append(resp.FailedTaskList, struct {
ErrorMsg string `json:"error_msg"`
TaskName string `json:"task_name"`
}{
ErrorMsg: err.Error(),
TaskName: task.Name,
})
} else {
resp.SuccessTaskList = append(resp.SuccessTaskList, task.Name)
}
}
c.IndentedJSON(http.StatusAccepted, resp)
}

// DMAPICreateTaskConfig create task_config_template url is: (POST /api/task/configs).
func (s *Server) DMAPICreateTaskConfig(c *gin.Context) {
task := &openapi.Task{}
if err := c.Bind(task); err != nil {
_ = c.Error(err)
return
}
if err := task.Adjust(); err != nil {
_ = c.Error(err)
return
}
// prepare target db config
newCtx := c.Request.Context()
toDBCfg := config.GetTargetDBCfgFromOpenAPITask(task)
if adjustDBErr := adjustTargetDB(newCtx, toDBCfg); adjustDBErr != nil {
_ = c.Error(terror.WithClass(adjustDBErr, terror.ClassDMMaster))
return
}
if err := ha.PutOpenAPITaskConfig(s.etcdClient, *task, false); err != nil {
_ = c.Error(err)
return
}
c.IndentedJSON(http.StatusCreated, task)
}

// DMAPIGetTaskConfigList get task_config_template list url is: (GET /api/v1/task/configs).
func (s *Server) DMAPIGetTaskConfigList(c *gin.Context) {
TaskConfigList, err := ha.GetAllOpenAPITaskConfig(s.etcdClient)
if err != nil {
_ = c.Error(err)
return
}
taskList := make([]openapi.Task, len(TaskConfigList))
for i, TaskConfig := range TaskConfigList {
taskList[i] = *TaskConfig
}
resp := openapi.GetTaskListResponse{Total: len(TaskConfigList), Data: taskList}
c.IndentedJSON(http.StatusOK, resp)
}

// DMAPIDeleteTaskConfig delete task_config_template url is: (DELETE /api/v1/task/configs/{task-name}).
func (s *Server) DMAPIDeleteTaskConfig(c *gin.Context, taskName string) {
if err := ha.DeleteOpenAPITaskConfig(s.etcdClient, taskName); err != nil {
_ = c.Error(err)
return
}
c.Status(http.StatusNoContent)
}

// DMAPIGetTaskConfig get task_config_template url is: (GET /api/v1/task/configs/{task-name}).
func (s *Server) DMAPIGetTaskConfig(c *gin.Context, taskName string) {
task, err := ha.GetOpenAPITaskConfig(s.etcdClient, taskName)
if err != nil {
_ = c.Error(err)
return
}
if task == nil {
_ = c.Error(terror.ErrOpenAPITaskConfigNotExist.Generate(taskName))
return
}
c.IndentedJSON(http.StatusOK, task)
}

// DMAPUpdateTaskConfig update task_config_template url is: (PUT /api/v1/task/configs/{task-name}).
func (s *Server) DMAPUpdateTaskConfig(c *gin.Context, taskName string) {
task := &openapi.Task{}
if err := c.Bind(task); err != nil {
_ = c.Error(err)
return
}
if err := task.Adjust(); err != nil {
_ = c.Error(err)
return
}
newCtx := c.Request.Context()
toDBCfg := config.GetTargetDBCfgFromOpenAPITask(task)
if adjustDBErr := adjustTargetDB(newCtx, toDBCfg); adjustDBErr != nil {
_ = c.Error(terror.WithClass(adjustDBErr, terror.ClassDMMaster))
return
}
if err := ha.UpdateOpenAPITaskConfig(s.etcdClient, *task); err != nil {
_ = c.Error(err)
return
}
c.IndentedJSON(http.StatusOK, task)
}

func terrorHTTPErrorHandler() gin.HandlerFunc {
return func(c *gin.Context) {
c.Next()
Expand Down
133 changes: 133 additions & 0 deletions dm/dm/master/openapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,33 @@ func (t *openAPISuite) TestTaskAPI(c *check.C) {
c.Assert(resultTaskList.Total, check.Equals, 1)
c.Assert(resultTaskList.Data[0].Name, check.Equals, task.Name)

// test batch import task config
taskBatchImportURL := "/api/v1/task/configs/import"
req := openapi.TaskConfigRequest{Overwrite: false}
result = testutil.NewRequest().Post(taskBatchImportURL).WithJsonBody(req).GoWithHTTPHandler(t.testT, s.openapiHandles)
c.Assert(result.Code(), check.Equals, http.StatusAccepted)
var resp openapi.TaskConfigResponse
c.Assert(result.UnmarshalBodyToObject(&resp), check.IsNil)
c.Assert(resp.SuccessTaskList, check.HasLen, 1)
c.Assert(resp.SuccessTaskList[0], check.Equals, task.Name)
c.Assert(resp.FailedTaskList, check.HasLen, 0)

// import again without overwrite will fail
result = testutil.NewRequest().Post(taskBatchImportURL).WithJsonBody(req).GoWithHTTPHandler(t.testT, s.openapiHandles)
c.Assert(result.Code(), check.Equals, http.StatusAccepted)
c.Assert(result.UnmarshalBodyToObject(&resp), check.IsNil)
c.Assert(resp.SuccessTaskList, check.HasLen, 0)
c.Assert(resp.FailedTaskList, check.HasLen, 1)
c.Assert(resp.FailedTaskList[0].TaskName, check.Equals, task.Name)

// import again with overwrite will success
req.Overwrite = true
result = testutil.NewRequest().Post(taskBatchImportURL).WithJsonBody(req).GoWithHTTPHandler(t.testT, s.openapiHandles)
c.Assert(result.UnmarshalBodyToObject(&resp), check.IsNil)
c.Assert(resp.SuccessTaskList, check.HasLen, 1)
c.Assert(resp.SuccessTaskList[0], check.Equals, task.Name)
c.Assert(resp.FailedTaskList, check.HasLen, 0)

// pause and resume task
pauseTaskURL := fmt.Sprintf("%s/%s/pause", taskURL, task.Name)
result = testutil.NewRequest().Post(pauseTaskURL).GoWithHTTPHandler(t.testT, s.openapiHandles)
Expand Down Expand Up @@ -634,6 +661,112 @@ func (t *openAPISuite) TestClusterAPI(c *check.C) {
cancel1()
}

func (t *openAPISuite) TestTaskConfigsAPI(c *check.C) {
ctx, cancel := context.WithCancel(context.Background())
s := setupServer(ctx, c)
c.Assert(failpoint.Enable("github.com/pingcap/tiflow/dm/dm/master/MockSkipAdjustTargetDB", `return(true)`), check.IsNil)
checker.CheckSyncConfigFunc = mockCheckSyncConfig
defer func() {
checker.CheckSyncConfigFunc = checker.CheckSyncConfig
cancel()
s.Close()
c.Assert(failpoint.Disable("github.com/pingcap/tiflow/dm/dm/master/MockSkipAdjustTargetDB"), check.IsNil)
}()

dbCfg := config.GetDBConfigForTest()
source1 := openapi.Source{
SourceName: source1Name,
EnableGtid: false,
Host: dbCfg.Host,
Password: dbCfg.Password,
Port: dbCfg.Port,
User: dbCfg.User,
}
// create source
sourceURL := "/api/v1/sources"
result := testutil.NewRequest().Post(sourceURL).WithJsonBody(source1).GoWithHTTPHandler(t.testT, s.openapiHandles)
// check http status code
c.Assert(result.Code(), check.Equals, http.StatusCreated)

// create task config template
url := "/api/v1/task/configs"

task, err := fixtures.GenNoShardOpenAPITaskForTest()
c.Assert(err, check.IsNil)
// use a valid target db
task.TargetConfig.Host = dbCfg.Host
task.TargetConfig.Port = dbCfg.Port
task.TargetConfig.User = dbCfg.User
task.TargetConfig.Password = dbCfg.Password

// create one
result = testutil.NewRequest().Post(url).WithJsonBody(task).GoWithHTTPHandler(t.testT, s.openapiHandles)
c.Assert(result.Code(), check.Equals, http.StatusCreated)
var createTaskResp openapi.Task
err = result.UnmarshalBodyToObject(&createTaskResp)
c.Assert(err, check.IsNil)
c.Assert(task.Name, check.Equals, createTaskResp.Name)

// create again will fail
result = testutil.NewRequest().Post(url).WithJsonBody(task).GoWithHTTPHandler(t.testT, s.openapiHandles)
c.Assert(result.Code(), check.Equals, http.StatusBadRequest)
var errResp openapi.ErrorWithMessage
err = result.UnmarshalBodyToObject(&errResp)
c.Assert(err, check.IsNil)
c.Assert(errResp.ErrorCode, check.Equals, int(terror.ErrOpenAPITaskConfigExist.Code()))

// list templates
result = testutil.NewRequest().Get(url).GoWithHTTPHandler(t.testT, s.openapiHandles)
c.Assert(result.Code(), check.Equals, http.StatusOK)
var resultTaskList openapi.GetTaskListResponse
err = result.UnmarshalBodyToObject(&resultTaskList)
c.Assert(err, check.IsNil)
c.Assert(resultTaskList.Total, check.Equals, 1)
c.Assert(resultTaskList.Data[0].Name, check.Equals, task.Name)

// get detail
oneURL := fmt.Sprintf("%s/%s", url, task.Name)
result = testutil.NewRequest().Get(oneURL).GoWithHTTPHandler(t.testT, s.openapiHandles)
c.Assert(result.Code(), check.Equals, http.StatusOK)
var respTask openapi.Task
err = result.UnmarshalBodyToObject(&respTask)
c.Assert(err, check.IsNil)
c.Assert(respTask.Name, check.Equals, task.Name)

// get not exist
notExistURL := fmt.Sprintf("%s/%s", url, "notexist")
result = testutil.NewRequest().Get(notExistURL).GoWithHTTPHandler(t.testT, s.openapiHandles)
c.Assert(result.Code(), check.Equals, http.StatusBadRequest)
err = result.UnmarshalBodyToObject(&errResp)
c.Assert(err, check.IsNil)
c.Assert(errResp.ErrorCode, check.Equals, int(terror.ErrOpenAPITaskConfigNotExist.Code()))

// update
task.TaskMode = openapi.TaskTaskModeAll
result = testutil.NewRequest().Put(oneURL).WithJsonBody(task).GoWithHTTPHandler(t.testT, s.openapiHandles)
c.Assert(result.Code(), check.Equals, http.StatusOK)
err = result.UnmarshalBodyToObject(&respTask)
c.Assert(err, check.IsNil)
c.Assert(respTask.Name, check.Equals, task.Name)

// update not exist will fail
task.Name = "notexist"
result = testutil.NewRequest().Put(notExistURL).WithJsonBody(task).GoWithHTTPHandler(t.testT, s.openapiHandles)
c.Assert(result.Code(), check.Equals, http.StatusBadRequest)
err = result.UnmarshalBodyToObject(&errResp)
c.Assert(err, check.IsNil)
c.Assert(errResp.ErrorCode, check.Equals, int(terror.ErrOpenAPITaskConfigNotExist.Code()))

// delete task config template
result = testutil.NewRequest().Delete(oneURL).GoWithHTTPHandler(t.testT, s.openapiHandles)
c.Assert(result.Code(), check.Equals, http.StatusNoContent)
result = testutil.NewRequest().Get(url).GoWithHTTPHandler(t.testT, s.openapiHandles)
c.Assert(result.Code(), check.Equals, http.StatusOK)
err = result.UnmarshalBodyToObject(&resultTaskList)
c.Assert(err, check.IsNil)
c.Assert(resultTaskList.Total, check.Equals, 0)
}

func setupServer(ctx context.Context, c *check.C) *Server {
// create a new cluster
cfg1 := NewConfig()
Expand Down
Loading

0 comments on commit 9ef831f

Please sign in to comment.