diff --git a/Makefile b/Makefile index c9df25116be41..321c05fd636c7 100644 --- a/Makefile +++ b/Makefile @@ -385,6 +385,7 @@ mock_lightning: tools/bin/mockgen gen_mock: tools/bin/mockgen tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/framework/scheduler TaskTable,Pool,Scheduler,Extension > disttask/framework/mock/scheduler_mock.go + tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/framework/dispatcher Dispatcher > disttask/framework/mock/dispatcher_mock.go tools/bin/mockgen -package execute github.com/pingcap/tidb/disttask/framework/scheduler/execute SubtaskExecutor > disttask/framework/mock/execute/execute_mock.go tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/importinto MiniTaskExecutor > disttask/importinto/mock/import_mock.go tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/framework/planner LogicalPlan,PipelineSpec > disttask/framework/mock/plan_mock.go diff --git a/br/cmd/tidb-lightning-ctl/main.go b/br/cmd/tidb-lightning-ctl/main.go index ea1a48b095298..fd36380ff1252 100644 --- a/br/cmd/tidb-lightning-ctl/main.go +++ b/br/cmd/tidb-lightning-ctl/main.go @@ -200,7 +200,7 @@ func checkpointErrorDestroy(ctx context.Context, cfg *config.Config, tls *common for _, table := range targetTables { for engineID := table.MinEngineID; engineID <= table.MaxEngineID; engineID++ { fmt.Fprintln(os.Stderr, "Closing and cleaning up engine:", table.TableName, engineID) - _, eID := backend.MakeUUID(table.TableName, engineID) + _, eID := backend.MakeUUID(table.TableName, int64(engineID)) engine := local.Engine{UUID: eID} err := engine.Cleanup(cfg.TikvImporter.SortedKVDir) if err != nil { diff --git a/br/pkg/lightning/backend/backend.go b/br/pkg/lightning/backend/backend.go index be55a88fc4a96..d50731dac713a 100644 --- a/br/pkg/lightning/backend/backend.go +++ b/br/pkg/lightning/backend/backend.go @@ -36,7 +36,7 @@ const ( importMaxRetryTimes = 3 // tikv-importer has done retry internally. so we don't retry many times. ) -func makeTag(tableName string, engineID int32) string { +func makeTag(tableName string, engineID int64) string { return fmt.Sprintf("%s:%d", tableName, engineID) } @@ -48,7 +48,7 @@ func makeLogger(logger log.Logger, tag string, engineUUID uuid.UUID) log.Logger } // MakeUUID generates a UUID for the engine and a tag for the engine. -func MakeUUID(tableName string, engineID int32) (string, uuid.UUID) { +func MakeUUID(tableName string, engineID int64) (string, uuid.UUID) { tag := makeTag(tableName, engineID) engineUUID := uuid.NewSHA1(engineNamespace, []byte(tag)) return tag, engineUUID @@ -229,7 +229,7 @@ func MakeEngineManager(ab Backend) EngineManager { // OpenEngine opens an engine with the given table name and engine ID. func (be EngineManager) OpenEngine(ctx context.Context, config *EngineConfig, tableName string, engineID int32) (*OpenedEngine, error) { - tag, engineUUID := MakeUUID(tableName, engineID) + tag, engineUUID := MakeUUID(tableName, int64(engineID)) logger := makeLogger(log.FromContext(ctx), tag, engineUUID) if err := be.backend.OpenEngine(ctx, config, engineUUID); err != nil { @@ -298,7 +298,7 @@ func (engine *OpenedEngine) LocalWriter(ctx context.Context, cfg *LocalWriterCon // resuming from a checkpoint. func (be EngineManager) UnsafeCloseEngine(ctx context.Context, cfg *EngineConfig, tableName string, engineID int32) (*ClosedEngine, error) { - tag, engineUUID := MakeUUID(tableName, engineID) + tag, engineUUID := MakeUUID(tableName, int64(engineID)) return be.UnsafeCloseEngineWithUUID(ctx, cfg, tag, engineUUID, engineID) } diff --git a/br/pkg/lightning/backend/external/BUILD.bazel b/br/pkg/lightning/backend/external/BUILD.bazel index 222586fb92db0..8dc5e30b31ffd 100644 --- a/br/pkg/lightning/backend/external/BUILD.bazel +++ b/br/pkg/lightning/backend/external/BUILD.bazel @@ -19,6 +19,9 @@ go_library( importpath = "github.com/pingcap/tidb/br/pkg/lightning/backend/external", visibility = ["//visibility:public"], deps = [ + "//br/pkg/lightning/backend", + "//br/pkg/lightning/backend/encode", + "//br/pkg/lightning/backend/kv", "//br/pkg/lightning/common", "//br/pkg/lightning/log", "//br/pkg/membuf", diff --git a/br/pkg/lightning/backend/external/util.go b/br/pkg/lightning/backend/external/util.go index 4529ae14f5451..d433e4816998c 100644 --- a/br/pkg/lightning/backend/external/util.go +++ b/br/pkg/lightning/backend/external/util.go @@ -239,3 +239,71 @@ func GetMaxOverlapping(points []Endpoint) int64 { } return maxWeight } + +// SortedKVMeta is the meta of sorted kv. +type SortedKVMeta struct { + MinKey []byte `json:"min_key"` + MaxKey []byte `json:"max_key"` + TotalKVSize uint64 `json:"total_kv_size"` + DataFiles []string `json:"data_files"` + StatFiles []string `json:"stat_files"` +} + +// NewSortedKVMeta creates a SortedKVMeta from a WriterSummary. +func NewSortedKVMeta(summary *WriterSummary) *SortedKVMeta { + meta := &SortedKVMeta{ + MinKey: summary.Min.Clone(), + MaxKey: summary.Max.Clone(), + TotalKVSize: summary.TotalSize, + } + for _, f := range summary.MultipleFilesStats { + for _, filename := range f.Filenames { + meta.DataFiles = append(meta.DataFiles, filename[0]) + meta.StatFiles = append(meta.StatFiles, filename[1]) + } + } + return meta +} + +// Merge merges the other SortedKVMeta into this one. +func (m *SortedKVMeta) Merge(other *SortedKVMeta) { + m.MinKey = NotNilMin(m.MinKey, other.MinKey) + m.MaxKey = NotNilMax(m.MaxKey, other.MaxKey) + m.TotalKVSize += other.TotalKVSize + + m.DataFiles = append(m.DataFiles, other.DataFiles...) + m.StatFiles = append(m.StatFiles, other.StatFiles...) +} + +// MergeSummary merges the WriterSummary into this SortedKVMeta. +func (m *SortedKVMeta) MergeSummary(summary *WriterSummary) { + m.Merge(NewSortedKVMeta(summary)) +} + +// NotNilMin returns the smallest of a and b, ignoring nil values. +func NotNilMin(a, b []byte) []byte { + if len(a) == 0 { + return b + } + if len(b) == 0 { + return a + } + if bytes.Compare(a, b) < 0 { + return a + } + return b +} + +// NotNilMax returns the largest of a and b, ignoring nil values. +func NotNilMax(a, b []byte) []byte { + if len(a) == 0 { + return b + } + if len(b) == 0 { + return a + } + if bytes.Compare(a, b) > 0 { + return a + } + return b +} diff --git a/br/pkg/lightning/backend/external/writer.go b/br/pkg/lightning/backend/external/writer.go index 8d7642841f403..522705540b6e5 100644 --- a/br/pkg/lightning/backend/external/writer.go +++ b/br/pkg/lightning/backend/external/writer.go @@ -24,6 +24,9 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/tidb/br/pkg/lightning/backend" + "github.com/pingcap/tidb/br/pkg/lightning/backend/encode" + "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/membuf" "github.com/pingcap/tidb/br/pkg/storage" @@ -442,3 +445,39 @@ func (w *Writer) createStorageWriter(ctx context.Context) ( } return dataPath, statPath, dataWriter, statsWriter, nil } + +// EngineWriter implements backend.EngineWriter interface. +type EngineWriter struct { + w *Writer +} + +// NewEngineWriter creates a new EngineWriter. +func NewEngineWriter(w *Writer) *EngineWriter { + return &EngineWriter{w: w} +} + +// AppendRows implements backend.EngineWriter interface. +func (e *EngineWriter) AppendRows(ctx context.Context, _ []string, rows encode.Rows) error { + kvs := kv.Rows2KvPairs(rows) + if len(kvs) == 0 { + return nil + } + for _, item := range kvs { + err := e.w.WriteRow(ctx, item.Key, item.Val, nil) + if err != nil { + return err + } + } + return nil +} + +// IsSynced implements backend.EngineWriter interface. +func (e *EngineWriter) IsSynced() bool { + // only used when saving checkpoint + return true +} + +// Close implements backend.EngineWriter interface. +func (e *EngineWriter) Close(ctx context.Context) (backend.ChunkFlushStatus, error) { + return nil, e.w.Close(ctx) +} diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index b8dafdaeee342..cf39bbfb73b52 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1469,7 +1469,7 @@ func (local *Backend) ImportEngine( log.FromContext(ctx).Info("engine contains no kv, skip import", zap.Stringer("engine", engineUUID)) return nil } - kvRegionSplitSize, kvRegionSplitKeys, err := getRegionSplitSizeKeys(ctx, local.pdCtl.GetPDClient(), local.tls) + kvRegionSplitSize, kvRegionSplitKeys, err := GetRegionSplitSizeKeys(ctx, local.pdCtl.GetPDClient(), local.tls) if err == nil { if kvRegionSplitSize > regionSplitSize { regionSplitSize = kvRegionSplitSize @@ -1549,7 +1549,7 @@ func (local *Backend) ImportEngine( // GetRegionSplitSizeKeys gets the region split size and keys from PD. func (local *Backend) GetRegionSplitSizeKeys(ctx context.Context) (finalSize int64, finalKeys int64, err error) { - return getRegionSplitSizeKeys(ctx, local.pdCtl.GetPDClient(), local.tls) + return GetRegionSplitSizeKeys(ctx, local.pdCtl.GetPDClient(), local.tls) } // expose these variables to unit test. @@ -1689,6 +1689,16 @@ func (local *Backend) GetImportedKVCount(engineUUID uuid.UUID) int64 { return e.importedKVCount.Load() } +// GetExternalEngineKVStatistics returns kv statistics of some engine. +func (local *Backend) GetExternalEngineKVStatistics(engineUUID uuid.UUID) ( + totalKVSize int64, totalKVCount int64) { + v, ok := local.externalEngine[engineUUID] + if !ok { + return 0, 0 + } + return v.KVStatistics() +} + // ResetEngine reset the engine and reclaim the space. func (local *Backend) ResetEngine(ctx context.Context, engineUUID uuid.UUID) error { // the only way to reset the engine + reclaim the space is to delete and reopen it 🤷 @@ -1927,8 +1937,8 @@ func getSplitConfFromStore(ctx context.Context, host string, tls *common.TLS) ( return splitSize, nested.Coprocessor.RegionSplitKeys, nil } -// return region split size, region split keys, error -func getRegionSplitSizeKeys(ctx context.Context, cli pd.Client, tls *common.TLS) ( +// GetRegionSplitSizeKeys return region split size, region split keys, error +func GetRegionSplitSizeKeys(ctx context.Context, cli pd.Client, tls *common.TLS) ( regionSplitSize int64, regionSplitKeys int64, err error) { stores, err := cli.GetAllStores(ctx, pd.WithExcludeTombstone()) if err != nil { diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index 1976d97e00f52..87aae043d7ffa 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -1123,7 +1123,7 @@ func TestGetRegionSplitSizeKeys(t *testing.T) { } return 0, 0, errors.New("invalid connection") } - splitSize, splitKeys, err := getRegionSplitSizeKeys(ctx, cli, nil) + splitSize, splitKeys, err := GetRegionSplitSizeKeys(ctx, cli, nil) require.NoError(t, err) require.Equal(t, int64(1), splitSize) require.Equal(t, int64(2), splitKeys) diff --git a/br/pkg/lightning/importer/import.go b/br/pkg/lightning/importer/import.go index 9ae64c784403a..2f41331b3ce1a 100644 --- a/br/pkg/lightning/importer/import.go +++ b/br/pkg/lightning/importer/import.go @@ -978,7 +978,7 @@ func verifyLocalFile(ctx context.Context, cpdb checkpoints.DB, dir string) error } for tableName, engineIDs := range targetTables { for _, engineID := range engineIDs { - _, eID := backend.MakeUUID(tableName, engineID) + _, eID := backend.MakeUUID(tableName, int64(engineID)) file := local.Engine{UUID: eID} err := file.Exist(dir) if err != nil { diff --git a/ddl/backfilling_dispatcher.go b/ddl/backfilling_dispatcher.go index f10b78a49f049..ff5f57f6f34d8 100644 --- a/ddl/backfilling_dispatcher.go +++ b/ddl/backfilling_dispatcher.go @@ -367,12 +367,14 @@ func generateMergeSortPlan( hex.EncodeToString(startKey), hex.EncodeToString(endKey)) } m := &BackfillSubTaskMeta{ - MinKey: startKey, - MaxKey: endKey, - DataFiles: dataFiles, - StatFiles: statFiles, + SortedKVMeta: external.SortedKVMeta{ + MinKey: startKey, + MaxKey: endKey, + DataFiles: dataFiles, + StatFiles: statFiles, + TotalKVSize: totalSize / uint64(len(instanceIDs)), + }, RangeSplitKeys: rangeSplitKeys, - TotalKVSize: totalSize / uint64(len(instanceIDs)), } metaBytes, err := json.Marshal(m) if err != nil { @@ -404,7 +406,9 @@ func generateMergePlan( end = len(dataFiles) } m := &BackfillSubTaskMeta{ - DataFiles: dataFiles[start:end], + SortedKVMeta: external.SortedKVMeta{ + DataFiles: dataFiles[start:end], + }, } metaBytes, err := json.Marshal(m) if err != nil { @@ -475,8 +479,8 @@ func getSummaryFromLastStep( } // Skip empty subtask.MinKey/MaxKey because it means // no records need to be written in this subtask. - minKey = notNilMin(minKey, subtask.MinKey) - maxKey = notNilMax(maxKey, subtask.MaxKey) + minKey = external.NotNilMin(minKey, subtask.MinKey) + maxKey = external.NotNilMax(maxKey, subtask.MaxKey) totalKVSize += subtask.TotalKVSize for _, stat := range subtask.MultipleFilesStats { @@ -502,31 +506,3 @@ func redactCloudStorageURI( } gTask.Meta = metaBytes } - -// notNilMin returns the smaller of a and b, ignoring nil values. -func notNilMin(a, b []byte) []byte { - if len(a) == 0 { - return b - } - if len(b) == 0 { - return a - } - if bytes.Compare(a, b) < 0 { - return a - } - return b -} - -// notNilMax returns the larger of a and b, ignoring nil values. -func notNilMax(a, b []byte) []byte { - if len(a) == 0 { - return b - } - if len(b) == 0 { - return a - } - if bytes.Compare(a, b) > 0 { - return a - } - return b -} diff --git a/ddl/backfilling_dist_scheduler.go b/ddl/backfilling_dist_scheduler.go index e811e814a6e06..4622c15bca5b1 100644 --- a/ddl/backfilling_dist_scheduler.go +++ b/ddl/backfilling_dist_scheduler.go @@ -45,12 +45,8 @@ type BackfillSubTaskMeta struct { StartKey []byte `json:"start_key"` EndKey []byte `json:"end_key"` - DataFiles []string `json:"data_files"` - StatFiles []string `json:"stat_files"` - RangeSplitKeys [][]byte `json:"range_split_keys"` - MinKey []byte `json:"min_key"` - MaxKey []byte `json:"max_key"` - TotalKVSize uint64 `json:"total_kv_size"` + RangeSplitKeys [][]byte `json:"range_split_keys"` + external.SortedKVMeta `json:",inline"` // MultipleFilesStats is the output of subtask, it will be used by the next subtask. MultipleFilesStats []external.MultipleFilesStat `json:"multiple_files_stats"` } diff --git a/ddl/backfilling_import_cloud.go b/ddl/backfilling_import_cloud.go index b5eaa3a013aa0..e658803bde5c6 100644 --- a/ddl/backfilling_import_cloud.go +++ b/ddl/backfilling_import_cloud.go @@ -77,7 +77,7 @@ func (m *cloudImportExecutor) RunSubtask(ctx context.Context, subtask *proto.Sub if local == nil { return errors.Errorf("local backend not found") } - _, engineUUID := backend.MakeUUID(m.ptbl.Meta().Name.L, int32(m.index.ID)) + _, engineUUID := backend.MakeUUID(m.ptbl.Meta().Name.L, m.index.ID) err = local.CloseEngine(ctx, &backend.EngineConfig{ External: &backend.ExternalEngineConfig{ StorageURI: m.cloudStoreURI, diff --git a/disttask/framework/dispatcher/BUILD.bazel b/disttask/framework/dispatcher/BUILD.bazel index 8fc2e14326112..edc610b3d2f1c 100644 --- a/disttask/framework/dispatcher/BUILD.bazel +++ b/disttask/framework/dispatcher/BUILD.bazel @@ -37,8 +37,9 @@ go_test( embed = [":dispatcher"], flaky = True, race = "off", - shard_count = 10, + shard_count = 11, deps = [ + "//disttask/framework/mock", "//disttask/framework/proto", "//disttask/framework/storage", "//domain/infosync", @@ -52,5 +53,6 @@ go_test( "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//util", "@org_uber_go_goleak//:goleak", + "@org_uber_go_mock//gomock", ], ) diff --git a/disttask/framework/dispatcher/dispatcher.go b/disttask/framework/dispatcher/dispatcher.go index e0c3ac3d58044..9d15de58550b7 100644 --- a/disttask/framework/dispatcher/dispatcher.go +++ b/disttask/framework/dispatcher/dispatcher.go @@ -67,9 +67,13 @@ type TaskHandle interface { // Dispatcher manages the lifetime of a task // including submitting subtasks and updating the status of a task. type Dispatcher interface { + // Init initializes the dispatcher, should be called before ExecuteTask. + // if Init returns error, dispatcher manager will fail the task directly, + // so the returned error should be a fatal error. + Init() error + // ExecuteTask start to schedule a task. ExecuteTask() - // Close closes the dispatcher, not routine-safe, and should be called - // after ExecuteTask finished. + // Close closes the dispatcher, should be called if Init returns nil. Close() } @@ -116,7 +120,12 @@ func NewBaseDispatcher(ctx context.Context, taskMgr *storage.TaskManager, server } } -// ExecuteTask start to schedule a task. +// Init implements the Dispatcher interface. +func (*BaseDispatcher) Init() error { + return nil +} + +// ExecuteTask implements the Dispatcher interface. func (d *BaseDispatcher) ExecuteTask() { logutil.Logger(d.logCtx).Info("execute one task", zap.String("state", d.Task.State), zap.Uint64("concurrency", d.Task.Concurrency)) diff --git a/disttask/framework/dispatcher/dispatcher_manager.go b/disttask/framework/dispatcher/dispatcher_manager.go index e13e769d9e109..e15fb1def2d54 100644 --- a/disttask/framework/dispatcher/dispatcher_manager.go +++ b/disttask/framework/dispatcher/dispatcher_manager.go @@ -176,12 +176,7 @@ func (dm *Manager) dispatchTaskLoop() { if GetDispatcherFactory(task.Type) == nil { logutil.BgLogger().Warn("unknown task type", zap.Int64("task-id", task.ID), zap.String("task-type", task.Type)) - prevState := task.State - task.State = proto.TaskStateFailed - task.Error = errors.New("unknown task type") - if _, err2 := dm.taskMgr.UpdateGlobalTaskAndAddSubTasks(task, nil, prevState); err2 != nil { - logutil.BgLogger().Warn("update task state of unknown type failed", zap.Error(err2)) - } + dm.failTask(task, errors.New("unknown task type")) continue } // the task is not in runningTasks set when: @@ -201,6 +196,16 @@ func (dm *Manager) dispatchTaskLoop() { } } +func (dm *Manager) failTask(task *proto.Task, err error) { + prevState := task.State + task.State = proto.TaskStateFailed + task.Error = err + if _, err2 := dm.taskMgr.UpdateGlobalTaskAndAddSubTasks(task, nil, prevState); err2 != nil { + logutil.BgLogger().Warn("failed to update task state to failed", + zap.Int64("task-id", task.ID), zap.Error(err2)) + } +} + func (dm *Manager) gcSubtaskHistoryTable() { historySubtaskTableGcInterval := defaultHistorySubtaskTableGcInterval failpoint.Inject("historySubtaskTableGcInterval", func(val failpoint.Value) { @@ -244,6 +249,11 @@ func (dm *Manager) startDispatcher(task *proto.Task) { _ = dm.gPool.Run(func() { dispatcherFactory := GetDispatcherFactory(task.Type) dispatcher := dispatcherFactory(dm.ctx, dm.taskMgr, dm.serverID, task) + if err := dispatcher.Init(); err != nil { + logutil.BgLogger().Error("init dispatcher failed", zap.Error(err)) + dm.failTask(task, err) + return + } defer dispatcher.Close() dm.setRunningTask(task, dispatcher) dispatcher.ExecuteTask() diff --git a/disttask/framework/dispatcher/dispatcher_test.go b/disttask/framework/dispatcher/dispatcher_test.go index bda19c0fb799d..93dfc1e956893 100644 --- a/disttask/framework/dispatcher/dispatcher_test.go +++ b/disttask/framework/dispatcher/dispatcher_test.go @@ -17,6 +17,7 @@ package dispatcher_test import ( "context" "fmt" + "strings" "testing" "time" @@ -24,6 +25,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/disttask/framework/dispatcher" + "github.com/pingcap/tidb/disttask/framework/mock" "github.com/pingcap/tidb/disttask/framework/proto" "github.com/pingcap/tidb/disttask/framework/storage" "github.com/pingcap/tidb/domain/infosync" @@ -32,6 +34,7 @@ import ( "github.com/pingcap/tidb/util/logutil" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/util" + "go.uber.org/mock/gomock" ) var ( @@ -200,6 +203,48 @@ func TestGetInstance(t *testing.T) { require.ElementsMatch(t, instanceIDs, serverIDs) } +func TestTaskFailInManager(t *testing.T) { + store := testkit.CreateMockStore(t) + gtk := testkit.NewTestKit(t, store) + pool := pools.NewResourcePool(func() (pools.Resource, error) { + return gtk.Session(), nil + }, 1, 1, time.Second) + defer pool.Close() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockDispatcher := mock.NewMockDispatcher(ctrl) + mockDispatcher.EXPECT().Init().Return(errors.New("mock dispatcher init error")) + + dspManager, mgr := MockDispatcherManager(t, pool) + dispatcher.RegisterDispatcherFactory(proto.TaskTypeExample, + func(ctx context.Context, taskMgr *storage.TaskManager, serverID string, task *proto.Task) dispatcher.Dispatcher { + return mockDispatcher + }) + dspManager.Start() + defer dspManager.Stop() + + // unknown task type + taskID, err := mgr.AddNewGlobalTask("test", "test-type", 1, nil) + require.NoError(t, err) + require.Eventually(t, func() bool { + task, err := mgr.GetGlobalTaskByID(taskID) + require.NoError(t, err) + return task.State == proto.TaskStateFailed && + strings.Contains(task.Error.Error(), "unknown task type") + }, time.Second*10, time.Millisecond*300) + + // dispatcher init error + taskID, err = mgr.AddNewGlobalTask("test2", proto.TaskTypeExample, 1, nil) + require.NoError(t, err) + require.Eventually(t, func() bool { + task, err := mgr.GetGlobalTaskByID(taskID) + require.NoError(t, err) + return task.State == proto.TaskStateFailed && + strings.Contains(task.Error.Error(), "mock dispatcher init error") + }, time.Second*10, time.Millisecond*300) +} + func checkDispatch(t *testing.T, taskCnt int, isSucc, isCancel, isSubtaskCancel bool) { require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/MockDisableDistTask", "return(true)")) defer func() { diff --git a/disttask/framework/mock/BUILD.bazel b/disttask/framework/mock/BUILD.bazel index d16d12502c432..1e02ac8e55499 100644 --- a/disttask/framework/mock/BUILD.bazel +++ b/disttask/framework/mock/BUILD.bazel @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "mock", srcs = [ + "dispatcher_mock.go", "plan_mock.go", "scheduler_mock.go", ], diff --git a/disttask/framework/mock/dispatcher_mock.go b/disttask/framework/mock/dispatcher_mock.go new file mode 100644 index 0000000000000..abb3ad16df85f --- /dev/null +++ b/disttask/framework/mock/dispatcher_mock.go @@ -0,0 +1,72 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/pingcap/tidb/disttask/framework/dispatcher (interfaces: Dispatcher) + +// Package mock is a generated GoMock package. +package mock + +import ( + reflect "reflect" + + gomock "go.uber.org/mock/gomock" +) + +// MockDispatcher is a mock of Dispatcher interface. +type MockDispatcher struct { + ctrl *gomock.Controller + recorder *MockDispatcherMockRecorder +} + +// MockDispatcherMockRecorder is the mock recorder for MockDispatcher. +type MockDispatcherMockRecorder struct { + mock *MockDispatcher +} + +// NewMockDispatcher creates a new mock instance. +func NewMockDispatcher(ctrl *gomock.Controller) *MockDispatcher { + mock := &MockDispatcher{ctrl: ctrl} + mock.recorder = &MockDispatcherMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockDispatcher) EXPECT() *MockDispatcherMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockDispatcher) Close() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Close") +} + +// Close indicates an expected call of Close. +func (mr *MockDispatcherMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockDispatcher)(nil).Close)) +} + +// ExecuteTask mocks base method. +func (m *MockDispatcher) ExecuteTask() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "ExecuteTask") +} + +// ExecuteTask indicates an expected call of ExecuteTask. +func (mr *MockDispatcherMockRecorder) ExecuteTask() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExecuteTask", reflect.TypeOf((*MockDispatcher)(nil).ExecuteTask)) +} + +// Init mocks base method. +func (m *MockDispatcher) Init() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Init") + ret0, _ := ret[0].(error) + return ret0 +} + +// Init indicates an expected call of Init. +func (mr *MockDispatcherMockRecorder) Init() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockDispatcher)(nil).Init)) +} diff --git a/disttask/framework/planner/plan.go b/disttask/framework/planner/plan.go index 2e20be064826d..022b071c7475a 100644 --- a/disttask/framework/planner/plan.go +++ b/disttask/framework/planner/plan.go @@ -33,6 +33,8 @@ type PlanCtx struct { // PreviousSubtaskMetas is a list of subtask metas from previous step. // We can remove this field if we find a better way to pass the result between steps. PreviousSubtaskMetas [][]byte + CurrTaskStep int64 + NextTaskStep int64 } // LogicalPlan represents a logical plan in distribute framework. diff --git a/disttask/importinto/BUILD.bazel b/disttask/importinto/BUILD.bazel index 782bfd16f9bda..ecab609ebc2c7 100644 --- a/disttask/importinto/BUILD.bazel +++ b/disttask/importinto/BUILD.bazel @@ -17,6 +17,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//br/pkg/lightning/backend", + "//br/pkg/lightning/backend/external", "//br/pkg/lightning/backend/kv", "//br/pkg/lightning/backend/local", "//br/pkg/lightning/checkpoints", @@ -26,6 +27,7 @@ go_library( "//br/pkg/lightning/metric", "//br/pkg/lightning/mydump", "//br/pkg/lightning/verification", + "//br/pkg/storage", "//br/pkg/utils", "//config", "//disttask/framework/dispatcher", @@ -86,6 +88,7 @@ go_test( race = "on", shard_count = 8, deps = [ + "//br/pkg/lightning/backend", "//br/pkg/lightning/checkpoints", "//br/pkg/lightning/mydump", "//br/pkg/lightning/verification", @@ -99,7 +102,6 @@ go_test( "//executor/importer", "//meta/autoid", "//parser/model", - "//parser/mysql", "//testkit", "//util/logutil", "//util/sqlexec", diff --git a/disttask/importinto/dispatcher.go b/disttask/importinto/dispatcher.go index 19dfffa75c04d..e7bc3d464c79e 100644 --- a/disttask/importinto/dispatcher.go +++ b/disttask/importinto/dispatcher.go @@ -122,7 +122,8 @@ func (t *taskInfo) close(ctx context.Context) { // ImportDispatcherExt is an extension of ImportDispatcher, exported for test. type ImportDispatcherExt struct { - mu sync.RWMutex + globalSort bool + mu sync.RWMutex // NOTE: there's no need to sync for below 2 fields actually, since we add a restriction that only one // task can be running at a time. but we might support task queuing in the future, leave it for now. // the last time we switch TiKV into IMPORT mode, this is a global operation, do it for one task makes @@ -196,13 +197,14 @@ func (dsp *ImportDispatcherExt) OnNextSubtasksBatch( ctx context.Context, taskHandle dispatcher.TaskHandle, gTask *proto.Task, - _ int64, + nextStep int64, ) ( resSubtaskMeta [][]byte, err error) { logger := logutil.BgLogger().With( zap.String("type", gTask.Type), zap.Int64("task-id", gTask.ID), - zap.String("step", stepStr(gTask.Step)), + zap.String("curr-step", stepStr(gTask.Step)), + zap.String("next-step", stepStr(nextStep)), ) taskMeta := &TaskMeta{} err = json.Unmarshal(gTask.Meta, taskMeta) @@ -228,18 +230,28 @@ func (dsp *ImportDispatcherExt) OnNextSubtasksBatch( } }() + var previousSubtaskMetas [][]byte switch gTask.Step { case proto.StepInit: if metrics, ok := metric.GetCommonMetric(ctx); ok { metrics.BytesCounter.WithLabelValues(metric.StateTotalRestore).Add(float64(taskMeta.Plan.TotalFileSize)) } - if err := preProcess(ctx, taskHandle, gTask, taskMeta, logger); err != nil { + jobStep := importer.JobStepImporting + if dsp.globalSort { + jobStep = importer.JobStepGlobalSorting + } + if err = startJob(ctx, logger, taskHandle, taskMeta, jobStep); err != nil { return nil, err } - if err = startJob(ctx, logger, taskHandle, taskMeta); err != nil { + case StepEncodeAndSort: + previousSubtaskMetas, err = taskHandle.GetPreviousSubtaskMetas(gTask.ID, StepEncodeAndSort) + if err != nil { return nil, err } - case StepImport: + if err = job2Step(ctx, logger, taskMeta, importer.JobStepImporting); err != nil { + return nil, err + } + case StepImport, StepWriteAndIngest: dsp.switchTiKV2NormalMode(ctx, gTask, logger) failpoint.Inject("clearLastSwitchTime", func() { dsp.lastSwitchTime.Store(time.Time{}) @@ -253,6 +265,15 @@ func (dsp *ImportDispatcherExt) OnNextSubtasksBatch( if err := updateResult(taskHandle, gTask, taskMeta); err != nil { return nil, err } + // we need get metas where checksum is stored. + step := StepImport + if dsp.globalSort { + step = StepEncodeAndSort + } + previousSubtaskMetas, err = taskHandle.GetPreviousSubtaskMetas(gTask.ID, step) + if err != nil { + return nil, err + } logger.Info("move to post-process step ", zap.Any("result", taskMeta.Result)) case StepPostProcess: return nil, nil @@ -260,11 +281,12 @@ func (dsp *ImportDispatcherExt) OnNextSubtasksBatch( return nil, errors.Errorf("unknown step %d", gTask.Step) } - previousSubtaskMetas, err := taskHandle.GetPreviousSubtaskMetas(gTask.ID, gTask.Step) - if err != nil { - return nil, err + planCtx := planner.PlanCtx{ + Ctx: ctx, + PreviousSubtaskMetas: previousSubtaskMetas, + CurrTaskStep: gTask.Step, + NextTaskStep: nextStep, } - planCtx := planner.PlanCtx{Ctx: ctx, PreviousSubtaskMetas: previousSubtaskMetas} logicalPlan := &LogicalPlan{} if err := logicalPlan.FromTaskMeta(gTask.Meta); err != nil { return nil, err @@ -273,7 +295,7 @@ func (dsp *ImportDispatcherExt) OnNextSubtasksBatch( if err != nil { return nil, err } - metaBytes, err := physicalPlan.ToSubtaskMetas(planCtx, dsp.GetNextStep(taskHandle, gTask)) + metaBytes, err := physicalPlan.ToSubtaskMetas(planCtx, nextStep) if err != nil { return nil, err } @@ -340,11 +362,16 @@ func (*ImportDispatcherExt) IsRetryableErr(error) bool { } // GetNextStep implements dispatcher.Extension interface. -func (*ImportDispatcherExt) GetNextStep(_ dispatcher.TaskHandle, task *proto.Task) int64 { +func (dsp *ImportDispatcherExt) GetNextStep(_ dispatcher.TaskHandle, task *proto.Task) int64 { switch task.Step { case proto.StepInit: + if dsp.globalSort { + return StepEncodeAndSort + } return StepImport - case StepImport: + case StepEncodeAndSort: + return StepWriteAndIngest + case StepImport, StepWriteAndIngest: return StepPostProcess default: // current step must be StepPostProcess @@ -391,11 +418,28 @@ func newImportDispatcher(ctx context.Context, taskMgr *storage.TaskManager, serverID string, task *proto.Task) dispatcher.Dispatcher { metrics := metricsManager.getOrCreateMetrics(task.ID) subCtx := metric.WithCommonMetric(ctx, metrics) - dis := importDispatcher{ + dsp := importDispatcher{ BaseDispatcher: dispatcher.NewBaseDispatcher(subCtx, taskMgr, serverID, task), } - dis.BaseDispatcher.Extension = &ImportDispatcherExt{} - return &dis + return &dsp +} + +func (dsp *importDispatcher) Init() (err error) { + defer func() { + if err != nil { + // if init failed, close is not called, so we need to unregister here. + metricsManager.unregister(dsp.Task.ID) + } + }() + taskMeta := &TaskMeta{} + if err = json.Unmarshal(dsp.BaseDispatcher.Task.Meta, taskMeta); err != nil { + return errors.Annotate(err, "unmarshal task meta failed") + } + + dsp.BaseDispatcher.Extension = &ImportDispatcherExt{ + globalSort: taskMeta.Plan.CloudStorageURI != "", + } + return dsp.BaseDispatcher.Init() } func (dsp *importDispatcher) Close() { @@ -403,16 +447,6 @@ func (dsp *importDispatcher) Close() { dsp.BaseDispatcher.Close() } -// preProcess does the pre-processing for the task. -func preProcess(_ context.Context, _ dispatcher.TaskHandle, gTask *proto.Task, taskMeta *TaskMeta, logger *zap.Logger) error { - logger.Info("pre process") - // TODO: drop table indexes depends on the option. - // if err := dropTableIndexes(ctx, handle, taskMeta, logger); err != nil { - // return err - // } - return updateMeta(gTask, taskMeta) -} - // nolint:deadcode func dropTableIndexes(ctx context.Context, handle dispatcher.TaskHandle, taskMeta *TaskMeta, logger *zap.Logger) error { tblInfo := taskMeta.Plan.TableInfo @@ -530,7 +564,7 @@ func updateResult(handle dispatcher.TaskHandle, gTask *proto.Task, taskMeta *Tas return updateMeta(gTask, taskMeta) } -func startJob(ctx context.Context, logger *zap.Logger, taskHandle dispatcher.TaskHandle, taskMeta *TaskMeta) error { +func startJob(ctx context.Context, logger *zap.Logger, taskHandle dispatcher.TaskHandle, taskMeta *TaskMeta, jobStep string) error { failpoint.Inject("syncBeforeJobStarted", func() { TestSyncChan <- struct{}{} <-TestSyncChan @@ -544,7 +578,7 @@ func startJob(ctx context.Context, logger *zap.Logger, taskHandle dispatcher.Tas func(ctx context.Context) (bool, error) { return true, taskHandle.WithNewSession(func(se sessionctx.Context) error { exec := se.(sqlexec.SQLExecutor) - return importer.StartJob(ctx, exec, taskMeta.JobID) + return importer.StartJob(ctx, exec, taskMeta.JobID, jobStep) }) }, ) @@ -610,6 +644,9 @@ func (dsp *ImportDispatcherExt) failJob(ctx context.Context, taskHandle dispatch func redactSensitiveInfo(gTask *proto.Task, taskMeta *TaskMeta) { taskMeta.Stmt = "" taskMeta.Plan.Path = ast.RedactURL(taskMeta.Plan.Path) + if taskMeta.Plan.CloudStorageURI != "" { + taskMeta.Plan.CloudStorageURI = ast.RedactURL(taskMeta.Plan.CloudStorageURI) + } if err := updateMeta(gTask, taskMeta); err != nil { // marshal failed, should not happen logutil.BgLogger().Warn("failed to update task meta", zap.Error(err)) @@ -645,10 +682,18 @@ func rollback(ctx context.Context, handle dispatcher.TaskHandle, gTask *proto.Ta func stepStr(step int64) string { switch step { + case proto.StepInit: + return "init" case StepImport: return "import" case StepPostProcess: return "postprocess" + case StepEncodeAndSort: + return "encode&sort" + case StepWriteAndIngest: + return "write&ingest" + case proto.StepDone: + return "done" default: return "unknown" } diff --git a/disttask/importinto/dispatcher_testkit_test.go b/disttask/importinto/dispatcher_testkit_test.go index 4a499e6d45728..fc8bae2681a1b 100644 --- a/disttask/importinto/dispatcher_testkit_test.go +++ b/disttask/importinto/dispatcher_testkit_test.go @@ -139,7 +139,7 @@ func TestDispatcherExt(t *testing.T) { task.Meta = bs // Set step to StepPostProcess to skip the rollback sql. task.Step = importinto.StepPostProcess - require.NoError(t, importer.StartJob(ctx, conn, jobID)) + require.NoError(t, importer.StartJob(ctx, conn, jobID, importer.JobStepImporting)) _, err = ext.OnErrStage(ctx, d, task, []error{errors.New("test")}) require.NoError(t, err) gotJobInfo, err = importer.GetJob(ctx, conn, jobID, "root", true) diff --git a/disttask/importinto/encode_and_sort_operator.go b/disttask/importinto/encode_and_sort_operator.go index fa5e32e7b48f5..f6ac48f57628d 100644 --- a/disttask/importinto/encode_and_sort_operator.go +++ b/disttask/importinto/encode_and_sort_operator.go @@ -16,9 +16,15 @@ package importinto import ( "context" + "path" + "strconv" + "time" + "github.com/google/uuid" "github.com/pingcap/errors" + "github.com/pingcap/tidb/br/pkg/lightning/backend/external" "github.com/pingcap/tidb/disttask/operator" + "github.com/pingcap/tidb/executor/importer" "github.com/pingcap/tidb/resourcemanager/pool/workerpool" "github.com/pingcap/tidb/resourcemanager/util" tidbutil "github.com/pingcap/tidb/util" @@ -26,6 +32,10 @@ import ( "go.uber.org/zap" ) +const ( + maxWaitDuration = 30 * time.Second +) + // encodeAndSortOperator is an operator that encodes and sorts data. // this operator process data of a subtask, i.e. one engine, it contains a lot // of data chunks, each chunk is a data file or part of it. @@ -39,31 +49,35 @@ type encodeAndSortOperator struct { ctx context.Context cancel context.CancelFunc - logger *zap.Logger - errCh chan error + taskID, subtaskID int64 + tableImporter *importer.TableImporter + sharedVars *SharedVars + logger *zap.Logger + errCh chan error } var _ operator.Operator = (*encodeAndSortOperator)(nil) var _ operator.WithSource[*importStepMinimalTask] = (*encodeAndSortOperator)(nil) var _ operator.WithSink[workerpool.None] = (*encodeAndSortOperator)(nil) -func newEncodeAndSortOperator(ctx context.Context, concurrency int, logger *zap.Logger) *encodeAndSortOperator { +func newEncodeAndSortOperator(ctx context.Context, executor *importStepExecutor, sharedVars *SharedVars, subtaskID int64) *encodeAndSortOperator { subCtx, cancel := context.WithCancel(ctx) op := &encodeAndSortOperator{ - ctx: subCtx, - cancel: cancel, - logger: logger, - errCh: make(chan error), + ctx: subCtx, + cancel: cancel, + taskID: executor.taskID, + subtaskID: subtaskID, + tableImporter: executor.tableImporter, + sharedVars: sharedVars, + logger: executor.logger, + errCh: make(chan error), } pool := workerpool.NewWorkerPool( "encodeAndSortOperator", util.ImportInto, - concurrency, + int(executor.taskMeta.Plan.ThreadCnt), func() workerpool.Worker[*importStepMinimalTask, workerpool.None] { - return &chunkWorker{ - ctx: subCtx, - op: op, - } + return newChunkWorker(ctx, op) }, ) op.AsyncOperator = operator.NewAsyncOperator(subCtx, pool) @@ -116,6 +130,42 @@ func (op *encodeAndSortOperator) Done() <-chan struct{} { type chunkWorker struct { ctx context.Context op *encodeAndSortOperator + + dataWriter *external.EngineWriter + indexWriter *importer.IndexRouteWriter +} + +func newChunkWorker(ctx context.Context, op *encodeAndSortOperator) *chunkWorker { + w := &chunkWorker{ + ctx: ctx, + op: op, + } + if op.tableImporter.IsGlobalSort() { + // in case on network partition, 2 nodes might run the same subtask. + workerUUID := uuid.New().String() + // sorted index kv storage path: /{taskID}/{subtaskID}/index/{indexID}/{workerID} + indexWriterFn := func(indexID int64) *external.Writer { + builder := external.NewWriterBuilder(). + SetOnCloseFunc(func(summary *external.WriterSummary) { + op.sharedVars.mergeIndexSummary(indexID, summary) + }) + prefix := path.Join(strconv.Itoa(int(op.taskID)), strconv.Itoa(int(op.subtaskID))) + writerID := path.Join("index", strconv.Itoa(int(indexID)), workerUUID) + writer := builder.Build(op.tableImporter.GlobalSortStore, prefix, writerID) + return writer + } + + // sorted data kv storage path: /{taskID}/{subtaskID}/data/{workerID} + builder := external.NewWriterBuilder(). + SetOnCloseFunc(op.sharedVars.mergeDataSummary) + prefix := path.Join(strconv.Itoa(int(op.taskID)), strconv.Itoa(int(op.subtaskID))) + writerID := path.Join("data", workerUUID) + writer := builder.Build(op.tableImporter.GlobalSortStore, prefix, writerID) + w.dataWriter = external.NewEngineWriter(writer) + + w.indexWriter = importer.NewIndexRouteWriter(op.logger, indexWriterFn) + } + return w } func (w *chunkWorker) HandleTask(task *importStepMinimalTask, _ func(workerpool.None)) { @@ -125,10 +175,29 @@ func (w *chunkWorker) HandleTask(task *importStepMinimalTask, _ func(workerpool. // we don't use the input send function, it makes workflow more complex // we send result to errCh and handle it here. executor := newImportMinimalTaskExecutor(task) - if err := executor.Run(w.ctx); err != nil { + if err := executor.Run(w.ctx, w.dataWriter, w.indexWriter); err != nil { w.op.onError(err) } } -func (*chunkWorker) Close() { +func (w *chunkWorker) Close() { + closeCtx := w.ctx + if closeCtx.Err() != nil { + // in case of context canceled, we need to create a new context to close writers. + newCtx, cancel := context.WithTimeout(context.Background(), maxWaitDuration) + closeCtx = newCtx + defer cancel() + } + if w.dataWriter != nil { + // Note: we cannot ignore close error as we're writing to S3 or GCS. + // ignore error might cause data loss. below too. + if _, err := w.dataWriter.Close(closeCtx); err != nil { + w.op.onError(errors.Trace(err)) + } + } + if w.indexWriter != nil { + if _, err := w.indexWriter.Close(closeCtx); err != nil { + w.op.onError(errors.Trace(err)) + } + } } diff --git a/disttask/importinto/encode_and_sort_operator_test.go b/disttask/importinto/encode_and_sort_operator_test.go index 3a11043750bf8..24b29e8fcff9d 100644 --- a/disttask/importinto/encode_and_sort_operator_test.go +++ b/disttask/importinto/encode_and_sort_operator_test.go @@ -24,8 +24,10 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/disttask/importinto/mock" "github.com/pingcap/tidb/disttask/operator" + "github.com/pingcap/tidb/executor/importer" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" "go.uber.org/zap" @@ -54,15 +56,32 @@ func TestEncodeAndSortOperator(t *testing.T) { return executor } + executorForParam := &importStepExecutor{ + taskID: 1, + taskMeta: &TaskMeta{ + Plan: importer.Plan{ + ThreadCnt: 2, + }, + }, + tableImporter: &importer.TableImporter{ + LoadDataController: &importer.LoadDataController{ + Plan: &importer.Plan{ + CloudStorageURI: "", + }, + }, + }, + logger: logger, + } + source := operator.NewSimpleDataChannel(make(chan *importStepMinimalTask)) - op := newEncodeAndSortOperator(context.Background(), 3, logger) + op := newEncodeAndSortOperator(context.Background(), executorForParam, nil, 3) op.SetSource(source) require.NoError(t, op.Open()) require.Greater(t, len(op.String()), 0) // cancel on error mockErr := errors.New("mock err") - executor.EXPECT().Run(gomock.Any()).Return(mockErr) + executor.EXPECT().Run(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockErr) source.Channel() <- &importStepMinimalTask{} require.Eventually(t, func() bool { return op.hasError() @@ -75,7 +94,7 @@ func TestEncodeAndSortOperator(t *testing.T) { // cancel on error and log other errors mockErr2 := errors.New("mock err 2") source = operator.NewSimpleDataChannel(make(chan *importStepMinimalTask)) - op = newEncodeAndSortOperator(context.Background(), 2, logger) + op = newEncodeAndSortOperator(context.Background(), executorForParam, nil, 2) op.SetSource(source) executor1 := mock.NewMockMiniTaskExecutor(ctrl) executor2 := mock.NewMockMiniTaskExecutor(ctrl) @@ -89,20 +108,22 @@ func TestEncodeAndSortOperator(t *testing.T) { var wg sync.WaitGroup wg.Add(2) // wait until 2 executor start running, else workerpool will be cancelled. - executor1.EXPECT().Run(gomock.Any()).DoAndReturn(func(context.Context) error { - wg.Done() - wg.Wait() - return mockErr2 - }) - executor2.EXPECT().Run(gomock.Any()).DoAndReturn(func(context.Context) error { - wg.Done() - wg.Wait() - // wait error in executor1 has been processed - require.Eventually(t, func() bool { - return op.hasError() - }, 3*time.Second, 300*time.Millisecond) - return errors.New("mock error should be logged") - }) + executor1.EXPECT().Run(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(context.Context, backend.EngineWriter, backend.EngineWriter) error { + wg.Done() + wg.Wait() + return mockErr2 + }) + executor2.EXPECT().Run(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(context.Context, backend.EngineWriter, backend.EngineWriter) error { + wg.Done() + wg.Wait() + // wait error in executor1 has been processed + require.Eventually(t, func() bool { + return op.hasError() + }, 3*time.Second, 300*time.Millisecond) + return errors.New("mock error should be logged") + }) require.NoError(t, op.Open()) // send 2 tasks source.Channel() <- &importStepMinimalTask{} diff --git a/disttask/importinto/job.go b/disttask/importinto/job.go index 2f6917cb4bba8..aa7c5978a20ba 100644 --- a/disttask/importinto/job.go +++ b/disttask/importinto/job.go @@ -258,24 +258,42 @@ func GetTaskImportedRows(jobID int64) (uint64, error) { return 0, err } taskKey := TaskKey(jobID) - globalTask, err := globalTaskManager.GetGlobalTaskByKey(taskKey) + task, err := globalTaskManager.GetGlobalTaskByKey(taskKey) if err != nil { return 0, err } - if globalTask == nil { + if task == nil { return 0, errors.Errorf("cannot find global task with key %s", taskKey) } - subtasks, err := globalTaskManager.GetSubtasksForImportInto(globalTask.ID, StepImport) - if err != nil { + taskMeta := TaskMeta{} + if err = json.Unmarshal(task.Meta, &taskMeta); err != nil { return 0, err } var importedRows uint64 - for _, subtask := range subtasks { - var subtaskMeta ImportStepMeta - if err2 := json.Unmarshal(subtask.Meta, &subtaskMeta); err2 != nil { - return 0, err2 + if taskMeta.Plan.CloudStorageURI == "" { + subtasks, err := globalTaskManager.GetSubtasksForImportInto(task.ID, StepImport) + if err != nil { + return 0, err + } + for _, subtask := range subtasks { + var subtaskMeta ImportStepMeta + if err2 := json.Unmarshal(subtask.Meta, &subtaskMeta); err2 != nil { + return 0, err2 + } + importedRows += subtaskMeta.Result.LoadedRowCnt + } + } else { + subtasks, err := globalTaskManager.GetSubtasksForImportInto(task.ID, StepWriteAndIngest) + if err != nil { + return 0, err + } + for _, subtask := range subtasks { + var subtaskMeta WriteIngestStepMeta + if err2 := json.Unmarshal(subtask.Meta, &subtaskMeta); err2 != nil { + return 0, err2 + } + importedRows += subtaskMeta.Result.LoadedRowCnt } - importedRows += subtaskMeta.Result.LoadedRowCnt } return importedRows, nil } diff --git a/disttask/importinto/mock/BUILD.bazel b/disttask/importinto/mock/BUILD.bazel index 9a780c155376d..902ed4332b31a 100644 --- a/disttask/importinto/mock/BUILD.bazel +++ b/disttask/importinto/mock/BUILD.bazel @@ -5,5 +5,8 @@ go_library( srcs = ["import_mock.go"], importpath = "github.com/pingcap/tidb/disttask/importinto/mock", visibility = ["//visibility:public"], - deps = ["@org_uber_go_mock//gomock"], + deps = [ + "//br/pkg/lightning/backend", + "@org_uber_go_mock//gomock", + ], ) diff --git a/disttask/importinto/mock/import_mock.go b/disttask/importinto/mock/import_mock.go index 63cce354f2db7..e5db685538d32 100644 --- a/disttask/importinto/mock/import_mock.go +++ b/disttask/importinto/mock/import_mock.go @@ -8,6 +8,7 @@ import ( context "context" reflect "reflect" + backend "github.com/pingcap/tidb/br/pkg/lightning/backend" gomock "go.uber.org/mock/gomock" ) @@ -35,15 +36,15 @@ func (m *MockMiniTaskExecutor) EXPECT() *MockMiniTaskExecutorMockRecorder { } // Run mocks base method. -func (m *MockMiniTaskExecutor) Run(arg0 context.Context) error { +func (m *MockMiniTaskExecutor) Run(arg0 context.Context, arg1, arg2 backend.EngineWriter) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Run", arg0) + ret := m.ctrl.Call(m, "Run", arg0, arg1, arg2) ret0, _ := ret[0].(error) return ret0 } // Run indicates an expected call of Run. -func (mr *MockMiniTaskExecutorMockRecorder) Run(arg0 interface{}) *gomock.Call { +func (mr *MockMiniTaskExecutorMockRecorder) Run(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockMiniTaskExecutor)(nil).Run), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockMiniTaskExecutor)(nil).Run), arg0, arg1, arg2) } diff --git a/disttask/importinto/planner.go b/disttask/importinto/planner.go index 5eb2483dbf188..869e22ff2f168 100644 --- a/disttask/importinto/planner.go +++ b/disttask/importinto/planner.go @@ -16,17 +16,28 @@ package importinto import ( "context" + "encoding/hex" "encoding/json" + "math" + "strconv" + "github.com/pingcap/errors" + "github.com/pingcap/tidb/br/pkg/lightning/backend/external" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/common" + "github.com/pingcap/tidb/br/pkg/lightning/config" verify "github.com/pingcap/tidb/br/pkg/lightning/verification" + "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/disttask/framework/planner" + "github.com/pingcap/tidb/disttask/framework/proto" "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/executor/importer" + tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/table/tables" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" ) var ( @@ -76,9 +87,9 @@ func (p *LogicalPlan) ToPhysicalPlan(planCtx planner.PlanCtx) (*planner.Physical inputLinks := make([]planner.LinkSpec, 0) // physical plan only needs to be generated once. // However, our current implementation requires generating it for each step. - // Only the first step needs to generate import specs. - // This is a fast path to bypass generating import spec multiple times (as we need to access the source data). - if len(planCtx.PreviousSubtaskMetas) == 0 { + // we only generate needed plans for the next step. + switch planCtx.CurrTaskStep { + case proto.StepInit: importSpecs, err := generateImportSpecs(planCtx.Ctx, p) if err != nil { return nil, err @@ -95,29 +106,53 @@ func (p *LogicalPlan) ToPhysicalPlan(planCtx planner.PlanCtx) (*planner.Physical }, }, }, - Step: StepImport, + Step: planCtx.NextTaskStep, }) inputLinks = append(inputLinks, planner.LinkSpec{ ProcessorID: i, }) } - } + case StepEncodeAndSort: + specs, err := generateWriteIngestSpecs(planCtx, p) + if err != nil { + return nil, err + } - physicalPlan.AddProcessor(planner.ProcessorSpec{ - ID: len(inputLinks), - Input: planner.InputSpec{ - ColumnTypes: []byte{ - // Checksum_crc64_xor, Total_kvs, Total_bytes, ReadRowCnt, LoadedRowCnt, ColSizeMap - mysql.TypeLonglong, mysql.TypeLonglong, mysql.TypeLonglong, mysql.TypeLonglong, mysql.TypeLonglong, mysql.TypeJSON, + for i, spec := range specs { + physicalPlan.AddProcessor(planner.ProcessorSpec{ + ID: i, + Pipeline: spec, + Output: planner.OutputSpec{ + Links: []planner.LinkSpec{ + { + ProcessorID: len(specs), + }, + }, + }, + Step: planCtx.NextTaskStep, + }) + inputLinks = append(inputLinks, planner.LinkSpec{ + ProcessorID: i, + }) + } + case StepImport, StepWriteAndIngest: + physicalPlan.AddProcessor(planner.ProcessorSpec{ + ID: len(inputLinks), + Input: planner.InputSpec{ + ColumnTypes: []byte{ + // Checksum_crc64_xor, Total_kvs, Total_bytes, ReadRowCnt, LoadedRowCnt, ColSizeMap + mysql.TypeLonglong, mysql.TypeLonglong, mysql.TypeLonglong, mysql.TypeLonglong, mysql.TypeLonglong, mysql.TypeJSON, + }, + Links: inputLinks, }, - Links: inputLinks, - }, - Pipeline: &PostProcessSpec{ - Schema: p.Plan.DBName, - Table: p.Plan.TableInfo.Name.L, - }, - Step: StepPostProcess, - }) + Pipeline: &PostProcessSpec{ + Schema: p.Plan.DBName, + Table: p.Plan.TableInfo.Name.L, + }, + Step: planCtx.NextTaskStep, + }) + } + return physicalPlan, nil } @@ -137,6 +172,16 @@ func (s *ImportSpec) ToSubtaskMeta(planner.PlanCtx) ([]byte, error) { return json.Marshal(importStepMeta) } +// WriteIngestSpec is the specification of a write-ingest pipeline. +type WriteIngestSpec struct { + *WriteIngestStepMeta +} + +// ToSubtaskMeta converts the write-ingest spec to subtask meta. +func (s *WriteIngestSpec) ToSubtaskMeta(planner.PlanCtx) ([]byte, error) { + return json.Marshal(s.WriteIngestStepMeta) +} + // PostProcessSpec is the specification of a post process pipeline. type PostProcessSpec struct { // for checksum request @@ -228,3 +273,126 @@ func generateImportSpecs(ctx context.Context, p *LogicalPlan) ([]*ImportSpec, er } return importSpecs, nil } + +func generateWriteIngestSpecs(planCtx planner.PlanCtx, p *LogicalPlan) ([]*WriteIngestSpec, error) { + ctx := planCtx.Ctx + controller, err2 := buildController(p) + if err2 != nil { + return nil, err2 + } + if err2 = controller.InitDataStore(ctx); err2 != nil { + return nil, err2 + } + // kvMetas contains data kv meta and all index kv metas. + // each kvMeta will be split into multiple range group individually, + // i.e. data and index kv will NOT be in the same subtask. + kvMetas, err := getSortedKVMetas(planCtx.PreviousSubtaskMetas) + if err != nil { + return nil, err + } + specs := make([]*WriteIngestSpec, 0, 16) + for kvGroup, kvMeta := range kvMetas { + splitter, err1 := getRangeSplitter(ctx, controller.GlobalSortStore, kvMeta) + if err1 != nil { + return nil, err1 + } + + err1 = func() error { + defer func() { + err2 := splitter.Close() + if err2 != nil { + logutil.Logger(ctx).Warn("close range splitter failed", zap.Error(err2)) + } + }() + startKey := tidbkv.Key(kvMeta.MinKey) + var endKey tidbkv.Key + for { + endKeyOfGroup, dataFiles, statFiles, rangeSplitKeys, err2 := splitter.SplitOneRangesGroup() + if err2 != nil { + return err2 + } + if len(endKeyOfGroup) == 0 { + endKey = tidbkv.Key(kvMeta.MaxKey).Next() + } else { + endKey = tidbkv.Key(endKeyOfGroup).Clone() + } + logutil.Logger(ctx).Info("kv range as subtask", + zap.String("startKey", hex.EncodeToString(startKey)), + zap.String("endKey", hex.EncodeToString(endKey))) + if startKey.Cmp(endKey) >= 0 { + return errors.Errorf("invalid kv range, startKey: %s, endKey: %s", + hex.EncodeToString(startKey), hex.EncodeToString(endKey)) + } + // each subtask will write and ingest one range group + m := &WriteIngestStepMeta{ + KVGroup: kvGroup, + SortedKVMeta: external.SortedKVMeta{ + MinKey: startKey, + MaxKey: endKey, + DataFiles: dataFiles, + StatFiles: statFiles, + // this is actually an estimate, we don't know the exact size of the data + TotalKVSize: uint64(config.DefaultBatchSize), + }, + RangeSplitKeys: rangeSplitKeys, + } + specs = append(specs, &WriteIngestSpec{m}) + + startKey = endKey + if len(endKeyOfGroup) == 0 { + break + } + } + return nil + }() + if err1 != nil { + return nil, err1 + } + } + return specs, nil +} + +func getSortedKVMetas(subTaskMetas [][]byte) (map[string]*external.SortedKVMeta, error) { + dataKVMeta := &external.SortedKVMeta{} + indexKVMetas := make(map[int64]*external.SortedKVMeta) + for _, subTaskMeta := range subTaskMetas { + var stepMeta ImportStepMeta + err := json.Unmarshal(subTaskMeta, &stepMeta) + if err != nil { + return nil, errors.Trace(err) + } + dataKVMeta.Merge(stepMeta.SortedDataMeta) + for indexID, sortedIndexMeta := range stepMeta.SortedIndexMetas { + if item, ok := indexKVMetas[indexID]; !ok { + indexKVMetas[indexID] = sortedIndexMeta + } else { + item.Merge(sortedIndexMeta) + } + } + } + res := make(map[string]*external.SortedKVMeta, 1+len(indexKVMetas)) + res[dataKVGroup] = dataKVMeta + for indexID, item := range indexKVMetas { + res[strconv.Itoa(int(indexID))] = item + } + return res, nil +} + +func getRangeSplitter(ctx context.Context, store storage.ExternalStorage, kvMeta *external.SortedKVMeta) ( + *external.RangeSplitter, error) { + regionSplitSize, regionSplitKeys, err := importer.GetRegionSplitSizeKeys(ctx) + if err != nil { + logutil.Logger(ctx).Warn("fail to get region split size and keys", zap.Error(err)) + } + regionSplitSize = max(regionSplitSize, int64(config.SplitRegionSize)) + regionSplitKeys = max(regionSplitKeys, int64(config.SplitRegionKeys)) + logutil.Logger(ctx).Info("split kv range with split size and keys", + zap.Int64("region-split-size", regionSplitSize), + zap.Int64("region-split-keys", regionSplitKeys)) + + return external.NewRangeSplitter( + ctx, kvMeta.DataFiles, kvMeta.StatFiles, store, + int64(config.DefaultBatchSize), int64(math.MaxInt64), + regionSplitSize, regionSplitKeys, + ) +} diff --git a/disttask/importinto/planner_test.go b/disttask/importinto/planner_test.go index 7ce4733b4d2ff..2c69e02c740fd 100644 --- a/disttask/importinto/planner_test.go +++ b/disttask/importinto/planner_test.go @@ -19,11 +19,11 @@ import ( "testing" "github.com/pingcap/tidb/disttask/framework/planner" + "github.com/pingcap/tidb/disttask/framework/proto" "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/executor/importer" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/parser/mysql" "github.com/stretchr/testify/require" ) @@ -56,7 +56,10 @@ func TestToPhysicalPlan(t *testing.T) { EligibleInstances: []*infosync.ServerInfo{{ID: "1"}}, ChunkMap: map[int32][]Chunk{chunkID: {{Path: "gs://test-load/1.csv"}}}, } - planCtx := planner.PlanCtx{} + planCtx := planner.PlanCtx{ + CurrTaskStep: proto.StepInit, + NextTaskStep: StepImport, + } physicalPlan, err := logicalPlan.ToPhysicalPlan(planCtx) require.NoError(t, err) plan := &planner.PhysicalPlan{ @@ -77,24 +80,6 @@ func TestToPhysicalPlan(t *testing.T) { }, Step: StepImport, }, - { - ID: 1, - Input: planner.InputSpec{ - ColumnTypes: []byte{ - mysql.TypeLonglong, mysql.TypeLonglong, mysql.TypeLonglong, mysql.TypeLonglong, mysql.TypeLonglong, mysql.TypeJSON, - }, - Links: []planner.LinkSpec{ - { - ProcessorID: 0, - }, - }, - }, - Pipeline: &PostProcessSpec{ - Schema: "db", - Table: "tb", - }, - Step: StepPostProcess, - }, }, } require.Equal(t, plan, physicalPlan) @@ -112,6 +97,12 @@ func TestToPhysicalPlan(t *testing.T) { subtaskMeta1.Checksum = Checksum{Size: 1, KVs: 2, Sum: 3} bs, err = json.Marshal(subtaskMeta1) require.NoError(t, err) + planCtx = planner.PlanCtx{ + CurrTaskStep: StepImport, + NextTaskStep: StepPostProcess, + } + physicalPlan, err = logicalPlan.ToPhysicalPlan(planCtx) + require.NoError(t, err) subtaskMetas2, err := physicalPlan.ToSubtaskMetas(planner.PlanCtx{ PreviousSubtaskMetas: [][]byte{bs}, }, StepPostProcess) diff --git a/disttask/importinto/proto.go b/disttask/importinto/proto.go index 86fd16da44ca1..8beb15dc44f1d 100644 --- a/disttask/importinto/proto.go +++ b/disttask/importinto/proto.go @@ -19,6 +19,7 @@ import ( "sync" "github.com/pingcap/tidb/br/pkg/lightning/backend" + "github.com/pingcap/tidb/br/pkg/lightning/backend/external" "github.com/pingcap/tidb/br/pkg/lightning/mydump" "github.com/pingcap/tidb/br/pkg/lightning/verification" "github.com/pingcap/tidb/domain/infosync" @@ -29,12 +30,18 @@ import ( // Steps of IMPORT INTO, each step is represented by one or multiple subtasks. // the initial step is StepInit(-1) -// steps are processed in the following order: StepInit -> StepImport -> StepPostProcess +// steps are processed in the following order: +// - local sort: StepInit -> StepImport -> StepPostProcess -> StepDone +// - global sort: StepInit -> StepEncodeAndSort -> StepWriteAndIngest -> StepPostProcess -> StepDone const ( // StepImport we sort source data and ingest it into TiKV in this step. StepImport int64 = 1 // StepPostProcess we verify checksum and add index in this step. StepPostProcess int64 = 2 + // StepEncodeAndSort encode source data and write sorted kv into global storage. + StepEncodeAndSort int64 = 3 + // StepWriteAndIngest write sorted kv into TiKV and ingest it. + StepWriteAndIngest int64 = 4 ) // TaskMeta is the task of IMPORT INTO. @@ -69,6 +76,24 @@ type ImportStepMeta struct { // the max id is same among all allocator types for now, since we're using same base, see // NewPanickingAllocators for more info. MaxIDs map[autoid.AllocatorType]int64 + + SortedDataMeta *external.SortedKVMeta + // SortedIndexMetas is a map from index id to its sorted kv meta. + SortedIndexMetas map[int64]*external.SortedKVMeta +} + +const ( + dataKVGroup = "data" +) + +// WriteIngestStepMeta is the meta of write and ingest step. +// only used when global sort is enabled. +type WriteIngestStepMeta struct { + KVGroup string `json:"kv_group"` + external.SortedKVMeta `json:",inline"` + RangeSplitKeys [][]byte `json:"range_split_keys"` + + Result Result } // PostProcessStepMeta is the meta of post process step. @@ -79,7 +104,7 @@ type PostProcessStepMeta struct { MaxIDs map[autoid.AllocatorType]int64 } -// SharedVars is the shared variables between subtask and minimal tasks. +// SharedVars is the shared variables of all minimal tasks in a subtask. // This is because subtasks cannot directly obtain the results of the minimal subtask. // All the fields should be concurrent safe. type SharedVars struct { @@ -90,6 +115,28 @@ type SharedVars struct { mu sync.Mutex Checksum *verification.KVChecksum + + SortedDataMeta *external.SortedKVMeta + // SortedIndexMetas is a map from index id to its sorted kv meta. + SortedIndexMetas map[int64]*external.SortedKVMeta +} + +func (sv *SharedVars) mergeDataSummary(summary *external.WriterSummary) { + sv.mu.Lock() + defer sv.mu.Unlock() + sv.SortedDataMeta.MergeSummary(summary) +} + +func (sv *SharedVars) mergeIndexSummary(indexID int64, summary *external.WriterSummary) { + sv.mu.Lock() + defer sv.mu.Unlock() + meta, ok := sv.SortedIndexMetas[indexID] + if !ok { + meta = external.NewSortedKVMeta(summary) + sv.SortedIndexMetas[indexID] = meta + return + } + meta.MergeSummary(summary) } // importStepMinimalTask is the minimal task of IMPORT INTO. diff --git a/disttask/importinto/scheduler.go b/disttask/importinto/scheduler.go index ef65e1524fe4c..0c98c8c4f9d4e 100644 --- a/disttask/importinto/scheduler.go +++ b/disttask/importinto/scheduler.go @@ -22,8 +22,11 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/br/pkg/lightning/backend" + "github.com/pingcap/tidb/br/pkg/lightning/backend/external" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/common" + "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/br/pkg/lightning/metric" "github.com/pingcap/tidb/br/pkg/lightning/verification" "github.com/pingcap/tidb/disttask/framework/proto" @@ -52,32 +55,34 @@ type importStepExecutor struct { wg sync.WaitGroup } -func (s *importStepExecutor) Init(ctx context.Context) error { - s.logger.Info("init subtask env") - +func getTableImporter(ctx context.Context, taskID int64, taskMeta *TaskMeta) (*importer.TableImporter, error) { idAlloc := kv.NewPanickingAllocators(0) - tbl, err := tables.TableFromMeta(idAlloc, s.taskMeta.Plan.TableInfo) + tbl, err := tables.TableFromMeta(idAlloc, taskMeta.Plan.TableInfo) if err != nil { - return err + return nil, err } - astArgs, err := importer.ASTArgsFromStmt(s.taskMeta.Stmt) + astArgs, err := importer.ASTArgsFromStmt(taskMeta.Stmt) if err != nil { - return err + return nil, err } - controller, err := importer.NewLoadDataController(&s.taskMeta.Plan, tbl, astArgs) + controller, err := importer.NewLoadDataController(&taskMeta.Plan, tbl, astArgs) if err != nil { - return err + return nil, err } - // todo: this method will load all files, but we only import files related to current subtask. - if err := controller.InitDataFiles(ctx); err != nil { - return err + if err = controller.InitDataStore(ctx); err != nil { + return nil, err } - tableImporter, err := importer.NewTableImporter(&importer.JobImportParam{ + return importer.NewTableImporter(&importer.JobImportParam{ GroupCtx: ctx, Progress: asyncloaddata.NewProgress(false), Job: &asyncloaddata.Job{}, - }, controller, s.taskID) + }, controller, taskID) +} + +func (s *importStepExecutor) Init(ctx context.Context) error { + s.logger.Info("init subtask env") + tableImporter, err := getTableImporter(ctx, s.taskID, s.taskMeta) if err != nil { return err } @@ -86,11 +91,14 @@ func (s *importStepExecutor) Init(ctx context.Context) error { // we need this sub context since Cleanup which wait on this routine is called // before parent context is canceled in normal flow. s.importCtx, s.importCancel = context.WithCancel(ctx) - s.wg.Add(1) - go func() { - defer s.wg.Done() - s.tableImporter.CheckDiskQuota(s.importCtx) - }() + // only need to check disk quota when we are using local sort. + if s.tableImporter.IsLocalSort() { + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.tableImporter.CheckDiskQuota(s.importCtx) + }() + } return nil } @@ -103,31 +111,37 @@ func (s *importStepExecutor) RunSubtask(ctx context.Context, subtask *proto.Subt } s.logger.Info("split and run subtask", zap.Int32("engine-id", subtaskMeta.ID)) - dataEngine, err := s.tableImporter.OpenDataEngine(ctx, subtaskMeta.ID) - if err != nil { - return err - } - // Unlike in Lightning, we start an index engine for each subtask, whereas previously there was only a single index engine globally. - // This is because the scheduler currently does not have a post-processing mechanism. - // If we import the index in `cleanupSubtaskEnv`, the dispatcher will not wait for the import to complete. - // Multiple index engines may suffer performance degradation due to range overlap. - // These issues will be alleviated after we integrate s3 sorter. - // engineID = -1, -2, -3, ... - indexEngine, err := s.tableImporter.OpenIndexEngine(ctx, common.IndexEngineID-subtaskMeta.ID) - if err != nil { - return err + var dataEngine, indexEngine *backend.OpenedEngine + if s.tableImporter.IsLocalSort() { + dataEngine, err = s.tableImporter.OpenDataEngine(ctx, subtaskMeta.ID) + if err != nil { + return err + } + // Unlike in Lightning, we start an index engine for each subtask, + // whereas previously there was only a single index engine globally. + // This is because the scheduler currently does not have a post-processing mechanism. + // If we import the index in `cleanupSubtaskEnv`, the dispatcher will not wait for the import to complete. + // Multiple index engines may suffer performance degradation due to range overlap. + // These issues will be alleviated after we integrate s3 sorter. + // engineID = -1, -2, -3, ... + indexEngine, err = s.tableImporter.OpenIndexEngine(ctx, common.IndexEngineID-subtaskMeta.ID) + if err != nil { + return err + } } sharedVars := &SharedVars{ - TableImporter: s.tableImporter, - DataEngine: dataEngine, - IndexEngine: indexEngine, - Progress: asyncloaddata.NewProgress(false), - Checksum: &verification.KVChecksum{}, + TableImporter: s.tableImporter, + DataEngine: dataEngine, + IndexEngine: indexEngine, + Progress: asyncloaddata.NewProgress(false), + Checksum: &verification.KVChecksum{}, + SortedDataMeta: &external.SortedKVMeta{}, + SortedIndexMetas: make(map[int64]*external.SortedKVMeta), } s.sharedVars.Store(subtaskMeta.ID, sharedVars) source := operator.NewSimpleDataChannel(make(chan *importStepMinimalTask)) - op := newEncodeAndSortOperator(ctx, int(s.taskMeta.Plan.ThreadCnt), s.logger) + op := newEncodeAndSortOperator(ctx, s, sharedVars, subtask.ID) op.SetSource(source) pipeline := operator.NewAsyncPipeline(op) if err = pipeline.Execute(); err != nil { @@ -169,23 +183,27 @@ func (s *importStepExecutor) OnFinished(ctx context.Context, subtask *proto.Subt return errors.Errorf("sharedVars %d not found", subtaskMeta.ID) } - // TODO: we should close and cleanup engine in all case, since there's no checkpoint. - s.logger.Info("import data engine", zap.Int32("engine-id", subtaskMeta.ID)) - closedDataEngine, err := sharedVars.DataEngine.Close(ctx) - if err != nil { - return err - } - dataKVCount, err := s.tableImporter.ImportAndCleanup(ctx, closedDataEngine) - if err != nil { - return err - } + var dataKVCount int64 + if s.tableImporter.IsLocalSort() { + // TODO: we should close and cleanup engine in all case, since there's no checkpoint. + s.logger.Info("import data engine", zap.Int32("engine-id", subtaskMeta.ID)) + closedDataEngine, err := sharedVars.DataEngine.Close(ctx) + if err != nil { + return err + } + dataKVCount, err = s.tableImporter.ImportAndCleanup(ctx, closedDataEngine) + if err != nil { + return err + } - s.logger.Info("import index engine", zap.Int32("engine-id", subtaskMeta.ID)) - if closedEngine, err := sharedVars.IndexEngine.Close(ctx); err != nil { - return err - } else if _, err := s.tableImporter.ImportAndCleanup(ctx, closedEngine); err != nil { - return err + s.logger.Info("import index engine", zap.Int32("engine-id", subtaskMeta.ID)) + if closedEngine, err := sharedVars.IndexEngine.Close(ctx); err != nil { + return err + } else if _, err := s.tableImporter.ImportAndCleanup(ctx, closedEngine); err != nil { + return err + } } + // there's no imported dataKVCount on this stage when using global sort. sharedVars.mu.Lock() defer sharedVars.mu.Unlock() @@ -203,6 +221,8 @@ func (s *importStepExecutor) OnFinished(ctx context.Context, subtask *proto.Subt autoid.AutoIncrementType: allocators.Get(autoid.AutoIncrementType).Base(), autoid.AutoRandomType: allocators.Get(autoid.AutoRandomType).Base(), } + subtaskMeta.SortedDataMeta = sharedVars.SortedDataMeta + subtaskMeta.SortedIndexMetas = sharedVars.SortedIndexMetas s.sharedVars.Delete(subtaskMeta.ID) newMeta, err := json.Marshal(subtaskMeta) if err != nil { @@ -225,6 +245,86 @@ func (s *importStepExecutor) Rollback(context.Context) error { return nil } +type writeAndIngestStepExecutor struct { + scheduler.EmptySubtaskExecutor + taskID int64 + taskMeta *TaskMeta + logger *zap.Logger + tableImporter *importer.TableImporter +} + +var _ execute.SubtaskExecutor = &writeAndIngestStepExecutor{} + +func (e *writeAndIngestStepExecutor) Init(ctx context.Context) error { + tableImporter, err := getTableImporter(ctx, e.taskID, e.taskMeta) + if err != nil { + return err + } + e.tableImporter = tableImporter + return nil +} + +func (e *writeAndIngestStepExecutor) RunSubtask(ctx context.Context, subtask *proto.Subtask) error { + e.logger.Info("write and ingest kv", zap.Int64("subtask-id", subtask.ID)) + + sm := &WriteIngestStepMeta{} + err := json.Unmarshal(subtask.Meta, sm) + if err != nil { + return errors.Trace(err) + } + _, engineUUID := backend.MakeUUID("", subtask.ID) + localBackend := e.tableImporter.Backend() + err = localBackend.CloseEngine(ctx, &backend.EngineConfig{ + External: &backend.ExternalEngineConfig{ + StorageURI: e.taskMeta.Plan.CloudStorageURI, + DataFiles: sm.DataFiles, + StatFiles: sm.StatFiles, + MinKey: sm.MinKey, + MaxKey: sm.MaxKey, + SplitKeys: sm.RangeSplitKeys, + TotalFileSize: int64(sm.TotalKVSize), + TotalKVCount: 0, + }, + }, engineUUID) + if err != nil { + return err + } + return localBackend.ImportEngine(ctx, engineUUID, int64(config.SplitRegionSize), int64(config.SplitRegionKeys)) +} + +func (e *writeAndIngestStepExecutor) OnFinished(_ context.Context, subtask *proto.Subtask) error { + var subtaskMeta WriteIngestStepMeta + if err := json.Unmarshal(subtask.Meta, &subtaskMeta); err != nil { + return err + } + if subtaskMeta.KVGroup != dataKVGroup { + return nil + } + + // only data kv group has loaded row count + _, engineUUID := backend.MakeUUID("", subtask.ID) + localBackend := e.tableImporter.Backend() + _, kvCount := localBackend.GetExternalEngineKVStatistics(engineUUID) + subtaskMeta.Result.LoadedRowCnt = uint64(kvCount) + + newMeta, err := json.Marshal(subtaskMeta) + if err != nil { + return err + } + subtask.Meta = newMeta + return nil +} + +func (e *writeAndIngestStepExecutor) Cleanup(_ context.Context) (err error) { + e.logger.Info("cleanup subtask env") + return e.tableImporter.Close() +} + +func (e *writeAndIngestStepExecutor) Rollback(context.Context) error { + e.logger.Info("rollback") + return nil +} + type postStepExecutor struct { scheduler.EmptySubtaskExecutor taskID int64 @@ -277,12 +377,18 @@ func (*importScheduler) GetSubtaskExecutor(_ context.Context, task *proto.Task, logger.Info("create step scheduler") switch task.Step { - case StepImport: + case StepImport, StepEncodeAndSort: return &importStepExecutor{ taskID: task.ID, taskMeta: &taskMeta, logger: logger, }, nil + case StepWriteAndIngest: + return &writeAndIngestStepExecutor{ + taskID: task.ID, + taskMeta: &taskMeta, + logger: logger, + }, nil case StepPostProcess: return &postStepExecutor{ taskID: task.ID, diff --git a/disttask/importinto/subtask_executor.go b/disttask/importinto/subtask_executor.go index b24e3ab72706c..4a4d7b497bfe8 100644 --- a/disttask/importinto/subtask_executor.go +++ b/disttask/importinto/subtask_executor.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/local" "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/config" @@ -46,7 +47,7 @@ var TestSyncChan = make(chan struct{}) // MiniTaskExecutor is the interface for a minimal task executor. // exported for testing. type MiniTaskExecutor interface { - Run(ctx context.Context) error + Run(ctx context.Context, dataWriter, indexWriter backend.EngineWriter) error } // importMinimalTaskExecutor is a minimal task executor for IMPORT INTO. @@ -62,7 +63,7 @@ func newImportMinimalTaskExecutor0(t *importStepMinimalTask) MiniTaskExecutor { } } -func (e *importMinimalTaskExecutor) Run(ctx context.Context) error { +func (e *importMinimalTaskExecutor) Run(ctx context.Context, dataWriter, indexWriter backend.EngineWriter) error { logger := logutil.BgLogger().With(zap.String("type", proto.ImportInto), zap.Int64("table-id", e.mTtask.Plan.TableInfo.ID)) logger.Info("run minimal task") failpoint.Inject("waitBeforeSortChunk", func() { @@ -77,8 +78,14 @@ func (e *importMinimalTaskExecutor) Run(ctx context.Context) error { }) chunkCheckpoint := toChunkCheckpoint(e.mTtask.Chunk) sharedVars := e.mTtask.SharedVars - if err := importer.ProcessChunk(ctx, &chunkCheckpoint, sharedVars.TableImporter, sharedVars.DataEngine, sharedVars.IndexEngine, sharedVars.Progress, logger); err != nil { - return err + if sharedVars.TableImporter.IsLocalSort() { + if err := importer.ProcessChunk(ctx, &chunkCheckpoint, sharedVars.TableImporter, sharedVars.DataEngine, sharedVars.IndexEngine, sharedVars.Progress, logger); err != nil { + return err + } + } else { + if err := importer.ProcessChunkWith(ctx, &chunkCheckpoint, sharedVars.TableImporter, dataWriter, indexWriter, sharedVars.Progress, logger); err != nil { + return err + } } sharedVars.mu.Lock() diff --git a/disttask/operator/operator.go b/disttask/operator/operator.go index 2e3bb5a68cc25..c1a3d43bb805c 100644 --- a/disttask/operator/operator.go +++ b/disttask/operator/operator.go @@ -70,7 +70,7 @@ func (c *AsyncOperator[T, R]) Open() error { func (c *AsyncOperator[T, R]) Close() error { // Wait all tasks done. // We don't need to close the task channel because - // it is closed by the workerpool. + // it is maintained outside this operator, see SetSource. c.pool.Wait() c.pool.Release() return nil diff --git a/errno/errname.go b/errno/errname.go index 648310616d4e8..bb1fa58be9ef3 100644 --- a/errno/errname.go +++ b/errno/errname.go @@ -1059,8 +1059,8 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{ ErrLoadParquetFromLocal: mysql.Message("Do not support loading parquet files from local. Please try to load the parquet files from the cloud storage", nil), ErrLoadDataEmptyPath: mysql.Message("The value of INFILE must not be empty when LOAD DATA from LOCAL", nil), ErrLoadDataUnsupportedFormat: mysql.Message("The FORMAT '%s' is not supported", nil), - ErrLoadDataInvalidURI: mysql.Message("The URI of file location is invalid. Reason: %s. Please provide a valid URI, such as 's3://import/test.csv?access_key_id={your_access_key_id ID}&secret_access_key={your_secret_access_key}&session_token={your_session_token}'", nil), - ErrLoadDataCantAccess: mysql.Message("Access to the source file has been denied. Reason: %s. Please check the URI, access key and secret access key are correct", nil), + ErrLoadDataInvalidURI: mysql.Message("The URI of %s is invalid. Reason: %s. Please provide a valid URI, such as 's3://import/test.csv?access_key_id={your_access_key_id ID}&secret_access_key={your_secret_access_key}&session_token={your_session_token}'", nil), + ErrLoadDataCantAccess: mysql.Message("Access to the %s has been denied. Reason: %s. Please check the URI, access key and secret access key are correct", nil), ErrLoadDataCantRead: mysql.Message("Failed to read source files. Reason: %s. %s", nil), ErrLoadDataWrongFormatConfig: mysql.Message("", nil), ErrUnknownOption: mysql.Message("Unknown option %s", nil), diff --git a/errors.toml b/errors.toml index 0ffcb48e051a7..8d37b77363cbc 100644 --- a/errors.toml +++ b/errors.toml @@ -1803,12 +1803,12 @@ The FORMAT '%s' is not supported ["executor:8158"] error = ''' -The URI of file location is invalid. Reason: %s. Please provide a valid URI, such as 's3://import/test.csv?access_key_id={your_access_key_id ID}&secret_access_key={your_secret_access_key}&session_token={your_session_token}' +The URI of %s is invalid. Reason: %s. Please provide a valid URI, such as 's3://import/test.csv?access_key_id={your_access_key_id ID}&secret_access_key={your_secret_access_key}&session_token={your_session_token}' ''' ["executor:8159"] error = ''' -Access to the source file has been denied. Reason: %s. Please check the URI, access key and secret access key are correct +Access to the %s has been denied. Reason: %s. Please check the URI, access key and secret access key are correct ''' ["executor:8160"] diff --git a/executor/import_into.go b/executor/import_into.go index cd0a444b92dfb..a02a3bb12d232 100644 --- a/executor/import_into.go +++ b/executor/import_into.go @@ -194,7 +194,7 @@ func (e *ImportIntoExec) getJobImporter(ctx context.Context, param *importer.Job importFromServer, err := storage.IsLocalPath(e.controller.Path) if err != nil { // since we have checked this during creating controller, this should not happen. - return nil, exeerrors.ErrLoadDataInvalidURI.FastGenByArgs(err.Error()) + return nil, exeerrors.ErrLoadDataInvalidURI.FastGenByArgs(plannercore.ImportIntoDataSource, err.Error()) } logutil.Logger(ctx).Info("get job importer", zap.Stringer("param", e.controller.Parameters), zap.Bool("dist-task-enabled", variable.EnableDistTask.Load())) diff --git a/executor/import_into_test.go b/executor/import_into_test.go index 81812add2269b..53ffdded23176 100644 --- a/executor/import_into_test.go +++ b/executor/import_into_test.go @@ -132,6 +132,11 @@ func TestImportIntoOptionsNegativeCase(t *testing.T) { {OptionStr: "record_errors=-123", Err: exeerrors.ErrInvalidOptionVal}, {OptionStr: "record_errors=null", Err: exeerrors.ErrInvalidOptionVal}, {OptionStr: "record_errors=true", Err: exeerrors.ErrInvalidOptionVal}, + + {OptionStr: "cloud_storage_uri=123", Err: exeerrors.ErrInvalidOptionVal}, + {OptionStr: "cloud_storage_uri=':'", Err: exeerrors.ErrInvalidOptionVal}, + {OptionStr: "cloud_storage_uri='sdsd'", Err: exeerrors.ErrInvalidOptionVal}, + {OptionStr: "cloud_storage_uri='http://sdsd'", Err: exeerrors.ErrInvalidOptionVal}, } sqlTemplate := "import into t from '/file.csv' with %s" diff --git a/executor/importer/BUILD.bazel b/executor/importer/BUILD.bazel index 88693c1ad8206..652e1deeee855 100644 --- a/executor/importer/BUILD.bazel +++ b/executor/importer/BUILD.bazel @@ -16,6 +16,7 @@ go_library( deps = [ "//br/pkg/lightning/backend", "//br/pkg/lightning/backend/encode", + "//br/pkg/lightning/backend/external", "//br/pkg/lightning/backend/kv", "//br/pkg/lightning/backend/local", "//br/pkg/lightning/checkpoints", @@ -85,7 +86,7 @@ go_test( embed = [":importer"], flaky = True, race = "on", - shard_count = 14, + shard_count = 15, deps = [ "//br/pkg/errors", "//br/pkg/lightning/config", @@ -100,7 +101,9 @@ go_test( "//parser/ast", "//parser/model", "//planner/core", + "//sessionctx/variable", "//testkit", + "//types", "//util/dbterror/exeerrors", "//util/etcd", "//util/logutil", diff --git a/executor/importer/chunk_process.go b/executor/importer/chunk_process.go index 8f5b92c0565af..de28449d4933b 100644 --- a/executor/importer/chunk_process.go +++ b/executor/importer/chunk_process.go @@ -22,6 +22,8 @@ import ( "github.com/docker/go-units" "github.com/pingcap/errors" "github.com/pingcap/tidb/br/pkg/lightning/backend" + "github.com/pingcap/tidb/br/pkg/lightning/backend/encode" + "github.com/pingcap/tidb/br/pkg/lightning/backend/external" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" "github.com/pingcap/tidb/br/pkg/lightning/common" @@ -335,3 +337,68 @@ func (p *chunkProcessor) deliverLoop(ctx context.Context) error { return nil } + +// IndexRouteWriter is a writer for index when using global sort. +// we route kvs of different index to different writer in order to make +// merge sort easier, else kv data of all subtasks will all be overlapped. +// +// drawback of doing this is that the number of writers need to open will be +// index-count * encode-concurrency, when the table has many indexes, and each +// writer will take 256MiB buffer on default. +// this will take a lot of memory, or even OOM. +type IndexRouteWriter struct { + writers map[int64]*external.Writer + logger *zap.Logger + writerFactory func(int64) *external.Writer +} + +// NewIndexRouteWriter creates a new IndexRouteWriter. +func NewIndexRouteWriter(logger *zap.Logger, writerFactory func(int64) *external.Writer) *IndexRouteWriter { + return &IndexRouteWriter{ + writers: make(map[int64]*external.Writer), + logger: logger, + writerFactory: writerFactory, + } +} + +// AppendRows implements backend.EngineWriter interface. +func (w *IndexRouteWriter) AppendRows(ctx context.Context, _ []string, rows encode.Rows) error { + kvs := kv.Rows2KvPairs(rows) + if len(kvs) == 0 { + return nil + } + for _, item := range kvs { + indexID, err := tablecodec.DecodeIndexID(item.Key) + if err != nil { + return errors.Trace(err) + } + writer, ok := w.writers[indexID] + if !ok { + writer = w.writerFactory(indexID) + w.writers[indexID] = writer + } + if err = writer.WriteRow(ctx, item.Key, item.Val, nil); err != nil { + return errors.Trace(err) + } + } + return nil +} + +// IsSynced implements backend.EngineWriter interface. +func (*IndexRouteWriter) IsSynced() bool { + return true +} + +// Close implements backend.EngineWriter interface. +func (w *IndexRouteWriter) Close(ctx context.Context) (backend.ChunkFlushStatus, error) { + var firstErr error + for _, writer := range w.writers { + if err := writer.Close(ctx); err != nil { + if firstErr == nil { + firstErr = err + } + w.logger.Error("close index writer failed", zap.Error(err)) + } + } + return nil, firstErr +} diff --git a/executor/importer/engine_process.go b/executor/importer/engine_process.go index 58ffca7e6c880..b5c44de1c762a 100644 --- a/executor/importer/engine_process.go +++ b/executor/importer/engine_process.go @@ -47,40 +47,53 @@ func ProcessChunk( dataWriterCfg := &backend.LocalWriterConfig{ IsKVSorted: hasAutoIncrementAutoID, } - parser, err := tableImporter.getParser(ctx, chunk) + dataWriter, err := dataEngine.LocalWriter(ctx, dataWriterCfg) if err != nil { return err } defer func() { - if err2 := parser.Close(); err2 != nil { - logger.Warn("close parser failed", zap.Error(err2)) + if _, err2 := dataWriter.Close(ctx); err2 != nil { + logger.Warn("close data writer failed", zap.Error(err2)) } }() - encoder, err := tableImporter.getKVEncoder(chunk) + indexWriter, err := indexEngine.LocalWriter(ctx, &backend.LocalWriterConfig{}) if err != nil { return err } defer func() { - if err2 := encoder.Close(); err2 != nil { - logger.Warn("close encoder failed", zap.Error(err2)) + if _, err2 := indexWriter.Close(ctx); err2 != nil { + logger.Warn("close index writer failed", zap.Error(err2)) } }() - dataWriter, err := dataEngine.LocalWriter(ctx, dataWriterCfg) + + return ProcessChunkWith(ctx, chunk, tableImporter, dataWriter, indexWriter, progress, logger) +} + +// ProcessChunkWith processes a chunk, and write kv pairs to dataWriter and indexWriter. +func ProcessChunkWith( + ctx context.Context, + chunk *checkpoints.ChunkCheckpoint, + tableImporter *TableImporter, + dataWriter, indexWriter backend.EngineWriter, + progress *asyncloaddata.Progress, + logger *zap.Logger, +) error { + parser, err := tableImporter.getParser(ctx, chunk) if err != nil { return err } defer func() { - if _, err2 := dataWriter.Close(ctx); err2 != nil { - logger.Warn("close data writer failed", zap.Error(err2)) + if err2 := parser.Close(); err2 != nil { + logger.Warn("close parser failed", zap.Error(err2)) } }() - indexWriter, err := indexEngine.LocalWriter(ctx, &backend.LocalWriterConfig{}) + encoder, err := tableImporter.getKVEncoder(chunk) if err != nil { return err } defer func() { - if _, err2 := indexWriter.Close(ctx); err2 != nil { - logger.Warn("close index writer failed", zap.Error(err2)) + if err2 := encoder.Close(); err2 != nil { + logger.Warn("close encoder failed", zap.Error(err2)) } }() diff --git a/executor/importer/import.go b/executor/importer/import.go index e2ee585ce61cd..a441b6e9dae0a 100644 --- a/executor/importer/import.go +++ b/executor/importer/import.go @@ -18,6 +18,7 @@ import ( "context" "io" "math" + "net/url" "os" "path/filepath" "runtime" @@ -92,6 +93,7 @@ const ( recordErrorsOption = "record_errors" detachedOption = "detached" disableTiKVImportModeOption = "disable_tikv_import_mode" + cloudStorageURIOption = "cloud_storage_uri" // used for test maxEngineSizeOption = "__max_engine_size" ) @@ -115,6 +117,7 @@ var ( detachedOption: false, disableTiKVImportModeOption: false, maxEngineSizeOption: true, + cloudStorageURIOption: true, } csvOnlyOptions = map[string]struct{}{ @@ -195,6 +198,7 @@ type Plan struct { Detached bool DisableTiKVImportMode bool MaxEngineSize config.ByteSize + CloudStorageURI string // used for checksum in physical mode DistSQLScanConcurrency int @@ -248,6 +252,8 @@ type LoadDataController struct { logger *zap.Logger dataStore storage.ExternalStorage dataFiles []*mydump.SourceFileMeta + // GlobalSortStore is used to store sorted data when using global sort. + GlobalSortStore storage.ExternalStorage } func getImportantSysVars(sctx sessionctx.Context) map[string]string { @@ -497,6 +503,7 @@ func (p *Plan) initDefaultOptions() { p.Detached = false p.DisableTiKVImportMode = false p.MaxEngineSize = config.ByteSize(defaultMaxEngineSize) + p.CloudStorageURI = variable.CloudStorageURI.Load() v := "utf8mb4" p.Charset = &v @@ -654,6 +661,25 @@ func (p *Plan) initOptions(seCtx sessionctx.Context, options []*plannercore.Load if _, ok := specifiedOptions[disableTiKVImportModeOption]; ok { p.DisableTiKVImportMode = true } + if opt, ok := specifiedOptions[cloudStorageURIOption]; ok { + v, err := optAsString(opt) + if err != nil { + return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name) + } + // set cloud storage uri to empty string to force uses local sort when + // the global variable is set. + if v != "" { + b, err := storage.ParseBackend(v, nil) + if err != nil { + return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name) + } + // only support s3 and gcs now. + if b.GetS3() == nil && b.GetGcs() == nil { + return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name) + } + } + p.CloudStorageURI = v + } if opt, ok := specifiedOptions[maxEngineSizeOption]; ok { v, err := optAsString(opt) if err != nil { @@ -715,7 +741,11 @@ func (p *Plan) initParameters(plan *plannercore.ImportInto) error { optionMap := make(map[string]interface{}, len(plan.Options)) for _, opt := range plan.Options { if opt.Value != nil { - optionMap[opt.Name] = opt.Value.String() + val := opt.Value.String() + if opt.Name == cloudStorageURIOption { + val = ast.RedactURL(val) + } + optionMap[opt.Name] = val } else { optionMap[opt.Name] = nil } @@ -863,11 +893,64 @@ func (e *LoadDataController) GenerateCSVConfig() *config.CSVConfig { return csvConfig } +// InitDataStore initializes the data store. +func (e *LoadDataController) InitDataStore(ctx context.Context) error { + u, err2 := storage.ParseRawURL(e.Path) + if err2 != nil { + return exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs(plannercore.ImportIntoDataSource, + err2.Error()) + } + + if storage.IsLocal(u) { + u.Path = filepath.Dir(e.Path) + } else { + u.Path = "" + } + s, err := e.initExternalStore(ctx, u, plannercore.ImportIntoDataSource) + if err != nil { + return err + } + e.dataStore = s + + if e.IsGlobalSort() { + target := "cloud storage" + cloudStorageURL, err3 := storage.ParseRawURL(e.Plan.CloudStorageURI) + if err3 != nil { + return exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs(target, + err3.Error()) + } + s, err = e.initExternalStore(ctx, cloudStorageURL, target) + if err != nil { + return err + } + e.GlobalSortStore = s + } + return nil +} +func (*LoadDataController) initExternalStore(ctx context.Context, u *url.URL, target string) (storage.ExternalStorage, error) { + b, err2 := storage.ParseBackendFromURL(u, nil) + if err2 != nil { + return nil, exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs(target, GetMsgFromBRError(err2)) + } + + opt := &storage.ExternalStorageOptions{} + if intest.InTest { + opt.NoCredentials = true + } + s, err := storage.New(ctx, b, opt) + if err != nil { + return nil, exeerrors.ErrLoadDataCantAccess.GenWithStackByArgs(target, GetMsgFromBRError(err)) + } + return s, nil +} + // InitDataFiles initializes the data store and files. +// it will call InitDataStore internally. func (e *LoadDataController) InitDataFiles(ctx context.Context) error { u, err2 := storage.ParseRawURL(e.Path) if err2 != nil { - return exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs(err2.Error()) + return exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs(plannercore.ImportIntoDataSource, + err2.Error()) } var fileNameKey string @@ -878,45 +961,39 @@ func (e *LoadDataController) InitDataFiles(ctx context.Context) error { } if !filepath.IsAbs(e.Path) { - return exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs("file location should be absolute path when import from server disk") + return exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs(plannercore.ImportIntoDataSource, + "file location should be absolute path when import from server disk") } // we add this check for security, we don't want user import any sensitive system files, // most of which is readable text file and don't have a suffix, such as /etc/passwd if !slices.Contains([]string{".csv", ".sql", ".parquet"}, strings.ToLower(filepath.Ext(e.Path))) { - return exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs("the file suffix is not supported when import from server disk") + return exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs(plannercore.ImportIntoDataSource, + "the file suffix is not supported when import from server disk") } dir := filepath.Dir(e.Path) _, err := os.Stat(dir) if err != nil { // permission denied / file not exist error, etc. - return exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs(err.Error()) + return exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs(plannercore.ImportIntoDataSource, + err.Error()) } fileNameKey = filepath.Base(e.Path) - u.Path = dir } else { fileNameKey = strings.Trim(u.Path, "/") - u.Path = "" - } - b, err2 := storage.ParseBackendFromURL(u, nil) - if err2 != nil { - return exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs(GetMsgFromBRError(err2)) } // try to find pattern error in advance _, err2 = filepath.Match(stringutil.EscapeGlobExceptAsterisk(fileNameKey), "") if err2 != nil { - return exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs("Glob pattern error: " + err2.Error()) + return exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs(plannercore.ImportIntoDataSource, + "Glob pattern error: "+err2.Error()) } - opt := &storage.ExternalStorageOptions{} - if intest.InTest { - opt.NoCredentials = true - } - s, err := storage.New(ctx, b, opt) - if err != nil { - return exeerrors.ErrLoadDataCantAccess.GenWithStackByArgs(GetMsgFromBRError(err)) + if err2 = e.InitDataStore(ctx); err2 != nil { + return err2 } + s := e.dataStore var totalSize int64 dataFiles := []*mydump.SourceFileMeta{} idx := strings.IndexByte(fileNameKey, '*') @@ -955,7 +1032,7 @@ func (e *LoadDataController) InitDataFiles(ctx context.Context) error { // access, else walkDir will fail // we only support '*', in order to reuse glob library manually escape the path escapedPath := stringutil.EscapeGlobExceptAsterisk(fileNameKey) - err = s.WalkDir(ctx, &storage.WalkOption{ObjPrefix: commonPrefix, SkipSubDir: true}, + err := s.WalkDir(ctx, &storage.WalkOption{ObjPrefix: commonPrefix, SkipSubDir: true}, func(remotePath string, size int64) error { // we have checked in LoadDataExec.Next //nolint: errcheck @@ -980,7 +1057,6 @@ func (e *LoadDataController) InitDataFiles(ctx context.Context) error { } } - e.dataStore = s e.dataFiles = dataFiles e.TotalFileSize = totalSize return nil @@ -1131,6 +1207,16 @@ func (e *LoadDataController) toMyDumpFiles() []mydump.FileInfo { return res } +// IsLocalSort returns true if we sort data on local disk. +func (e *LoadDataController) IsLocalSort() bool { + return e.Plan.CloudStorageURI == "" +} + +// IsGlobalSort returns true if we sort data on global storage. +func (e *LoadDataController) IsGlobalSort() bool { + return !e.IsLocalSort() +} + // CreateColAssignExprs creates the column assignment expressions using session context. // RewriteAstExpr will write ast node in place(due to xxNode.Accept), but it doesn't change node content, // so we sync it. diff --git a/executor/importer/import_test.go b/executor/importer/import_test.go index b98592be88595..d141d61629416 100644 --- a/executor/importer/import_test.go +++ b/executor/importer/import_test.go @@ -17,6 +17,7 @@ package importer import ( "context" "fmt" + "net/url" "runtime" "testing" @@ -30,6 +31,8 @@ import ( "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/ast" plannercore "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/mock" "github.com/stretchr/testify/require" @@ -38,6 +41,10 @@ import ( func TestInitDefaultOptions(t *testing.T) { plan := &Plan{} require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/importer/mockNumCpu", "return(1)")) + variable.CloudStorageURI.Store("s3://bucket/path") + t.Cleanup(func() { + variable.CloudStorageURI.Store("") + }) plan.initDefaultOptions() require.Equal(t, config.ByteSize(0), plan.DiskQuota) require.Equal(t, config.OpLevelRequired, plan.Checksum) @@ -49,6 +56,7 @@ func TestInitDefaultOptions(t *testing.T) { require.Equal(t, "utf8mb4", *plan.Charset) require.Equal(t, false, plan.DisableTiKVImportMode) require.Equal(t, config.ByteSize(defaultMaxEngineSize), plan.MaxEngineSize) + require.Equal(t, "s3://bucket/path", plan.CloudStorageURI) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/importer/mockNumCpu", "return(10)")) plan.initDefaultOptions() @@ -76,7 +84,6 @@ func TestInitOptionsPositiveCase(t *testing.T) { sqlTemplate := "import into t from '/file.csv' with %s" p := parser.New() - plan := &Plan{Format: DataFormatCSV} sql := fmt.Sprintf(sqlTemplate, characterSetOption+"='utf8', "+ fieldsTerminatedByOption+"='aaa', "+ fieldsEnclosedByOption+"='|', "+ @@ -96,6 +103,7 @@ func TestInitOptionsPositiveCase(t *testing.T) { ) stmt, err := p.ParseOneStmt(sql, "", "") require.NoError(t, err, sql) + plan := &Plan{Format: DataFormatCSV} err = plan.initOptions(ctx, convertOptions(stmt.(*ast.ImportIntoStmt).Options)) require.NoError(t, err, sql) require.Equal(t, "utf8", *plan.Charset, sql) @@ -114,6 +122,42 @@ func TestInitOptionsPositiveCase(t *testing.T) { require.True(t, plan.Detached, sql) require.True(t, plan.DisableTiKVImportMode, sql) require.Equal(t, config.ByteSize(100<<30), plan.MaxEngineSize, sql) + require.Empty(t, plan.CloudStorageURI, sql) + + // set cloud storage uri + variable.CloudStorageURI.Store("s3://bucket/path") + t.Cleanup(func() { + variable.CloudStorageURI.Store("") + }) + plan = &Plan{Format: DataFormatCSV} + err = plan.initOptions(ctx, convertOptions(stmt.(*ast.ImportIntoStmt).Options)) + require.NoError(t, err, sql) + require.Equal(t, "s3://bucket/path", plan.CloudStorageURI, sql) + + // override cloud storage uri using option + sql2 := sql + ", " + cloudStorageURIOption + "='s3://bucket/path2'" + stmt, err = p.ParseOneStmt(sql2, "", "") + require.NoError(t, err, sql2) + plan = &Plan{Format: DataFormatCSV} + err = plan.initOptions(ctx, convertOptions(stmt.(*ast.ImportIntoStmt).Options)) + require.NoError(t, err, sql2) + require.Equal(t, "s3://bucket/path2", plan.CloudStorageURI, sql2) + // override with gs + sql3 := sql + ", " + cloudStorageURIOption + "='gs://bucket/path2'" + stmt, err = p.ParseOneStmt(sql3, "", "") + require.NoError(t, err, sql3) + plan = &Plan{Format: DataFormatCSV} + err = plan.initOptions(ctx, convertOptions(stmt.(*ast.ImportIntoStmt).Options)) + require.NoError(t, err, sql3) + require.Equal(t, "gs://bucket/path2", plan.CloudStorageURI, sql3) + // override with empty string, force use local sort + sql4 := sql + ", " + cloudStorageURIOption + "=''" + stmt, err = p.ParseOneStmt(sql4, "", "") + require.NoError(t, err, sql4) + plan = &Plan{Format: DataFormatCSV} + err = plan.initOptions(ctx, convertOptions(stmt.(*ast.ImportIntoStmt).Options)) + require.NoError(t, err, sql4) + require.Equal(t, "", plan.CloudStorageURI, sql4) } func TestAdjustOptions(t *testing.T) { @@ -176,3 +220,54 @@ func TestGetFileRealSize(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(100), c.getFileRealSize(context.Background(), fileMeta, nil)) } + +func urlEqual(t *testing.T, expected, actual string) { + urlExpected, err := url.Parse(expected) + require.NoError(t, err) + urlGot, err := url.Parse(actual) + require.NoError(t, err) + // order of query parameters might change + require.Equal(t, urlExpected.Query(), urlGot.Query()) + urlExpected.RawQuery, urlGot.RawQuery = "", "" + require.Equal(t, urlExpected.String(), urlGot.String()) +} + +func TestInitParameters(t *testing.T) { + // test redacted + p := &Plan{ + Format: DataFormatCSV, + Path: "s3://bucket/path?access-key=111111&secret-access-key=222222", + } + require.NoError(t, p.initParameters(&plannercore.ImportInto{ + Options: []*plannercore.LoadDataOpt{ + { + Name: cloudStorageURIOption, + Value: &expression.Constant{ + Value: types.NewStringDatum("s3://this-is-for-storage/path?access-key=aaaaaa&secret-access-key=bbbbbb"), + }, + }, + }, + })) + urlEqual(t, "s3://bucket/path?access-key=xxxxxx&secret-access-key=xxxxxx", p.Parameters.FileLocation) + require.Len(t, p.Parameters.Options, 1) + urlEqual(t, "s3://this-is-for-storage/path?access-key=xxxxxx&secret-access-key=xxxxxx", + p.Parameters.Options[cloudStorageURIOption].(string)) + + // test other options + require.NoError(t, p.initParameters(&plannercore.ImportInto{ + Options: []*plannercore.LoadDataOpt{ + { + Name: detachedOption, + }, + { + Name: threadOption, + Value: &expression.Constant{ + Value: types.NewIntDatum(3), + }, + }, + }, + })) + require.Len(t, p.Parameters.Options, 2) + require.Contains(t, p.Parameters.Options, detachedOption) + require.Equal(t, "3", p.Parameters.Options[threadOption]) +} diff --git a/executor/importer/job.go b/executor/importer/job.go index b35e427d0e9d6..b658d42efb067 100644 --- a/executor/importer/job.go +++ b/executor/importer/job.go @@ -58,7 +58,14 @@ const ( jobStatusFinished = "finished" // when the job is finished, step will be set to none. - jobStepNone = "" + jobStepNone = "" + // JobStepGlobalSorting is the first step when using global sort, + // step goes from none -> global-sorting -> importing -> validating -> none. + JobStepGlobalSorting = "global-sorting" + // JobStepImporting is the first step when using local sort, + // step goes from none -> importing -> validating -> none. + // when used in global sort, it means importing the sorted data. + // when used in local sort, it means encode&sort data and then importing the data. JobStepImporting = "importing" JobStepValidating = "validating" @@ -216,14 +223,14 @@ func CreateJob( return rows[0].GetInt64(0), nil } -// StartJob tries to start a pending job with jobID, change its status/step to running/importing. +// StartJob tries to start a pending job with jobID, change its status/step to running/input step. // It will not return error when there's no matched job or the job has already started. -func StartJob(ctx context.Context, conn sqlexec.SQLExecutor, jobID int64) error { +func StartJob(ctx context.Context, conn sqlexec.SQLExecutor, jobID int64, step string) error { ctx = util.WithInternalSourceType(ctx, kv.InternalImportInto) _, err := conn.ExecuteInternal(ctx, `UPDATE mysql.tidb_import_jobs SET update_time = CURRENT_TIMESTAMP(6), start_time = CURRENT_TIMESTAMP(6), status = %?, step = %? WHERE id = %? AND status = %?;`, - JobStatusRunning, JobStepImporting, jobID, jobStatusPending) + JobStatusRunning, step, jobID, jobStatusPending) return err } diff --git a/executor/importer/job_test.go b/executor/importer/job_test.go index 3bccceb864a54..1b1b4b002f312 100644 --- a/executor/importer/job_test.go +++ b/executor/importer/job_test.go @@ -105,7 +105,7 @@ func TestJobHappyPath(t *testing.T) { jobInfoEqual(t, jobInfo, gotJobInfo) // start job - require.NoError(t, importer.StartJob(ctx, conn, jobID)) + require.NoError(t, importer.StartJob(ctx, conn, jobID, importer.JobStepImporting)) gotJobInfo, err = importer.GetJob(ctx, conn, jobID, jobInfo.CreatedBy, false) require.NoError(t, err) require.False(t, gotJobInfo.CreateTime.IsZero()) @@ -222,7 +222,7 @@ func TestGetAndCancelJob(t *testing.T) { jobInfoEqual(t, jobInfo, gotJobInfo) // start job - require.NoError(t, importer.StartJob(ctx, conn, jobID2)) + require.NoError(t, importer.StartJob(ctx, conn, jobID2, importer.JobStepImporting)) gotJobInfo, err = importer.GetJob(ctx, conn, jobID2, jobInfo.CreatedBy, false) require.NoError(t, err) require.False(t, gotJobInfo.CreateTime.IsZero()) @@ -306,7 +306,7 @@ func TestGetJobInfoNullField(t *testing.T) { jobID1, err := importer.CreateJob(ctx, conn, jobInfo.TableSchema, jobInfo.TableName, jobInfo.TableID, jobInfo.CreatedBy, &jobInfo.Parameters, jobInfo.SourceFileSize) require.NoError(t, err) - require.NoError(t, importer.StartJob(ctx, conn, jobID1)) + require.NoError(t, importer.StartJob(ctx, conn, jobID1, importer.JobStepImporting)) require.NoError(t, importer.FailJob(ctx, conn, jobID1, "failed")) jobID2, err := importer.CreateJob(ctx, conn, jobInfo.TableSchema, jobInfo.TableName, jobInfo.TableID, jobInfo.CreatedBy, &jobInfo.Parameters, jobInfo.SourceFileSize) diff --git a/executor/importer/table_import.go b/executor/importer/table_import.go index 9cf3ffb9bc539..cf441346c67db 100644 --- a/executor/importer/table_import.go +++ b/executor/importer/table_import.go @@ -147,6 +147,28 @@ func GetCachedKVStoreFrom(pdAddr string, tls *common.TLS) (tidbkv.Storage, error return kvStore, nil } +// GetRegionSplitSizeKeys gets the region split size and keys from PD. +func GetRegionSplitSizeKeys(ctx context.Context) (regionSplitSize int64, regionSplitKeys int64, err error) { + tidbCfg := tidb.GetGlobalConfig() + tls, err := common.NewTLS( + tidbCfg.Security.ClusterSSLCA, + tidbCfg.Security.ClusterSSLCert, + tidbCfg.Security.ClusterSSLKey, + "", + nil, nil, nil, + ) + if err != nil { + return 0, 0, err + } + tlsOpt := tls.ToPDSecurityOption() + pdCli, err := pd.NewClientWithContext(ctx, []string{tidbCfg.Path}, tlsOpt) + if err != nil { + return 0, 0, errors.Trace(err) + } + defer pdCli.Close() + return local.GetRegionSplitSizeKeys(ctx, pdCli, tls) +} + // NewTableImporter creates a new table importer. func NewTableImporter(param *JobImportParam, e *LoadDataController, taskID int64) (ti *TableImporter, err error) { idAlloc := kv.NewPanickingAllocators(0) @@ -223,7 +245,6 @@ func NewTableImporter(param *JobImportParam, e *LoadDataController, taskID int64 }, encTable: tbl, dbID: e.DBID, - store: e.dataStore, kvStore: kvStore, logger: e.logger, // this is the value we use for 50TiB data parallel import. @@ -246,7 +267,6 @@ type TableImporter struct { encTable table.Table dbID int64 - store storage.ExternalStorage // the kv store we get is a cached store, so we can't close it. kvStore tidbkv.Storage logger *zap.Logger @@ -475,6 +495,11 @@ func (ti *TableImporter) fullTableName() string { return common.UniqueTable(ti.DBName, ti.Table.Meta().Name.O) } +// Backend returns the backend of the importer. +func (ti *TableImporter) Backend() *local.Backend { + return ti.backend +} + // Close implements the io.Closer interface. func (ti *TableImporter) Close() error { ti.backend.Close() diff --git a/executor/test/loadremotetest/error_test.go b/executor/test/loadremotetest/error_test.go index 330486049a97a..beb941ed96470 100644 --- a/executor/test/loadremotetest/error_test.go +++ b/executor/test/loadremotetest/error_test.go @@ -54,10 +54,10 @@ func (s *mockGCSSuite) TestErrorMessage() { checkClientErrorMessage(s.T(), err, "ERROR 1054 (42S22): Unknown column 'wrong' in 'field list'") err = s.tk.ExecToErr("LOAD DATA INFILE 'abc://1' INTO TABLE t;") checkClientErrorMessage(s.T(), err, - "ERROR 8158 (HY000): The URI of file location is invalid. Reason: storage abc not support yet. Please provide a valid URI, such as 's3://import/test.csv?access_key_id={your_access_key_id ID}&secret_access_key={your_secret_access_key}&session_token={your_session_token}'") + "ERROR 8158 (HY000): The URI of data source is invalid. Reason: storage abc not support yet. Please provide a valid URI, such as 's3://import/test.csv?access_key_id={your_access_key_id ID}&secret_access_key={your_secret_access_key}&session_token={your_session_token}'") err = s.tk.ExecToErr("LOAD DATA INFILE 's3://no-network' INTO TABLE t;") checkClientErrorMessage(s.T(), err, - "ERROR 8159 (HY000): Access to the source file has been denied. Reason: failed to get region of bucket no-network. Please check the URI, access key and secret access key are correct") + "ERROR 8159 (HY000): Access to the data source has been denied. Reason: failed to get region of bucket no-network. Please check the URI, access key and secret access key are correct") err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://wrong-bucket/p?endpoint=%s' INTO TABLE t;`, gcsEndpoint)) checkClientErrorMessage(s.T(), err, diff --git a/parser/ast/dml.go b/parser/ast/dml.go index fb09e84e77780..602e6960ee126 100644 --- a/parser/ast/dml.go +++ b/parser/ast/dml.go @@ -1969,6 +1969,7 @@ func (n *LoadDataStmt) Accept(v Visitor) (Node, bool) { } type LoadDataOpt struct { + // Name is the name of the option, will be converted to lower case during parse. Name string // only literal is allowed, we use ExprNode to support negative number Value ExprNode diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index 2050119f03c49..48c080e117647 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -578,6 +578,7 @@ type LoadData struct { // LoadDataOpt represents load data option. type LoadDataOpt struct { + // Name is the name of the option, converted to lower case during parse. Name string Value expression.Expression } diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 2dace03dc980e..2a0ee40987f8b 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -4446,6 +4446,9 @@ var ( importIntoSchemaFTypes = []byte{mysql.TypeLonglong, mysql.TypeString, mysql.TypeString, mysql.TypeLonglong, mysql.TypeString, mysql.TypeString, mysql.TypeString, mysql.TypeLonglong, mysql.TypeString, mysql.TypeTimestamp, mysql.TypeTimestamp, mysql.TypeTimestamp, mysql.TypeString} + + // ImportIntoDataSource used in ErrLoadDataInvalidURI. + ImportIntoDataSource = "data source" ) func (b *PlanBuilder) buildImportInto(ctx context.Context, ld *ast.ImportIntoStmt) (Plan, error) { @@ -4458,7 +4461,7 @@ func (b *PlanBuilder) buildImportInto(ctx context.Context, ld *ast.ImportIntoStm importFromServer, err = storage.IsLocalPath(ld.Path) if err != nil { - return nil, exeerrors.ErrLoadDataInvalidURI.FastGenByArgs(err.Error()) + return nil, exeerrors.ErrLoadDataInvalidURI.FastGenByArgs(ImportIntoDataSource, err.Error()) } if importFromServer && sem.IsEnabled() { diff --git a/tablecodec/tablecodec.go b/tablecodec/tablecodec.go index 2af5fb22d70f2..9ec296256eed5 100644 --- a/tablecodec/tablecodec.go +++ b/tablecodec/tablecodec.go @@ -276,6 +276,19 @@ func DecodeKeyHead(key kv.Key) (tableID int64, indexID int64, isRecordKey bool, return } +// DecodeIndexID decodes indexID from the key. +// this method simply extract index id part, and no other checking. +// Caller should make sure the key is an index key. +func DecodeIndexID(key kv.Key) (int64, error) { + key = key[len(tablePrefix)+8+len(indexPrefixSep):] + + _, indexID, err := codec.DecodeInt(key) + if err != nil { + return 0, errors.Trace(err) + } + return indexID, nil +} + // DecodeTableID decodes the table ID of the key, if the key is not table key, returns 0. func DecodeTableID(key kv.Key) int64 { if !key.HasPrefix(tablePrefix) { diff --git a/tablecodec/tablecodec_test.go b/tablecodec/tablecodec_test.go index acdef44e81fb9..116971fc36273 100644 --- a/tablecodec/tablecodec_test.go +++ b/tablecodec/tablecodec_test.go @@ -610,12 +610,18 @@ func TestTempIndexKey(t *testing.T) { require.Equal(t, tid, tableID) require.NotEqual(t, indexID, iid) require.Equal(t, indexID, iid&IndexIDMask) + iid2, err := DecodeIndexID(indexKey) + require.NoError(t, err) + require.Equal(t, iid, iid2) TempIndexKey2IndexKey(indexKey) tid, iid, _, err = DecodeKeyHead(indexKey) require.NoError(t, err) require.Equal(t, tid, tableID) require.Equal(t, indexID, iid) + iid2, err = DecodeIndexID(indexKey) + require.NoError(t, err) + require.Equal(t, iid, iid2) } func TestTempIndexValueCodec(t *testing.T) { diff --git a/tests/realtikvtest/importintotest/from_server_test.go b/tests/realtikvtest/importintotest/from_server_test.go index 8ba96d53cacdf..787ca5a9aa0c9 100644 --- a/tests/realtikvtest/importintotest/from_server_test.go +++ b/tests/realtikvtest/importintotest/from_server_test.go @@ -52,7 +52,9 @@ func (s *mockGCSSuite) TestImportFromServer() { s.tk.MustExec("create table t (a bigint, b varchar(100));") // relative path - s.ErrorIs(s.tk.QueryToErr("IMPORT INTO t FROM '~/file.csv'"), exeerrors.ErrLoadDataInvalidURI) + err2 := s.tk.QueryToErr("IMPORT INTO t FROM '~/file.csv'") + s.ErrorIs(err2, exeerrors.ErrLoadDataInvalidURI) + s.ErrorContains(err2, "URI of data source is invalid") // no suffix or wrong suffix s.ErrorIs(s.tk.QueryToErr("IMPORT INTO t FROM '/file'"), exeerrors.ErrLoadDataInvalidURI) s.ErrorIs(s.tk.QueryToErr("IMPORT INTO t FROM '/file.txt'"), exeerrors.ErrLoadDataInvalidURI)