Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importinto/lightning: do remote checksum via sql #44803

Merged
merged 27 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion br/pkg/checksum/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,12 +342,14 @@ func (exec *Executor) Execute(
//
// It is useful in TiDB, however, it's a place holder in BR.
killed := uint32(0)
vars := kv.NewVariables(&killed)
vars.BackOffWeight *= 3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make it a configurable parameter ?

var (
resp *tipb.ChecksumResponse
err error
)
err = utils.WithRetry(ctx, func() error {
resp, err = sendChecksumRequest(ctx, client, req, kv.NewVariables(&killed))
resp, err = sendChecksumRequest(ctx, client, req, vars)
failpoint.Inject("checksumRetryErr", func(val failpoint.Value) {
// first time reach here. return error
if val.(bool) {
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ go_library(
"@com_github_pingcap_kvproto//pkg/pdpb",
"@com_github_pingcap_tipb//go-tipb",
"@com_github_tikv_client_go_v2//error",
"@com_github_tikv_client_go_v2//kv",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_pd_client//:client",
Expand Down
14 changes: 10 additions & 4 deletions br/pkg/lightning/backend/local/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tipb/go-tipb"
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
"go.uber.org/atomic"
Expand All @@ -49,7 +50,8 @@ const (
var (
serviceSafePointTTL int64 = 10 * 60 // 10 min in seconds

minDistSQLScanConcurrency = 4
// MinDistSQLScanConcurrency is the minimum value of tidb_distsql_scan_concurrency.
MinDistSQLScanConcurrency = 4
)

// RemoteChecksum represents a checksum result got from tidb.
Expand Down Expand Up @@ -111,7 +113,11 @@ func (e *tidbChecksumExecutor) Checksum(ctx context.Context, tableInfo *checkpoi
// +---------+------------+---------------------+-----------+-------------+

cs := RemoteChecksum{}
err = common.SQLWithRetry{DB: e.db, Logger: task.Logger}.QueryRow(ctx, "compute remote checksum",
exec := common.SQLWithRetry{DB: e.db, Logger: task.Logger}
if err := exec.Exec(ctx, "increase tidb_backoff_weight", fmt.Sprintf("SET SESSION tidb_backoff_weight = '%d';", 3*tikvstore.DefBackOffWeight)); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make backoff weight a configurable parameter ?

return nil, errors.Trace(err)
}
err = exec.QueryRow(ctx, "compute remote checksum",
"ADMIN CHECKSUM TABLE "+tableName, &cs.Schema, &cs.Table, &cs.Checksum, &cs.TotalKVs, &cs.TotalBytes,
)
dur := task.End(zap.ErrorLevel, err)
Expand Down Expand Up @@ -286,8 +292,8 @@ func (e *TiKVChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpo
if !common.IsRetryableError(err) {
break
}
if distSQLScanConcurrency > minDistSQLScanConcurrency {
distSQLScanConcurrency = mathutil.Max(distSQLScanConcurrency/2, minDistSQLScanConcurrency)
if distSQLScanConcurrency > MinDistSQLScanConcurrency {
distSQLScanConcurrency = mathutil.Max(distSQLScanConcurrency/2, MinDistSQLScanConcurrency)
}
}

Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,7 @@ type PostRestore struct {
Level1Compact bool `toml:"level-1-compact" json:"level-1-compact"`
PostProcessAtLast bool `toml:"post-process-at-last" json:"post-process-at-last"`
Compact bool `toml:"compact" json:"compact"`
ChecksumViaSQL bool `toml:"checksum-via-sql" json:"checksum-via-sql"`
}

// StringOrStringSlice can unmarshal a TOML string as string slice with one element.
Expand Down Expand Up @@ -974,6 +975,7 @@ func NewConfig() *Config {
Checksum: OpLevelRequired,
Analyze: OpLevelOptional,
PostProcessAtLast: true,
ChecksumViaSQL: true,
},
}
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/importer/checksum_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func NewChecksumManager(ctx context.Context, rc *Controller, store kv.Storage) (

// for v4.0.0 or upper, we can use the gc ttl api
var manager local.ChecksumManager
if pdVersion.Major >= 4 {
if pdVersion.Major >= 4 && !rc.cfg.PostRestore.ChecksumViaSQL {
tlsOpt := rc.tls.ToPDSecurityOption()
pdCli, err := pd.NewClientWithContext(ctx, []string{pdAddr}, tlsOpt)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions disttask/framework/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,10 @@ func (d *dispatcher) WithNewSession(fn func(se sessionctx.Context) error) error
return d.taskMgr.WithNewSession(fn)
}

func (d *dispatcher) WithNewTxn(fn func(se sessionctx.Context) error) error {
return d.taskMgr.WithNewTxn(fn)
}

func (*dispatcher) checkConcurrencyOverflow(cnt int) bool {
if cnt >= DefaultDispatchConcurrency {
logutil.BgLogger().Info("dispatch task loop, running GTask cnt is more than concurrency",
Expand Down
22 changes: 12 additions & 10 deletions disttask/framework/storage/task_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import (
type SessionExecutor interface {
// WithNewSession executes the function with a new session.
WithNewSession(fn func(se sessionctx.Context) error) error
// WithNewTxn executes the fn in a new transaction.
WithNewTxn(fn func(se sessionctx.Context) error) error
}

// TaskManager is the manager of global/sub task.
Expand Down Expand Up @@ -79,9 +81,9 @@ func SetTaskManager(is *TaskManager) {
taskManagerInstance.Store(is)
}

// execSQL executes the sql and returns the result.
// ExecSQL executes the sql and returns the result.
// TODO: consider retry.
func execSQL(ctx context.Context, se sessionctx.Context, sql string, args ...interface{}) ([]chunk.Row, error) {
func ExecSQL(ctx context.Context, se sessionctx.Context, sql string, args ...interface{}) ([]chunk.Row, error) {
rs, err := se.(sqlexec.SQLExecutor).ExecuteInternal(ctx, sql, args...)
if err != nil {
return nil, err
Expand Down Expand Up @@ -125,7 +127,7 @@ func (stm *TaskManager) WithNewSession(fn func(se sessionctx.Context) error) err
// WithNewTxn executes the fn in a new transaction.
func (stm *TaskManager) WithNewTxn(fn func(se sessionctx.Context) error) error {
return stm.WithNewSession(func(se sessionctx.Context) (err error) {
_, err = execSQL(stm.ctx, se, "begin")
_, err = ExecSQL(stm.ctx, se, "begin")
if err != nil {
return err
}
Expand All @@ -136,7 +138,7 @@ func (stm *TaskManager) WithNewTxn(fn func(se sessionctx.Context) error) error {
if success {
sql = "commit"
}
_, commitErr := execSQL(stm.ctx, se, sql)
_, commitErr := ExecSQL(stm.ctx, se, sql)
if err == nil && commitErr != nil {
err = commitErr
}
Expand All @@ -153,7 +155,7 @@ func (stm *TaskManager) WithNewTxn(fn func(se sessionctx.Context) error) error {

func (stm *TaskManager) executeSQLWithNewSession(ctx context.Context, sql string, args ...interface{}) (rs []chunk.Row, err error) {
err = stm.WithNewSession(func(se sessionctx.Context) error {
rs, err = execSQL(ctx, se, sql, args...)
rs, err = ExecSQL(ctx, se, sql, args...)
return err
})

Expand All @@ -176,15 +178,15 @@ func (stm *TaskManager) AddNewGlobalTask(key, tp string, concurrency int, meta [

// AddGlobalTaskWithSession adds a new task to global task table with session.
func (stm *TaskManager) AddGlobalTaskWithSession(se sessionctx.Context, key, tp string, concurrency int, meta []byte) (taskID int64, err error) {
_, err = execSQL(stm.ctx, se,
_, err = ExecSQL(stm.ctx, se,
`insert into mysql.tidb_global_task(task_key, type, state, concurrency, step, meta, state_update_time)
values (%?, %?, %?, %?, %?, %?, %?)`,
key, tp, proto.TaskStatePending, concurrency, proto.StepInit, meta, time.Now().UTC().String())
if err != nil {
return 0, err
}

rs, err := execSQL(stm.ctx, se, "select @@last_insert_id")
rs, err := ExecSQL(stm.ctx, se, "select @@last_insert_id")
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -406,7 +408,7 @@ func (stm *TaskManager) GetSchedulerIDsByTaskID(taskID int64) ([]string, error)
// UpdateGlobalTaskAndAddSubTasks update the global task and add new subtasks
func (stm *TaskManager) UpdateGlobalTaskAndAddSubTasks(gTask *proto.Task, subtasks []*proto.Subtask, isSubtaskRevert bool) error {
return stm.WithNewTxn(func(se sessionctx.Context) error {
_, err := execSQL(stm.ctx, se, "update mysql.tidb_global_task set state = %?, dispatcher_id = %?, step = %?, state_update_time = %?, concurrency = %?, meta = %?, error = %? where id = %?",
_, err := ExecSQL(stm.ctx, se, "update mysql.tidb_global_task set state = %?, dispatcher_id = %?, step = %?, state_update_time = %?, concurrency = %?, meta = %?, error = %? where id = %?",
gTask.State, gTask.DispatcherID, gTask.Step, gTask.StateUpdateTime.UTC().String(), gTask.Concurrency, gTask.Meta, gTask.Error, gTask.ID)
if err != nil {
return err
Expand All @@ -425,7 +427,7 @@ func (stm *TaskManager) UpdateGlobalTaskAndAddSubTasks(gTask *proto.Task, subtas

for _, subtask := range subtasks {
// TODO: insert subtasks in batch
_, err = execSQL(stm.ctx, se, "insert into mysql.tidb_background_subtask(step, task_key, exec_id, meta, state, type, checkpoint) values (%?, %?, %?, %?, %?, %?, %?)",
_, err = ExecSQL(stm.ctx, se, "insert into mysql.tidb_background_subtask(step, task_key, exec_id, meta, state, type, checkpoint) values (%?, %?, %?, %?, %?, %?, %?)",
gTask.Step, gTask.ID, subtask.SchedulerID, subtask.Meta, subtaskState, proto.Type2Int(subtask.Type), []byte{})
if err != nil {
return err
Expand All @@ -446,7 +448,7 @@ func (stm *TaskManager) CancelGlobalTask(taskID int64) error {

// CancelGlobalTaskByKeySession cancels global task by key using input session
func (stm *TaskManager) CancelGlobalTaskByKeySession(se sessionctx.Context, taskKey string) error {
_, err := execSQL(stm.ctx, se, "update mysql.tidb_global_task set state=%? where task_key=%? and state in (%?, %?)",
_, err := ExecSQL(stm.ctx, se, "update mysql.tidb_global_task set state=%? where task_key=%? and state in (%?, %?)",
proto.TaskStateCancelling, taskKey, proto.TaskStatePending, proto.TaskStateRunning)
return err
}
Expand Down
4 changes: 4 additions & 0 deletions disttask/importinto/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
deps = [
"//br/pkg/lightning/backend",
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/backend/local",
"//br/pkg/lightning/checkpoints",
"//br/pkg/lightning/common",
"//br/pkg/lightning/config",
Expand All @@ -33,15 +34,18 @@ go_library(
"//parser/ast",
"//parser/mysql",
"//sessionctx",
"//sessionctx/variable",
"//table/tables",
"//util/dbterror/exeerrors",
"//util/etcd",
"//util/logutil",
"//util/mathutil",
"//util/sqlexec",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_tikv_client_go_v2//kv",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
Expand Down
37 changes: 0 additions & 37 deletions disttask/importinto/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,43 +365,6 @@ func preProcess(_ context.Context, _ dispatcher.TaskHandle, gTask *proto.Task, t
return updateMeta(gTask, taskMeta)
}

// postProcess does the post-processing for the task.
func postProcess(ctx context.Context, taskMeta *TaskMeta, subtaskMeta *PostProcessStepMeta, logger *zap.Logger) (err error) {
failpoint.Inject("syncBeforePostProcess", func() {
TestSyncChan <- struct{}{}
<-TestSyncChan
})
// TODO: create table indexes depends on the option.
// globalTaskManager, err := storage.GetTaskManager()
// if err != nil {
// return err
// }
// create table indexes even if the post process is failed.
// defer func() {
// err2 := createTableIndexes(ctx, globalTaskManager, taskMeta, logger)
// err = multierr.Append(err, err2)
// }()

controller, err := buildController(taskMeta)
if err != nil {
return err
}
// no need and should not call controller.InitDataFiles, files might not exist on this instance.

logger.Info("post process")

return verifyChecksum(ctx, controller, subtaskMeta.Checksum, logger)
}

func verifyChecksum(ctx context.Context, controller *importer.LoadDataController, checksum Checksum, logger *zap.Logger) error {
if controller.Checksum == config.OpLevelOff {
return nil
}
localChecksum := verify.MakeKVChecksum(checksum.Size, checksum.KVs, checksum.Sum)
logger.Info("local checksum", zap.Object("checksum", &localChecksum))
return controller.VerifyChecksum(ctx, localChecksum)
}

// nolint:deadcode
func dropTableIndexes(ctx context.Context, handle dispatcher.TaskHandle, taskMeta *TaskMeta, logger *zap.Logger) error {
tblInfo := taskMeta.Plan.TableInfo
Expand Down
Loading