Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importinto/lightning: do remote checksum via sql (#44803) #45195

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions br/pkg/checksum/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ type ExecutorBuilder struct {

oldTable *metautil.Table

concurrency uint
concurrency uint
backoffWeight int

oldKeyspace []byte
newKeyspace []byte
Expand Down Expand Up @@ -56,6 +57,12 @@ func (builder *ExecutorBuilder) SetConcurrency(conc uint) *ExecutorBuilder {
return builder
}

// SetBackoffWeight set the backoffWeight of the checksum executing.
func (builder *ExecutorBuilder) SetBackoffWeight(backoffWeight int) *ExecutorBuilder {
builder.backoffWeight = backoffWeight
return builder
}

func (builder *ExecutorBuilder) SetOldKeyspace(keyspace []byte) *ExecutorBuilder {
builder.oldKeyspace = keyspace
return builder
Expand All @@ -79,7 +86,7 @@ func (builder *ExecutorBuilder) Build() (*Executor, error) {
if err != nil {
return nil, errors.Trace(err)
}
return &Executor{reqs: reqs}, nil
return &Executor{reqs: reqs, backoffWeight: builder.backoffWeight}, nil
}

func buildChecksumRequest(
Expand Down Expand Up @@ -294,7 +301,8 @@ func updateChecksumResponse(resp, update *tipb.ChecksumResponse) {

// Executor is a checksum executor.
type Executor struct {
reqs []*kv.Request
reqs []*kv.Request
backoffWeight int
}

// Len returns the total number of checksum requests.
Expand Down Expand Up @@ -347,7 +355,11 @@ func (exec *Executor) Execute(
err error
)
err = utils.WithRetry(ctx, func() error {
resp, err = sendChecksumRequest(ctx, client, req, kv.NewVariables(&killed))
vars := kv.NewVariables(&killed)
if exec.backoffWeight > 0 {
vars.BackOffWeight = exec.backoffWeight
}
resp, err = sendChecksumRequest(ctx, client, req, vars)
failpoint.Inject("checksumRetryErr", func(val failpoint.Value) {
// first time reach here. return error
if val.(bool) {
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ go_library(
"//kv",
"//parser/model",
"//parser/mysql",
"//sessionctx/variable",
"//store/pdtypes",
"//table",
"//tablecodec",
Expand All @@ -72,6 +73,7 @@ go_library(
"@com_github_pingcap_kvproto//pkg/pdpb",
"@com_github_pingcap_tipb//go-tipb",
"@com_github_tikv_client_go_v2//error",
"@com_github_tikv_client_go_v2//kv",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_pd_client//:client",
Expand Down
45 changes: 40 additions & 5 deletions br/pkg/lightning/backend/local/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/metric"
"github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tipb/go-tipb"
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
"go.uber.org/atomic"
Expand All @@ -49,7 +51,14 @@ const (
var (
serviceSafePointTTL int64 = 10 * 60 // 10 min in seconds

minDistSQLScanConcurrency = 4
// MinDistSQLScanConcurrency is the minimum value of tidb_distsql_scan_concurrency.
MinDistSQLScanConcurrency = 4

// DefaultBackoffWeight is the default value of tidb_backoff_weight for checksum.
// when TiKV client encounters an error of "region not leader", it will keep retrying every 500 ms.
// If it still fails after 2 * 20 = 40 seconds, it will return "region unavailable".
// If we increase the BackOffWeight to 6, then the TiKV client will keep retrying for 120 seconds.
DefaultBackoffWeight = 3 * tikvstore.DefBackOffWeight
)

// RemoteChecksum represents a checksum result got from tidb.
Expand Down Expand Up @@ -102,16 +111,39 @@ func (e *tidbChecksumExecutor) Checksum(ctx context.Context, tableInfo *checkpoi

task := log.FromContext(ctx).With(zap.String("table", tableName)).Begin(zap.InfoLevel, "remote checksum")

conn, err := e.db.Conn(ctx)
if err != nil {
return nil, errors.Trace(err)
}
defer func() {
if err := conn.Close(); err != nil {
task.Warn("close connection failed", zap.Error(err))
}
}()
// ADMIN CHECKSUM TABLE <table>,<table> example.
// mysql> admin checksum table test.t;
// +---------+------------+---------------------+-----------+-------------+
// | Db_name | Table_name | Checksum_crc64_xor | Total_kvs | Total_bytes |
// +---------+------------+---------------------+-----------+-------------+
// | test | t | 8520875019404689597 | 7296873 | 357601387 |
// +---------+------------+---------------------+-----------+-------------+
backoffWeight, err := common.GetBackoffWeightFromDB(ctx, e.db)
if err == nil && backoffWeight < DefaultBackoffWeight {
task.Info("increase tidb_backoff_weight", zap.Int("original", backoffWeight), zap.Int("new", DefaultBackoffWeight))
// increase backoff weight
if _, err := conn.ExecContext(ctx, fmt.Sprintf("SET SESSION %s = '%d';", variable.TiDBBackOffWeight, DefaultBackoffWeight)); err != nil {
task.Warn("set tidb_backoff_weight failed", zap.Error(err))
} else {
defer func() {
if _, err := conn.ExecContext(ctx, fmt.Sprintf("SET SESSION %s = '%d';", variable.TiDBBackOffWeight, backoffWeight)); err != nil {
task.Warn("recover tidb_backoff_weight failed", zap.Error(err))
}
}()
}
}

cs := RemoteChecksum{}
err = common.SQLWithRetry{DB: e.db, Logger: task.Logger}.QueryRow(ctx, "compute remote checksum",
err = common.SQLWithRetry{DB: conn, Logger: task.Logger}.QueryRow(ctx, "compute remote checksum",
"ADMIN CHECKSUM TABLE "+tableName, &cs.Schema, &cs.Table, &cs.Checksum, &cs.TotalKVs, &cs.TotalBytes,
)
dur := task.End(zap.ErrorLevel, err)
Expand Down Expand Up @@ -239,22 +271,25 @@ type TiKVChecksumManager struct {
client kv.Client
manager gcTTLManager
distSQLScanConcurrency uint
backoffWeight int
}

var _ ChecksumManager = &TiKVChecksumManager{}

// NewTiKVChecksumManager return a new tikv checksum manager
func NewTiKVChecksumManager(client kv.Client, pdClient pd.Client, distSQLScanConcurrency uint) *TiKVChecksumManager {
func NewTiKVChecksumManager(client kv.Client, pdClient pd.Client, distSQLScanConcurrency uint, backoffWeight int) *TiKVChecksumManager {
return &TiKVChecksumManager{
client: client,
manager: newGCTTLManager(pdClient),
distSQLScanConcurrency: distSQLScanConcurrency,
backoffWeight: backoffWeight,
}
}

func (e *TiKVChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpoints.TidbTableInfo, ts uint64) (*RemoteChecksum, error) {
executor, err := checksum.NewExecutorBuilder(tableInfo.Core, ts).
SetConcurrency(e.distSQLScanConcurrency).
SetBackoffWeight(e.backoffWeight).
Build()
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -286,8 +321,8 @@ func (e *TiKVChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpo
if !common.IsRetryableError(err) {
break
}
if distSQLScanConcurrency > minDistSQLScanConcurrency {
distSQLScanConcurrency = mathutil.Max(distSQLScanConcurrency/2, minDistSQLScanConcurrency)
if distSQLScanConcurrency > MinDistSQLScanConcurrency {
distSQLScanConcurrency = mathutil.Max(distSQLScanConcurrency/2, MinDistSQLScanConcurrency)
}
}

Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/backend/local/checksum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func TestDoChecksum(t *testing.T) {
WithArgs("10m").
WillReturnResult(sqlmock.NewResult(2, 1))
mock.ExpectClose()
mock.ExpectClose()

manager := NewTiDBChecksumExecutor(db)
checksum, err := manager.Checksum(context.Background(), &TidbTableInfo{DB: "test", Name: "t"})
Expand Down Expand Up @@ -215,6 +216,7 @@ func TestDoChecksumWithErrorAndLongOriginalLifetime(t *testing.T) {
WithArgs("300h").
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectClose()
mock.ExpectClose()

manager := NewTiDBChecksumExecutor(db)
_, err = manager.Checksum(context.Background(), &TidbTableInfo{DB: "test", Name: "t"})
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ go_library(
"//kv",
"//meta/autoid",
"//parser/model",
"//sessionctx/variable",
"//store/driver/error",
"//table/tables",
"//util",
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ var DefaultImportantVariables = map[string]string{
"default_week_format": "0",
"block_encryption_mode": "aes-128-ecb",
"group_concat_max_len": "1024",
"tidb_backoff_weight": "6",
}

// DefaultImportVariablesTiDB is used in ObtainImportantVariables to retrieve the system
Expand Down
43 changes: 43 additions & 0 deletions br/pkg/lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/pingcap/tidb/br/pkg/utils"
tmysql "github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/util/codec"
"go.uber.org/zap"
Expand Down Expand Up @@ -488,3 +489,45 @@ func GetAutoRandomColumn(tblInfo *model.TableInfo) *model.ColumnInfo {
}
return nil
}

// GetBackoffWeightFromDB gets the backoff weight from database.
func GetBackoffWeightFromDB(ctx context.Context, db *sql.DB) (int, error) {
val, err := getSessionVariable(ctx, db, variable.TiDBBackOffWeight)
if err != nil {
return 0, err
}
return strconv.Atoi(val)
}

// copy from dbutil to avoid import cycle
func getSessionVariable(ctx context.Context, db *sql.DB, variable string) (value string, err error) {
query := fmt.Sprintf("SHOW VARIABLES LIKE '%s'", variable)
rows, err := db.QueryContext(ctx, query)

if err != nil {
return "", errors.Trace(err)
}
defer rows.Close()

// Show an example.
/*
mysql> SHOW VARIABLES LIKE "binlog_format";
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
*/

for rows.Next() {
if err = rows.Scan(&variable, &value); err != nil {
return "", errors.Trace(err)
}
}

if err := rows.Err(); err != nil {
return "", errors.Trace(err)
}

return value, nil
}
2 changes: 2 additions & 0 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,7 @@ type PostRestore struct {
Level1Compact bool `toml:"level-1-compact" json:"level-1-compact"`
PostProcessAtLast bool `toml:"post-process-at-last" json:"post-process-at-last"`
Compact bool `toml:"compact" json:"compact"`
ChecksumViaSQL bool `toml:"checksum-via-sql" json:"checksum-via-sql"`
}

// StringOrStringSlice can unmarshal a TOML string as string slice with one element.
Expand Down Expand Up @@ -965,6 +966,7 @@ func NewConfig() *Config {
Checksum: OpLevelRequired,
Analyze: OpLevelOptional,
PostProcessAtLast: true,
ChecksumViaSQL: true,
},
}
}
Expand Down
13 changes: 11 additions & 2 deletions br/pkg/lightning/importer/checksum_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/lightning/metric"
Expand All @@ -44,14 +45,22 @@ func NewChecksumManager(ctx context.Context, rc *Controller, store kv.Storage) (

// for v4.0.0 or upper, we can use the gc ttl api
var manager local.ChecksumManager
if pdVersion.Major >= 4 {
if pdVersion.Major >= 4 && !rc.cfg.PostRestore.ChecksumViaSQL {
tlsOpt := rc.tls.ToPDSecurityOption()
pdCli, err := pd.NewClientWithContext(ctx, []string{pdAddr}, tlsOpt)
if err != nil {
return nil, errors.Trace(err)
}

manager = local.NewTiKVChecksumManager(store.GetClient(), pdCli, uint(rc.cfg.TiDB.DistSQLScanConcurrency))
backoffWeight, err := common.GetBackoffWeightFromDB(ctx, rc.db)
// only set backoff weight when it's smaller than default value
if err == nil && backoffWeight >= local.DefaultBackoffWeight {
log.FromContext(ctx).Info("get tidb_backoff_weight", zap.Int("backoff_weight", backoffWeight))
} else {
log.FromContext(ctx).Info("set tidb_backoff_weight to default", zap.Int("backoff_weight", local.DefaultBackoffWeight))
backoffWeight = local.DefaultBackoffWeight
}
manager = local.NewTiKVChecksumManager(store.GetClient(), pdCli, uint(rc.cfg.TiDB.DistSQLScanConcurrency), backoffWeight)
} else {
manager = local.NewTiDBChecksumExecutor(rc.db)
}
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/lightning/importer/table_import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,7 @@ func (s *tableRestoreSuite) TestCompareChecksumSuccess() {
WithArgs("10m").
WillReturnResult(sqlmock.NewResult(2, 1))
mock.ExpectClose()
mock.ExpectClose()

ctx := MockDoChecksumCtx(db)
remoteChecksum, err := DoChecksum(ctx, s.tr.tableInfo)
Expand Down Expand Up @@ -833,7 +834,7 @@ func (s *tableRestoreSuite) TestCompareChecksumFailure() {
WithArgs("10m").
WillReturnResult(sqlmock.NewResult(2, 1))
mock.ExpectClose()

mock.ExpectClose()
ctx := MockDoChecksumCtx(db)
remoteChecksum, err := DoChecksum(ctx, s.tr.tableInfo)
require.NoError(s.T(), err)
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/importer/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ func TestObtainRowFormatVersionSucceed(t *testing.T) {

sysVars := ObtainImportantVariables(ctx, s.db, true)
require.Equal(t, map[string]string{
"tidb_backoff_weight": "6",
"tidb_row_format_version": "2",
"max_allowed_packet": "1073741824",
"div_precision_increment": "10",
Expand All @@ -360,6 +361,7 @@ func TestObtainRowFormatVersionFailure(t *testing.T) {

sysVars := ObtainImportantVariables(ctx, s.db, true)
require.Equal(t, map[string]string{
"tidb_backoff_weight": "6",
"tidb_row_format_version": "1",
"max_allowed_packet": "67108864",
"div_precision_increment": "4",
Expand Down
3 changes: 3 additions & 0 deletions br/tests/lightning_add_index/config1.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
[tikv-importer]
backend = 'local'
add-index-by-sql = false

[post-restore]
checksum-via-sql = false
2 changes: 1 addition & 1 deletion executor/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (ti *TableImporter) checksumTable(ctx context.Context) error {
}
}
ti.logger.Info("local checksum", zap.Object("checksum", &localChecksum))
manager := local.NewTiKVChecksumManager(ti.kvStore.GetClient(), ti.backend.GetPDClient(), uint(ti.distSQLScanConcurrency))
manager := local.NewTiKVChecksumManager(ti.kvStore.GetClient(), ti.backend.GetPDClient(), uint(ti.distSQLScanConcurrency), local.DefaultBackoffWeight)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

physical mode on 7.1 is disabled, so here we just use the default weight

remoteChecksum, err := manager.Checksum(ctx, ti.tableInfo)
if err != nil {
return err
Expand Down