Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
post-process: allow run checksum at last and restrict the number of c…
Browse files Browse the repository at this point in the history
…hecksum jobs (#540)
  • Loading branch information
glorv committed Jan 29, 2021
1 parent dc6f064 commit 1b9d7fb
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 39 deletions.
40 changes: 29 additions & 11 deletions lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ const (
IgnoreOnDup = "ignore"
// ErrorOnDup indicates using INSERT INTO to insert data, which would violate PK or UNIQUE constraint
ErrorOnDup = "error"

defaultDistSQLScanConcurrency = 15
defaultBuildStatsConcurrency = 20
defaultIndexSerialScanConcurrency = 20
defaultChecksumTableConcurrency = 2
)

var (
Expand Down Expand Up @@ -204,11 +209,11 @@ func (t PostOpLevel) String() string {

// PostRestore has some options which will be executed after kv restored.
type PostRestore struct {
Level1Compact bool `toml:"level-1-compact" json:"level-1-compact"`
Compact bool `toml:"compact" json:"compact"`
Checksum PostOpLevel `toml:"checksum" json:"checksum"`
Analyze PostOpLevel `toml:"analyze" json:"analyze"`
AnalyzeAtLast bool `toml:"analyze-at-last" json:"analyze-at-last"`
Level1Compact bool `toml:"level-1-compact" json:"level-1-compact"`
Compact bool `toml:"compact" json:"compact"`
Checksum PostOpLevel `toml:"checksum" json:"checksum"`
Analyze PostOpLevel `toml:"analyze" json:"analyze"`
PostProcessAtLast bool `toml:"post-process-at-last" json:"post-process-at-last"`
}

type CSVConfig struct {
Expand Down Expand Up @@ -334,10 +339,10 @@ func NewConfig() *Config {
StatusPort: 10080,
StrSQLMode: "ONLY_FULL_GROUP_BY,NO_AUTO_CREATE_USER",
MaxAllowedPacket: defaultMaxAllowedPacket,
BuildStatsConcurrency: 20,
DistSQLScanConcurrency: 100,
IndexSerialScanConcurrency: 20,
ChecksumTableConcurrency: 16,
BuildStatsConcurrency: defaultBuildStatsConcurrency,
DistSQLScanConcurrency: defaultDistSQLScanConcurrency,
IndexSerialScanConcurrency: defaultIndexSerialScanConcurrency,
ChecksumTableConcurrency: defaultChecksumTableConcurrency,
},
Cron: Cron{
SwitchMode: Duration{Duration: 5 * time.Minute},
Expand Down Expand Up @@ -366,8 +371,9 @@ func NewConfig() *Config {
RegionSplitSize: SplitRegionSize,
},
PostRestore: PostRestore{
Checksum: OpLevelRequired,
Analyze: OpLevelOptional,
Checksum: OpLevelRequired,
Analyze: OpLevelOptional,
PostProcessAtLast: true,
},
}
}
Expand Down Expand Up @@ -525,6 +531,18 @@ func (cfg *Config) Adjust(ctx context.Context) error {
if cfg.TikvImporter.RegionSplitSize == 0 {
cfg.TikvImporter.RegionSplitSize = SplitRegionSize
}
if cfg.TiDB.DistSQLScanConcurrency == 0 {
cfg.TiDB.DistSQLScanConcurrency = defaultDistSQLScanConcurrency
}
if cfg.TiDB.BuildStatsConcurrency == 0 {
cfg.TiDB.BuildStatsConcurrency = defaultBuildStatsConcurrency
}
if cfg.TiDB.IndexSerialScanConcurrency == 0 {
cfg.TiDB.IndexSerialScanConcurrency = defaultIndexSerialScanConcurrency
}
if cfg.TiDB.ChecksumTableConcurrency == 0 {
cfg.TiDB.ChecksumTableConcurrency = defaultChecksumTableConcurrency
}
default:
return errors.Errorf("invalid config: unsupported `tikv-importer.backend` (%s)", cfg.TikvImporter.Backend)
}
Expand Down
65 changes: 41 additions & 24 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ type RestoreController struct {
indexWorkers *worker.Pool
regionWorkers *worker.Pool
ioWorkers *worker.Pool
checksumWorks *worker.Pool
pauser *common.Pauser
backend kv.Backend
tidbGlue glue.Glue
Expand Down Expand Up @@ -235,6 +236,7 @@ func NewRestoreControllerWithPauser(
indexWorkers: worker.NewPool(ctx, cfg.App.IndexConcurrency, "index"),
regionWorkers: worker.NewPool(ctx, cfg.App.RegionConcurrency, "region"),
ioWorkers: worker.NewPool(ctx, cfg.App.IOConcurrency, "io"),
checksumWorks: worker.NewPool(ctx, cfg.TiDB.ChecksumTableConcurrency, "checksum"),
pauser: pauser,
backend: backend,
tidbGlue: g,
Expand Down Expand Up @@ -852,7 +854,7 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error {
go func() {
for task := range postProcessTaskChan {
// force all the remain post-process tasks to be executed
_, err := task.tr.postProcess(ctx, rc, task.cp, true)
_, err := task.tr.postProcess(ctx2, rc, task.cp, true)
restoreErr.Set(err)
}
wg.Done()
Expand Down Expand Up @@ -1219,9 +1221,14 @@ func (t *TableRestore) importEngine(

// postProcess execute rebase-auto-id/checksum/analyze according to the task config.
//
// if the parameter forceAnalyze to true, postProcess force run analyze even if the analyze-at-last config is true.
// And if analyze phase is skipped, the first return value will be true.
func (t *TableRestore) postProcess(ctx context.Context, rc *RestoreController, cp *TableCheckpoint, forceAnalyze bool) (bool, error) {
// if the parameter forcePostProcess to true, postProcess force run checksum and analyze even if the
// post-process-at-last config is true. And if this two phases are skipped, the first return value will be true.
func (t *TableRestore) postProcess(
ctx context.Context,
rc *RestoreController,
cp *TableCheckpoint,
forcePostProcess bool,
) (bool, error) {
// there are no data in this table, no need to do post process
// this is important for tables that are just the dump table of views
// because at this stage, the table was already deleted and replaced by the related view
Expand Down Expand Up @@ -1255,44 +1262,54 @@ func (t *TableRestore) postProcess(ctx context.Context, rc *RestoreController, c
return false, nil
}

// 4. do table checksum
var localChecksum verify.KVChecksum
for _, engine := range cp.Engines {
for _, chunk := range engine.Chunks {
localChecksum.Add(&chunk.Checksum)
}
}
w := rc.checksumWorks.Apply()
defer rc.checksumWorks.Recycle(w)

t.logger.Info("local checksum", zap.Object("checksum", &localChecksum))
finished := true
if cp.Status < CheckpointStatusChecksummed {
if rc.cfg.PostRestore.Checksum == config.OpLevelOff {
t.logger.Info("skip checksum")
rc.saveStatusCheckpoint(t.tableName, WholeTableEngineID, nil, CheckpointStatusChecksumSkipped)
} else {
err := t.compareChecksum(ctx, localChecksum)
// witch post restore level 'optional', we will skip checksum error
if rc.cfg.PostRestore.Checksum == config.OpLevelOptional {
if forcePostProcess || !rc.cfg.PostRestore.PostProcessAtLast {
// 4. do table checksum
var localChecksum verify.KVChecksum
for _, engine := range cp.Engines {
for _, chunk := range engine.Chunks {
localChecksum.Add(&chunk.Checksum)
}
}
t.logger.Info("local checksum", zap.Object("checksum", &localChecksum))
err := t.compareChecksum(ctx, localChecksum)
// with post restore level 'optional', we will skip checksum error
if rc.cfg.PostRestore.Checksum == config.OpLevelOptional {
if err != nil {
t.logger.Warn("compare checksum failed, will skip this error and go on", log.ShortError(err))
err = nil
}
}
rc.saveStatusCheckpoint(t.tableName, WholeTableEngineID, err, CheckpointStatusChecksummed)
if err != nil {
t.logger.Warn("compare checksum failed, will skip this error and go on", log.ShortError(err))
err = nil
return false, errors.Trace(err)
}
cp.Status = CheckpointStatusChecksummed
} else {
finished = false
}
rc.saveStatusCheckpoint(t.tableName, WholeTableEngineID, err, CheckpointStatusChecksummed)
if err != nil {
return false, errors.Trace(err)
}

}
cp.Status = CheckpointStatusChecksummed
}
if !finished {
return !finished, nil
}

// 5. do table analyze
finished := true
if cp.Status < CheckpointStatusAnalyzed {
if rc.cfg.PostRestore.Analyze == config.OpLevelOff {
t.logger.Info("skip analyze")
rc.saveStatusCheckpoint(t.tableName, WholeTableEngineID, nil, CheckpointStatusAnalyzeSkipped)
cp.Status = CheckpointStatusAnalyzed
} else if forceAnalyze || !rc.cfg.PostRestore.AnalyzeAtLast {
} else if forcePostProcess || !rc.cfg.PostRestore.PostProcessAtLast {
err := t.analyzeTable(ctx, rc.tidbGlue.GetSQLExecutor())
// witch post restore level 'optional', we will skip analyze error
if rc.cfg.PostRestore.Analyze == config.OpLevelOptional {
Expand Down
9 changes: 5 additions & 4 deletions tidb-lightning.toml
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,10 @@ log-level = "error"
# set tidb session variables to speed up checksum/analyze table.
# see https://pingcap.com/docs/sql/statistics/#control-analyze-concurrency for the meaning of each setting
build-stats-concurrency = 20
distsql-scan-concurrency = 100
distsql-scan-concurrency = 15
index-serial-scan-concurrency = 20
checksum-table-concurrency = 16
# checksum-table-concurrency controls the maximum checksum table tasks to run concurrently.
checksum-table-concurrency = 2

# specifies certificates and keys for TLS-enabled MySQL connections.
# defaults to a copy of the [security] section.
Expand Down Expand Up @@ -247,8 +248,8 @@ level-1-compact = false
# if set true, compact will do full compaction to tikv data.
# if this setting is missing, the default value is false.
compact = false
# if set to true, lightning will analyze all table together at last
analyze-at-last = false
# if set to true, lightning will run checksum and analyze for all tables together at last
post-process-at-last = true

# cron performs some periodic actions in background
[cron]
Expand Down

0 comments on commit 1b9d7fb

Please sign in to comment.