From 3e5518d73ae0aceb0dd6226f12697c736b99c8a4 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 1 Mar 2023 22:13:09 +0800 Subject: [PATCH] asyncloaddata: add table and helper functions (#41765) ref pingcap/tidb#40499 --- executor/asyncloaddata/BUILD.bazel | 30 ++ executor/asyncloaddata/util.go | 422 ++++++++++++++++++++++++++++ executor/asyncloaddata/util_test.go | 314 +++++++++++++++++++++ kv/option.go | 2 + 4 files changed, 768 insertions(+) create mode 100644 executor/asyncloaddata/BUILD.bazel create mode 100644 executor/asyncloaddata/util.go create mode 100644 executor/asyncloaddata/util_test.go diff --git a/executor/asyncloaddata/BUILD.bazel b/executor/asyncloaddata/BUILD.bazel new file mode 100644 index 0000000000000..291c3021e6e6f --- /dev/null +++ b/executor/asyncloaddata/BUILD.bazel @@ -0,0 +1,30 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "asyncloaddata", + srcs = ["util.go"], + importpath = "github.com/pingcap/tidb/executor/asyncloaddata", + visibility = ["//visibility:public"], + deps = [ + "//kv", + "//parser/terror", + "//session", + "//util/chunk", + "//util/sqlexec", + "@com_github_pingcap_errors//:errors", + "@com_github_tikv_client_go_v2//util", + ], +) + +go_test( + name = "asyncloaddata_test", + timeout = "short", + srcs = ["util_test.go"], + embed = [":asyncloaddata"], + flaky = True, + deps = [ + "//testkit", + "//util/sqlexec", + "@com_github_stretchr_testify//require", + ], +) diff --git a/executor/asyncloaddata/util.go b/executor/asyncloaddata/util.go new file mode 100644 index 0000000000000..9816ac8b41ef1 --- /dev/null +++ b/executor/asyncloaddata/util.go @@ -0,0 +1,422 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package asyncloaddata + +import ( + "context" + "fmt" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser/terror" + "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/sqlexec" + "github.com/tikv/client-go/v2/util" +) + +const ( + // CreateLoadDataJobs is a table that LOAD DATA uses + // TODO: move it to bootstrap.go and create it in bootstrap + CreateLoadDataJobs = `CREATE TABLE IF NOT EXISTS mysql.load_data_jobs ( + job_id bigint(64) NOT NULL AUTO_INCREMENT, + expected_status ENUM('running', 'paused', 'canceled') NOT NULL DEFAULT 'running', + create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + start_time TIMESTAMP NULL DEFAULT NULL, + update_time TIMESTAMP NULL DEFAULT NULL, + end_time TIMESTAMP NULL DEFAULT NULL, + data_source TEXT NOT NULL, + table_schema VARCHAR(64) NOT NULL, + table_name VARCHAR(64) NOT NULL, + import_mode VARCHAR(64) NOT NULL, + create_user VARCHAR(32) NOT NULL, + progress TEXT DEFAULT NULL, + result_message TEXT DEFAULT NULL, + error_message TEXT DEFAULT NULL, + PRIMARY KEY (job_id), + KEY (create_user));` +) + +// CreateLoadDataJob creates a load data job. +func CreateLoadDataJob( + ctx context.Context, + conn sqlexec.SQLExecutor, + source, db, table string, + importMode string, + user string, +) (int64, error) { + ctx = util.WithInternalSourceType(ctx, kv.InternalLoadData) + _, err := conn.ExecuteInternal(ctx, + `INSERT INTO mysql.load_data_jobs + (data_source, table_schema, table_name, import_mode, create_user) + VALUES (%?, %?, %?, %?, %?);`, + source, db, table, importMode, user) + if err != nil { + return 0, err + } + rs, err := conn.ExecuteInternal(ctx, `SELECT LAST_INSERT_ID();`) + if err != nil { + return 0, err + } + //nolint: errcheck + defer rs.Close() + rows, err := sqlexec.DrainRecordSet(ctx, rs, 1) + if err != nil { + return 0, err + } + if len(rows) != 1 { + return 0, errors.Errorf("unexpected result length: %d", len(rows)) + } + return rows[0].GetInt64(0), nil +} + +// StartJob starts a load data job. A job can only be started once. +func StartJob( + ctx context.Context, + conn sqlexec.SQLExecutor, + jobID int64, +) error { + ctx = util.WithInternalSourceType(ctx, kv.InternalLoadData) + _, err := conn.ExecuteInternal(ctx, + `UPDATE mysql.load_data_jobs + SET start_time = CURRENT_TIMESTAMP, update_time = CURRENT_TIMESTAMP + WHERE job_id = %? AND start_time IS NULL;`, + jobID) + return err +} + +var ( + // HeartBeatInSec is the interval of heartbeat. + HeartBeatInSec = 5 + // OfflineThresholdInSec means after failing to update heartbeat for 3 times, + // we treat the worker of the job as offline. + OfflineThresholdInSec = HeartBeatInSec * 3 +) + +// UpdateJobProgress updates the progress of a load data job. It should be called +// periodically as heartbeat. +func UpdateJobProgress( + ctx context.Context, + conn session.Session, + jobID int64, + progress string, +) (bool, error) { + ctx = util.WithInternalSourceType(ctx, kv.InternalLoadData) + // let TiDB handle heartbeat check for concurrent SQL + // we tolerate 2 times of failure/timeout when updating heartbeat + _, err := conn.ExecuteInternal(ctx, + `UPDATE mysql.load_data_jobs + SET progress = %?, update_time = CURRENT_TIMESTAMP + WHERE job_id = %? + AND ( + update_time >= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL %? SECOND) + OR update_time IS NULL);`, + progress, jobID, OfflineThresholdInSec) + if err != nil { + return false, err + } + return conn.GetSessionVars().StmtCtx.AffectedRows() == 1, nil +} + +// FinishJob finishes a load data job. A job can only be started once. +func FinishJob( + ctx context.Context, + conn sqlexec.SQLExecutor, + jobID int64, + result string, +) error { + ctx = util.WithInternalSourceType(ctx, kv.InternalLoadData) + _, err := conn.ExecuteInternal(ctx, + `UPDATE mysql.load_data_jobs + SET end_time = CURRENT_TIMESTAMP, result_message = %? + WHERE job_id = %? AND result_message IS NULL AND error_message IS NULL;`, + result, jobID) + return err +} + +// FailJob fails a load data job. A job can only be started once. +func FailJob( + ctx context.Context, + conn sqlexec.SQLExecutor, + jobID int64, + result string, +) error { + ctx = util.WithInternalSourceType(ctx, kv.InternalLoadData) + _, err := conn.ExecuteInternal(ctx, + `UPDATE mysql.load_data_jobs + SET end_time = CURRENT_TIMESTAMP, error_message = %? + WHERE job_id = %? AND result_message IS NULL AND error_message IS NULL;`, + result, jobID) + return err +} + +// JobExpectedStatus is the expected status of a load data job. User can set the +// expected status of a job and worker will respect it. +type JobExpectedStatus int + +const ( + // JobExpectedRunning means the job is expected to be running. + JobExpectedRunning JobExpectedStatus = iota + // JobExpectedPaused means the job is expected to be paused. + JobExpectedPaused + // JobExpectedCanceled means the job is expected to be canceled. + JobExpectedCanceled +) + +// UpdateJobExpectedStatus updates the expected status of a load data job. +func UpdateJobExpectedStatus( + ctx context.Context, + conn sqlexec.SQLExecutor, + jobID int64, + status JobExpectedStatus, +) error { + ctx = util.WithInternalSourceType(ctx, kv.InternalLoadData) + var sql string + switch status { + case JobExpectedRunning: + sql = `UPDATE mysql.load_data_jobs + SET expected_status = 'running' + WHERE job_id = %? AND expected_status = 'paused';` + case JobExpectedPaused: + sql = `UPDATE mysql.load_data_jobs + SET expected_status = 'paused' + WHERE job_id = %? AND expected_status = 'running';` + case JobExpectedCanceled: + sql = `UPDATE mysql.load_data_jobs + SET expected_status = 'canceled' + WHERE job_id = %? AND expected_status != 'canceled';` + } + _, err := conn.ExecuteInternal(ctx, sql, jobID) + return err +} + +// JobStatus represents the status of a load data job. +type JobStatus int + +const ( + // JobFailed means the job is failed and can't be resumed. + JobFailed JobStatus = iota + // JobCanceled means the job is canceled by user and can't be resumed. + JobCanceled + // JobPaused means the job is paused by user and can be resumed. + JobPaused + // JobFinished means the job is finished. + JobFinished + // JobPending means the job is pending to be started. + JobPending + // JobRunning means the job is running. + JobRunning +) + +// GetJobStatus gets the status of a load data job. The returned error means +// something wrong when querying the database. Other business logic errors are +// returned as JobFailed with message. +func GetJobStatus( + ctx context.Context, + conn sqlexec.SQLExecutor, + jobID int64, +) (JobStatus, string, error) { + ctx = util.WithInternalSourceType(ctx, kv.InternalLoadData) + rs, err := conn.ExecuteInternal(ctx, + `SELECT + expected_status, + update_time >= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL %? SECOND) AS is_alive, + end_time, + result_message, + error_message, + start_time + FROM mysql.load_data_jobs + WHERE job_id = %?;`, + OfflineThresholdInSec, jobID) + if err != nil { + return JobFailed, "", err + } + defer terror.Call(rs.Close) + rows, err := sqlexec.DrainRecordSet(ctx, rs, 1) + if err != nil { + return JobFailed, "", err + } + if len(rows) != 1 { + return JobFailed, fmt.Sprintf("job %d not found", jobID), nil + } + + return getJobStatus(rows[0]) +} + +// getJobStatus expected the first 6 columns of input row is (expected_status, +// is_alive (derived from update_time), end_time, result_message, error_message, +// start_time). +func getJobStatus(row chunk.Row) (JobStatus, string, error) { + // ending status has the highest priority + endTimeIsNull := row.IsNull(2) + if !endTimeIsNull { + resultMsgIsNull := row.IsNull(3) + if !resultMsgIsNull { + resultMessage := row.GetString(3) + return JobFinished, resultMessage, nil + } + errorMessage := row.GetString(4) + return JobFailed, errorMessage, nil + } + + isAlive := row.GetInt64(1) == 1 + startTimeIsNull := row.IsNull(5) + expectedStatus := row.GetEnum(0).String() + + switch expectedStatus { + case "canceled": + if startTimeIsNull || isAlive { + return JobCanceled, "", nil + } + return JobFailed, "job expected canceled but the node is timeout", nil + case "paused": + if startTimeIsNull || isAlive { + return JobPaused, "", nil + } + return JobFailed, "job expected paused but the node is timeout", nil + case "running": + if startTimeIsNull { + return JobPending, "", nil + } + if isAlive { + return JobRunning, "", nil + } + return JobFailed, "job expected running but the node is timeout", nil + default: + return JobFailed, fmt.Sprintf("unexpected job status %s", expectedStatus), nil + } +} + +// JobInfo is the information of a load data job. +type JobInfo struct { + JobID int64 + User string + DataSource string + TableSchema string + TableName string + ImportMode string + Progress string + Status JobStatus + StatusMessage string +} + +// GetJobInfo gets all needed information of a load data job. +func GetJobInfo( + ctx context.Context, + conn sqlexec.SQLExecutor, + jobID int64, +) (*JobInfo, error) { + ctx = util.WithInternalSourceType(ctx, kv.InternalLoadData) + rs, err := conn.ExecuteInternal(ctx, + `SELECT + expected_status, + update_time >= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL %? SECOND) AS is_alive, + end_time, + result_message, + error_message, + start_time, + + job_id, + data_source, + table_schema, + table_name, + import_mode, + progress, + create_user + FROM mysql.load_data_jobs + WHERE job_id = %?;`, + OfflineThresholdInSec, jobID) + if err != nil { + return nil, err + } + defer terror.Call(rs.Close) + rows, err := sqlexec.DrainRecordSet(ctx, rs, 1) + if err != nil { + return nil, err + } + if len(rows) != 1 { + return nil, fmt.Errorf("job %d not found", jobID) + } + + return getJobInfo(rows[0]) +} + +// getJobInfo expected the columns of input row is (expected_status, +// is_alive (derived from update_time), end_time, result_message, error_message, +// start_time, job_id, data_source, table_schema, table_name, import_mode, +// progress, create_user). +func getJobInfo(row chunk.Row) (*JobInfo, error) { + var err error + jobInfo := JobInfo{ + JobID: row.GetInt64(6), + DataSource: row.GetString(7), + TableSchema: row.GetString(8), + TableName: row.GetString(9), + ImportMode: row.GetString(10), + Progress: row.GetString(11), + User: row.GetString(12), + } + jobInfo.Status, jobInfo.StatusMessage, err = getJobStatus(row) + if err != nil { + return nil, err + } + return &jobInfo, nil +} + +// GetAllJobInfo gets all jobs status of a user. +func GetAllJobInfo( + ctx context.Context, + conn sqlexec.SQLExecutor, + user string, +) ([]*JobInfo, error) { + ctx = util.WithInternalSourceType(ctx, kv.InternalLoadData) + rs, err := conn.ExecuteInternal(ctx, + `SELECT + expected_status, + update_time >= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL %? SECOND) AS is_alive, + end_time, + result_message, + error_message, + start_time, + + job_id, + data_source, + table_schema, + table_name, + import_mode, + progress, + create_user + FROM mysql.load_data_jobs + WHERE create_user = %?;`, + OfflineThresholdInSec, user) + if err != nil { + return nil, err + } + defer terror.Call(rs.Close) + rows, err := sqlexec.DrainRecordSet(ctx, rs, 1) + if err != nil { + return nil, err + } + ret := make([]*JobInfo, 0, len(rows)) + for _, row := range rows { + jobInfo, err := getJobInfo(row) + if err != nil { + return nil, err + } + ret = append(ret, jobInfo) + } + + return ret, nil +} diff --git a/executor/asyncloaddata/util_test.go b/executor/asyncloaddata/util_test.go new file mode 100644 index 0000000000000..8ae7580e98b4e --- /dev/null +++ b/executor/asyncloaddata/util_test.go @@ -0,0 +1,314 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package asyncloaddata + +import ( + "context" + "testing" + "time" + + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/util/sqlexec" + "github.com/stretchr/testify/require" +) + +func createJob(t *testing.T, conn sqlexec.SQLExecutor, user string) (int64, *JobInfo) { + id, err := CreateLoadDataJob(context.Background(), conn, "/tmp/test.csv", "test", "t", "logical", user) + require.NoError(t, err) + info, err := GetJobInfo(context.Background(), conn, id) + require.NoError(t, err) + expected := &JobInfo{ + JobID: id, + User: user, + DataSource: "/tmp/test.csv", + TableSchema: "test", + TableName: "t", + ImportMode: "logical", + Progress: "", + Status: JobPending, + StatusMessage: "", + } + require.Equal(t, expected, info) + return id, info +} + +func TestHappyPath(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec(CreateLoadDataJobs) + defer tk.MustExec("DROP TABLE IF EXISTS mysql.load_data_jobs") + + ctx := context.Background() + + // job is created + + id, expected := createJob(t, tk.Session(), "user") + + // job is started by a worker + + backup := OfflineThresholdInSec + OfflineThresholdInSec = 1000 + t.Cleanup(func() { + OfflineThresholdInSec = backup + }) + err := StartJob(ctx, tk.Session(), id) + require.NoError(t, err) + info, err := GetJobInfo(ctx, tk.Session(), id) + require.NoError(t, err) + expected.Status = JobRunning + require.Equal(t, expected, info) + + // job is periodically updated by worker + + ok, err := UpdateJobProgress(ctx, tk.Session(), id, "imported 10%") + require.NoError(t, err) + require.True(t, ok) + info, err = GetJobInfo(ctx, tk.Session(), id) + require.NoError(t, err) + expected.Progress = "imported 10%" + require.Equal(t, expected, info) + + // job is paused + + err = UpdateJobExpectedStatus(ctx, tk.Session(), id, JobExpectedPaused) + require.NoError(t, err) + info, err = GetJobInfo(ctx, tk.Session(), id) + require.NoError(t, err) + expected.Status = JobPaused + require.Equal(t, expected, info) + + // worker still can update progress, maybe response to pausing is delayed + + ok, err = UpdateJobProgress(ctx, tk.Session(), id, "imported 20%") + require.NoError(t, err) + require.True(t, ok) + info, err = GetJobInfo(ctx, tk.Session(), id) + require.NoError(t, err) + expected.Progress = "imported 20%" + require.Equal(t, expected, info) + + // job is resumed + + err = UpdateJobExpectedStatus(ctx, tk.Session(), id, JobExpectedRunning) + require.NoError(t, err) + info, err = GetJobInfo(ctx, tk.Session(), id) + require.NoError(t, err) + expected.Status = JobRunning + require.Equal(t, expected, info) + + // job is finished + + err = FinishJob(ctx, tk.Session(), id, "finished message") + require.NoError(t, err) + info, err = GetJobInfo(ctx, tk.Session(), id) + require.NoError(t, err) + expected.Status = JobFinished + expected.StatusMessage = "finished message" + require.Equal(t, expected, info) +} + +func TestKeepAlive(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec(CreateLoadDataJobs) + defer tk.MustExec("DROP TABLE IF EXISTS mysql.load_data_jobs") + ctx := context.Background() + + // job is created + + id, expected := createJob(t, tk.Session(), "user") + + backup := OfflineThresholdInSec + OfflineThresholdInSec = 1 + t.Cleanup(func() { + OfflineThresholdInSec = backup + }) + + // before job is started, worker don't need to keepalive + + time.Sleep(2 * time.Second) + info, err := GetJobInfo(ctx, tk.Session(), id) + require.NoError(t, err) + require.Equal(t, expected, info) + + err = StartJob(ctx, tk.Session(), id) + require.NoError(t, err) + info, err = GetJobInfo(ctx, tk.Session(), id) + require.NoError(t, err) + expected.Status = JobRunning + require.Equal(t, expected, info) + + // if worker failed to keepalive, job will fail + + time.Sleep(2 * time.Second) + info, err = GetJobInfo(ctx, tk.Session(), id) + require.NoError(t, err) + expected.Status = JobFailed + expected.StatusMessage = "job expected running but the node is timeout" + require.Equal(t, expected, info) + + // after the worker is failed to keepalive, further keepalive will fail + + ok, err := UpdateJobProgress(ctx, tk.Session(), id, "imported 20%") + require.NoError(t, err) + require.False(t, ok) + info, err = GetJobInfo(ctx, tk.Session(), id) + require.NoError(t, err) + require.Equal(t, expected, info) + + // can change expected status. + + err = UpdateJobExpectedStatus(ctx, tk.Session(), id, JobExpectedPaused) + require.NoError(t, err) + info, err = GetJobInfo(ctx, tk.Session(), id) + require.NoError(t, err) + expected.StatusMessage = "job expected paused but the node is timeout" + require.Equal(t, expected, info) + err = UpdateJobExpectedStatus(ctx, tk.Session(), id, JobExpectedRunning) + require.NoError(t, err) + info, err = GetJobInfo(ctx, tk.Session(), id) + require.NoError(t, err) + expected.StatusMessage = "job expected running but the node is timeout" + require.Equal(t, expected, info) + err = UpdateJobExpectedStatus(ctx, tk.Session(), id, JobExpectedCanceled) + require.NoError(t, err) + info, err = GetJobInfo(ctx, tk.Session(), id) + require.NoError(t, err) + expected.StatusMessage = "job expected canceled but the node is timeout" + require.Equal(t, expected, info) +} + +func TestJobIsFailedAndGetAllJobs(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec(CreateLoadDataJobs) + defer tk.MustExec("DROP TABLE IF EXISTS mysql.load_data_jobs") + ctx := context.Background() + + // job is created + + id, expected := createJob(t, tk.Session(), "user") + + // job can be failed directly when it's pending, although it's not possible + + err := FailJob(ctx, tk.Session(), id, "failed message") + require.NoError(t, err) + info, err := GetJobInfo(ctx, tk.Session(), id) + require.NoError(t, err) + expected.Status = JobFailed + expected.StatusMessage = "failed message" + require.Equal(t, expected, info) + + // create another job and fail it + + id, expected = createJob(t, tk.Session(), "user") + + err = StartJob(ctx, tk.Session(), id) + require.NoError(t, err) + info, err = GetJobInfo(ctx, tk.Session(), id) + require.NoError(t, err) + expected.Status = JobRunning + require.Equal(t, expected, info) + + err = FailJob(ctx, tk.Session(), id, "failed message") + require.NoError(t, err) + info, err = GetJobInfo(ctx, tk.Session(), id) + require.NoError(t, err) + expected.Status = JobFailed + expected.StatusMessage = "failed message" + require.Equal(t, expected, info) + + // test change expected status of a failed job. + + err = UpdateJobExpectedStatus(ctx, tk.Session(), id, JobExpectedPaused) + require.NoError(t, err) + info, err = GetJobInfo(ctx, tk.Session(), id) + require.NoError(t, err) + require.Equal(t, expected, info) + err = UpdateJobExpectedStatus(ctx, tk.Session(), id, JobExpectedRunning) + require.NoError(t, err) + info, err = GetJobInfo(ctx, tk.Session(), id) + require.NoError(t, err) + require.Equal(t, expected, info) + err = UpdateJobExpectedStatus(ctx, tk.Session(), id, JobExpectedCanceled) + require.NoError(t, err) + info, err = GetJobInfo(ctx, tk.Session(), id) + require.NoError(t, err) + require.Equal(t, expected, info) + + // add job of another user and test GetAllJobInfo + + _, _ = createJob(t, tk.Session(), "user2") + + jobs, err := GetAllJobInfo(ctx, tk.Session(), "user") + require.NoError(t, err) + require.Equal(t, 2, len(jobs)) + require.Equal(t, JobFailed, jobs[0].Status) + require.Equal(t, JobFailed, jobs[1].Status) + + jobs, err = GetAllJobInfo(ctx, tk.Session(), "user2") + require.NoError(t, err) + require.Equal(t, 1, len(jobs)) + require.Equal(t, JobPending, jobs[0].Status) +} + +func TestGetJobStatus(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec(CreateLoadDataJobs) + defer tk.MustExec("DROP TABLE IF EXISTS mysql.load_data_jobs") + ctx := context.Background() + + // job is created + + id, _ := createJob(t, tk.Session(), "user") + + // job is pending + + status, msg, err := GetJobStatus(ctx, tk.Session(), id) + require.NoError(t, err) + require.Equal(t, JobPending, status) + require.Equal(t, "", msg) + + // job is running + + backup := OfflineThresholdInSec + OfflineThresholdInSec = 1000 + t.Cleanup(func() { + OfflineThresholdInSec = backup + }) + err = StartJob(ctx, tk.Session(), id) + require.NoError(t, err) + status, msg, err = GetJobStatus(ctx, tk.Session(), id) + require.NoError(t, err) + require.Equal(t, JobRunning, status) + require.Equal(t, "", msg) + + // job is finished + + err = FinishJob(ctx, tk.Session(), id, "finished message") + require.NoError(t, err) + status, msg, err = GetJobStatus(ctx, tk.Session(), id) + require.NoError(t, err) + require.Equal(t, JobFinished, status) + require.Equal(t, "finished message", msg) + + // wrong ID + + status, msg, err = GetJobStatus(ctx, tk.Session(), id+1) + require.NoError(t, err) + require.Equal(t, JobFailed, status) + require.Contains(t, msg, "not found") +} diff --git a/kv/option.go b/kv/option.go index 631e69da7613d..50710a74da4c7 100644 --- a/kv/option.go +++ b/kv/option.go @@ -186,4 +186,6 @@ const ( InternalTxnTrace = "Trace" // InternalTxnTTL is the type of TTL usage InternalTxnTTL = "TTL" + // InternalLoadData is the type of LOAD DATA usage + InternalLoadData = "LoadData" )