Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
Merge branch 'release-4.0' into release-4.0-264059b76873
Browse files Browse the repository at this point in the history
  • Loading branch information
3pointer authored May 12, 2021
2 parents fa5714e + bfeea3f commit 14bed3f
Show file tree
Hide file tree
Showing 21 changed files with 482 additions and 58 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ static: prepare tools
grep -vE "Normalize|Annotate|Trace|Cause|RedactLogEnabled|Find" 2>&1 | $(CHECKER)
# The package name of "github.com/pingcap/kvproto/pkg/backup" collides
# "github.com/pingcap/br/pkg/backup", so we rename kvproto to backuppb.
grep -Rn --include="*.go" -E '"github.com/pingcap/kvproto/pkg/backup"' | \
grep -Rn --include="*.go" -E '"github.com/pingcap/kvproto/pkg/backup"' \
$$($(PACKAGE_DIRECTORIES)) | \
grep -vE "backuppb" | $(CHECKER)

lint: prepare tools
Expand Down
2 changes: 1 addition & 1 deletion cmd/br/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func newFullBackupCommand() *cobra.Command {
return runBackupCommand(command, "Full backup")
},
}
task.DefineFilterFlags(command)
task.DefineFilterFlags(command, acceptAllTables)
return command
}

Expand Down
14 changes: 14 additions & 0 deletions cmd/br/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,20 @@ var (
hasLogFile uint64
tidbGlue = gluetidb.New()
envLogToTermKey = "BR_LOG_TO_TERM"

filterOutSysAndMemTables = []string{
"*.*",
fmt.Sprintf("!%s.*", utils.TemporaryDBName("*")),
"!mysql.*",
"!sys.*",
"!INFORMATION_SCHEMA.*",
"!PERFORMANCE_SCHEMA.*",
"!METRICS_SCHEMA.*",
"!INSPECTION_SCHEMA.*",
}
acceptAllTables = []string{
"*.*",
}
)

const (
Expand Down
4 changes: 2 additions & 2 deletions cmd/br/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func newFullRestoreCommand() *cobra.Command {
return runRestoreCommand(cmd, "Full restore")
},
}
task.DefineFilterFlags(command)
task.DefineFilterFlags(command, filterOutSysAndMemTables)
return command
}

Expand Down Expand Up @@ -136,7 +136,7 @@ func newLogRestoreCommand() *cobra.Command {
return runLogRestoreCommand(cmd)
},
}
task.DefineFilterFlags(command)
task.DefineFilterFlags(command, filterOutSysAndMemTables)
task.DefineLogRestoreFlags(command)
return command
}
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,8 @@ error = '''
failed to write and ingest
'''

["BR:Restore:ErrUnsupportedSysTable"]
error = '''
the system table isn't supported for restoring yet
'''

