Skip to content

Commit 45d36ae

Browse files
authored
[pingcap#2165] dmctl: change pause-task/resume-task/stop-task to support operate tasks bound to a source. (pingcap#2166)
1 parent 2cae49e commit 45d36ae

File tree

12 files changed

+405
-83
lines changed

12 files changed

+405
-83
lines changed

dm/ctl/master/operate_task.go

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
// Copyright 2021 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package master
15+
16+
import (
17+
"context"
18+
"errors"
19+
"os"
20+
"sort"
21+
"strings"
22+
"sync"
23+
24+
"github.com/spf13/cobra"
25+
26+
"github.com/pingcap/dm/dm/ctl/common"
27+
"github.com/pingcap/dm/dm/pb"
28+
)
29+
30+
const (
31+
batchSizeFlag = "batch-size"
32+
defaultBatchSize = 5
33+
)
34+
35+
type batchTaskResult struct {
36+
Result bool `json:"result"`
37+
Msg string `json:"msg"`
38+
Tasks []*operateTaskResult `json:"tasks"`
39+
}
40+
41+
type operateTaskResult struct {
42+
Task string `json:"task"`
43+
Op string `json:"op"`
44+
Result bool `json:"result"`
45+
Msg string `json:"msg"`
46+
Sources []*pb.CommonWorkerResponse `json:"sources"`
47+
}
48+
49+
func operateTaskFunc(taskOp pb.TaskOp, cmd *cobra.Command) error {
50+
argLen := len(cmd.Flags().Args())
51+
if argLen == 0 {
52+
// may want to operate tasks bound to a source
53+
return operateSourceTaskFunc(taskOp, cmd)
54+
} else if argLen > 1 {
55+
// can pass at most one task-name/task-conf
56+
cmd.SetOut(os.Stdout)
57+
common.PrintCmdUsage(cmd)
58+
return errors.New("please check output to see error")
59+
}
60+
61+
name := common.GetTaskNameFromArgOrFile(cmd.Flags().Arg(0))
62+
sources, err := common.GetSourceArgs(cmd)
63+
if err != nil {
64+
return err
65+
}
66+
67+
resp, err := common.OperateTask(taskOp, name, sources)
68+
if err != nil {
69+
common.PrintLinesf("can not %s task %s", strings.ToLower(taskOp.String()), name)
70+
return err
71+
}
72+
73+
common.PrettyPrintResponse(resp)
74+
return nil
75+
}
76+
77+
func addOperateSourceTaskFlags(cmd *cobra.Command) {
78+
// control workload to dm-cluster for sources with large number of tasks.
79+
cmd.Flags().Int(batchSizeFlag, defaultBatchSize, "batch size when operating all (sub)tasks bound to a source")
80+
}
81+
82+
func operateSourceTaskFunc(taskOp pb.TaskOp, cmd *cobra.Command) error {
83+
source, batchSize, err := parseOperateSourceTaskParams(cmd)
84+
if err != nil {
85+
cmd.SetOut(os.Stdout)
86+
common.PrintCmdUsage(cmd)
87+
return errors.New("please check output to see error")
88+
}
89+
90+
sources := []string{source}
91+
ctx, cancel := context.WithTimeout(context.Background(), common.GlobalConfig().RPCTimeout)
92+
defer cancel()
93+
94+
req := pb.QueryStatusListRequest{Sources: sources}
95+
resp := &pb.QueryStatusListResponse{}
96+
if err := common.SendRequest(ctx, "QueryStatus", &req, &resp); err != nil {
97+
common.PrintLinesf("cannot query status of source: %v", sources)
98+
return err
99+
}
100+
101+
if !resp.Result || len(resp.Sources) == 0 {
102+
common.PrettyPrintInterface(&batchTaskResult{Result: false, Msg: resp.Msg, Tasks: []*operateTaskResult{}})
103+
return nil
104+
}
105+
106+
result := batchOperateTask(taskOp, batchSize, sources, resp.Sources[0].SubTaskStatus)
107+
common.PrettyPrintInterface(result)
108+
109+
return nil
110+
}
111+
112+
func batchOperateTask(taskOp pb.TaskOp, batchSize int, sources []string, subTaskStatus []*pb.SubTaskStatus) *batchTaskResult {
113+
result := batchTaskResult{Result: true, Tasks: []*operateTaskResult{}}
114+
115+
if len(subTaskStatus) < batchSize {
116+
batchSize = len(subTaskStatus)
117+
}
118+
119+
workCh := make(chan string)
120+
go func() {
121+
for _, subTask := range subTaskStatus {
122+
workCh <- subTask.Name
123+
}
124+
close(workCh)
125+
}()
126+
127+
var wg sync.WaitGroup
128+
resultCh := make(chan *operateTaskResult, 1)
129+
for i := 0; i < batchSize; i++ {
130+
wg.Add(1)
131+
go func() {
132+
defer wg.Done()
133+
134+
for name := range workCh {
135+
taskResult := operateTaskResult{Task: name, Op: taskOp.String()}
136+
taskOpResp, err := common.OperateTask(taskOp, name, sources)
137+
if err != nil {
138+
taskResult.Result = false
139+
taskResult.Msg = err.Error()
140+
} else {
141+
taskResult.Result = taskOpResp.Result
142+
taskResult.Msg = taskOpResp.Msg
143+
taskResult.Sources = taskOpResp.Sources
144+
}
145+
resultCh <- &taskResult
146+
}
147+
}()
148+
}
149+
150+
go func() {
151+
wg.Wait()
152+
close(resultCh)
153+
}()
154+
155+
for item := range resultCh {
156+
result.Tasks = append(result.Tasks, item)
157+
}
158+
159+
sort.Slice(result.Tasks, func(i, j int) bool {
160+
return result.Tasks[i].Task < result.Tasks[j].Task
161+
})
162+
163+
return &result
164+
}
165+
166+
func parseOperateSourceTaskParams(cmd *cobra.Command) (string, int, error) {
167+
sources, err := common.GetSourceArgs(cmd)
168+
if err != nil {
169+
return "", 0, err
170+
}
171+
if len(sources) == 0 {
172+
common.PrintLinesf(`must give one source-name when task-name/task-conf is not specified`)
173+
return "", 0, errors.New("missing source")
174+
} else if len(sources) > 1 {
175+
common.PrintLinesf(`can give only one source-name when task-name/task-conf is not specified`)
176+
return "", 0, errors.New("too many source")
177+
}
178+
batchSize, err := cmd.Flags().GetInt(batchSizeFlag)
179+
if err != nil {
180+
common.PrintLinesf("error in parse `--" + batchSizeFlag + "`")
181+
return "", 0, err
182+
}
183+
return sources[0], batchSize, nil
184+
}

dm/ctl/master/operate_task_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Copyright 2021 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package master
15+
16+
import (
17+
"github.com/pingcap/check"
18+
"github.com/spf13/cobra"
19+
)
20+
21+
func (t *testCtlMaster) TestParseBatchTaskParameters(c *check.C) {
22+
{
23+
cmd := prepareTestCmd()
24+
_ = cmd.ParseFlags([]string{"task-name"})
25+
_, _, err := parseOperateSourceTaskParams(cmd)
26+
c.Assert(err, check.Not(check.IsNil))
27+
}
28+
{
29+
cmd := prepareTestCmd()
30+
_, _, err := parseOperateSourceTaskParams(cmd)
31+
c.Assert(err, check.Not(check.IsNil))
32+
}
33+
{
34+
cmd := prepareTestCmd()
35+
_ = cmd.ParseFlags([]string{"-s", "source-name", "-s", "source-name2"})
36+
_, _, err := parseOperateSourceTaskParams(cmd)
37+
c.Assert(err, check.Not(check.IsNil))
38+
}
39+
{
40+
cmd := prepareTestCmd()
41+
_ = cmd.ParseFlags([]string{"-s", "source-name"})
42+
source, _, err := parseOperateSourceTaskParams(cmd)
43+
c.Assert(source, check.Equals, "source-name")
44+
c.Assert(err, check.IsNil)
45+
}
46+
{
47+
cmd := prepareTestCmd()
48+
_ = cmd.ParseFlags([]string{"-s", "source-name"})
49+
source, batchSize, err := parseOperateSourceTaskParams(cmd)
50+
c.Assert(source, check.Equals, "source-name")
51+
c.Assert(batchSize, check.Equals, defaultBatchSize)
52+
c.Assert(err, check.IsNil)
53+
}
54+
{
55+
cmd := prepareTestCmd()
56+
_ = cmd.ParseFlags([]string{"-s", "source-name", "--batch-size", "2"})
57+
source, batchSize, err := parseOperateSourceTaskParams(cmd)
58+
c.Assert(source, check.Equals, "source-name")
59+
c.Assert(batchSize, check.Equals, 2)
60+
c.Assert(err, check.IsNil)
61+
}
62+
}
63+
64+
func prepareTestCmd() *cobra.Command {
65+
cmd := NewPauseTaskCmd()
66+
// --source is added in ctl package, import it may cause cyclic import, so we mock one
67+
cmd.PersistentFlags().StringSliceVarP(&[]string{}, "source", "s", []string{}, "MySQL Source ID.")
68+
return cmd
69+
}

dm/ctl/master/pause_task.go

Lines changed: 4 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -14,46 +14,23 @@
1414
package master
1515

1616
import (
17-
"errors"
18-
"os"
19-
2017
"github.com/spf13/cobra"
2118

22-
"github.com/pingcap/dm/dm/ctl/common"
2319
"github.com/pingcap/dm/dm/pb"
2420
)
2521

2622
// NewPauseTaskCmd creates a PauseTask command.
2723
func NewPauseTaskCmd() *cobra.Command {
2824
cmd := &cobra.Command{
29-
Use: "pause-task [-s source ...] <task-name | task-file>",
30-
Short: "Pauses a specified running task",
25+
Use: `pause-task [-s source ...] [task-name | task-file]`,
26+
Short: "Pauses a specified running task or all (sub)tasks bound to a source",
3127
RunE: pauseTaskFunc,
3228
}
29+
addOperateSourceTaskFlags(cmd)
3330
return cmd
3431
}
3532

3633
// pauseTaskFunc does pause task request.
3734
func pauseTaskFunc(cmd *cobra.Command, _ []string) (err error) {
38-
if len(cmd.Flags().Args()) != 1 {
39-
cmd.SetOut(os.Stdout)
40-
common.PrintCmdUsage(cmd)
41-
err = errors.New("please check output to see error")
42-
return
43-
}
44-
name := common.GetTaskNameFromArgOrFile(cmd.Flags().Arg(0))
45-
46-
sources, err := common.GetSourceArgs(cmd)
47-
if err != nil {
48-
return
49-
}
50-
51-
resp, err := common.OperateTask(pb.TaskOp_Pause, name, sources)
52-
if err != nil {
53-
common.PrintLinesf("can not pause task %s", name)
54-
return
55-
}
56-
57-
common.PrettyPrintResponse(resp)
58-
return
35+
return operateTaskFunc(pb.TaskOp_Pause, cmd)
5936
}

dm/ctl/master/resume_task.go

Lines changed: 4 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -14,46 +14,23 @@
1414
package master
1515

1616
import (
17-
"errors"
18-
"os"
19-
2017
"github.com/spf13/cobra"
2118

22-
"github.com/pingcap/dm/dm/ctl/common"
2319
"github.com/pingcap/dm/dm/pb"
2420
)
2521

2622
// NewResumeTaskCmd creates a ResumeTask command.
2723
func NewResumeTaskCmd() *cobra.Command {
2824
cmd := &cobra.Command{
29-
Use: "resume-task [-s source ...] <task-name | task-file>",
30-
Short: "Resumes a specified paused task",
25+
Use: "resume-task [-s source ...] [task-name | task-file]",
26+
Short: "Resumes a specified paused task or all (sub)tasks bound to a source",
3127
RunE: resumeTaskFunc,
3228
}
29+
addOperateSourceTaskFlags(cmd)
3330
return cmd
3431
}
3532

3633
// resumeTaskFunc does resume task request.
3734
func resumeTaskFunc(cmd *cobra.Command, _ []string) (err error) {
38-
if len(cmd.Flags().Args()) != 1 {
39-
cmd.SetOut(os.Stdout)
40-
common.PrintCmdUsage(cmd)
41-
err = errors.New("please check output to see error")
42-
return
43-
}
44-
name := common.GetTaskNameFromArgOrFile(cmd.Flags().Arg(0))
45-
46-
sources, err := common.GetSourceArgs(cmd)
47-
if err != nil {
48-
return
49-
}
50-
51-
resp, err := common.OperateTask(pb.TaskOp_Resume, name, sources)
52-
if err != nil {
53-
common.PrintLinesf("can not resume task %s", name)
54-
return
55-
}
56-
57-
common.PrettyPrintResponse(resp)
58-
return
35+
return operateTaskFunc(pb.TaskOp_Resume, cmd)
5936
}

0 commit comments

Comments
 (0)