Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backup: add table-concurrency to control backup tableInfo stats and checksum. #48570

Merged
merged 5 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion br/pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const (
flagUseCheckpoint = "use-checkpoint"
flagKeyspaceName = "keyspace-name"
flagReplicaReadLabel = "replica-read-label"
flagTableConcurrency = "table-concurrency"

flagGCTTL = "gcttl"

Expand Down Expand Up @@ -92,6 +93,7 @@ type BackupConfig struct {
UseBackupMetaV2 bool `json:"use-backupmeta-v2"`
UseCheckpoint bool `json:"use-checkpoint" toml:"use-checkpoint"`
ReplicaReadLabel map[string]string `json:"replica-read-label" toml:"replica-read-label"`
TableConcurrency uint `json:"table-concurrency" toml:"table-concurrency"`
CompressionConfig

// for ebs-based backup
Expand Down Expand Up @@ -123,6 +125,9 @@ func DefineBackupFlags(flags *pflag.FlagSet) {
"One task represents one table range (or one index range) according to the backup schemas. If there is one table with one index."+
"there will be two tasks to back up this table. This value should increase if you need to back up lots of tables or indices.")

flags.Uint32(flagTableConcurrency, backup.DefaultSchemaConcurrency, "The size of a BR thread pool used for backup table metas, "+
"including tableInfo/checksum and stats.")

flags.Bool(flagRemoveSchedulers, false,
"disable the balance, shuffle and region-merge schedulers in PD to speed up backup")
// This flag can impact the online cluster, so hide it in case of abuse.
Expand Down Expand Up @@ -197,6 +202,9 @@ func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error {
if err != nil {
return errors.Trace(err)
}
if cfg.TableConcurrency, err = flags.GetUint(flagTableConcurrency); err != nil {
return errors.Trace(err)
}

compressionCfg, err := parseCompressionFlags(flags)
if err != nil {
Expand Down Expand Up @@ -718,7 +726,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
}
}
updateCh = g.StartProgress(ctx, "Checksum", checksumProgress, !cfg.LogProgress)
schemasConcurrency := uint(min(backup.DefaultSchemaConcurrency, schemas.Len()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

backup.DefaultSchemaConcurrency can be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it used as initial value for the new config

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't a default value of 64 a bit too high? Should the default setting be more conservative?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two reasons to keep it 64

  1. to keep the compatibility.
  2. to keep checksum performance.

schemasConcurrency := min(cfg.TableConcurrency, uint(schemas.Len()))

err = schemas.BackupSchemas(
ctx, metawriter, client.GetCheckpointRunner(), mgr.GetStorage(), statsHandle, backupTS, schemasConcurrency, cfg.ChecksumConcurrency, skipChecksum, updateCh)
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/task/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func TestBackupConfigHash(t *testing.T) {
LastBackupTS: 1,
GCTTL: 123,
RemoveSchedulers: true,
TableConcurrency: 123,
IgnoreStats: true,
UseBackupMetaV2: true,
UseCheckpoint: true,
Expand Down Expand Up @@ -197,6 +198,7 @@ func TestBackupConfigHash(t *testing.T) {
testCfg.TLS = TLSConfig{CA: "123"}
testCfg.RateLimit = 321
testCfg.ChecksumConcurrency = 321
testCfg.TableConcurrency = 321
testCfg.Concurrency = 321
testCfg.Checksum = false
testCfg.LogProgress = false
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ type Config struct {
TLS TLSConfig `json:"tls" toml:"tls"`
RateLimit uint64 `json:"rate-limit" toml:"rate-limit"`
ChecksumConcurrency uint `json:"checksum-concurrency" toml:"checksum-concurrency"`
TableConcurrency uint `json:"table-concurrency" toml:"table-concurrency"`
Concurrency uint32 `json:"concurrency" toml:"concurrency"`
Checksum bool `json:"checksum" toml:"checksum"`
SendCreds bool `json:"send-credentials-to-tikv" toml:"send-credentials-to-tikv"`
Expand Down Expand Up @@ -269,8 +270,7 @@ func DefineCommonFlags(flags *pflag.FlagSet) {
flags.String(flagCA, "", "CA certificate path for TLS connection")
flags.String(flagCert, "", "Certificate path for TLS connection")
flags.String(flagKey, "", "Private key path for TLS connection")
flags.Uint(flagChecksumConcurrency, variable.DefChecksumTableConcurrency, "The concurrency of table checksumming")
_ = flags.MarkHidden(flagChecksumConcurrency)
flags.Uint(flagChecksumConcurrency, variable.DefChecksumTableConcurrency, "The concurrency of checksumming in one table")

flags.Uint64(flagRateLimit, unlimited, "The rate limit of the task, MB/s per node")
flags.Bool(flagChecksum, true, "Run checksum at end of task")
Expand Down
Loading