14 changes: 13 additions & 1 deletion pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func BuildBackupRangeAndSchema(

for _, dbInfo := range dbs {
// skip system databases
if util.IsMemOrSysDB(dbInfo.Name.L) {
if !tableFilter.MatchSchema(dbInfo.Name.O) || isMemDB(dbInfo.Name.L) {
continue
}

Expand Down Expand Up @@ -1079,3 +1079,15 @@ func CollectChecksums(backupMeta *backuppb.BackupMeta) ([]Checksum, error) {
func isRetryableError(err error) bool {
return status.Code(err) == codes.Unavailable
}

// isMemDB checks whether dbLowerName is memory database.
// Remove it when tidb.utils has this function
func isMemDB(dbLowerName string) bool {
switch dbLowerName {
case util.InformationSchemaName.L,
util.PerformanceSchemaName.L,
util.MetricSchemaName.L:
return true
}
return false
}
3 changes: 3 additions & 0 deletions pkg/backup/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ func (ss *Schemas) BackupSchemas(

schemas := make([]*backuppb.Schema, 0, len(ss.schemas))
for name, schema := range ss.schemas {
if utils.IsSysDB(schema.dbInfo.Name.L) {
schema.dbInfo.Name = utils.TemporaryDBName(schema.dbInfo.Name.O)
}
dbBytes, err := json.Marshal(schema.dbInfo)
if err != nil {
return nil, errors.Trace(err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/backup/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchema(c *C) {
c.Assert(backupSchemas, IsNil)

// Empty database.
noFilter, err := filter.Parse([]string{"*.*"})
// Filter out system tables manually.
noFilter, err := filter.Parse([]string{"*.*", "!mysql.*"})
c.Assert(err, IsNil)
_, backupSchemas, err = backup.BuildBackupRangeAndSchema(
s.mock.Storage, noFilter, math.MaxUint64)
Expand Down
2 changes: 1 addition & 1 deletion pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func NewMgr(
return nil, errors.Trace(err)
}
if checkRequirements {
err = version.CheckClusterVersion(ctx, controller.GetPDClient())
err = version.CheckClusterVersion(ctx, controller.GetPDClient(), version.CheckVersionForBR)
if err != nil {
return nil, errors.Annotate(err, "running BR in incompatible version of cluster, "+
"if you believe it's OK, use --check-requirements=false to skip.")
Expand Down
1 change: 1 addition & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ var (
ErrRestoreInvalidRange = errors.Normalize("invalid restore range", errors.RFCCodeText("BR:Restore:ErrRestoreInvalidRange"))
ErrRestoreWriteAndIngest = errors.Normalize("failed to write and ingest", errors.RFCCodeText("BR:Restore:ErrRestoreWriteAndIngest"))
ErrRestoreSchemaNotExists = errors.Normalize("schema not exists", errors.RFCCodeText("BR:Restore:ErrRestoreSchemaNotExists"))
ErrUnsupportedSystemTable = errors.Normalize("the system table isn't supported for restoring yet", errors.RFCCodeText("BR:Restore:ErrUnsupportedSysTable"))

// TODO maybe it belongs to PiTR.
ErrRestoreRTsConstrain = errors.Normalize("resolved ts constrain violation", errors.RFCCodeText("BR:Restore:ErrRestoreResolvedTsConstrain"))
Expand Down
2 changes: 1 addition & 1 deletion pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (g Glue) GetVersion() string {

// Execute implements glue.Session.
func (gs *tidbSession) Execute(ctx context.Context, sql string) error {
_, err := gs.se.Execute(ctx, sql)
_, err := gs.se.ExecuteInternal(ctx, sql)
return errors.Trace(err)
}

Expand Down
185 changes: 185 additions & 0 deletions pkg/restore/systable_restore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.

package restore

import (
"context"
"fmt"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
filter "github.com/pingcap/tidb-tools/pkg/table-filter"
"go.uber.org/multierr"
"go.uber.org/zap"

berrors "github.com/pingcap/br/pkg/errors"
"github.com/pingcap/br/pkg/logutil"
"github.com/pingcap/br/pkg/utils"
)

var statsTables = map[string]struct{}{
"stats_buckets": {},
"stats_extended": {},
"stats_feedback": {},
"stats_fm_sketch": {},
"stats_histograms": {},
"stats_meta": {},
"stats_top_n": {},
}

func isStatsTable(tableName string) bool {
_, ok := statsTables[tableName]
return ok
}

// RestoreSystemSchemas restores the system schema(i.e. the `mysql` schema).
// Detail see https://github.com/pingcap/br/issues/679#issuecomment-762592254.
func (rc *Client) RestoreSystemSchemas(ctx context.Context, f filter.Filter) {
sysDB := mysql.SystemDB

temporaryDB := utils.TemporaryDBName(sysDB)
defer rc.cleanTemporaryDatabase(ctx, sysDB)

if !f.MatchSchema(temporaryDB.O) {
log.Debug("system database filtered out", zap.String("database", sysDB))
return
}
originDatabase, ok := rc.databases[temporaryDB.O]
if !ok {
log.Info("system database not backed up, skipping", zap.String("database", sysDB))
return
}
db, ok := rc.getDatabaseByName(sysDB)
if !ok {
// Or should we create the database here?
log.Warn("target database not exist, aborting", zap.String("database", sysDB))
return
}

tablesRestored := make([]string, 0, len(originDatabase.Tables))
for _, table := range originDatabase.Tables {
tableName := table.Info.Name
if f.MatchTable(sysDB, tableName.O) {
if err := rc.replaceTemporaryTableToSystable(ctx, tableName.L, db); err != nil {
logutil.WarnTerm("error during merging temporary tables into system tables",
logutil.ShortError(err),
zap.Stringer("table", tableName),
)
}
}
tablesRestored = append(tablesRestored, tableName.L)
}
if err := rc.afterSystemTablesReplaced(ctx, tablesRestored); err != nil {
for _, e := range multierr.Errors(err) {
logutil.WarnTerm("error during reconfigurating the system tables", zap.String("database", sysDB), logutil.ShortError(e))
}
}
}

// database is a record of a database.
// For fast querying whether a table exists and the temporary database of it.
type database struct {
ExistingTables map[string]*model.TableInfo
Name model.CIStr
TemporaryName model.CIStr
}

// getDatabaseByName make a record of a database from info schema by its name.
func (rc *Client) getDatabaseByName(name string) (*database, bool) {
infoSchema := rc.dom.InfoSchema()
schema, ok := infoSchema.SchemaByName(model.NewCIStr(name))
if !ok {
return nil, false
}
db := &database{
ExistingTables: map[string]*model.TableInfo{},
Name: model.NewCIStr(name),
TemporaryName: utils.TemporaryDBName(name),
}
for _, t := range schema.Tables {
db.ExistingTables[t.Name.L] = t
}
return db, true
}

// afterSystemTablesReplaced do some extra work for special system tables.
// e.g. after inserting to the table mysql.user, we must execute `FLUSH PRIVILEGES` to allow it take effect.
func (rc *Client) afterSystemTablesReplaced(ctx context.Context, tables []string) error {
var err error
for _, table := range tables {
switch {
case table == "user":
// We cannot execute `rc.dom.NotifyUpdatePrivilege` here, because there isn't
// sessionctx.Context provided by the glue.
// TODO: update the glue type and allow we retrive a session context from it.
err = multierr.Append(err, errors.Annotatef(berrors.ErrUnsupportedSystemTable,
"restored user info may not take effect, until you should execute `FLUSH PRIVILEGES` manually"))
}
}
return err
}

// replaceTemporaryTableToSystable replaces the temporary table to real system table.
func (rc *Client) replaceTemporaryTableToSystable(ctx context.Context, tableName string, db *database) error {
execSQL := func(sql string) error {
// SQLs here only contain table name and database name, seems it is no need to redact them.
if err := rc.db.se.Execute(ctx, sql); err != nil {
log.Warn("failed to execute SQL restore system database",
zap.String("table", tableName),
zap.Stringer("database", db.Name),
zap.String("sql", sql),
zap.Error(err),
)
return berrors.ErrUnknown.Wrap(err).GenWithStack("failed to execute %s", sql)
}
log.Info("successfully restore system database",
zap.String("table", tableName),
zap.Stringer("database", db.Name),
zap.String("sql", sql),
)
return nil
}

// The newly created tables have different table IDs with original tables,
// hence the old statistics are invalid.
//
// TODO:
// 1 ) Rewrite the table IDs via `UPDATE _temporary_mysql.stats_xxx SET table_id = new_table_id WHERE table_id = old_table_id`
// BEFORE replacing into and then execute `rc.statsHandler.Update(rc.dom.InfoSchema())`.
// 1.5 ) (Optional) The UPDATE statement sometimes costs, the whole system tables restore step can be place into the restore pipeline.
// 2 ) Deprecate the origin interface for backing up statistics.
if isStatsTable(tableName) {
return berrors.ErrUnsupportedSystemTable.GenWithStack("restoring stats via `mysql` schema isn't support yet: " +
"the table ID is out-of-date and may corrupt existing statistics")
}

if db.ExistingTables[tableName] != nil {
log.Info("table existing, using replace into for restore",
zap.String("table", tableName),
zap.Stringer("schema", db.Name))
replaceIntoSQL := fmt.Sprintf("REPLACE INTO %s SELECT * FROM %s;",
utils.EncloseDBAndTable(db.Name.L, tableName),
utils.EncloseDBAndTable(db.TemporaryName.L, tableName))
return execSQL(replaceIntoSQL)
}

renameSQL := fmt.Sprintf("RENAME TABLE %s TO %s;",
utils.EncloseDBAndTable(db.TemporaryName.L, tableName),
utils.EncloseDBAndTable(db.Name.L, tableName),
)
return execSQL(renameSQL)
}

func (rc *Client) cleanTemporaryDatabase(ctx context.Context, originDB string) {
database := utils.TemporaryDBName(originDB)
log.Debug("dropping temporary database", zap.Stringer("database", database))
sql := fmt.Sprintf("DROP DATABASE IF EXISTS %s", utils.EncloseName(database.L))
if err := rc.db.se.Execute(ctx, sql); err != nil {
logutil.WarnTerm("failed to drop temporary database, it should be dropped manually",
zap.Stringer("database", database),
logutil.ShortError(err),
)
}
}
4 changes: 2 additions & 2 deletions pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,9 @@ func DefineTableFlags(command *cobra.Command) {
}

// DefineFilterFlags defines the --filter and --case-sensitive flags for `full` subcommand.
func DefineFilterFlags(command *cobra.Command) {
func DefineFilterFlags(command *cobra.Command, defaultFilter []string) {
flags := command.Flags()
flags.StringArrayP(flagFilter, "f", []string{"*.*"}, "select tables to process")
flags.StringArrayP(flagFilter, "f", defaultFilter, "select tables to process")
flags.Bool(flagCaseSensitive, false, "whether the table names used in --filter should be case-sensitive")
}

Expand Down
13 changes: 13 additions & 0 deletions pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/br/pkg/storage"
"github.com/pingcap/br/pkg/summary"
"github.com/pingcap/br/pkg/utils"
"github.com/pingcap/br/pkg/version"
)

const (
Expand Down Expand Up @@ -139,6 +140,14 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
return errors.Trace(err)
}
g.Record("Size", utils.ArchiveSize(backupMeta))

backupVersion := version.NormalizeBackupVersion(backupMeta.ClusterVersion)
if cfg.CheckRequirements && backupVersion != nil {
if versionErr := version.CheckClusterVersion(ctx, mgr.GetPDClient(), version.CheckVersionForBackup(backupVersion)); versionErr != nil {
return errors.Trace(versionErr)
}
}

if err = client.InitBackupMeta(backupMeta, u); err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -306,6 +315,10 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
return errors.Trace(err)
}

// The cost of rename user table / replace into system table wouldn't be so high.
// So leave it out of the pipeline for easier implementation.
client.RestoreSystemSchemas(ctx, cfg.TableFilter)

// Set task summary to success status.
summary.SetSuccessStatus(true)
return nil
Expand Down
Loading

0 comments on commit 14bed3f

Please sign in to comment.