Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: implement CANCEL/DROP LOAD DATA JOB #42137

Merged
merged 19 commits into from
Mar 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions errno/errcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -1059,6 +1059,7 @@ const (
ErrDuplicateOption = 8165
ErrLoadDataUnsupportedOption = 8166
ErrLoadDataJobNotFound = 8170
ErrLoadDataInvalidOperation = 8171
ErrLoadDataCantDetachWithLocal = 8172

// Error codes used by TiDB ddl package
Expand Down
1 change: 1 addition & 0 deletions errno/errname.go
Original file line number Diff line number Diff line change
Expand Up @@ -1053,6 +1053,7 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{
ErrDuplicateOption: mysql.Message("Option %s specified more than once", nil),
ErrLoadDataUnsupportedOption: mysql.Message("Unsupported option %s for %s import mode", nil),
ErrLoadDataJobNotFound: mysql.Message("Job ID %d doesn't exist", nil),
ErrLoadDataInvalidOperation: mysql.Message("The current job status cannot perform the operation. %s", nil),
ErrLoadDataCantDetachWithLocal: mysql.Message("The job can not be DETACHED when LOAD DATA LOCAL INFILE", nil),

ErrWarnOptimizerHintInvalidInteger: mysql.Message("integer value is out of range in '%s'", nil),
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1736,6 +1736,11 @@ error = '''
Job ID %d doesn't exist
'''

["executor:8171"]
error = '''
The current job status cannot perform the operation. %s
'''

["executor:8172"]
error = '''
The job can not be DETACHED when LOAD DATA LOCAL INFILE
Expand Down
1 change: 1 addition & 0 deletions executor/asyncloaddata/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ go_test(
srcs = [
"detach_test.go",
"main_test.go",
"operate_test.go",
"show_test.go",
"util_test.go",
],
Expand Down
118 changes: 118 additions & 0 deletions executor/asyncloaddata/operate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package asyncloaddata_test

import (
"fmt"
"sync"
"time"

"github.com/fsouza/fake-gcs-server/fakestorage"
. "github.com/pingcap/tidb/executor/asyncloaddata"
"github.com/pingcap/tidb/parser/auth"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/require"
)

func (s *mockGCSSuite) TestOperateRunningJob() {
s.tk.MustExec("DROP DATABASE IF EXISTS test_operate;")
s.tk.MustExec("CREATE DATABASE test_operate;")
s.tk.MustExec("CREATE TABLE test_operate.t (i INT PRIMARY KEY);")
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{
BucketName: "test-operate",
Name: "t.tsv",
},
Content: []byte("1\n2\n3\n4\n5\n6\n7\n8\n9\n10"),
})

backup := HeartBeatInSec
HeartBeatInSec = 1
s.T().Cleanup(func() {
HeartBeatInSec = backup
})

s.enableFailpoint("github.com/pingcap/tidb/executor/AfterCreateLoadDataJob", `sleep(1000)`)
s.enableFailpoint("github.com/pingcap/tidb/executor/AfterStartJob", `sleep(1000)`)
s.enableFailpoint("github.com/pingcap/tidb/executor/AfterCommitOneTask", `sleep(1000)`)
sql := fmt.Sprintf(`LOAD DATA INFILE 'gs://test-operate/t.tsv?endpoint=%s'
INTO TABLE test_operate.t WITH batch_size = 1;`, gcsEndpoint)

// DROP can happen anytime
user := &auth.UserIdentity{
AuthUsername: "test-load-3",
AuthHostname: "test-host",
}
s.tk.Session().GetSessionVars().User = user
tk2 := testkit.NewTestKit(s.T(), s.store)
tk2.Session().GetSessionVars().User = user
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
tk2.MustContainErrMsg(sql, "failed to keepalive")
}()

// TODO: remove this sleep after moving mysql.load_data_jobs to bootstrap
time.Sleep(3 * time.Second)
rows := s.tk.MustQuery("SHOW LOAD DATA JOBS;").Rows()
require.Greater(s.T(), len(rows), 0)
jobID := rows[len(rows)-1][0].(string)
s.tk.MustExec("DROP LOAD DATA JOB " + jobID)
wg.Wait()

// test CANCEL

sql = fmt.Sprintf(`LOAD DATA INFILE 'gs://test-operate/t.tsv?endpoint=%s'
REPLACE INTO TABLE test_operate.t;`, gcsEndpoint)
wg.Add(1)
go func() {
defer wg.Done()
tk2.MustContainErrMsg(sql, "failed to keepalive")
}()

time.Sleep(3 * time.Second)
rows = s.tk.MustQuery("SHOW LOAD DATA JOBS;").Rows()
require.Greater(s.T(), len(rows), 0)
jobID = rows[len(rows)-1][0].(string)
s.tk.MustExec("CANCEL LOAD DATA JOB " + jobID)
wg.Wait()
rows = s.tk.MustQuery("SHOW LOAD DATA JOB " + jobID).Rows()
require.Len(s.T(), rows, 1)
row := rows[0]
e := expectedRecord{
jobID: jobID,
dataSource: "gs://test-operate/t.tsv",
targetTable: "`test_operate`.`t`",
importMode: "logical",
createdBy: "test-load-3@test-host",
jobState: "loading",
jobStatus: "canceled",
sourceFileSize: row[10].(string),
loadedFileSize: row[11].(string),
resultCode: "0",
resultMessage: "canceled by user",
}
e.checkIgnoreTimes(s.T(), row)

// cancel again is OK

err := s.tk.ExecToErr("CANCEL LOAD DATA JOB " + jobID)
require.ErrorContains(s.T(), err, "The current job status cannot perform the operation. need status running or paused, but got canceled")
rows = s.tk.MustQuery("SHOW LOAD DATA JOB " + jobID).Rows()
require.Len(s.T(), rows, 1)
row = rows[0]
e.checkIgnoreTimes(s.T(), row)
}
97 changes: 96 additions & 1 deletion executor/asyncloaddata/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func UpdateJobProgress(
`UPDATE mysql.load_data_jobs
SET progress = %?, update_time = CURRENT_TIMESTAMP(6)
WHERE job_id = %?
AND end_time IS NULL
AND (update_time >= DATE_SUB(CURRENT_TIMESTAMP(6), INTERVAL %? SECOND)
OR update_time IS NULL);`,
progress, jobID, OfflineThresholdInSec)
Expand Down Expand Up @@ -178,6 +179,95 @@ func FailJob(
return err
}

// CancelJob cancels a load data job. Only a running/paused job can be canceled.
func CancelJob(
ctx context.Context,
conn sqlexec.SQLExecutor,
jobID int64,
user string,
) (err error) {
ctx = util.WithInternalSourceType(ctx, kv.InternalLoadData)
_, err = conn.ExecuteInternal(ctx, "BEGIN PESSIMISTIC;")
if err != nil {
return err
}
defer func() {
if err != nil {
_, err1 := conn.ExecuteInternal(ctx, "ROLLBACK;")
terror.Log(err1)
return
}

_, err = conn.ExecuteInternal(ctx, "COMMIT;")
if err != nil {
return
}
}()

var (
rs sqlexec.RecordSet
rows []chunk.Row
)
rs, err = conn.ExecuteInternal(ctx,
`SELECT expected_status, end_time, error_message FROM mysql.load_data_jobs
WHERE job_id = %? AND create_user = %?;`,
jobID, user)
if err != nil {
return err
}
defer terror.Call(rs.Close)
rows, err = sqlexec.DrainRecordSet(ctx, rs, 1)
if err != nil {
return err
}

if len(rows) < 1 {
return exeerrors.ErrLoadDataJobNotFound.GenWithStackByArgs(jobID)
}
status := rows[0].GetEnum(0).String()
if status != "running" && status != "paused" {
return exeerrors.ErrLoadDataInvalidOperation.GenWithStackByArgs(fmt.Sprintf("need status running or paused, but got %s", status))
}
endTimeIsNull := rows[0].IsNull(1)
if !endTimeIsNull {
hasError := !rows[0].IsNull(2)
if hasError {
return exeerrors.ErrLoadDataInvalidOperation.GenWithStackByArgs("need status running or paused, but got failed")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make failed a status in job table?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently we only have expected_status. but no actual_status, so these lines is to calculate actual_status from other columns. I plan to add actual_status later when implement PAUSE/RESUME LOAD DATA JOBS, to let PAUSE don't return until actual_status is paused 😂

}
return exeerrors.ErrLoadDataInvalidOperation.GenWithStackByArgs("need status running or paused, but got finished")
}

_, err = conn.ExecuteInternal(ctx,
`UPDATE mysql.load_data_jobs
SET expected_status = 'canceled',
end_time = CURRENT_TIMESTAMP(6),
error_message = 'canceled by user'
WHERE job_id = %?;`,
jobID)
return err
}

// DropJob drops a load data job.
func DropJob(
ctx context.Context,
conn sqlexec.SQLExecutor,
jobID int64,
user string,
) error {
ctx = util.WithInternalSourceType(ctx, kv.InternalLoadData)
_, err := conn.ExecuteInternal(ctx,
`DELETE FROM mysql.load_data_jobs
WHERE job_id = %? AND create_user = %?;`,
jobID, user)
if err == nil {
return err
}
if conn.GetSessionVars().StmtCtx.AffectedRows() < 1 {
return exeerrors.ErrLoadDataJobNotFound.GenWithStackByArgs(jobID)
}
return nil
}

// JobExpectedStatus is the expected status of a load data job. User can set the
// expected status of a job and worker will respect it.
type JobExpectedStatus int
Expand All @@ -192,6 +282,7 @@ const (
)

// UpdateJobExpectedStatus updates the expected status of a load data job.
// TODO: remove it?
func UpdateJobExpectedStatus(
ctx context.Context,
conn sqlexec.SQLExecutor,
Expand Down Expand Up @@ -297,20 +388,24 @@ func GetJobStatus(
// start_time).
func getJobStatus(row chunk.Row) (JobStatus, string, error) {
// ending status has the highest priority
expectedStatus := row.GetEnum(0).String()
endTimeIsNull := row.IsNull(2)
if !endTimeIsNull {
resultMsgIsNull := row.IsNull(3)
if !resultMsgIsNull {
resultMessage := row.GetString(3)
return JobFinished, resultMessage, nil
}

errorMessage := row.GetString(4)
if expectedStatus == "canceled" {
return JobCanceled, errorMessage, nil
}
return JobFailed, errorMessage, nil
}

isAlive := row.GetInt64(1) == 1
startTimeIsNull := row.IsNull(5)
expectedStatus := row.GetEnum(0).String()

switch expectedStatus {
case "canceled":
Expand Down
12 changes: 5 additions & 7 deletions executor/asyncloaddata/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,14 @@ func TestKeepAlive(t *testing.T) {
expected.StatusMessage = ""
checkEqualIgnoreTimes(t, expected, info)

// Now the worker calls FailJob
// Now the worker calls FailJob, but the status should still be canceled,
// that's more friendly.

err = FailJob(ctx, tk.Session(), id, "failed to keepalive")
require.NoError(t, err)
info, err = GetJobInfo(ctx, tk.Session(), id, "user")
require.NoError(t, err)
expected.Status = JobFailed
expected.Status = JobCanceled
expected.StatusMessage = "failed to keepalive"
checkEqualIgnoreTimes(t, expected, info)
}
Expand Down Expand Up @@ -264,11 +265,8 @@ func TestJobIsFailedAndGetAllJobs(t *testing.T) {
info, err = GetJobInfo(ctx, tk.Session(), id, "user")
require.NoError(t, err)
checkEqualIgnoreTimes(t, expected, info)
err = UpdateJobExpectedStatus(ctx, tk.Session(), id, JobExpectedCanceled)
require.NoError(t, err)
info, err = GetJobInfo(ctx, tk.Session(), id, "user")
require.NoError(t, err)
checkEqualIgnoreTimes(t, expected, info)
err = CancelJob(ctx, tk.Session(), id, "user")
require.ErrorContains(t, err, "The current job status cannot perform the operation. need status running or paused, but got failed")

// add job of another user and test GetAllJobInfo

Expand Down
6 changes: 6 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,12 @@ func (b *executorBuilder) buildSimple(v *plannercore.Simple) Executor {
}
}
}
case *ast.LoadDataActionStmt:
return &LoadDataActionExec{
baseExecutor: newBaseExecutor(b.ctx, nil, 0),
tp: s.Tp,
jobID: s.JobID,
}
}
base := newBaseExecutor(b.ctx, v.Schema(), v.ID())
base.initCap = chunk.ZeroCapacity
Expand Down
Loading