Skip to content

Commit

Permalink
*: implement CANCEL/DROP LOAD DATA JOB (#42137)
Browse files Browse the repository at this point in the history
ref #40499
  • Loading branch information
lance6716 authored Mar 17, 2023
1 parent ad30aaf commit 04741ae
Show file tree
Hide file tree
Showing 14 changed files with 295 additions and 23 deletions.
1 change: 1 addition & 0 deletions errno/errcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -1059,6 +1059,7 @@ const (
ErrDuplicateOption = 8165
ErrLoadDataUnsupportedOption = 8166
ErrLoadDataJobNotFound = 8170
ErrLoadDataInvalidOperation = 8171
ErrLoadDataCantDetachWithLocal = 8172

// Error codes used by TiDB ddl package
Expand Down
1 change: 1 addition & 0 deletions errno/errname.go
Original file line number Diff line number Diff line change
Expand Up @@ -1053,6 +1053,7 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{
ErrDuplicateOption: mysql.Message("Option %s specified more than once", nil),
ErrLoadDataUnsupportedOption: mysql.Message("Unsupported option %s for %s import mode", nil),
ErrLoadDataJobNotFound: mysql.Message("Job ID %d doesn't exist", nil),
ErrLoadDataInvalidOperation: mysql.Message("The current job status cannot perform the operation. %s", nil),
ErrLoadDataCantDetachWithLocal: mysql.Message("The job can not be DETACHED when LOAD DATA LOCAL INFILE", nil),

ErrWarnOptimizerHintInvalidInteger: mysql.Message("integer value is out of range in '%s'", nil),
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1736,6 +1736,11 @@ error = '''
Job ID %d doesn't exist
'''

["executor:8171"]
error = '''
The current job status cannot perform the operation. %s
'''

["executor:8172"]
error = '''
The job can not be DETACHED when LOAD DATA LOCAL INFILE
Expand Down
1 change: 1 addition & 0 deletions executor/asyncloaddata/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ go_test(
srcs = [
"detach_test.go",
"main_test.go",
"operate_test.go",
"show_test.go",
"util_test.go",
],
Expand Down
118 changes: 118 additions & 0 deletions executor/asyncloaddata/operate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package asyncloaddata_test

import (
"fmt"
"sync"
"time"

"github.com/fsouza/fake-gcs-server/fakestorage"
. "github.com/pingcap/tidb/executor/asyncloaddata"
"github.com/pingcap/tidb/parser/auth"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/require"
)

func (s *mockGCSSuite) TestOperateRunningJob() {
s.tk.MustExec("DROP DATABASE IF EXISTS test_operate;")
s.tk.MustExec("CREATE DATABASE test_operate;")
s.tk.MustExec("CREATE TABLE test_operate.t (i INT PRIMARY KEY);")
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{
BucketName: "test-operate",
Name: "t.tsv",
},
Content: []byte("1\n2\n3\n4\n5\n6\n7\n8\n9\n10"),
})

backup := HeartBeatInSec
HeartBeatInSec = 1
s.T().Cleanup(func() {
HeartBeatInSec = backup
})

s.enableFailpoint("github.com/pingcap/tidb/executor/AfterCreateLoadDataJob", `sleep(1000)`)
s.enableFailpoint("github.com/pingcap/tidb/executor/AfterStartJob", `sleep(1000)`)
s.enableFailpoint("github.com/pingcap/tidb/executor/AfterCommitOneTask", `sleep(1000)`)
sql := fmt.Sprintf(`LOAD DATA INFILE 'gs://test-operate/t.tsv?endpoint=%s'
INTO TABLE test_operate.t WITH batch_size = 1;`, gcsEndpoint)

// DROP can happen anytime
user := &auth.UserIdentity{
AuthUsername: "test-load-3",
AuthHostname: "test-host",
}
s.tk.Session().GetSessionVars().User = user
tk2 := testkit.NewTestKit(s.T(), s.store)
tk2.Session().GetSessionVars().User = user
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
tk2.MustContainErrMsg(sql, "failed to keepalive")
}()

// TODO: remove this sleep after moving mysql.load_data_jobs to bootstrap
time.Sleep(3 * time.Second)
rows := s.tk.MustQuery("SHOW LOAD DATA JOBS;").Rows()
require.Greater(s.T(), len(rows), 0)
jobID := rows[len(rows)-1][0].(string)
s.tk.MustExec("DROP LOAD DATA JOB " + jobID)
wg.Wait()

// test CANCEL

sql = fmt.Sprintf(`LOAD DATA INFILE 'gs://test-operate/t.tsv?endpoint=%s'
REPLACE INTO TABLE test_operate.t;`, gcsEndpoint)
wg.Add(1)
go func() {
defer wg.Done()
tk2.MustContainErrMsg(sql, "failed to keepalive")
}()

time.Sleep(3 * time.Second)
rows = s.tk.MustQuery("SHOW LOAD DATA JOBS;").Rows()
require.Greater(s.T(), len(rows), 0)
jobID = rows[len(rows)-1][0].(string)
s.tk.MustExec("CANCEL LOAD DATA JOB " + jobID)
wg.Wait()
rows = s.tk.MustQuery("SHOW LOAD DATA JOB " + jobID).Rows()
require.Len(s.T(), rows, 1)
row := rows[0]
e := expectedRecord{
jobID: jobID,
dataSource: "gs://test-operate/t.tsv",
targetTable: "`test_operate`.`t`",
importMode: "logical",
createdBy: "test-load-3@test-host",
jobState: "loading",
jobStatus: "canceled",
sourceFileSize: row[10].(string),
loadedFileSize: row[11].(string),
resultCode: "0",
resultMessage: "canceled by user",
}
e.checkIgnoreTimes(s.T(), row)

// cancel again is OK

err := s.tk.ExecToErr("CANCEL LOAD DATA JOB " + jobID)
require.ErrorContains(s.T(), err, "The current job status cannot perform the operation. need status running or paused, but got canceled")
rows = s.tk.MustQuery("SHOW LOAD DATA JOB " + jobID).Rows()
require.Len(s.T(), rows, 1)
row = rows[0]
e.checkIgnoreTimes(s.T(), row)
}
97 changes: 96 additions & 1 deletion executor/asyncloaddata/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func UpdateJobProgress(
`UPDATE mysql.load_data_jobs
SET progress = %?, update_time = CURRENT_TIMESTAMP(6)
WHERE job_id = %?
AND end_time IS NULL
AND (update_time >= DATE_SUB(CURRENT_TIMESTAMP(6), INTERVAL %? SECOND)
OR update_time IS NULL);`,
progress, jobID, OfflineThresholdInSec)
Expand Down Expand Up @@ -178,6 +179,95 @@ func FailJob(
return err
}

// CancelJob cancels a load data job. Only a running/paused job can be canceled.
func CancelJob(
ctx context.Context,
conn sqlexec.SQLExecutor,
jobID int64,
user string,
) (err error) {
ctx = util.WithInternalSourceType(ctx, kv.InternalLoadData)
_, err = conn.ExecuteInternal(ctx, "BEGIN PESSIMISTIC;")
if err != nil {
return err
}
defer func() {
if err != nil {
_, err1 := conn.ExecuteInternal(ctx, "ROLLBACK;")
terror.Log(err1)
return
}

_, err = conn.ExecuteInternal(ctx, "COMMIT;")
if err != nil {
return
}
}()

var (
rs sqlexec.RecordSet
rows []chunk.Row
)
rs, err = conn.ExecuteInternal(ctx,
`SELECT expected_status, end_time, error_message FROM mysql.load_data_jobs
WHERE job_id = %? AND create_user = %?;`,
jobID, user)
if err != nil {
return err
}
defer terror.Call(rs.Close)
rows, err = sqlexec.DrainRecordSet(ctx, rs, 1)
if err != nil {
return err
}

if len(rows) < 1 {
return exeerrors.ErrLoadDataJobNotFound.GenWithStackByArgs(jobID)
}
status := rows[0].GetEnum(0).String()
if status != "running" && status != "paused" {
return exeerrors.ErrLoadDataInvalidOperation.GenWithStackByArgs(fmt.Sprintf("need status running or paused, but got %s", status))
}
endTimeIsNull := rows[0].IsNull(1)
if !endTimeIsNull {
hasError := !rows[0].IsNull(2)
if hasError {
return exeerrors.ErrLoadDataInvalidOperation.GenWithStackByArgs("need status running or paused, but got failed")
}
return exeerrors.ErrLoadDataInvalidOperation.GenWithStackByArgs("need status running or paused, but got finished")
}

_, err = conn.ExecuteInternal(ctx,
`UPDATE mysql.load_data_jobs
SET expected_status = 'canceled',
end_time = CURRENT_TIMESTAMP(6),
error_message = 'canceled by user'
WHERE job_id = %?;`,
jobID)
return err
}

// DropJob drops a load data job.
func DropJob(
ctx context.Context,
conn sqlexec.SQLExecutor,
jobID int64,
user string,
) error {
ctx = util.WithInternalSourceType(ctx, kv.InternalLoadData)
_, err := conn.ExecuteInternal(ctx,
`DELETE FROM mysql.load_data_jobs
WHERE job_id = %? AND create_user = %?;`,
jobID, user)
if err == nil {
return err
}
if conn.GetSessionVars().StmtCtx.AffectedRows() < 1 {
return exeerrors.ErrLoadDataJobNotFound.GenWithStackByArgs(jobID)
}
return nil
}

// JobExpectedStatus is the expected status of a load data job. User can set the
// expected status of a job and worker will respect it.
type JobExpectedStatus int
Expand All @@ -192,6 +282,7 @@ const (
)

// UpdateJobExpectedStatus updates the expected status of a load data job.
// TODO: remove it?
func UpdateJobExpectedStatus(
ctx context.Context,
conn sqlexec.SQLExecutor,
Expand Down Expand Up @@ -297,20 +388,24 @@ func GetJobStatus(
// start_time).
func getJobStatus(row chunk.Row) (JobStatus, string, error) {
// ending status has the highest priority
expectedStatus := row.GetEnum(0).String()
endTimeIsNull := row.IsNull(2)
if !endTimeIsNull {
resultMsgIsNull := row.IsNull(3)
if !resultMsgIsNull {
resultMessage := row.GetString(3)
return JobFinished, resultMessage, nil
}

errorMessage := row.GetString(4)
if expectedStatus == "canceled" {
return JobCanceled, errorMessage, nil
}
return JobFailed, errorMessage, nil
}

isAlive := row.GetInt64(1) == 1
startTimeIsNull := row.IsNull(5)
expectedStatus := row.GetEnum(0).String()

switch expectedStatus {
case "canceled":
Expand Down
12 changes: 5 additions & 7 deletions executor/asyncloaddata/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,14 @@ func TestKeepAlive(t *testing.T) {
expected.StatusMessage = ""
checkEqualIgnoreTimes(t, expected, info)

// Now the worker calls FailJob
// Now the worker calls FailJob, but the status should still be canceled,
// that's more friendly.

err = FailJob(ctx, tk.Session(), id, "failed to keepalive")
require.NoError(t, err)
info, err = GetJobInfo(ctx, tk.Session(), id, "user")
require.NoError(t, err)
expected.Status = JobFailed
expected.Status = JobCanceled
expected.StatusMessage = "failed to keepalive"
checkEqualIgnoreTimes(t, expected, info)
}
Expand Down Expand Up @@ -264,11 +265,8 @@ func TestJobIsFailedAndGetAllJobs(t *testing.T) {
info, err = GetJobInfo(ctx, tk.Session(), id, "user")
require.NoError(t, err)
checkEqualIgnoreTimes(t, expected, info)
err = UpdateJobExpectedStatus(ctx, tk.Session(), id, JobExpectedCanceled)
require.NoError(t, err)
info, err = GetJobInfo(ctx, tk.Session(), id, "user")
require.NoError(t, err)
checkEqualIgnoreTimes(t, expected, info)
err = CancelJob(ctx, tk.Session(), id, "user")
require.ErrorContains(t, err, "The current job status cannot perform the operation. need status running or paused, but got failed")

// add job of another user and test GetAllJobInfo

Expand Down
6 changes: 6 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,12 @@ func (b *executorBuilder) buildSimple(v *plannercore.Simple) Executor {
}
}
}
case *ast.LoadDataActionStmt:
return &LoadDataActionExec{
baseExecutor: newBaseExecutor(b.ctx, nil, 0),
tp: s.Tp,
jobID: s.JobID,
}
}
base := newBaseExecutor(b.ctx, v.Schema(), v.ID())
base.initCap = chunk.ZeroCapacity
Expand Down
Loading

0 comments on commit 04741ae

Please sign in to comment.