Skip to content

Commit

Permalink
disttask: fix cancel from grpc mark subtasks as failed (pingcap#48339)
Browse files Browse the repository at this point in the history
  • Loading branch information
ywqzzy authored and ti-chi-bot committed Nov 8, 2023
1 parent 157a7cc commit 42b3bdf
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 6 deletions.
4 changes: 4 additions & 0 deletions pkg/disttask/framework/scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ go_library(
"//pkg/util/memory",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_uber_go_zap//:zap",
],
)
Expand All @@ -56,6 +58,8 @@ go_test(
"//pkg/resourcemanager/util",
"@com_github_pkg_errors//:errors",
"@com_github_stretchr_testify//require",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_uber_go_mock//gomock",
],
)
1 change: 0 additions & 1 deletion pkg/disttask/framework/scheduler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,6 @@ func (m *Manager) onRunnableTask(ctx context.Context, task *proto.Task) {
return
}
scheduler := factory(ctx, m.id, task, m.taskTable)

taskCtx, taskCancel := context.WithCancelCause(ctx)
m.registerCancelFunc(task.ID, taskCancel)
defer taskCancel(nil)
Expand Down
5 changes: 4 additions & 1 deletion pkg/disttask/framework/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import (
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/memory"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

const (
Expand Down Expand Up @@ -605,12 +607,13 @@ func isRetryableError(err error) bool {
// 3. When meet other errors, don't change subtasks' state.
func (s *BaseScheduler) markSubTaskCanceledOrFailed(ctx context.Context, subtask *proto.Subtask) bool {
if err := s.getError(); err != nil {
err := errors.Cause(err)
if ctx.Err() != nil && context.Cause(ctx) == ErrCancelSubtask {
logutil.Logger(s.logCtx).Warn("subtask canceled", zap.Error(err))
s.updateSubtaskStateAndError(subtask, proto.TaskStateCanceled, nil)
} else if common.IsRetryableError(err) || isRetryableError(err) {
logutil.Logger(s.logCtx).Warn("met retryable error", zap.Error(err))
} else if errors.Cause(err) != context.Canceled {
} else if errors.Cause(err) != context.Canceled && status.Code(err) != codes.Canceled {
logutil.Logger(s.logCtx).Warn("subtask failed", zap.Error(err))
s.updateSubtaskStateAndError(subtask, proto.TaskStateFailed, err)
} else {
Expand Down
40 changes: 38 additions & 2 deletions pkg/disttask/framework/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

var (
Expand Down Expand Up @@ -206,6 +208,41 @@ func TestSchedulerRun(t *testing.T) {
err = scheduler.Run(runCtx, task)
require.EqualError(t, err, context.Canceled.Error())

// 8. grpc cancel
mockSubtaskTable.EXPECT().GetSubtasksInStates("id", taskID, proto.StepOne,
unfinishedNormalSubtaskStates...).Return([]*proto.Subtask{{
ID: 2, Type: tp, Step: proto.StepOne, State: proto.TaskStatePending}}, nil)
mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(nil)
mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", taskID, proto.StepOne,
unfinishedNormalSubtaskStates...).Return(&proto.Subtask{
ID: 1, Type: tp, Step: proto.StepOne, State: proto.TaskStatePending}, nil)
mockSubtaskTable.EXPECT().StartSubtask(taskID).Return(nil)
grpcErr := status.Error(codes.Canceled, "test cancel")
mockSubtaskExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(grpcErr)
mockSubtaskExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil)
err = scheduler.Run(runCtx, task)
require.EqualError(t, err, grpcErr.Error())

// 9. annotate grpc cancel
mockSubtaskTable.EXPECT().GetSubtasksInStates("id", taskID, proto.StepOne,
unfinishedNormalSubtaskStates...).Return([]*proto.Subtask{{
ID: 2, Type: tp, Step: proto.StepOne, State: proto.TaskStatePending}}, nil)
mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(nil)
mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", taskID, proto.StepOne,
unfinishedNormalSubtaskStates...).Return(&proto.Subtask{
ID: 1, Type: tp, Step: proto.StepOne, State: proto.TaskStatePending}, nil)
mockSubtaskTable.EXPECT().StartSubtask(taskID).Return(nil)
grpcErr = status.Error(codes.Canceled, "test cancel")
annotatedError := errors.Annotatef(
grpcErr,
" %s",
"test annotate",
)
mockSubtaskExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(annotatedError)
mockSubtaskExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil)
err = scheduler.Run(runCtx, task)
require.EqualError(t, err, annotatedError.Error())

runCancel()
}

Expand Down Expand Up @@ -329,7 +366,7 @@ func TestScheduler(t *testing.T) {
mockSubtaskExecutor := mockexecute.NewMockSubtaskExecutor(ctrl)
mockExtension := mock.NewMockExtension(ctrl)
mockSubtaskTable.EXPECT().IsSchedulerCanceled(gomock.Any(), gomock.Any()).Return(false, nil).AnyTimes()

mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(taskID, proto.TaskStateFailed, gomock.Any()).Return(nil)
mockExtension.EXPECT().GetSubtaskExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil).AnyTimes()

scheduler := NewBaseScheduler(ctx, "id", 1, mockSubtaskTable)
Expand All @@ -346,7 +383,6 @@ func TestScheduler(t *testing.T) {
ID: 1, Type: tp, Step: proto.StepOne, State: proto.TaskStatePending}, nil)
mockSubtaskTable.EXPECT().StartSubtask(taskID).Return(nil)
mockSubtaskExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(runSubtaskErr)
mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(taskID, proto.TaskStateFailed, gomock.Any()).Return(nil)
mockSubtaskExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil)
err := scheduler.run(runCtx, &proto.Task{Step: proto.StepOne, Type: tp, ID: taskID, Concurrency: concurrency})
require.EqualError(t, err, runSubtaskErr.Error())
Expand Down
2 changes: 1 addition & 1 deletion tests/realtikvtest/addindextest3/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_test")

go_test(
name = "addindextest3_test",
timeout = "short",
timeout = "long",
srcs = [
"main_test.go",
"operator_test.go",
Expand Down
2 changes: 1 addition & 1 deletion tests/realtikvtest/addindextest4/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_test")

go_test(
name = "addindextest4_test",
timeout = "short",
timeout = "long",
srcs = [
"ingest_test.go",
"main_test.go",
Expand Down

0 comments on commit 42b3bdf

Please sign in to comment.