From da379fc8d62dcf8349e4d8f0a40337e6288b1a2c Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Thu, 16 Feb 2023 13:40:01 +0800 Subject: [PATCH] This is an automated cherry-pick of #41463 Signed-off-by: ti-chi-bot --- br/pkg/lightning/backend/kv/sql2kv.go | 58 +- br/pkg/lightning/common/BUILD.bazel | 118 ++ br/pkg/lightning/common/util.go | 20 + br/pkg/lightning/common/util_test.go | 27 + br/pkg/lightning/restore/precheck_impl.go | 1429 +++++++++++++++++ br/pkg/lightning/restore/restore.go | 4 +- br/pkg/lightning/restore/table_restore.go | 8 + .../data2/auto_random.t.000000001.csv | 3 + br/tests/lightning_auto_random_default/run.sh | 27 + 9 files changed, 1684 insertions(+), 10 deletions(-) create mode 100644 br/pkg/lightning/common/BUILD.bazel create mode 100644 br/pkg/lightning/restore/precheck_impl.go create mode 100644 br/tests/lightning_auto_random_default/data2/auto_random.t.000000001.csv diff --git a/br/pkg/lightning/backend/kv/sql2kv.go b/br/pkg/lightning/backend/kv/sql2kv.go index 4dc80e0c17ce2..13ec3c7325f7c 100644 --- a/br/pkg/lightning/backend/kv/sql2kv.go +++ b/br/pkg/lightning/backend/kv/sql2kv.go @@ -57,10 +57,11 @@ type genCol struct { type autoIDConverter func(int64) int64 type tableKVEncoder struct { - tbl table.Table - se *session - recordCache []types.Datum - genCols []genCol + tbl table.Table + autoRandomColID int64 + se *session + recordCache []types.Datum + genCols []genCol // convert auto id for shard rowid or auto random id base on row id generated by lightning autoIDFn autoIDConverter } @@ -74,7 +75,9 @@ func NewTableKVEncoder(tbl table.Table, options *SessionOptions) (Encoder, error recordCtx := tables.NewCommonAddRecordCtx(len(cols)) tables.SetAddRecordCtx(se, recordCtx) + var autoRandomColID int64 autoIDFn := func(id int64) int64 { return id } +<<<<<<< HEAD if meta.PKIsHandle && meta.ContainsAutoRandomBits() { for _, col := range cols { if mysql.HasPriKeyFlag(col.GetFlag()) { @@ -85,6 +88,16 @@ func NewTableKVEncoder(tbl table.Table, options *SessionOptions) (Encoder, error } break } +======= + if meta.ContainsAutoRandomBits() { + col := common.GetAutoRandomColumn(meta) + autoRandomColID = col.ID + + shardFmt := autoid.NewShardIDFormat(&col.FieldType, meta.AutoRandomBits, meta.AutoRandomRangeBits) + shard := rand.New(rand.NewSource(options.AutoRandomSeed)).Int63() + autoIDFn = func(id int64) int64 { + return shardFmt.Compose(shard, id) +>>>>>>> 6837bd588d7 (lightning: support auto_random column in composite primary key (#41463)) } } else if meta.ShardRowIDBits > 0 { rd := rand.New(rand.NewSource(options.AutoRandomSeed)) // nolint:gosec @@ -104,10 +117,19 @@ func NewTableKVEncoder(tbl table.Table, options *SessionOptions) (Encoder, error } return &tableKVEncoder{ +<<<<<<< HEAD tbl: tbl, se: se, genCols: genCols, autoIDFn: autoIDFn, +======= + tbl: tbl, + autoRandomColID: autoRandomColID, + se: se, + genCols: genCols, + autoIDFn: autoIDFn, + metrics: metrics, +>>>>>>> 6837bd588d7 (lightning: support auto_random column in composite primary key (#41463)) }, nil } @@ -369,8 +391,13 @@ func (kvcodec *tableKVEncoder) Encode( record = append(record, value) +<<<<<<< HEAD if isTableAutoRandom(meta) && isPKCol(col.ToInfo()) { incrementalBits := autoRandomIncrementBits(col, int(meta.AutoRandomBits)) +======= + if kvcodec.isAutoRandomCol(col.ToInfo()) { + shardFmt := autoid.NewShardIDFormat(&col.FieldType, meta.AutoRandomBits, meta.AutoRandomRangeBits) +>>>>>>> 6837bd588d7 (lightning: support auto_random column in composite primary key (#41463)) alloc := kvcodec.tbl.Allocators(kvcodec.se).Get(autoid.AutoRandomType) if err := alloc.Rebase(context.Background(), value.GetInt64()&((1<>>>>>> 6837bd588d7 (lightning: support auto_random column in composite primary key (#41463)) } func (kvcodec *tableKVEncoder) getActualDatum(rowID int64, colIndex int, inputDatum *types.Datum) (types.Datum, error) { @@ -445,7 +488,6 @@ func (kvcodec *tableKVEncoder) getActualDatum(rowID int64, colIndex int, inputDa err error ) - tblMeta := kvcodec.tbl.Meta() cols := kvcodec.tbl.Cols() // Since this method is only called when iterating the columns in the `Encode()` method, @@ -468,7 +510,7 @@ func (kvcodec *tableKVEncoder) getActualDatum(rowID int64, colIndex int, inputDa case isAutoIncCol(col.ToInfo()): // we still need a conversion, e.g. to catch overflow with a TINYINT column. value, err = table.CastValue(kvcodec.se, types.NewIntDatum(rowID), col.ToInfo(), false, false) - case isTableAutoRandom(tblMeta) && isPKCol(col.ToInfo()): + case kvcodec.isAutoRandomCol(col.ToInfo()): var val types.Datum realRowID := kvcodec.autoIDFn(rowID) if mysql.HasUnsignedFlag(col.GetFlag()) { diff --git a/br/pkg/lightning/common/BUILD.bazel b/br/pkg/lightning/common/BUILD.bazel new file mode 100644 index 0000000000000..808832cd7e5a8 --- /dev/null +++ b/br/pkg/lightning/common/BUILD.bazel @@ -0,0 +1,118 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "common", + srcs = [ + "conn.go", + "errors.go", + "once_error.go", + "pause.go", + "retry.go", + "security.go", + "storage.go", + "storage_unix.go", + "storage_windows.go", + "util.go", + ], + importpath = "github.com/pingcap/tidb/br/pkg/lightning/common", + visibility = ["//visibility:public"], + deps = [ + "//br/pkg/errors", + "//br/pkg/httputil", + "//br/pkg/lightning/log", + "//br/pkg/utils", + "//errno", + "//parser/model", + "//store/driver/error", + "//table/tables", + "//util", + "@com_github_go_sql_driver_mysql//:mysql", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_tikv_client_go_v2//config", + "@com_github_tikv_pd_client//:client", + "@org_golang_google_grpc//:grpc", + "@org_golang_google_grpc//codes", + "@org_golang_google_grpc//credentials", + "@org_golang_google_grpc//credentials/insecure", + "@org_golang_google_grpc//status", + "@org_uber_go_zap//:zap", + ] + select({ + "@io_bazel_rules_go//go/platform:aix": [ + "@org_golang_x_sys//unix", + ], + "@io_bazel_rules_go//go/platform:android": [ + "@org_golang_x_sys//unix", + ], + "@io_bazel_rules_go//go/platform:darwin": [ + "@org_golang_x_sys//unix", + ], + "@io_bazel_rules_go//go/platform:dragonfly": [ + "@org_golang_x_sys//unix", + ], + "@io_bazel_rules_go//go/platform:freebsd": [ + "@org_golang_x_sys//unix", + ], + "@io_bazel_rules_go//go/platform:illumos": [ + "@org_golang_x_sys//unix", + ], + "@io_bazel_rules_go//go/platform:ios": [ + "@org_golang_x_sys//unix", + ], + "@io_bazel_rules_go//go/platform:js": [ + "@org_golang_x_sys//unix", + ], + "@io_bazel_rules_go//go/platform:linux": [ + "@org_golang_x_sys//unix", + ], + "@io_bazel_rules_go//go/platform:netbsd": [ + "@org_golang_x_sys//unix", + ], + "@io_bazel_rules_go//go/platform:openbsd": [ + "@org_golang_x_sys//unix", + ], + "@io_bazel_rules_go//go/platform:plan9": [ + "@org_golang_x_sys//unix", + ], + "@io_bazel_rules_go//go/platform:solaris": [ + "@org_golang_x_sys//unix", + ], + "//conditions:default": [], + }), +) + +go_test( + name = "common_test", + timeout = "short", + srcs = [ + "errors_test.go", + "main_test.go", + "once_error_test.go", + "pause_test.go", + "retry_test.go", + "security_test.go", + "storage_test.go", + "util_test.go", + ], + embed = [":common"], + flaky = True, + deps = [ + "//br/pkg/errors", + "//br/pkg/lightning/log", + "//errno", + "//parser", + "//store/driver/error", + "//testkit/testsetup", + "//util/dbutil", + "@com_github_data_dog_go_sqlmock//:go-sqlmock", + "@com_github_go_sql_driver_mysql//:mysql", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@org_golang_google_grpc//codes", + "@org_golang_google_grpc//status", + "@org_uber_go_goleak//:goleak", + "@org_uber_go_multierr//:multierr", + ], +) diff --git a/br/pkg/lightning/common/util.go b/br/pkg/lightning/common/util.go index 57afc1fb7eac0..87dec1994c703 100644 --- a/br/pkg/lightning/common/util.go +++ b/br/pkg/lightning/common/util.go @@ -36,6 +36,7 @@ import ( "github.com/pingcap/tidb/br/pkg/utils" tmysql "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/table/tables" "go.uber.org/zap" ) @@ -388,3 +389,22 @@ func StringSliceEqual(a, b []string) bool { } return true } + +// GetAutoRandomColumn return the column with auto_random, return nil if the table doesn't have it. +// todo: better put in ddl package, but this will cause import cycle since ddl package import lightning +func GetAutoRandomColumn(tblInfo *model.TableInfo) *model.ColumnInfo { + if !tblInfo.ContainsAutoRandomBits() { + return nil + } + if tblInfo.PKIsHandle { + return tblInfo.GetPkColInfo() + } else if tblInfo.IsCommonHandle { + pk := tables.FindPrimaryIndex(tblInfo) + if pk == nil { + return nil + } + offset := pk.Columns[0].Offset + return tblInfo.Columns[offset] + } + return nil +} diff --git a/br/pkg/lightning/common/util_test.go b/br/pkg/lightning/common/util_test.go index a192ecea11906..781ee046c5fdc 100644 --- a/br/pkg/lightning/common/util_test.go +++ b/br/pkg/lightning/common/util_test.go @@ -31,6 +31,8 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/log" + "github.com/pingcap/tidb/parser" + "github.com/pingcap/tidb/util/dbutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -179,3 +181,28 @@ func TestInterpolateMySQLString(t *testing.T) { assert.Equal(t, "'1''23'", common.InterpolateMySQLString("1'23")) assert.Equal(t, "'1''2''''3'", common.InterpolateMySQLString("1'2''3")) } + +func TestGetAutoRandomColumn(t *testing.T) { + tests := []struct { + ddl string + colName string + }{ + {"create table t(c int)", ""}, + {"create table t(c int auto_increment)", ""}, + {"create table t(c bigint auto_random primary key)", "c"}, + {"create table t(a int, c bigint auto_random primary key)", "c"}, + {"create table t(c bigint auto_random, a int, primary key(c,a))", "c"}, + {"create table t(a int, c bigint auto_random, primary key(c,a))", "c"}, + } + p := parser.New() + for _, tt := range tests { + tableInfo, err := dbutil.GetTableInfoBySQL(tt.ddl, p) + require.NoError(t, err) + col := common.GetAutoRandomColumn(tableInfo) + if tt.colName == "" { + require.Nil(t, col, tt.ddl) + } else { + require.Equal(t, tt.colName, col.Name.L, tt.ddl) + } + } +} diff --git a/br/pkg/lightning/restore/precheck_impl.go b/br/pkg/lightning/restore/precheck_impl.go new file mode 100644 index 0000000000000..aff5dc7ae012b --- /dev/null +++ b/br/pkg/lightning/restore/precheck_impl.go @@ -0,0 +1,1429 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package restore + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "path/filepath" + "reflect" + "strconv" + "strings" + "sync" + "time" + + "github.com/docker/go-units" + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" + "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" + "github.com/pingcap/tidb/br/pkg/lightning/common" + "github.com/pingcap/tidb/br/pkg/lightning/config" + "github.com/pingcap/tidb/br/pkg/lightning/log" + "github.com/pingcap/tidb/br/pkg/lightning/mydump" + "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tidb/br/pkg/streamhelper" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/store/pdtypes" + "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/engine" + "github.com/pingcap/tidb/util/mathutil" + "github.com/pingcap/tidb/util/set" + clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" + "golang.org/x/exp/slices" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" +) + +type clusterResourceCheckItem struct { + preInfoGetter PreRestoreInfoGetter +} + +func NewClusterResourceCheckItem(preInfoGetter PreRestoreInfoGetter) PrecheckItem { + return &clusterResourceCheckItem{ + preInfoGetter: preInfoGetter, + } +} + +func (ci *clusterResourceCheckItem) GetCheckItemID() CheckItemID { + return CheckTargetClusterSize +} + +func (ci *clusterResourceCheckItem) getClusterAvail(ctx context.Context) (uint64, error) { + storeInfo, err := ci.preInfoGetter.GetStorageInfo(ctx) + if err != nil { + return 0, errors.Trace(err) + } + clusterAvail := uint64(0) + for _, store := range storeInfo.Stores { + clusterAvail += uint64(store.Status.Available) + } + return clusterAvail, nil +} + +func (ci *clusterResourceCheckItem) getReplicaCount(ctx context.Context) (uint64, error) { + replConfig, err := ci.preInfoGetter.GetReplicationConfig(ctx) + if err != nil { + return 0, errors.Trace(err) + } + return replConfig.MaxReplicas, nil +} + +func (ci *clusterResourceCheckItem) Check(ctx context.Context) (*CheckResult, error) { + theResult := &CheckResult{ + Item: ci.GetCheckItemID(), + Severity: Warn, + Passed: true, + Message: "Cluster resources are rich for this import task", + } + + var ( + err error + clusterAvail uint64 + clusterSource uint64 + taskMgr taskMetaMgr + ) + taskMgrVal := ctx.Value(taskManagerKey) + if taskMgrVal != nil { + if mgr, ok := taskMgrVal.(taskMetaMgr); ok { + taskMgr = mgr + } + } + if taskMgr == nil { + var err error + estimatedDataSizeResult, err := ci.preInfoGetter.EstimateSourceDataSize(ctx) + if err != nil { + return nil, errors.Trace(err) + } + clusterSource = uint64(estimatedDataSizeResult.SizeWithIndex) + clusterAvail, err = ci.getClusterAvail(ctx) + if err != nil { + return nil, errors.Trace(err) + } + } else { + if err := taskMgr.CheckTasksExclusively(ctx, func(tasks []taskMeta) ([]taskMeta, error) { + clusterAvail = 0 + clusterSource = 0 + restoreStarted := false + for _, task := range tasks { + if task.status > taskMetaStatusInitial { + restoreStarted = true + } + clusterSource += task.sourceBytes + if task.clusterAvail > 0 { + clusterAvail = task.clusterAvail + } + } + if restoreStarted || clusterAvail > 0 { + return nil, nil + } + + clusterAvail, err = ci.getClusterAvail(ctx) + if err != nil { + return nil, errors.Trace(err) + } + newTasks := append([]taskMeta(nil), tasks...) + for i := 0; i < len(newTasks); i++ { + newTasks[i].clusterAvail = clusterAvail + } + return newTasks, nil + }); err != nil { + return nil, errors.Trace(err) + } + } + + replicaCount, err := ci.getReplicaCount(ctx) + if err != nil { + return nil, errors.Trace(err) + } + estimateSize := clusterSource * replicaCount + if estimateSize > clusterAvail { + theResult.Passed = false + theResult.Message = fmt.Sprintf("Cluster doesn't have enough space, available is %s, but we need %s", + units.BytesSize(float64(clusterAvail)), units.BytesSize(float64(estimateSize))) + } else { + theResult.Message = fmt.Sprintf("Cluster available is rich, available is %s, we need %s", + units.BytesSize(float64(clusterAvail)), units.BytesSize(float64(estimateSize))) + } + return theResult, nil +} + +type clusterVersionCheckItem struct { + preInfoGetter PreRestoreInfoGetter + dbMetas []*mydump.MDDatabaseMeta +} + +func NewClusterVersionCheckItem(preInfoGetter PreRestoreInfoGetter, dbMetas []*mydump.MDDatabaseMeta) PrecheckItem { + return &clusterVersionCheckItem{ + preInfoGetter: preInfoGetter, + dbMetas: dbMetas, + } +} + +func (ci *clusterVersionCheckItem) GetCheckItemID() CheckItemID { + return CheckTargetClusterVersion +} + +func (ci *clusterVersionCheckItem) Check(ctx context.Context) (*CheckResult, error) { + theResult := &CheckResult{ + Item: ci.GetCheckItemID(), + Severity: Critical, + Passed: true, + Message: "Cluster version check passed", + } + checkCtx := WithPreInfoGetterDBMetas(ctx, ci.dbMetas) + if err := ci.preInfoGetter.CheckVersionRequirements(checkCtx); err != nil { + err := common.NormalizeError(err) + theResult.Passed = false + theResult.Message = fmt.Sprintf("Cluster version check failed: %s", err.Error()) + } + return theResult, nil +} + +type emptyRegionCheckItem struct { + preInfoGetter PreRestoreInfoGetter + dbMetas []*mydump.MDDatabaseMeta +} + +func NewEmptyRegionCheckItem(preInfoGetter PreRestoreInfoGetter, dbMetas []*mydump.MDDatabaseMeta) PrecheckItem { + return &emptyRegionCheckItem{ + preInfoGetter: preInfoGetter, + dbMetas: dbMetas, + } +} + +func (ci *emptyRegionCheckItem) GetCheckItemID() CheckItemID { + return CheckTargetClusterEmptyRegion +} + +func (ci *emptyRegionCheckItem) Check(ctx context.Context) (*CheckResult, error) { + theResult := &CheckResult{ + Item: ci.GetCheckItemID(), + Severity: Warn, + Passed: true, + Message: "Cluster doesn't have too many empty regions", + } + dbInfos, err := ci.preInfoGetter.GetAllTableStructures(ctx) + if err != nil { + return nil, errors.Trace(err) + } + storeInfo, err := ci.preInfoGetter.GetStorageInfo(ctx) + if err != nil { + return nil, errors.Trace(err) + } + if len(storeInfo.Stores) <= 1 { + return theResult, nil + } + emptyRegionsInfo, err := ci.preInfoGetter.GetEmptyRegionsInfo(ctx) + if err != nil { + return nil, errors.Trace(err) + } + regions := make(map[uint64]int) + stores := make(map[uint64]*pdtypes.StoreInfo) + for _, region := range emptyRegionsInfo.Regions { + for _, peer := range region.Peers { + regions[peer.StoreId]++ + } + } + for _, store := range storeInfo.Stores { + stores[store.Store.GetId()] = store + } + tableCount := 0 + for _, db := range ci.dbMetas { + info, ok := dbInfos[db.Name] + if !ok { + continue + } + tableCount += len(info.Tables) + } + errorThrehold := mathutil.Max(errorEmptyRegionCntPerStore, tableCount*3) + warnThrehold := mathutil.Max(warnEmptyRegionCntPerStore, tableCount) + var ( + errStores []string + warnStores []string + ) + for storeID, regionCnt := range regions { + if store, ok := stores[storeID]; ok { + if metapb.StoreState(metapb.StoreState_value[store.Store.StateName]) != metapb.StoreState_Up { + continue + } + if engine.IsTiFlash(store.Store.Store) { + continue + } + if regionCnt > errorThrehold { + errStores = append(errStores, strconv.Itoa(int(storeID))) + } else if regionCnt > warnThrehold { + warnStores = append(warnStores, strconv.Itoa(int(storeID))) + } + } + } + + var messages []string + if len(errStores) > 0 { + theResult.Passed = false + messages = append(messages, fmt.Sprintf("TiKV stores (%s) contains more than %v empty regions respectively, "+ + "which will greatly affect the import speed and success rate", strings.Join(errStores, ", "), errorEmptyRegionCntPerStore)) + } + if len(warnStores) > 0 { + messages = append(messages, fmt.Sprintf("TiKV stores (%s) contains more than %v empty regions respectively, "+ + "which will affect the import speed and success rate", strings.Join(warnStores, ", "), warnEmptyRegionCntPerStore)) + } + if len(messages) > 0 { + theResult.Message = strings.Join(messages, "\n") + } + return theResult, nil +} + +type regionDistributionCheckItem struct { + preInfoGetter PreRestoreInfoGetter + dbMetas []*mydump.MDDatabaseMeta +} + +func NewRegionDistributionCheckItem(preInfoGetter PreRestoreInfoGetter, dbMetas []*mydump.MDDatabaseMeta) PrecheckItem { + return ®ionDistributionCheckItem{ + preInfoGetter: preInfoGetter, + dbMetas: dbMetas, + } +} + +func (ci *regionDistributionCheckItem) GetCheckItemID() CheckItemID { + return CheckTargetClusterRegionDist +} + +func (ci *regionDistributionCheckItem) Check(ctx context.Context) (*CheckResult, error) { + theResult := &CheckResult{ + Item: ci.GetCheckItemID(), + Severity: Critical, + Passed: true, + Message: "Cluster region distribution is balanced", + } + + storesInfo, err := ci.preInfoGetter.GetStorageInfo(ctx) + if err != nil { + return nil, errors.Trace(err) + } + stores := make([]*pdtypes.StoreInfo, 0, len(storesInfo.Stores)) + for _, store := range storesInfo.Stores { + if metapb.StoreState(metapb.StoreState_value[store.Store.StateName]) != metapb.StoreState_Up { + continue + } + if engine.IsTiFlash(store.Store.Store) { + continue + } + stores = append(stores, store) + } + if len(stores) <= 1 { + return theResult, nil + } + slices.SortFunc(stores, func(i, j *pdtypes.StoreInfo) bool { + return i.Status.RegionCount < j.Status.RegionCount + }) + minStore := stores[0] + maxStore := stores[len(stores)-1] + + dbInfos, err := ci.preInfoGetter.GetAllTableStructures(ctx) + if err != nil { + return nil, errors.Trace(err) + } + tableCount := 0 + for _, db := range ci.dbMetas { + info, ok := dbInfos[db.Name] + if !ok { + continue + } + tableCount += len(info.Tables) + } + threhold := mathutil.Max(checkRegionCntRatioThreshold, tableCount) + if maxStore.Status.RegionCount <= threhold { + return theResult, nil + } + ratio := float64(minStore.Status.RegionCount) / float64(maxStore.Status.RegionCount) + if ratio < errorRegionCntMinMaxRatio { + theResult.Passed = false + theResult.Message = fmt.Sprintf("Region distribution is unbalanced, the ratio of the regions count of the store(%v) "+ + "with least regions(%v) to the store(%v) with most regions(%v) is %v, but we expect it must not be less than %v", + minStore.Store.GetId(), minStore.Status.RegionCount, maxStore.Store.GetId(), maxStore.Status.RegionCount, ratio, errorRegionCntMinMaxRatio) + } else if ratio < warnRegionCntMinMaxRatio { + theResult.Message = fmt.Sprintf("Region distribution is unbalanced, the ratio of the regions count of the store(%v) "+ + "with least regions(%v) to the store(%v) with most regions(%v) is %v, but we expect it should not be less than %v", + minStore.Store.GetId(), minStore.Status.RegionCount, maxStore.Store.GetId(), maxStore.Status.RegionCount, ratio, warnRegionCntMinMaxRatio) + } + return theResult, nil +} + +type storagePermissionCheckItem struct { + cfg *config.Config +} + +func NewStoragePermissionCheckItem(cfg *config.Config) PrecheckItem { + return &storagePermissionCheckItem{ + cfg: cfg, + } +} + +func (ci *storagePermissionCheckItem) GetCheckItemID() CheckItemID { + return CheckSourcePermission +} + +func (ci *storagePermissionCheckItem) Check(ctx context.Context) (*CheckResult, error) { + theResult := &CheckResult{ + Item: ci.GetCheckItemID(), + Severity: Critical, + Passed: true, + Message: "Lightning has the correct storage permission", + } + + u, err := storage.ParseBackend(ci.cfg.Mydumper.SourceDir, nil) + if err != nil { + return nil, common.NormalizeError(err) + } + _, err = storage.New(ctx, u, &storage.ExternalStorageOptions{ + CheckPermissions: []storage.Permission{ + storage.ListObjects, + storage.GetObject, + }, + }) + if err != nil { + theResult.Passed = false + theResult.Message = err.Error() + } + return theResult, nil +} + +type largeFileCheckItem struct { + cfg *config.Config + dbMetas []*mydump.MDDatabaseMeta +} + +func NewLargeFileCheckItem(cfg *config.Config, dbMetas []*mydump.MDDatabaseMeta) PrecheckItem { + return &largeFileCheckItem{ + cfg: cfg, + dbMetas: dbMetas, + } +} + +func (ci *largeFileCheckItem) GetCheckItemID() CheckItemID { + return CheckLargeDataFile +} + +func (ci *largeFileCheckItem) Check(ctx context.Context) (*CheckResult, error) { + theResult := &CheckResult{ + Item: ci.GetCheckItemID(), + Severity: Warn, + Passed: true, + Message: "Source csv files size is proper", + } + + if !ci.cfg.Mydumper.StrictFormat { + for _, db := range ci.dbMetas { + for _, t := range db.Tables { + for _, f := range t.DataFiles { + if f.FileMeta.RealSize > defaultCSVSize { + theResult.Message = fmt.Sprintf("large csv: %s file exists and it will slow down import performance", f.FileMeta.Path) + theResult.Passed = false + } + } + } + } + } else { + theResult.Message = "Skip the csv size check, because config.StrictFormat is true" + } + return theResult, nil +} + +type localDiskPlacementCheckItem struct { + cfg *config.Config +} + +func NewLocalDiskPlacementCheckItem(cfg *config.Config) PrecheckItem { + return &localDiskPlacementCheckItem{ + cfg: cfg, + } +} + +func (ci *localDiskPlacementCheckItem) GetCheckItemID() CheckItemID { + return CheckLocalDiskPlacement +} + +func (ci *localDiskPlacementCheckItem) Check(ctx context.Context) (*CheckResult, error) { + theResult := &CheckResult{ + Item: ci.GetCheckItemID(), + Severity: Warn, + Passed: true, + Message: "local source dir and temp-kv dir are in different disks", + } + sourceDir := strings.TrimPrefix(ci.cfg.Mydumper.SourceDir, storage.LocalURIPrefix) + same, err := common.SameDisk(sourceDir, ci.cfg.TikvImporter.SortedKVDir) + if err != nil { + return nil, errors.Trace(err) + } + if same { + theResult.Passed = false + theResult.Message = fmt.Sprintf("sorted-kv-dir:%s and data-source-dir:%s are in the same disk, may slow down performance", + ci.cfg.TikvImporter.SortedKVDir, sourceDir) + } + return theResult, nil +} + +type localTempKVDirCheckItem struct { + cfg *config.Config + preInfoGetter PreRestoreInfoGetter + dbMetas []*mydump.MDDatabaseMeta +} + +func NewLocalTempKVDirCheckItem(cfg *config.Config, preInfoGetter PreRestoreInfoGetter, dbMetas []*mydump.MDDatabaseMeta) PrecheckItem { + return &localTempKVDirCheckItem{ + cfg: cfg, + preInfoGetter: preInfoGetter, + dbMetas: dbMetas, + } +} + +func (ci *localTempKVDirCheckItem) GetCheckItemID() CheckItemID { + return CheckLocalTempKVDir +} + +func (ci *localTempKVDirCheckItem) hasCompressedFiles() bool { + for _, dbMeta := range ci.dbMetas { + for _, tbMeta := range dbMeta.Tables { + for _, file := range tbMeta.DataFiles { + if file.FileMeta.Compression != mydump.CompressionNone { + return true + } + } + } + } + return false +} + +func (ci *localTempKVDirCheckItem) Check(ctx context.Context) (*CheckResult, error) { + severity := Critical + // for cases that have compressed files, the estimated size may not be accurate, set severity to Warn to avoid failure + if ci.hasCompressedFiles() { + severity = Warn + } + theResult := &CheckResult{ + Item: ci.GetCheckItemID(), + Severity: severity, + } + storageSize, err := common.GetStorageSize(ci.cfg.TikvImporter.SortedKVDir) + if err != nil { + return nil, errors.Trace(err) + } + localAvailable := int64(storageSize.Available) + estimatedDataSizeResult, err := ci.preInfoGetter.EstimateSourceDataSize(ctx) + if err != nil { + return nil, errors.Trace(err) + } + estimatedDataSizeWithIndex := estimatedDataSizeResult.SizeWithIndex + + switch { + case localAvailable > estimatedDataSizeWithIndex: + theResult.Message = fmt.Sprintf("local disk resources are rich, estimate sorted data size %s, local available is %s", + units.BytesSize(float64(estimatedDataSizeWithIndex)), units.BytesSize(float64(localAvailable))) + theResult.Passed = true + case int64(ci.cfg.TikvImporter.DiskQuota) > localAvailable: + theResult.Message = fmt.Sprintf("local disk space may not enough to finish import, estimate sorted data size is %s,"+ + " but local available is %s, please set `tikv-importer.disk-quota` to a smaller value than %s"+ + " or change `mydumper.sorted-kv-dir` to another disk with enough space to finish imports", + units.BytesSize(float64(estimatedDataSizeWithIndex)), + units.BytesSize(float64(localAvailable)), units.BytesSize(float64(localAvailable))) + theResult.Passed = false + log.FromContext(ctx).Error(theResult.Message) + default: + theResult.Message = fmt.Sprintf("local disk space may not enough to finish import, "+ + "estimate sorted data size is %s, but local available is %s,"+ + "we will use disk-quota (size: %s) to finish imports, which may slow down import", + units.BytesSize(float64(estimatedDataSizeWithIndex)), + units.BytesSize(float64(localAvailable)), units.BytesSize(float64(ci.cfg.TikvImporter.DiskQuota))) + theResult.Passed = true + log.FromContext(ctx).Warn(theResult.Message) + } + return theResult, nil +} + +type checkpointCheckItem struct { + cfg *config.Config + preInfoGetter PreRestoreInfoGetter + dbMetas []*mydump.MDDatabaseMeta + checkpointsDB checkpoints.DB +} + +func NewCheckpointCheckItem(cfg *config.Config, preInfoGetter PreRestoreInfoGetter, dbMetas []*mydump.MDDatabaseMeta, checkpointsDB checkpoints.DB) PrecheckItem { + return &checkpointCheckItem{ + cfg: cfg, + preInfoGetter: preInfoGetter, + dbMetas: dbMetas, + checkpointsDB: checkpointsDB, + } +} + +func (ci *checkpointCheckItem) GetCheckItemID() CheckItemID { + return CheckCheckpoints +} + +func (ci *checkpointCheckItem) Check(ctx context.Context) (*CheckResult, error) { + if !ci.cfg.Checkpoint.Enable || ci.checkpointsDB == nil { + return nil, nil + } + theResult := &CheckResult{ + Item: ci.GetCheckItemID(), + Severity: Critical, + Passed: true, + Message: "the checkpoints are valid", + } + + checkMsgs := []string{} + for _, dbInfo := range ci.dbMetas { + for _, tableInfo := range dbInfo.Tables { + msgs, err := ci.checkpointIsValid(ctx, tableInfo) + if err != nil { + return nil, errors.Trace(err) + } + checkMsgs = append(checkMsgs, msgs...) + } + } + if len(checkMsgs) > 0 { + theResult.Passed = false + theResult.Message = strings.Join(checkMsgs, "\n") + } + return theResult, nil +} + +// checkpointIsValid checks whether we can start this import with this checkpoint. +func (ci *checkpointCheckItem) checkpointIsValid(ctx context.Context, tableInfo *mydump.MDTableMeta) ([]string, error) { + msgs := make([]string, 0) + uniqueName := common.UniqueTable(tableInfo.DB, tableInfo.Name) + tableCheckPoint, err := ci.checkpointsDB.Get(ctx, uniqueName) + if err != nil { + if errors.IsNotFound(err) { + // there is no checkpoint + log.FromContext(ctx).Debug("no checkpoint detected", zap.String("table", uniqueName)) + return nil, nil + } + return nil, errors.Trace(err) + } + // if checkpoint enable and not missing, we skip the check table empty progress. + if tableCheckPoint.Status <= checkpoints.CheckpointStatusMissing { + return nil, nil + } + + if tableCheckPoint.Status <= checkpoints.CheckpointStatusMaxInvalid { + failedStep := tableCheckPoint.Status * 10 + var action strings.Builder + action.WriteString("./tidb-lightning-ctl --checkpoint-error-") + switch failedStep { + case checkpoints.CheckpointStatusAlteredAutoInc, checkpoints.CheckpointStatusAnalyzed: + action.WriteString("ignore") + default: + action.WriteString("destroy") + } + action.WriteString("='") + action.WriteString(uniqueName) + action.WriteString("' --config=...") + + msgs = append(msgs, fmt.Sprintf("TiDB Lightning has failed last time. To prevent data loss, this run will stop now, "+ + "%s failed in step(%s), please run command %s,"+ + "You may also run `./tidb-lightning-ctl --checkpoint-error-destroy=all --config=...` to start from scratch,"+ + "For details of this failure, read the log file from the PREVIOUS run", + uniqueName, failedStep.MetricName(), action.String())) + return msgs, nil + } + + dbInfos, err := ci.preInfoGetter.GetAllTableStructures(ctx) + if err != nil { + return nil, errors.Trace(err) + } + dbInfo, ok := dbInfos[tableInfo.DB] + if ok { + t, ok := dbInfo.Tables[tableInfo.Name] + if ok { + if tableCheckPoint.TableID > 0 && tableCheckPoint.TableID != t.ID { + msgs = append(msgs, fmt.Sprintf("TiDB Lightning has detected tables with illegal checkpoints. To prevent data loss, this run will stop now,"+ + "please run command \"./tidb-lightning-ctl --checkpoint-remove='%s' --config=...\""+ + "You may also run `./tidb-lightning-ctl --checkpoint-error-destroy=all --config=...` to start from scratch,"+ + "For details of this failure, read the log file from the PREVIOUS run", + uniqueName)) + return msgs, nil + } + } + } + + var permFromCheckpoint []int + var columns []string + for _, eng := range tableCheckPoint.Engines { + if len(eng.Chunks) > 0 { + chunk := eng.Chunks[0] + permFromCheckpoint = chunk.ColumnPermutation + columns = chunk.Chunk.Columns + if filepath.Dir(chunk.FileMeta.Path) != ci.cfg.Mydumper.SourceDir { + message := fmt.Sprintf("chunk checkpoints path is not equal to config"+ + "checkpoint is %s, config source dir is %s", chunk.FileMeta.Path, ci.cfg.Mydumper.SourceDir) + msgs = append(msgs, message) + } + } + } + if len(columns) == 0 { + log.FromContext(ctx).Debug("no valid checkpoint detected", zap.String("table", uniqueName)) + return nil, nil + } + info := dbInfos[tableInfo.DB].Tables[tableInfo.Name] + if info != nil { + permFromTiDB, err := parseColumnPermutations(info.Core, columns, nil, log.FromContext(ctx)) + if err != nil { + msgs = append(msgs, fmt.Sprintf("failed to calculate columns %s, table %s's info has changed,"+ + "consider remove this checkpoint, and start import again.", err.Error(), uniqueName)) + } + if !reflect.DeepEqual(permFromCheckpoint, permFromTiDB) { + msgs = append(msgs, fmt.Sprintf("compare columns perm failed. table %s's info has changed,"+ + "consider remove this checkpoint, and start import again.", uniqueName)) + } + } + return msgs, nil +} + +// CDCPITRCheckItem check downstream has enabled CDC or PiTR. It's exposed to let +// caller override the Instruction message. +type CDCPITRCheckItem struct { + cfg *config.Config + Instruction string + // used in test + etcdCli *clientv3.Client +} + +// NewCDCPITRCheckItem creates a checker to check downstream has enabled CDC or PiTR. +func NewCDCPITRCheckItem(cfg *config.Config) PrecheckItem { + return &CDCPITRCheckItem{ + cfg: cfg, + Instruction: "local backend is not compatible with them. Please switch to tidb backend then try again.", + } +} + +// GetCheckItemID implements PrecheckItem interface. +func (ci *CDCPITRCheckItem) GetCheckItemID() CheckItemID { + return CheckTargetUsingCDCPITR +} + +func dialEtcdWithCfg(ctx context.Context, cfg *config.Config) (*clientv3.Client, error) { + cfg2, err := cfg.ToTLS() + if err != nil { + return nil, err + } + tlsConfig := cfg2.TLSConfig() + + return clientv3.New(clientv3.Config{ + TLS: tlsConfig, + Endpoints: []string{cfg.TiDB.PdAddr}, + AutoSyncInterval: 30 * time.Second, + DialTimeout: 5 * time.Second, + DialOptions: []grpc.DialOption{ + config.DefaultGrpcKeepaliveParams, + grpc.WithBlock(), + grpc.WithReturnConnectionError(), + }, + Context: ctx, + }) +} + +// Check implements PrecheckItem interface. +func (ci *CDCPITRCheckItem) Check(ctx context.Context) (*CheckResult, error) { + theResult := &CheckResult{ + Item: ci.GetCheckItemID(), + Severity: Critical, + } + + if ci.cfg.TikvImporter.Backend != config.BackendLocal { + theResult.Passed = true + theResult.Message = "TiDB Lightning is not using local backend, skip this check" + return theResult, nil + } + + if ci.etcdCli == nil { + var err error + ci.etcdCli, err = dialEtcdWithCfg(ctx, ci.cfg) + if err != nil { + return nil, errors.Trace(err) + } + //nolint: errcheck + defer ci.etcdCli.Close() + } + + errorMsg := make([]string, 0, 2) + + pitrCli := streamhelper.NewMetaDataClient(ci.etcdCli) + tasks, err := pitrCli.GetAllTasks(ctx) + if err != nil { + return nil, errors.Trace(err) + } + if len(tasks) > 0 { + names := make([]string, 0, len(tasks)) + for _, task := range tasks { + names = append(names, task.Info.GetName()) + } + errorMsg = append(errorMsg, fmt.Sprintf("found PiTR log streaming task(s): %v,", names)) + } + + // check etcd KV of CDC >= v6.2 + cdcPrefix := "/tidb/cdc/" + changefeedPath := []byte("/changefeed/info/") + + nameSet := make(map[string][]string, 1) + resp, err := ci.etcdCli.Get(ctx, cdcPrefix, clientv3.WithPrefix()) + if err != nil { + return nil, errors.Trace(err) + } + for _, kv := range resp.Kvs { + // example: /tidb/cdc///changefeed/info/ + k := kv.Key[len(cdcPrefix):] + clusterAndNamespace, changefeedID, found := bytes.Cut(k, changefeedPath) + if !found { + continue + } + if !isActiveCDCChangefeed(kv.Value) { + continue + } + + nameSet[string(clusterAndNamespace)] = append(nameSet[string(clusterAndNamespace)], string(changefeedID)) + } + if len(nameSet) == 0 { + // check etcd KV of CDC <= v6.1 + cdcPrefixV61 := "/tidb/cdc/changefeed/info/" + resp, err = ci.etcdCli.Get(ctx, cdcPrefixV61, clientv3.WithPrefix()) + if err != nil { + return nil, errors.Trace(err) + } + for _, kv := range resp.Kvs { + // example: /tidb/cdc/changefeed/info/ + k := kv.Key[len(cdcPrefixV61):] + if len(k) == 0 { + continue + } + if !isActiveCDCChangefeed(kv.Value) { + continue + } + + nameSet[""] = append(nameSet[""], string(k)) + } + } + + if len(nameSet) > 0 { + var changefeedMsgBuf strings.Builder + changefeedMsgBuf.WriteString("found CDC changefeed(s): ") + isFirst := true + for clusterID, captureIDs := range nameSet { + if !isFirst { + changefeedMsgBuf.WriteString(", ") + } + isFirst = false + changefeedMsgBuf.WriteString("cluster/namespace: ") + changefeedMsgBuf.WriteString(clusterID) + changefeedMsgBuf.WriteString(" changefeed(s): ") + changefeedMsgBuf.WriteString(fmt.Sprintf("%v", captureIDs)) + } + changefeedMsgBuf.WriteString(",") + errorMsg = append(errorMsg, changefeedMsgBuf.String()) + } + + if len(errorMsg) > 0 { + errorMsg = append(errorMsg, ci.Instruction) + theResult.Passed = false + theResult.Message = strings.Join(errorMsg, "\n") + } else { + theResult.Passed = true + theResult.Message = "no CDC or PiTR task found" + } + + return theResult, nil +} + +type onlyState struct { + State string `json:"state"` +} + +func isActiveCDCChangefeed(jsonBytes []byte) bool { + s := onlyState{} + err := json.Unmarshal(jsonBytes, &s) + if err != nil { + // maybe a compatible issue, skip this key + log.L().Error("unmarshal etcd value failed when check CDC changefeed, will skip this key", + zap.ByteString("value", jsonBytes), + zap.Error(err)) + return false + } + switch s.State { + case "normal", "stopped", "error": + return true + default: + return false + } +} + +type schemaCheckItem struct { + cfg *config.Config + preInfoGetter PreRestoreInfoGetter + dbMetas []*mydump.MDDatabaseMeta + checkpointsDB checkpoints.DB +} + +func NewSchemaCheckItem(cfg *config.Config, preInfoGetter PreRestoreInfoGetter, dbMetas []*mydump.MDDatabaseMeta, cpdb checkpoints.DB) PrecheckItem { + return &schemaCheckItem{ + cfg: cfg, + preInfoGetter: preInfoGetter, + dbMetas: dbMetas, + checkpointsDB: cpdb, + } +} + +func (ci *schemaCheckItem) GetCheckItemID() CheckItemID { + return CheckSourceSchemaValid +} + +func (ci *schemaCheckItem) Check(ctx context.Context) (*CheckResult, error) { + theResult := &CheckResult{ + Item: ci.GetCheckItemID(), + Severity: Critical, + Passed: true, + Message: "table schemas are valid", + } + + checkMsgs := []string{} + for _, dbInfo := range ci.dbMetas { + for _, tableInfo := range dbInfo.Tables { + if ci.cfg.Checkpoint.Enable && ci.checkpointsDB != nil { + uniqueName := common.UniqueTable(tableInfo.DB, tableInfo.Name) + if _, err := ci.checkpointsDB.Get(ctx, uniqueName); err == nil { + // there is a checkpoint + log.L().Debug("checkpoint detected, skip the schema check", zap.String("table", uniqueName)) + continue + } + } + msgs, err := ci.SchemaIsValid(ctx, tableInfo) + if err != nil { + return nil, errors.Trace(err) + } + checkMsgs = append(checkMsgs, msgs...) + } + } + if len(checkMsgs) > 0 { + theResult.Passed = false + theResult.Message = strings.Join(checkMsgs, "\n") + } + return theResult, nil +} + +// SchemaIsValid checks the import file and cluster schema is match. +func (ci *schemaCheckItem) SchemaIsValid(ctx context.Context, tableInfo *mydump.MDTableMeta) ([]string, error) { + if len(tableInfo.DataFiles) == 0 { + log.FromContext(ctx).Info("no data files detected", zap.String("db", tableInfo.DB), zap.String("table", tableInfo.Name)) + return nil, nil + } + + msgs := make([]string, 0) + dbInfos, err := ci.preInfoGetter.GetAllTableStructures(ctx) + if err != nil { + return nil, errors.Trace(err) + } + info, ok := dbInfos[tableInfo.DB].Tables[tableInfo.Name] + if !ok { + msgs = append(msgs, fmt.Sprintf("TiDB schema `%s`.`%s` doesn't exists,"+ + "please give a schema file in source dir or create table manually", tableInfo.DB, tableInfo.Name)) + return msgs, nil + } + + igCol, err := ci.cfg.Mydumper.IgnoreColumns.GetIgnoreColumns(tableInfo.DB, tableInfo.Name, ci.cfg.Mydumper.CaseSensitive) + if err != nil { + return nil, errors.Trace(err) + } + igCols := igCol.ColumnsMap() + + fullExtendColsSet := make(set.StringSet) + for _, fileInfo := range tableInfo.DataFiles { + for _, col := range fileInfo.FileMeta.ExtendData.Columns { + if _, ok = igCols[col]; ok { + msgs = append(msgs, fmt.Sprintf("extend column %s is also assigned in ignore-column for table `%s`.`%s`, "+ + "please keep only either one of them", col, tableInfo.DB, tableInfo.Name)) + } + fullExtendColsSet.Insert(col) + } + } + if len(msgs) > 0 { + return msgs, nil + } + + colCountFromTiDB := len(info.Core.Columns) + if len(fullExtendColsSet) > 0 { + log.FromContext(ctx).Info("check extend column count through data files", zap.String("db", tableInfo.DB), + zap.String("table", tableInfo.Name)) + igColCnt := 0 + for _, col := range info.Core.Columns { + if _, ok = igCols[col.Name.L]; ok { + igColCnt++ + } + } + for _, f := range tableInfo.DataFiles { + cols, previewRows, err := ci.preInfoGetter.ReadFirstNRowsByFileMeta(ctx, f.FileMeta, 1) + if err != nil { + return nil, errors.Trace(err) + } + if len(cols) > 0 { + colsSet := set.NewStringSet(cols...) + for _, extendCol := range f.FileMeta.ExtendData.Columns { + if colsSet.Exist(strings.ToLower(extendCol)) { + msgs = append(msgs, fmt.Sprintf("extend column %s is contained in table `%s`.`%s`'s header, "+ + "please remove this column in data or remove this extend rule", extendCol, tableInfo.DB, tableInfo.Name)) + } + } + } else if len(previewRows) > 0 && len(previewRows[0])+len(f.FileMeta.ExtendData.Columns) > colCountFromTiDB+igColCnt { + msgs = append(msgs, fmt.Sprintf("row count %d adding with extend column length %d is larger than columnCount %d plus ignore column count %d for table `%s`.`%s`, "+ + "please make sure your source data don't have extend columns and target schema has all of them", len(previewRows[0]), len(f.FileMeta.ExtendData.Columns), colCountFromTiDB, igColCnt, tableInfo.DB, tableInfo.Name)) + } + } + } + if len(msgs) > 0 { + return msgs, nil + } + + core := info.Core + defaultCols := make(map[string]struct{}) + autoRandomCol := common.GetAutoRandomColumn(core) + for _, col := range core.Columns { + // we can extend column the same with columns with default values + if _, isExtendCol := fullExtendColsSet[col.Name.O]; isExtendCol || hasDefault(col) || (autoRandomCol != nil && autoRandomCol.ID == col.ID) { + // this column has default value or it's auto random id, so we can ignore it + defaultCols[col.Name.L] = struct{}{} + } + delete(fullExtendColsSet, col.Name.O) + } + if len(fullExtendColsSet) > 0 { + extendCols := make([]string, 0, len(fullExtendColsSet)) + for col := range fullExtendColsSet { + extendCols = append(extendCols, col) + } + msgs = append(msgs, fmt.Sprintf("extend column [%s] don't exist in target table `%s`.`%s` schema, "+ + "please add these extend columns manually in downstream database/schema file", strings.Join(extendCols, ","), tableInfo.DB, tableInfo.Name)) + return msgs, nil + } + + // tidb_rowid have a default value. + defaultCols[model.ExtraHandleName.String()] = struct{}{} + + // only check the first file of this table. + dataFile := tableInfo.DataFiles[0] + log.FromContext(ctx).Info("datafile to check", zap.String("db", tableInfo.DB), + zap.String("table", tableInfo.Name), zap.String("path", dataFile.FileMeta.Path)) + // get columns name from data file. + dataFileMeta := dataFile.FileMeta + + if tp := dataFileMeta.Type; tp != mydump.SourceTypeCSV && tp != mydump.SourceTypeSQL && tp != mydump.SourceTypeParquet { + msgs = append(msgs, fmt.Sprintf("file '%s' with unknown source type '%s'", dataFileMeta.Path, dataFileMeta.Type.String())) + return msgs, nil + } + row := []types.Datum{} + colsFromDataFile, rows, err := ci.preInfoGetter.ReadFirstNRowsByFileMeta(ctx, dataFileMeta, 1) + if err != nil { + return nil, errors.Trace(err) + } + if len(rows) > 0 { + row = rows[0] + } + if colsFromDataFile == nil && len(row) == 0 { + log.FromContext(ctx).Info("file contains no data, skip checking against schema validity", zap.String("path", dataFileMeta.Path)) + return msgs, nil + } + + if colsFromDataFile == nil { + // when there is no columns name in data file. we must insert data in order. + // so the last several columns either can be ignored or has a default value. + for i := len(row); i < colCountFromTiDB; i++ { + if _, ok := defaultCols[core.Columns[i].Name.L]; !ok { + msgs = append(msgs, fmt.Sprintf("TiDB schema `%s`.`%s` has %d columns,"+ + "and data file has %d columns, but column %s are missing the default value,"+ + "please give column a default value to skip this check", + tableInfo.DB, tableInfo.Name, colCountFromTiDB, len(row), core.Columns[i].Name.L)) + } + } + return msgs, nil + } + + // compare column names and make sure + // 1. TiDB table info has data file's all columns(besides ignore columns) + // 2. Those columns not introduced in data file always have a default value. + colMap := make(map[string]struct{}) + for col := range igCols { + colMap[col] = struct{}{} + } + for _, col := range core.Columns { + if _, ok := colMap[col.Name.L]; ok { + // tidb's column is ignored + // we need ensure this column has the default value. + if _, hasDefault := defaultCols[col.Name.L]; !hasDefault { + msgs = append(msgs, fmt.Sprintf("TiDB schema `%s`.`%s`'s column %s cannot be ignored,"+ + "because it doesn't have a default value, please set tables.ignoreColumns properly", + tableInfo.DB, tableInfo.Name, col.Name.L)) + } + } else { + colMap[col.Name.L] = struct{}{} + } + } + // tidb_rowid can be ignored in check + colMap[model.ExtraHandleName.String()] = struct{}{} + for _, col := range colsFromDataFile { + if _, ok := colMap[col]; !ok { + checkMsg := "please check table schema" + if dataFileMeta.Type == mydump.SourceTypeCSV && ci.cfg.Mydumper.CSV.Header { + checkMsg += " and csv file header" + } + msgs = append(msgs, fmt.Sprintf("TiDB schema `%s`.`%s` doesn't have column %s, "+ + "%s or use tables.ignoreColumns to ignore %s", + tableInfo.DB, tableInfo.Name, col, checkMsg, col)) + } else { + // remove column for next iteration + delete(colMap, col) + } + } + // if theses rest columns don't have a default value. + for col := range colMap { + if _, ok := defaultCols[col]; ok { + continue + } + msgs = append(msgs, fmt.Sprintf("TiDB schema `%s`.`%s` doesn't have the default value for %s. "+ + "Please add default value for column '%s' or choose another column to ignore or add this column in data file", + tableInfo.DB, tableInfo.Name, col, col)) + } + return msgs, nil +} + +type csvHeaderCheckItem struct { + cfg *config.Config + preInfoGetter PreRestoreInfoGetter + dbMetas []*mydump.MDDatabaseMeta +} + +func NewCSVHeaderCheckItem(cfg *config.Config, preInfoGetter PreRestoreInfoGetter, dbMetas []*mydump.MDDatabaseMeta) PrecheckItem { + return &csvHeaderCheckItem{ + cfg: cfg, + preInfoGetter: preInfoGetter, + dbMetas: dbMetas, + } +} + +func (ci *csvHeaderCheckItem) GetCheckItemID() CheckItemID { + return CheckCSVHeader +} + +// Check tries to check whether the csv header config is consistent with the source csv files by: +// 1. pick one table with two CSV files and a unique/primary key +// 2. read the first row of those two CSV files +// 3. checks if the content of those first rows are compatible with the table schema, and whether the +// two rows are identical, to determine if the first rows are a header rows. +func (ci *csvHeaderCheckItem) Check(ctx context.Context) (*CheckResult, error) { + // if cfg set header = true but source files actually contain not header, former SchemaCheck should + // return error in this situation, so we need do it again. + if ci.cfg.Mydumper.CSV.Header { + return nil, nil + } + theResult := &CheckResult{ + Item: ci.GetCheckItemID(), + Severity: Critical, + Passed: true, + Message: "the config [mydumper.csv.header] is set to false, and CSV header lines are really not detected in the data files", + } + var ( + tableMeta *mydump.MDTableMeta + csvCount int + hasUniqueIdx bool + ) + dbInfos, err := ci.preInfoGetter.GetAllTableStructures(ctx) + if err != nil { + return nil, errors.Trace(err) + } + // only check one table source files for better performance. The checked table is chosen based on following two factor: + // 1. contains at least 1 csv source file, 2 is preferable + // 2. table schema contains primary key or unique key + // if the two factors can't be both satisfied, the first one has a higher priority +outer: + for _, dbMeta := range ci.dbMetas { + for _, tblMeta := range dbMeta.Tables { + if len(tblMeta.DataFiles) == 0 { + continue + } + tableHasUniqueIdx := false + tableCSVCount := 0 + for _, f := range tblMeta.DataFiles { + if f.FileMeta.Type == mydump.SourceTypeCSV { + tableCSVCount++ + if tableCSVCount >= 2 { + break + } + } + } + if tableCSVCount == 0 { + continue + } + + info := dbInfos[tblMeta.DB].Tables[tblMeta.Name] + for _, idx := range info.Core.Indices { + if idx.Primary || idx.Unique { + tableHasUniqueIdx = true + } + } + + if tableCSVCount >= 2 && hasUniqueIdx { + tableMeta = tblMeta + // if a perfect table source is found, we can stop check more tables + break outer + } + if tableCSVCount > csvCount || (tableCSVCount == csvCount && !hasUniqueIdx && tableHasUniqueIdx) { + tableMeta = tblMeta + csvCount = tableCSVCount + hasUniqueIdx = tableHasUniqueIdx + } + } + } + + if tableMeta == nil { + return theResult, nil + } + + var rows [][]types.Datum + for _, f := range tableMeta.DataFiles { + if f.FileMeta.Type != mydump.SourceTypeCSV { + continue + } + + row := []types.Datum{} + _, previewRows, err := ci.preInfoGetter.ReadFirstNRowsByFileMeta(ctx, f.FileMeta, 1) + if err != nil { + return nil, errors.Trace(err) + } + if len(previewRows) > 0 { + row = previewRows[0] + } + if len(row) > 0 { + rows = append(rows, row) + } + // only check at most two of all the files + if len(rows) >= 2 { + break + } + } + if len(rows) == 0 { + return theResult, nil + } else if len(rows) >= 2 { + // if the first row in two source files are not the same, they should not be the header line + // NOTE: though lightning's logic allows different source files contains different columns or the + // order is difference, here we only check if they are exactly the same because this is the common case. + if len(rows[0]) != len(rows[1]) { + return theResult, nil + } + + for i := 0; i < len(rows[0]); i++ { + if rows[0][i].GetString() != rows[1][i].GetString() { + return theResult, nil + } + } + } + + // check if some fields are unique and not ignored + // if at least one field appears in a unique key, we can sure there is something wrong, + // they should be either the header line or the data is duplicated. + tableInfo := dbInfos[tableMeta.DB].Tables[tableMeta.Name] + tableFields := make(map[string]struct{}) + uniqueIdxFields := make(map[string]struct{}) + ignoreColumns, err := ci.cfg.Mydumper.IgnoreColumns.GetIgnoreColumns(tableMeta.DB, tableMeta.Name, ci.cfg.Mydumper.CaseSensitive) + if err != nil { + return nil, errors.Trace(err) + } + ignoreColsSet := make(map[string]struct{}) + for _, col := range ignoreColumns.Columns { + ignoreColsSet[col] = struct{}{} + } + for _, idx := range tableInfo.Core.Indices { + if !idx.Unique && !idx.Primary { + continue + } + for _, col := range idx.Columns { + if _, ok := ignoreColsSet[col.Name.L]; !ok { + uniqueIdxFields[col.Name.L] = struct{}{} + } + } + } + for _, f := range tableInfo.Core.Columns { + tableFields[f.Name.L] = struct{}{} + } + if common.TableHasAutoRowID(tableInfo.Core) { + tableFields[model.ExtraHandleName.L] = struct{}{} + } + hasUniqueField := false + for _, d := range rows[0] { + val := strings.ToLower(d.GetString()) + if _, ok := tableFields[val]; !ok { + return theResult, nil + } + if _, ok := uniqueIdxFields[val]; ok { + hasUniqueField = true + break + } + } + + theResult.Passed = false + theResult.Message = fmt.Sprintf("source csv files contains header row but `mydumper.csv.header` is false, checked table is `%s`.`%s`", + tableMeta.DB, tableMeta.Name) + theResult.Severity = Warn + if hasUniqueField && len(rows) > 1 { + theResult.Severity = Critical + } else if !checkFieldCompatibility(tableInfo.Core, ignoreColsSet, rows[0], log.FromContext(ctx)) { + // if there are only 1 csv file or there is not unique key, try to check if all columns are compatible with string value + theResult.Severity = Critical + } + return theResult, nil +} + +func checkFieldCompatibility( + tbl *model.TableInfo, + ignoreCols map[string]struct{}, + values []types.Datum, + logger log.Logger, +) bool { + se := kv.NewSession(&kv.SessionOptions{ + SQLMode: mysql.ModeStrictTransTables, + }, logger) + for i, col := range tbl.Columns { + // do not check ignored columns + if _, ok := ignoreCols[col.Name.L]; ok { + continue + } + if i >= len(values) { + break + } + _, err := table.CastValue(se, values[i], col, true, false) + if err != nil { + logger.Error("field value is not consistent with column type", zap.String("value", values[i].GetString()), + zap.Any("column_info", col), zap.Error(err)) + return false + } + } + + return true +} + +type tableEmptyCheckItem struct { + cfg *config.Config + preInfoGetter PreRestoreInfoGetter + dbMetas []*mydump.MDDatabaseMeta + checkpointsDB checkpoints.DB +} + +func NewTableEmptyCheckItem(cfg *config.Config, preInfoGetter PreRestoreInfoGetter, dbMetas []*mydump.MDDatabaseMeta, cpdb checkpoints.DB) PrecheckItem { + return &tableEmptyCheckItem{ + cfg: cfg, + preInfoGetter: preInfoGetter, + dbMetas: dbMetas, + checkpointsDB: cpdb, + } +} + +func (ci *tableEmptyCheckItem) GetCheckItemID() CheckItemID { + return CheckTargetTableEmpty +} + +func (ci *tableEmptyCheckItem) Check(ctx context.Context) (*CheckResult, error) { + theResult := &CheckResult{ + Item: ci.GetCheckItemID(), + Severity: Critical, + Passed: true, + Message: "all importing tables on the target are empty", + } + + tableCount := 0 + for _, db := range ci.dbMetas { + tableCount += len(db.Tables) + } + + var lock sync.Mutex + tableNames := make([]string, 0) + concurrency := mathutil.Min(tableCount, ci.cfg.App.RegionConcurrency) + type tableNameComponents struct { + DBName string + TableName string + } + ch := make(chan tableNameComponents, concurrency) + eg, gCtx := errgroup.WithContext(ctx) + + for i := 0; i < concurrency; i++ { + eg.Go(func() error { + for tblNameComp := range ch { + fullTableName := common.UniqueTable(tblNameComp.DBName, tblNameComp.TableName) + // skip tables that have checkpoint + if ci.cfg.Checkpoint.Enable && ci.checkpointsDB != nil { + _, err := ci.checkpointsDB.Get(gCtx, fullTableName) + switch { + case err == nil: + continue + case errors.IsNotFound(err): + default: + return errors.Trace(err) + } + } + + isEmptyPtr, err1 := ci.preInfoGetter.IsTableEmpty(gCtx, tblNameComp.DBName, tblNameComp.TableName) + if err1 != nil { + return err1 + } + if !(*isEmptyPtr) { + lock.Lock() + tableNames = append(tableNames, fullTableName) + lock.Unlock() + } + } + return nil + }) + } +loop: + for _, db := range ci.dbMetas { + for _, tbl := range db.Tables { + select { + case ch <- tableNameComponents{tbl.DB, tbl.Name}: + case <-gCtx.Done(): + break loop + } + } + } + close(ch) + if err := eg.Wait(); err != nil { + if common.IsContextCanceledError(err) { + return nil, nil + } + return nil, errors.Annotate(err, "check table contains data failed") + } + + if len(tableNames) > 0 { + // sort the failed names + slices.Sort(tableNames) + theResult.Passed = false + theResult.Message = fmt.Sprintf("table(s) [%s] are not empty", strings.Join(tableNames, ", ")) + } + return theResult, nil +} + +// hasDefault represents col has default value. +func hasDefault(col *model.ColumnInfo) bool { + return col.DefaultIsExpr || col.DefaultValue != nil || !mysql.HasNotNullFlag(col.GetFlag()) || + col.IsGenerated() || mysql.HasAutoIncrementFlag(col.GetFlag()) +} diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 92e02e3c4cd67..44c2c9c41a69e 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -1623,7 +1623,7 @@ func (tr *TableRestore) restoreTable( web.BroadcastTableCheckpoint(tr.tableName, cp) // rebase the allocator so it exceeds the number of rows. - if tr.tableInfo.Core.PKIsHandle && tr.tableInfo.Core.ContainsAutoRandomBits() { + if tr.tableInfo.Core.ContainsAutoRandomBits() { cp.AllocBase = mathutil.Max(cp.AllocBase, tr.tableInfo.Core.AutoRandID) if err := tr.alloc.Get(autoid.AutoRandomType).Rebase(context.Background(), cp.AllocBase, false); err != nil { return false, err @@ -2303,7 +2303,7 @@ func saveCheckpoint(rc *Controller, t *TableRestore, engineID int32, chunk *chec // or integer primary key), which can only be obtained by reading all data. var base int64 - if t.tableInfo.Core.PKIsHandle && t.tableInfo.Core.ContainsAutoRandomBits() { + if t.tableInfo.Core.ContainsAutoRandomBits() { base = t.alloc.Get(autoid.AutoRandomType).Base() + 1 } else { base = t.alloc.Get(autoid.RowIDAllocType).Base() + 1 diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index d875cbd0af87d..d8c5bb2a1f1c7 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -688,6 +688,7 @@ func (tr *TableRestore) postProcess( rc.alterTableLock.Lock() tblInfo := tr.tableInfo.Core var err error +<<<<<<< HEAD if tblInfo.PKIsHandle && tblInfo.ContainsAutoRandomBits() { var maxAutoRandom, autoRandomTotalBits uint64 autoRandomTotalBits = 64 @@ -698,6 +699,13 @@ func (tr *TableRestore) postProcess( } maxAutoRandom = 1<<(autoRandomTotalBits-autoRandomBits) - 1 err = AlterAutoRandom(ctx, rc.tidbGlue.GetSQLExecutor(), tr.tableName, uint64(tr.alloc.Get(autoid.AutoRandomType).Base())+1, maxAutoRandom) +======= + if tblInfo.ContainsAutoRandomBits() { + ft := &common.GetAutoRandomColumn(tblInfo).FieldType + shardFmt := autoid.NewShardIDFormat(ft, tblInfo.AutoRandomBits, tblInfo.AutoRandomRangeBits) + maxCap := shardFmt.IncrementalBitsCapacity() + err = AlterAutoRandom(ctx, rc.tidbGlue.GetSQLExecutor(), tr.tableName, uint64(tr.alloc.Get(autoid.AutoRandomType).Base())+1, maxCap) +>>>>>>> 6837bd588d7 (lightning: support auto_random column in composite primary key (#41463)) } else if common.TableHasAutoRowID(tblInfo) || tblInfo.GetAutoIncrementColInfo() != nil { // only alter auto increment id iff table contains auto-increment column or generated handle err = AlterAutoIncrement(ctx, rc.tidbGlue.GetSQLExecutor(), tr.tableName, uint64(tr.alloc.Get(autoid.RowIDAllocType).Base())+1) diff --git a/br/tests/lightning_auto_random_default/data2/auto_random.t.000000001.csv b/br/tests/lightning_auto_random_default/data2/auto_random.t.000000001.csv new file mode 100644 index 0000000000000..5de7daa1f86f2 --- /dev/null +++ b/br/tests/lightning_auto_random_default/data2/auto_random.t.000000001.csv @@ -0,0 +1,3 @@ +"a","b" +1,11 +2,22 diff --git a/br/tests/lightning_auto_random_default/run.sh b/br/tests/lightning_auto_random_default/run.sh index 41b9798de4560..67f51a6518ec6 100644 --- a/br/tests/lightning_auto_random_default/run.sh +++ b/br/tests/lightning_auto_random_default/run.sh @@ -60,3 +60,30 @@ for backend in tidb local; do run_sql "SELECT max(id & b'000001111111111111111111111111111111111111111111111111111111111') >= $NEXT_AUTO_RAND_VAL as ge FROM auto_random.t" check_contains 'ge: 1' done + +function run_for_auro_random_data2() { + create_table=$1 + run_sql 'DROP DATABASE IF EXISTS auto_random;' + run_sql 'CREATE DATABASE IF NOT EXISTS auto_random;' + run_sql "$create_table" + run_lightning --backend $backend -d "tests/$TEST_NAME/data2" + run_sql 'select count(*) as count from auto_random.t where c > 0' + check_contains "count: 2" + run_sql 'select count(*) as count from auto_random.t where a=1 and b=11' + check_contains "count: 1" + run_sql 'select count(*) as count from auto_random.t where a=2 and b=22' + check_contains "count: 1" +} + +for backend in tidb local; do + if [ "$backend" = 'local' ]; then + check_cluster_version 4 0 0 'local backend' || continue + fi + + run_for_auro_random_data2 'create table auto_random.t(c bigint auto_random primary key, a int, b int)' + run_for_auro_random_data2 'create table auto_random.t(a int, b int, c bigint auto_random primary key)' + # composite key and auto_random is the first column + run_for_auro_random_data2 'create table auto_random.t(c bigint auto_random, a int, b int, primary key(c, a))' + # composite key and auto_random is not the first column + run_for_auro_random_data2 'create table auto_random.t(a int, b int, c bigint auto_random, primary key(c, a))' +done