Skip to content

Commit

Permalink
importinto/lightning: do remote checksum via sql (#44803) (#45195)
Browse files Browse the repository at this point in the history
close #41941, ref #44711
  • Loading branch information
ti-chi-bot authored Jul 6, 2023
1 parent 635a436 commit 9709d8e
Show file tree
Hide file tree
Showing 13 changed files with 126 additions and 13 deletions.
20 changes: 16 additions & 4 deletions br/pkg/checksum/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ type ExecutorBuilder struct {

oldTable *metautil.Table

concurrency uint
concurrency uint
backoffWeight int

oldKeyspace []byte
newKeyspace []byte
Expand Down Expand Up @@ -56,6 +57,12 @@ func (builder *ExecutorBuilder) SetConcurrency(conc uint) *ExecutorBuilder {
return builder
}

// SetBackoffWeight set the backoffWeight of the checksum executing.
func (builder *ExecutorBuilder) SetBackoffWeight(backoffWeight int) *ExecutorBuilder {
builder.backoffWeight = backoffWeight
return builder
}

func (builder *ExecutorBuilder) SetOldKeyspace(keyspace []byte) *ExecutorBuilder {
builder.oldKeyspace = keyspace
return builder
Expand All @@ -79,7 +86,7 @@ func (builder *ExecutorBuilder) Build() (*Executor, error) {
if err != nil {
return nil, errors.Trace(err)
}
return &Executor{reqs: reqs}, nil
return &Executor{reqs: reqs, backoffWeight: builder.backoffWeight}, nil
}

func buildChecksumRequest(
Expand Down Expand Up @@ -294,7 +301,8 @@ func updateChecksumResponse(resp, update *tipb.ChecksumResponse) {

// Executor is a checksum executor.
type Executor struct {
reqs []*kv.Request
reqs []*kv.Request
backoffWeight int
}

// Len returns the total number of checksum requests.
Expand Down Expand Up @@ -347,7 +355,11 @@ func (exec *Executor) Execute(
err error
)
err = utils.WithRetry(ctx, func() error {
resp, err = sendChecksumRequest(ctx, client, req, kv.NewVariables(&killed))
vars := kv.NewVariables(&killed)
if exec.backoffWeight > 0 {
vars.BackOffWeight = exec.backoffWeight
}
resp, err = sendChecksumRequest(ctx, client, req, vars)
failpoint.Inject("checksumRetryErr", func(val failpoint.Value) {
// first time reach here. return error
if val.(bool) {
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ go_library(
"//kv",
"//parser/model",
"//parser/mysql",
"//sessionctx/variable",
"//store/pdtypes",
"//table",
"//tablecodec",
Expand All @@ -72,6 +73,7 @@ go_library(
"@com_github_pingcap_kvproto//pkg/pdpb",
"@com_github_pingcap_tipb//go-tipb",
"@com_github_tikv_client_go_v2//error",
"@com_github_tikv_client_go_v2//kv",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_pd_client//:client",
Expand Down
45 changes: 40 additions & 5 deletions br/pkg/lightning/backend/local/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/metric"
"github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tipb/go-tipb"
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
"go.uber.org/atomic"
Expand All @@ -49,7 +51,14 @@ const (
var (
serviceSafePointTTL int64 = 10 * 60 // 10 min in seconds

minDistSQLScanConcurrency = 4
// MinDistSQLScanConcurrency is the minimum value of tidb_distsql_scan_concurrency.
MinDistSQLScanConcurrency = 4

// DefaultBackoffWeight is the default value of tidb_backoff_weight for checksum.
// when TiKV client encounters an error of "region not leader", it will keep retrying every 500 ms.
// If it still fails after 2 * 20 = 40 seconds, it will return "region unavailable".
// If we increase the BackOffWeight to 6, then the TiKV client will keep retrying for 120 seconds.
DefaultBackoffWeight = 3 * tikvstore.DefBackOffWeight
)

// RemoteChecksum represents a checksum result got from tidb.
Expand Down Expand Up @@ -102,16 +111,39 @@ func (e *tidbChecksumExecutor) Checksum(ctx context.Context, tableInfo *checkpoi

task := log.FromContext(ctx).With(zap.String("table", tableName)).Begin(zap.InfoLevel, "remote checksum")

conn, err := e.db.Conn(ctx)
if err != nil {
return nil, errors.Trace(err)
}
defer func() {
if err := conn.Close(); err != nil {
task.Warn("close connection failed", zap.Error(err))
}
}()
// ADMIN CHECKSUM TABLE <table>,<table> example.
// mysql> admin checksum table test.t;
// +---------+------------+---------------------+-----------+-------------+
// | Db_name | Table_name | Checksum_crc64_xor | Total_kvs | Total_bytes |
// +---------+------------+---------------------+-----------+-------------+
// | test | t | 8520875019404689597 | 7296873 | 357601387 |
// +---------+------------+---------------------+-----------+-------------+
backoffWeight, err := common.GetBackoffWeightFromDB(ctx, e.db)
if err == nil && backoffWeight < DefaultBackoffWeight {
task.Info("increase tidb_backoff_weight", zap.Int("original", backoffWeight), zap.Int("new", DefaultBackoffWeight))
// increase backoff weight
if _, err := conn.ExecContext(ctx, fmt.Sprintf("SET SESSION %s = '%d';", variable.TiDBBackOffWeight, DefaultBackoffWeight)); err != nil {
task.Warn("set tidb_backoff_weight failed", zap.Error(err))
} else {
defer func() {
if _, err := conn.ExecContext(ctx, fmt.Sprintf("SET SESSION %s = '%d';", variable.TiDBBackOffWeight, backoffWeight)); err != nil {
task.Warn("recover tidb_backoff_weight failed", zap.Error(err))
}
}()
}
}

cs := RemoteChecksum{}
err = common.SQLWithRetry{DB: e.db, Logger: task.Logger}.QueryRow(ctx, "compute remote checksum",
err = common.SQLWithRetry{DB: conn, Logger: task.Logger}.QueryRow(ctx, "compute remote checksum",
"ADMIN CHECKSUM TABLE "+tableName, &cs.Schema, &cs.Table, &cs.Checksum, &cs.TotalKVs, &cs.TotalBytes,
)
dur := task.End(zap.ErrorLevel, err)
Expand Down Expand Up @@ -239,22 +271,25 @@ type TiKVChecksumManager struct {
client kv.Client
manager gcTTLManager
distSQLScanConcurrency uint
backoffWeight int
}

var _ ChecksumManager = &TiKVChecksumManager{}

// NewTiKVChecksumManager return a new tikv checksum manager
func NewTiKVChecksumManager(client kv.Client, pdClient pd.Client, distSQLScanConcurrency uint) *TiKVChecksumManager {
func NewTiKVChecksumManager(client kv.Client, pdClient pd.Client, distSQLScanConcurrency uint, backoffWeight int) *TiKVChecksumManager {
return &TiKVChecksumManager{
client: client,
manager: newGCTTLManager(pdClient),
distSQLScanConcurrency: distSQLScanConcurrency,
backoffWeight: backoffWeight,
}
}

func (e *TiKVChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpoints.TidbTableInfo, ts uint64) (*RemoteChecksum, error) {
executor, err := checksum.NewExecutorBuilder(tableInfo.Core, ts).
SetConcurrency(e.distSQLScanConcurrency).
SetBackoffWeight(e.backoffWeight).
Build()
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -286,8 +321,8 @@ func (e *TiKVChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpo
if !common.IsRetryableError(err) {
break
}
if distSQLScanConcurrency > minDistSQLScanConcurrency {
distSQLScanConcurrency = mathutil.Max(distSQLScanConcurrency/2, minDistSQLScanConcurrency)
if distSQLScanConcurrency > MinDistSQLScanConcurrency {
distSQLScanConcurrency = mathutil.Max(distSQLScanConcurrency/2, MinDistSQLScanConcurrency)
}
}

Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/backend/local/checksum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func TestDoChecksum(t *testing.T) {
WithArgs("10m").
WillReturnResult(sqlmock.NewResult(2, 1))
mock.ExpectClose()
mock.ExpectClose()

manager := NewTiDBChecksumExecutor(db)
checksum, err := manager.Checksum(context.Background(), &TidbTableInfo{DB: "test", Name: "t"})
Expand Down Expand Up @@ -215,6 +216,7 @@ func TestDoChecksumWithErrorAndLongOriginalLifetime(t *testing.T) {
WithArgs("300h").
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectClose()
mock.ExpectClose()

manager := NewTiDBChecksumExecutor(db)
_, err = manager.Checksum(context.Background(), &TidbTableInfo{DB: "test", Name: "t"})
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ go_library(
"//kv",
"//meta/autoid",
"//parser/model",
"//sessionctx/variable",
"//store/driver/error",
"//table/tables",
"//util",
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ var DefaultImportantVariables = map[string]string{
"default_week_format": "0",
"block_encryption_mode": "aes-128-ecb",
"group_concat_max_len": "1024",
"tidb_backoff_weight": "6",
}

// DefaultImportVariablesTiDB is used in ObtainImportantVariables to retrieve the system
Expand Down
43 changes: 43 additions & 0 deletions br/pkg/lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/pingcap/tidb/br/pkg/utils"
tmysql "github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/util/codec"
"go.uber.org/zap"
Expand Down Expand Up @@ -488,3 +489,45 @@ func GetAutoRandomColumn(tblInfo *model.TableInfo) *model.ColumnInfo {
}
return nil
}

// GetBackoffWeightFromDB gets the backoff weight from database.
func GetBackoffWeightFromDB(ctx context.Context, db *sql.DB) (int, error) {
val, err := getSessionVariable(ctx, db, variable.TiDBBackOffWeight)
if err != nil {
return 0, err
}
return strconv.Atoi(val)
}

// copy from dbutil to avoid import cycle
func getSessionVariable(ctx context.Context, db *sql.DB, variable string) (value string, err error) {
query := fmt.Sprintf("SHOW VARIABLES LIKE '%s'", variable)
rows, err := db.QueryContext(ctx, query)

if err != nil {
return "", errors.Trace(err)
}
defer rows.Close()

// Show an example.
/*
mysql> SHOW VARIABLES LIKE "binlog_format";
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
*/

for rows.Next() {
if err = rows.Scan(&variable, &value); err != nil {
return "", errors.Trace(err)
}
}

if err := rows.Err(); err != nil {
return "", errors.Trace(err)
}

return value, nil
}
2 changes: 2 additions & 0 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,7 @@ type PostRestore struct {
Level1Compact bool `toml:"level-1-compact" json:"level-1-compact"`
PostProcessAtLast bool `toml:"post-process-at-last" json:"post-process-at-last"`
Compact bool `toml:"compact" json:"compact"`
ChecksumViaSQL bool `toml:"checksum-via-sql" json:"checksum-via-sql"`
}

// StringOrStringSlice can unmarshal a TOML string as string slice with one element.
Expand Down Expand Up @@ -965,6 +966,7 @@ func NewConfig() *Config {
Checksum: OpLevelRequired,
Analyze: OpLevelOptional,
PostProcessAtLast: true,
ChecksumViaSQL: true,
},
}
}
Expand Down
13 changes: 11 additions & 2 deletions br/pkg/lightning/importer/checksum_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/lightning/metric"
Expand All @@ -44,14 +45,22 @@ func NewChecksumManager(ctx context.Context, rc *Controller, store kv.Storage) (

// for v4.0.0 or upper, we can use the gc ttl api
var manager local.ChecksumManager
if pdVersion.Major >= 4 {
if pdVersion.Major >= 4 && !rc.cfg.PostRestore.ChecksumViaSQL {
tlsOpt := rc.tls.ToPDSecurityOption()
pdCli, err := pd.NewClientWithContext(ctx, []string{pdAddr}, tlsOpt)
if err != nil {
return nil, errors.Trace(err)
}

manager = local.NewTiKVChecksumManager(store.GetClient(), pdCli, uint(rc.cfg.TiDB.DistSQLScanConcurrency))
backoffWeight, err := common.GetBackoffWeightFromDB(ctx, rc.db)
// only set backoff weight when it's smaller than default value
if err == nil && backoffWeight >= local.DefaultBackoffWeight {
log.FromContext(ctx).Info("get tidb_backoff_weight", zap.Int("backoff_weight", backoffWeight))
} else {
log.FromContext(ctx).Info("set tidb_backoff_weight to default", zap.Int("backoff_weight", local.DefaultBackoffWeight))
backoffWeight = local.DefaultBackoffWeight
}
manager = local.NewTiKVChecksumManager(store.GetClient(), pdCli, uint(rc.cfg.TiDB.DistSQLScanConcurrency), backoffWeight)
} else {
manager = local.NewTiDBChecksumExecutor(rc.db)
}
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/lightning/importer/table_import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,7 @@ func (s *tableRestoreSuite) TestCompareChecksumSuccess() {
WithArgs("10m").
WillReturnResult(sqlmock.NewResult(2, 1))
mock.ExpectClose()
mock.ExpectClose()

ctx := MockDoChecksumCtx(db)
remoteChecksum, err := DoChecksum(ctx, s.tr.tableInfo)
Expand Down Expand Up @@ -833,7 +834,7 @@ func (s *tableRestoreSuite) TestCompareChecksumFailure() {
WithArgs("10m").
WillReturnResult(sqlmock.NewResult(2, 1))
mock.ExpectClose()

mock.ExpectClose()
ctx := MockDoChecksumCtx(db)
remoteChecksum, err := DoChecksum(ctx, s.tr.tableInfo)
require.NoError(s.T(), err)
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/importer/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ func TestObtainRowFormatVersionSucceed(t *testing.T) {

sysVars := ObtainImportantVariables(ctx, s.db, true)
require.Equal(t, map[string]string{
"tidb_backoff_weight": "6",
"tidb_row_format_version": "2",
"max_allowed_packet": "1073741824",
"div_precision_increment": "10",
Expand All @@ -360,6 +361,7 @@ func TestObtainRowFormatVersionFailure(t *testing.T) {

sysVars := ObtainImportantVariables(ctx, s.db, true)
require.Equal(t, map[string]string{
"tidb_backoff_weight": "6",
"tidb_row_format_version": "1",
"max_allowed_packet": "67108864",
"div_precision_increment": "4",
Expand Down
3 changes: 3 additions & 0 deletions br/tests/lightning_add_index/config1.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
[tikv-importer]
backend = 'local'
add-index-by-sql = false

[post-restore]
checksum-via-sql = false
2 changes: 1 addition & 1 deletion executor/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (ti *TableImporter) checksumTable(ctx context.Context) error {
}
}
ti.logger.Info("local checksum", zap.Object("checksum", &localChecksum))
manager := local.NewTiKVChecksumManager(ti.kvStore.GetClient(), ti.backend.GetPDClient(), uint(ti.distSQLScanConcurrency))
manager := local.NewTiKVChecksumManager(ti.kvStore.GetClient(), ti.backend.GetPDClient(), uint(ti.distSQLScanConcurrency), local.DefaultBackoffWeight)
remoteChecksum, err := manager.Checksum(ctx, ti.tableInfo)
if err != nil {
return err
Expand Down

0 comments on commit 9709d8e

Please sign in to comment.