Skip to content

Commit

Permalink
importinto: integrate global sort(without merge-sort) part 1 (#46998)
Browse files Browse the repository at this point in the history
ref #46704
  • Loading branch information
D3Hunter authored Sep 19, 2023
1 parent 4bd39b5 commit 4450ae4
Show file tree
Hide file tree
Showing 52 changed files with 1,331 additions and 292 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ mock_lightning: tools/bin/mockgen

gen_mock: tools/bin/mockgen
tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/framework/scheduler TaskTable,Pool,Scheduler,Extension > disttask/framework/mock/scheduler_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/framework/dispatcher Dispatcher > disttask/framework/mock/dispatcher_mock.go
tools/bin/mockgen -package execute github.com/pingcap/tidb/disttask/framework/scheduler/execute SubtaskExecutor > disttask/framework/mock/execute/execute_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/importinto MiniTaskExecutor > disttask/importinto/mock/import_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/framework/planner LogicalPlan,PipelineSpec > disttask/framework/mock/plan_mock.go
Expand Down
2 changes: 1 addition & 1 deletion br/cmd/tidb-lightning-ctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func checkpointErrorDestroy(ctx context.Context, cfg *config.Config, tls *common
for _, table := range targetTables {
for engineID := table.MinEngineID; engineID <= table.MaxEngineID; engineID++ {
fmt.Fprintln(os.Stderr, "Closing and cleaning up engine:", table.TableName, engineID)
_, eID := backend.MakeUUID(table.TableName, engineID)
_, eID := backend.MakeUUID(table.TableName, int64(engineID))
engine := local.Engine{UUID: eID}
err := engine.Cleanup(cfg.TikvImporter.SortedKVDir)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions br/pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (
importMaxRetryTimes = 3 // tikv-importer has done retry internally. so we don't retry many times.
)

func makeTag(tableName string, engineID int32) string {
func makeTag(tableName string, engineID int64) string {
return fmt.Sprintf("%s:%d", tableName, engineID)
}

Expand All @@ -48,7 +48,7 @@ func makeLogger(logger log.Logger, tag string, engineUUID uuid.UUID) log.Logger
}

// MakeUUID generates a UUID for the engine and a tag for the engine.
func MakeUUID(tableName string, engineID int32) (string, uuid.UUID) {
func MakeUUID(tableName string, engineID int64) (string, uuid.UUID) {
tag := makeTag(tableName, engineID)
engineUUID := uuid.NewSHA1(engineNamespace, []byte(tag))
return tag, engineUUID
Expand Down Expand Up @@ -229,7 +229,7 @@ func MakeEngineManager(ab Backend) EngineManager {
// OpenEngine opens an engine with the given table name and engine ID.
func (be EngineManager) OpenEngine(ctx context.Context, config *EngineConfig,
tableName string, engineID int32) (*OpenedEngine, error) {
tag, engineUUID := MakeUUID(tableName, engineID)
tag, engineUUID := MakeUUID(tableName, int64(engineID))
logger := makeLogger(log.FromContext(ctx), tag, engineUUID)

if err := be.backend.OpenEngine(ctx, config, engineUUID); err != nil {
Expand Down Expand Up @@ -298,7 +298,7 @@ func (engine *OpenedEngine) LocalWriter(ctx context.Context, cfg *LocalWriterCon
// resuming from a checkpoint.
func (be EngineManager) UnsafeCloseEngine(ctx context.Context, cfg *EngineConfig,
tableName string, engineID int32) (*ClosedEngine, error) {
tag, engineUUID := MakeUUID(tableName, engineID)
tag, engineUUID := MakeUUID(tableName, int64(engineID))
return be.UnsafeCloseEngineWithUUID(ctx, cfg, tag, engineUUID, engineID)
}

Expand Down
3 changes: 3 additions & 0 deletions br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ go_library(
importpath = "github.com/pingcap/tidb/br/pkg/lightning/backend/external",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/lightning/backend",
"//br/pkg/lightning/backend/encode",
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/common",
"//br/pkg/lightning/log",
"//br/pkg/membuf",
Expand Down
68 changes: 68 additions & 0 deletions br/pkg/lightning/backend/external/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,3 +239,71 @@ func GetMaxOverlapping(points []Endpoint) int64 {
}
return maxWeight
}

// SortedKVMeta is the meta of sorted kv.
type SortedKVMeta struct {
MinKey []byte `json:"min_key"`
MaxKey []byte `json:"max_key"`
TotalKVSize uint64 `json:"total_kv_size"`
DataFiles []string `json:"data_files"`
StatFiles []string `json:"stat_files"`
}

// NewSortedKVMeta creates a SortedKVMeta from a WriterSummary.
func NewSortedKVMeta(summary *WriterSummary) *SortedKVMeta {
meta := &SortedKVMeta{
MinKey: summary.Min.Clone(),
MaxKey: summary.Max.Clone(),
TotalKVSize: summary.TotalSize,
}
for _, f := range summary.MultipleFilesStats {
for _, filename := range f.Filenames {
meta.DataFiles = append(meta.DataFiles, filename[0])
meta.StatFiles = append(meta.StatFiles, filename[1])
}
}
return meta
}

// Merge merges the other SortedKVMeta into this one.
func (m *SortedKVMeta) Merge(other *SortedKVMeta) {
m.MinKey = NotNilMin(m.MinKey, other.MinKey)
m.MaxKey = NotNilMax(m.MaxKey, other.MaxKey)
m.TotalKVSize += other.TotalKVSize

m.DataFiles = append(m.DataFiles, other.DataFiles...)
m.StatFiles = append(m.StatFiles, other.StatFiles...)
}

// MergeSummary merges the WriterSummary into this SortedKVMeta.
func (m *SortedKVMeta) MergeSummary(summary *WriterSummary) {
m.Merge(NewSortedKVMeta(summary))
}

// NotNilMin returns the smallest of a and b, ignoring nil values.
func NotNilMin(a, b []byte) []byte {
if len(a) == 0 {
return b
}
if len(b) == 0 {
return a
}
if bytes.Compare(a, b) < 0 {
return a
}
return b
}

// NotNilMax returns the largest of a and b, ignoring nil values.
func NotNilMax(a, b []byte) []byte {
if len(a) == 0 {
return b
}
if len(b) == 0 {
return a
}
if bytes.Compare(a, b) > 0 {
return a
}
return b
}
39 changes: 39 additions & 0 deletions br/pkg/lightning/backend/external/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/encode"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/br/pkg/storage"
Expand Down Expand Up @@ -442,3 +445,39 @@ func (w *Writer) createStorageWriter(ctx context.Context) (
}
return dataPath, statPath, dataWriter, statsWriter, nil
}

// EngineWriter implements backend.EngineWriter interface.
type EngineWriter struct {
w *Writer
}

// NewEngineWriter creates a new EngineWriter.
func NewEngineWriter(w *Writer) *EngineWriter {
return &EngineWriter{w: w}
}

// AppendRows implements backend.EngineWriter interface.
func (e *EngineWriter) AppendRows(ctx context.Context, _ []string, rows encode.Rows) error {
kvs := kv.Rows2KvPairs(rows)
if len(kvs) == 0 {
return nil
}
for _, item := range kvs {
err := e.w.WriteRow(ctx, item.Key, item.Val, nil)
if err != nil {
return err
}
}
return nil
}

// IsSynced implements backend.EngineWriter interface.
func (e *EngineWriter) IsSynced() bool {
// only used when saving checkpoint
return true
}

// Close implements backend.EngineWriter interface.
func (e *EngineWriter) Close(ctx context.Context) (backend.ChunkFlushStatus, error) {
return nil, e.w.Close(ctx)
}
18 changes: 14 additions & 4 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1469,7 +1469,7 @@ func (local *Backend) ImportEngine(
log.FromContext(ctx).Info("engine contains no kv, skip import", zap.Stringer("engine", engineUUID))
return nil
}
kvRegionSplitSize, kvRegionSplitKeys, err := getRegionSplitSizeKeys(ctx, local.pdCtl.GetPDClient(), local.tls)
kvRegionSplitSize, kvRegionSplitKeys, err := GetRegionSplitSizeKeys(ctx, local.pdCtl.GetPDClient(), local.tls)
if err == nil {
if kvRegionSplitSize > regionSplitSize {
regionSplitSize = kvRegionSplitSize
Expand Down Expand Up @@ -1549,7 +1549,7 @@ func (local *Backend) ImportEngine(

// GetRegionSplitSizeKeys gets the region split size and keys from PD.
func (local *Backend) GetRegionSplitSizeKeys(ctx context.Context) (finalSize int64, finalKeys int64, err error) {
return getRegionSplitSizeKeys(ctx, local.pdCtl.GetPDClient(), local.tls)
return GetRegionSplitSizeKeys(ctx, local.pdCtl.GetPDClient(), local.tls)
}

// expose these variables to unit test.
Expand Down Expand Up @@ -1689,6 +1689,16 @@ func (local *Backend) GetImportedKVCount(engineUUID uuid.UUID) int64 {
return e.importedKVCount.Load()
}

// GetExternalEngineKVStatistics returns kv statistics of some engine.
func (local *Backend) GetExternalEngineKVStatistics(engineUUID uuid.UUID) (
totalKVSize int64, totalKVCount int64) {
v, ok := local.externalEngine[engineUUID]
if !ok {
return 0, 0
}
return v.KVStatistics()
}

// ResetEngine reset the engine and reclaim the space.
func (local *Backend) ResetEngine(ctx context.Context, engineUUID uuid.UUID) error {
// the only way to reset the engine + reclaim the space is to delete and reopen it 🤷
Expand Down Expand Up @@ -1927,8 +1937,8 @@ func getSplitConfFromStore(ctx context.Context, host string, tls *common.TLS) (
return splitSize, nested.Coprocessor.RegionSplitKeys, nil
}

// return region split size, region split keys, error
func getRegionSplitSizeKeys(ctx context.Context, cli pd.Client, tls *common.TLS) (
// GetRegionSplitSizeKeys return region split size, region split keys, error
func GetRegionSplitSizeKeys(ctx context.Context, cli pd.Client, tls *common.TLS) (
regionSplitSize int64, regionSplitKeys int64, err error) {
stores, err := cli.GetAllStores(ctx, pd.WithExcludeTombstone())
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1123,7 +1123,7 @@ func TestGetRegionSplitSizeKeys(t *testing.T) {
}
return 0, 0, errors.New("invalid connection")
}
splitSize, splitKeys, err := getRegionSplitSizeKeys(ctx, cli, nil)
splitSize, splitKeys, err := GetRegionSplitSizeKeys(ctx, cli, nil)
require.NoError(t, err)
require.Equal(t, int64(1), splitSize)
require.Equal(t, int64(2), splitKeys)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -978,7 +978,7 @@ func verifyLocalFile(ctx context.Context, cpdb checkpoints.DB, dir string) error
}
for tableName, engineIDs := range targetTables {
for _, engineID := range engineIDs {
_, eID := backend.MakeUUID(tableName, engineID)
_, eID := backend.MakeUUID(tableName, int64(engineID))
file := local.Engine{UUID: eID}
err := file.Exist(dir)
if err != nil {
Expand Down
48 changes: 12 additions & 36 deletions ddl/backfilling_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,12 +367,14 @@ func generateMergeSortPlan(
hex.EncodeToString(startKey), hex.EncodeToString(endKey))
}
m := &BackfillSubTaskMeta{
MinKey: startKey,
MaxKey: endKey,
DataFiles: dataFiles,
StatFiles: statFiles,
SortedKVMeta: external.SortedKVMeta{
MinKey: startKey,
MaxKey: endKey,
DataFiles: dataFiles,
StatFiles: statFiles,
TotalKVSize: totalSize / uint64(len(instanceIDs)),
},
RangeSplitKeys: rangeSplitKeys,
TotalKVSize: totalSize / uint64(len(instanceIDs)),
}
metaBytes, err := json.Marshal(m)
if err != nil {
Expand Down Expand Up @@ -404,7 +406,9 @@ func generateMergePlan(
end = len(dataFiles)
}
m := &BackfillSubTaskMeta{
DataFiles: dataFiles[start:end],
SortedKVMeta: external.SortedKVMeta{
DataFiles: dataFiles[start:end],
},
}
metaBytes, err := json.Marshal(m)
if err != nil {
Expand Down Expand Up @@ -475,8 +479,8 @@ func getSummaryFromLastStep(
}
// Skip empty subtask.MinKey/MaxKey because it means
// no records need to be written in this subtask.
minKey = notNilMin(minKey, subtask.MinKey)
maxKey = notNilMax(maxKey, subtask.MaxKey)
minKey = external.NotNilMin(minKey, subtask.MinKey)
maxKey = external.NotNilMax(maxKey, subtask.MaxKey)
totalKVSize += subtask.TotalKVSize

for _, stat := range subtask.MultipleFilesStats {
Expand All @@ -502,31 +506,3 @@ func redactCloudStorageURI(
}
gTask.Meta = metaBytes
}

// notNilMin returns the smaller of a and b, ignoring nil values.
func notNilMin(a, b []byte) []byte {
if len(a) == 0 {
return b
}
if len(b) == 0 {
return a
}
if bytes.Compare(a, b) < 0 {
return a
}
return b
}

// notNilMax returns the larger of a and b, ignoring nil values.
func notNilMax(a, b []byte) []byte {
if len(a) == 0 {
return b
}
if len(b) == 0 {
return a
}
if bytes.Compare(a, b) > 0 {
return a
}
return b
}
8 changes: 2 additions & 6 deletions ddl/backfilling_dist_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,8 @@ type BackfillSubTaskMeta struct {
StartKey []byte `json:"start_key"`
EndKey []byte `json:"end_key"`

DataFiles []string `json:"data_files"`
StatFiles []string `json:"stat_files"`
RangeSplitKeys [][]byte `json:"range_split_keys"`
MinKey []byte `json:"min_key"`
MaxKey []byte `json:"max_key"`
TotalKVSize uint64 `json:"total_kv_size"`
RangeSplitKeys [][]byte `json:"range_split_keys"`
external.SortedKVMeta `json:",inline"`
// MultipleFilesStats is the output of subtask, it will be used by the next subtask.
MultipleFilesStats []external.MultipleFilesStat `json:"multiple_files_stats"`
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/backfilling_import_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (m *cloudImportExecutor) RunSubtask(ctx context.Context, subtask *proto.Sub
if local == nil {
return errors.Errorf("local backend not found")
}
_, engineUUID := backend.MakeUUID(m.ptbl.Meta().Name.L, int32(m.index.ID))
_, engineUUID := backend.MakeUUID(m.ptbl.Meta().Name.L, m.index.ID)
err = local.CloseEngine(ctx, &backend.EngineConfig{
External: &backend.ExternalEngineConfig{
StorageURI: m.cloudStoreURI,
Expand Down
4 changes: 3 additions & 1 deletion disttask/framework/dispatcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ go_test(
embed = [":dispatcher"],
flaky = True,
race = "off",
shard_count = 10,
shard_count = 11,
deps = [
"//disttask/framework/mock",
"//disttask/framework/proto",
"//disttask/framework/storage",
"//domain/infosync",
Expand All @@ -52,5 +53,6 @@ go_test(
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//util",
"@org_uber_go_goleak//:goleak",
"@org_uber_go_mock//gomock",
],
)
15 changes: 12 additions & 3 deletions disttask/framework/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,13 @@ type TaskHandle interface {
// Dispatcher manages the lifetime of a task
// including submitting subtasks and updating the status of a task.
type Dispatcher interface {
// Init initializes the dispatcher, should be called before ExecuteTask.
// if Init returns error, dispatcher manager will fail the task directly,
// so the returned error should be a fatal error.
Init() error
// ExecuteTask start to schedule a task.
ExecuteTask()
// Close closes the dispatcher, not routine-safe, and should be called
// after ExecuteTask finished.
// Close closes the dispatcher, should be called if Init returns nil.
Close()
}

Expand Down Expand Up @@ -116,7 +120,12 @@ func NewBaseDispatcher(ctx context.Context, taskMgr *storage.TaskManager, server
}
}

// ExecuteTask start to schedule a task.
// Init implements the Dispatcher interface.
func (*BaseDispatcher) Init() error {
return nil
}

// ExecuteTask implements the Dispatcher interface.
func (d *BaseDispatcher) ExecuteTask() {
logutil.Logger(d.logCtx).Info("execute one task",
zap.String("state", d.Task.State), zap.Uint64("concurrency", d.Task.Concurrency))
Expand Down
Loading

0 comments on commit 4450ae4

Please sign in to comment.