Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#44803
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
GMHDBJD authored and ti-chi-bot committed Jun 29, 2023
1 parent d68abe5 commit 3a13785
Show file tree
Hide file tree
Showing 23 changed files with 2,618 additions and 17 deletions.
20 changes: 16 additions & 4 deletions br/pkg/checksum/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ type ExecutorBuilder struct {

oldTable *metautil.Table

concurrency uint
concurrency uint
backoffWeight int

oldKeyspace []byte
newKeyspace []byte
Expand Down Expand Up @@ -56,6 +57,12 @@ func (builder *ExecutorBuilder) SetConcurrency(conc uint) *ExecutorBuilder {
return builder
}

// SetBackoffWeight set the backoffWeight of the checksum executing.
func (builder *ExecutorBuilder) SetBackoffWeight(backoffWeight int) *ExecutorBuilder {
builder.backoffWeight = backoffWeight
return builder
}

func (builder *ExecutorBuilder) SetOldKeyspace(keyspace []byte) *ExecutorBuilder {
builder.oldKeyspace = keyspace
return builder
Expand All @@ -79,7 +86,7 @@ func (builder *ExecutorBuilder) Build() (*Executor, error) {
if err != nil {
return nil, errors.Trace(err)
}
return &Executor{reqs: reqs}, nil
return &Executor{reqs: reqs, backoffWeight: builder.backoffWeight}, nil
}

func buildChecksumRequest(
Expand Down Expand Up @@ -294,7 +301,8 @@ func updateChecksumResponse(resp, update *tipb.ChecksumResponse) {

// Executor is a checksum executor.
type Executor struct {
reqs []*kv.Request
reqs []*kv.Request
backoffWeight int
}

// Len returns the total number of checksum requests.
Expand Down Expand Up @@ -347,7 +355,11 @@ func (exec *Executor) Execute(
err error
)
err = utils.WithRetry(ctx, func() error {
resp, err = sendChecksumRequest(ctx, client, req, kv.NewVariables(&killed))
vars := kv.NewVariables(&killed)
if exec.backoffWeight > 0 {
vars.BackOffWeight = exec.backoffWeight
}
resp, err = sendChecksumRequest(ctx, client, req, vars)
failpoint.Inject("checksumRetryErr", func(val failpoint.Value) {
// first time reach here. return error
if val.(bool) {
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ go_library(
"//kv",
"//parser/model",
"//parser/mysql",
"//sessionctx/variable",
"//store/pdtypes",
"//table",
"//tablecodec",
Expand All @@ -72,6 +73,7 @@ go_library(
"@com_github_pingcap_kvproto//pkg/pdpb",
"@com_github_pingcap_tipb//go-tipb",
"@com_github_tikv_client_go_v2//error",
"@com_github_tikv_client_go_v2//kv",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_pd_client//:client",
Expand Down
45 changes: 40 additions & 5 deletions br/pkg/lightning/backend/local/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/metric"
"github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tipb/go-tipb"
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
"go.uber.org/atomic"
Expand All @@ -49,7 +51,14 @@ const (
var (
serviceSafePointTTL int64 = 10 * 60 // 10 min in seconds

minDistSQLScanConcurrency = 4
// MinDistSQLScanConcurrency is the minimum value of tidb_distsql_scan_concurrency.
MinDistSQLScanConcurrency = 4

// DefaultBackoffWeight is the default value of tidb_backoff_weight for checksum.
// when TiKV client encounters an error of "region not leader", it will keep retrying every 500 ms.
// If it still fails after 2 * 20 = 40 seconds, it will return "region unavailable".
// If we increase the BackOffWeight to 6, then the TiKV client will keep retrying for 120 seconds.
DefaultBackoffWeight = 3 * tikvstore.DefBackOffWeight
)

// RemoteChecksum represents a checksum result got from tidb.
Expand Down Expand Up @@ -102,16 +111,39 @@ func (e *tidbChecksumExecutor) Checksum(ctx context.Context, tableInfo *checkpoi

task := log.FromContext(ctx).With(zap.String("table", tableName)).Begin(zap.InfoLevel, "remote checksum")

conn, err := e.db.Conn(ctx)
if err != nil {
return nil, errors.Trace(err)
}
defer func() {
if err := conn.Close(); err != nil {
task.Warn("close connection failed", zap.Error(err))
}
}()
// ADMIN CHECKSUM TABLE <table>,<table> example.
// mysql> admin checksum table test.t;
// +---------+------------+---------------------+-----------+-------------+
// | Db_name | Table_name | Checksum_crc64_xor | Total_kvs | Total_bytes |
// +---------+------------+---------------------+-----------+-------------+
// | test | t | 8520875019404689597 | 7296873 | 357601387 |
// +---------+------------+---------------------+-----------+-------------+
backoffWeight, err := common.GetBackoffWeightFromDB(ctx, e.db)
if err == nil && backoffWeight < DefaultBackoffWeight {
task.Info("increase tidb_backoff_weight", zap.Int("original", backoffWeight), zap.Int("new", DefaultBackoffWeight))
// increase backoff weight
if _, err := conn.ExecContext(ctx, fmt.Sprintf("SET SESSION %s = '%d';", variable.TiDBBackOffWeight, DefaultBackoffWeight)); err != nil {
task.Warn("set tidb_backoff_weight failed", zap.Error(err))
} else {
defer func() {
if _, err := conn.ExecContext(ctx, fmt.Sprintf("SET SESSION %s = '%d';", variable.TiDBBackOffWeight, backoffWeight)); err != nil {
task.Warn("recover tidb_backoff_weight failed", zap.Error(err))
}
}()
}
}

cs := RemoteChecksum{}
err = common.SQLWithRetry{DB: e.db, Logger: task.Logger}.QueryRow(ctx, "compute remote checksum",
err = common.SQLWithRetry{DB: conn, Logger: task.Logger}.QueryRow(ctx, "compute remote checksum",
"ADMIN CHECKSUM TABLE "+tableName, &cs.Schema, &cs.Table, &cs.Checksum, &cs.TotalKVs, &cs.TotalBytes,
)
dur := task.End(zap.ErrorLevel, err)
Expand Down Expand Up @@ -239,22 +271,25 @@ type TiKVChecksumManager struct {
client kv.Client
manager gcTTLManager
distSQLScanConcurrency uint
backoffWeight int
}

var _ ChecksumManager = &TiKVChecksumManager{}

// NewTiKVChecksumManager return a new tikv checksum manager
func NewTiKVChecksumManager(client kv.Client, pdClient pd.Client, distSQLScanConcurrency uint) *TiKVChecksumManager {
func NewTiKVChecksumManager(client kv.Client, pdClient pd.Client, distSQLScanConcurrency uint, backoffWeight int) *TiKVChecksumManager {
return &TiKVChecksumManager{
client: client,
manager: newGCTTLManager(pdClient),
distSQLScanConcurrency: distSQLScanConcurrency,
backoffWeight: backoffWeight,
}
}

func (e *TiKVChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpoints.TidbTableInfo, ts uint64) (*RemoteChecksum, error) {
executor, err := checksum.NewExecutorBuilder(tableInfo.Core, ts).
SetConcurrency(e.distSQLScanConcurrency).
SetBackoffWeight(e.backoffWeight).
Build()
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -286,8 +321,8 @@ func (e *TiKVChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpo
if !common.IsRetryableError(err) {
break
}
if distSQLScanConcurrency > minDistSQLScanConcurrency {
distSQLScanConcurrency = mathutil.Max(distSQLScanConcurrency/2, minDistSQLScanConcurrency)
if distSQLScanConcurrency > MinDistSQLScanConcurrency {
distSQLScanConcurrency = mathutil.Max(distSQLScanConcurrency/2, MinDistSQLScanConcurrency)
}
}

Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/backend/local/checksum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func TestDoChecksum(t *testing.T) {
WithArgs("10m").
WillReturnResult(sqlmock.NewResult(2, 1))
mock.ExpectClose()
mock.ExpectClose()

manager := NewTiDBChecksumExecutor(db)
checksum, err := manager.Checksum(context.Background(), &TidbTableInfo{DB: "test", Name: "t"})
Expand Down Expand Up @@ -215,6 +216,7 @@ func TestDoChecksumWithErrorAndLongOriginalLifetime(t *testing.T) {
WithArgs("300h").
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectClose()
mock.ExpectClose()

manager := NewTiDBChecksumExecutor(db)
_, err = manager.Checksum(context.Background(), &TidbTableInfo{DB: "test", Name: "t"})
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/lightning/common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ go_library(
"//kv",
"//meta/autoid",
"//parser/model",
<<<<<<< HEAD
=======
"//parser/mysql",
"//sessionctx/variable",
>>>>>>> 89bf7432279 (importinto/lightning: do remote checksum via sql (#44803))
"//store/driver/error",
"//table/tables",
"//util",
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ var DefaultImportantVariables = map[string]string{
"default_week_format": "0",
"block_encryption_mode": "aes-128-ecb",
"group_concat_max_len": "1024",
"tidb_backoff_weight": "6",
}

// DefaultImportVariablesTiDB is used in ObtainImportantVariables to retrieve the system
Expand Down
163 changes: 163 additions & 0 deletions br/pkg/lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ import (
"github.com/pingcap/tidb/br/pkg/utils"
tmysql "github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/parser/model"
<<<<<<< HEAD
=======
tmysql "github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx/variable"
>>>>>>> 89bf7432279 (importinto/lightning: do remote checksum via sql (#44803))
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/util/codec"
"go.uber.org/zap"
Expand Down Expand Up @@ -488,3 +493,161 @@ func GetAutoRandomColumn(tblInfo *model.TableInfo) *model.ColumnInfo {
}
return nil
}
<<<<<<< HEAD
=======

// GetDropIndexInfos returns the index infos that need to be dropped and the remain indexes.
func GetDropIndexInfos(
tblInfo *model.TableInfo,
) (remainIndexes []*model.IndexInfo, dropIndexes []*model.IndexInfo) {
cols := tblInfo.Columns
loop:
for _, idxInfo := range tblInfo.Indices {
if idxInfo.State != model.StatePublic {
remainIndexes = append(remainIndexes, idxInfo)
continue
}
// Primary key is a cluster index.
if idxInfo.Primary && tblInfo.HasClusteredIndex() {
remainIndexes = append(remainIndexes, idxInfo)
continue
}
// Skip index that contains auto-increment column.
// Because auto colum must be defined as a key.
for _, idxCol := range idxInfo.Columns {
flag := cols[idxCol.Offset].GetFlag()
if tmysql.HasAutoIncrementFlag(flag) {
remainIndexes = append(remainIndexes, idxInfo)
continue loop
}
}
dropIndexes = append(dropIndexes, idxInfo)
}
return remainIndexes, dropIndexes
}

// BuildDropIndexSQL builds the SQL statement to drop index.
func BuildDropIndexSQL(tableName string, idxInfo *model.IndexInfo) string {
if idxInfo.Primary {
return fmt.Sprintf("ALTER TABLE %s DROP PRIMARY KEY", tableName)
}
return fmt.Sprintf("ALTER TABLE %s DROP INDEX %s", tableName, EscapeIdentifier(idxInfo.Name.O))
}

// BuildAddIndexSQL builds the SQL statement to create missing indexes.
// It returns both a single SQL statement that creates all indexes at once,
// and a list of SQL statements that creates each index individually.
func BuildAddIndexSQL(
tableName string,
curTblInfo,
desiredTblInfo *model.TableInfo,
) (singleSQL string, multiSQLs []string) {
addIndexSpecs := make([]string, 0, len(desiredTblInfo.Indices))
loop:
for _, desiredIdxInfo := range desiredTblInfo.Indices {
for _, curIdxInfo := range curTblInfo.Indices {
if curIdxInfo.Name.L == desiredIdxInfo.Name.L {
continue loop
}
}

var buf bytes.Buffer
if desiredIdxInfo.Primary {
buf.WriteString("ADD PRIMARY KEY ")
} else if desiredIdxInfo.Unique {
buf.WriteString("ADD UNIQUE KEY ")
} else {
buf.WriteString("ADD KEY ")
}
// "primary" is a special name for primary key, we should not use it as index name.
if desiredIdxInfo.Name.L != "primary" {
buf.WriteString(EscapeIdentifier(desiredIdxInfo.Name.O))
}

colStrs := make([]string, 0, len(desiredIdxInfo.Columns))
for _, col := range desiredIdxInfo.Columns {
var colStr string
if desiredTblInfo.Columns[col.Offset].Hidden {
colStr = fmt.Sprintf("(%s)", desiredTblInfo.Columns[col.Offset].GeneratedExprString)
} else {
colStr = EscapeIdentifier(col.Name.O)
if col.Length != types.UnspecifiedLength {
colStr = fmt.Sprintf("%s(%s)", colStr, strconv.Itoa(col.Length))
}
}
colStrs = append(colStrs, colStr)
}
fmt.Fprintf(&buf, "(%s)", strings.Join(colStrs, ","))

if desiredIdxInfo.Invisible {
fmt.Fprint(&buf, " INVISIBLE")
}
if desiredIdxInfo.Comment != "" {
fmt.Fprintf(&buf, ` COMMENT '%s'`, format.OutputFormat(desiredIdxInfo.Comment))
}
addIndexSpecs = append(addIndexSpecs, buf.String())
}
if len(addIndexSpecs) == 0 {
return "", nil
}

singleSQL = fmt.Sprintf("ALTER TABLE %s %s", tableName, strings.Join(addIndexSpecs, ", "))
for _, spec := range addIndexSpecs {
multiSQLs = append(multiSQLs, fmt.Sprintf("ALTER TABLE %s %s", tableName, spec))
}
return singleSQL, multiSQLs
}

// IsDupKeyError checks if err is a duplicate index error.
func IsDupKeyError(err error) bool {
if merr, ok := errors.Cause(err).(*mysql.MySQLError); ok {
switch merr.Number {
case errno.ErrDupKeyName, errno.ErrMultiplePriKey, errno.ErrDupUnique:
return true
}
}
return false
}

// GetBackoffWeightFromDB gets the backoff weight from database.
func GetBackoffWeightFromDB(ctx context.Context, db *sql.DB) (int, error) {
val, err := getSessionVariable(ctx, db, variable.TiDBBackOffWeight)
if err != nil {
return 0, err
}
return strconv.Atoi(val)
}

// copy from dbutil to avoid import cycle
func getSessionVariable(ctx context.Context, db *sql.DB, variable string) (value string, err error) {
query := fmt.Sprintf("SHOW VARIABLES LIKE '%s'", variable)
rows, err := db.QueryContext(ctx, query)

if err != nil {
return "", errors.Trace(err)
}
defer rows.Close()

// Show an example.
/*
mysql> SHOW VARIABLES LIKE "binlog_format";
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
*/

for rows.Next() {
if err = rows.Scan(&variable, &value); err != nil {
return "", errors.Trace(err)
}
}

if err := rows.Err(); err != nil {
return "", errors.Trace(err)
}

return value, nil
}
>>>>>>> 89bf7432279 (importinto/lightning: do remote checksum via sql (#44803))
Loading

0 comments on commit 3a13785

Please sign in to comment.