Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

brie: support batch ddl for sql restore (#49089) #49851

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions br/pkg/gluetidb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ go_library(
"//domain",
"//executor",
"//kv",
"//meta/autoid",
"//parser/model",
"//parser/mysql",
"//session",
"//sessionctx",
"@com_github_pingcap_errors//:errors",
Expand All @@ -34,14 +32,9 @@ go_test(
flaky = True,
deps = [
"//br/pkg/glue",
"//ddl",
"//kv",
"//meta",
"//parser/model",
"//sessionctx",
"//testkit",
"//types",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
],
)
134 changes: 8 additions & 126 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
package gluetidb

import (
"bytes"
"context"
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand All @@ -17,9 +15,7 @@ import (
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
pd "github.com/tikv/pd/client"
Expand All @@ -32,13 +28,8 @@ var (
_ glue.Glue = Glue{}
)

const (
defaultCapOfCreateTable = 512
defaultCapOfCreateDatabase = 64
brComment = `/*from(br)*/`
)
const brComment = `/*from(br)*/`

// New makes a new tidb glue.
func New() Glue {
log.Debug("enabling no register config")
config.UpdateGlobal(func(conf *config.Config) {
Expand Down Expand Up @@ -192,17 +183,7 @@ func (gs *tidbSession) ExecuteInternal(ctx context.Context, sql string, args ...

// CreateDatabase implements glue.Session.
func (gs *tidbSession) CreateDatabase(ctx context.Context, schema *model.DBInfo) error {
d := domain.GetDomain(gs.se).DDL()
query, err := gs.showCreateDatabase(schema)
if err != nil {
return errors.Trace(err)
}
gs.se.SetValue(sessionctx.QueryString, query)
schema = schema.Clone()
if len(schema.Charset) == 0 {
schema.Charset = mysql.DefaultCharset
}
return d.CreateSchemaWithInfo(gs.se, schema, ddl.OnExistIgnore)
return errors.Trace(executor.BRIECreateDatabase(gs.se, schema, brComment))
}

// CreatePlacementPolicy implements glue.Session.
Expand All @@ -213,91 +194,16 @@ func (gs *tidbSession) CreatePlacementPolicy(ctx context.Context, policy *model.
return d.CreatePlacementPolicyWithInfo(gs.se, policy, ddl.OnExistIgnore)
}

// SplitBatchCreateTable provide a way to split batch into small batch when batch size is large than 6 MB.
// The raft entry has limit size of 6 MB, a batch of CreateTables may hit this limitation
// TODO: shall query string be set for each split batch create, it looks does not matter if we set once for all.
func (gs *tidbSession) SplitBatchCreateTable(schema model.CIStr, infos []*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
var err error
d := domain.GetDomain(gs.se).DDL()

if err = d.BatchCreateTableWithInfo(gs.se, schema, infos, append(cs, ddl.OnExistIgnore)...); kv.ErrEntryTooLarge.Equal(err) {
log.Info("entry too large, split batch create table", zap.Int("num table", len(infos)))
if len(infos) == 1 {
return err
}
mid := len(infos) / 2
err = gs.SplitBatchCreateTable(schema, infos[:mid], cs...)
if err != nil {
return err
}
err = gs.SplitBatchCreateTable(schema, infos[mid:], cs...)
if err != nil {
return err
}
return nil
}
return err
}

// CreateTables implements glue.BatchCreateTableSession.
func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
var dbName model.CIStr

// Disable foreign key check when batch create tables.
gs.se.GetSessionVars().ForeignKeyChecks = false
for db, tablesInDB := range tables {
dbName = model.NewCIStr(db)
queryBuilder := strings.Builder{}
cloneTables := make([]*model.TableInfo, 0, len(tablesInDB))
for _, table := range tablesInDB {
query, err := gs.showCreateTable(table)
if err != nil {
return errors.Trace(err)
}

queryBuilder.WriteString(query)
queryBuilder.WriteString(";")

table = table.Clone()
// Clone() does not clone partitions yet :(
if table.Partition != nil {
newPartition := *table.Partition
newPartition.Definitions = append([]model.PartitionDefinition{}, table.Partition.Definitions...)
table.Partition = &newPartition
}
cloneTables = append(cloneTables, table)
}
gs.se.SetValue(sessionctx.QueryString, queryBuilder.String())

if err := gs.SplitBatchCreateTable(dbName, cloneTables, cs...); err != nil {
//It is possible to failure when TiDB does not support model.ActionCreateTables.
//In this circumstance, BatchCreateTableWithInfo returns errno.ErrInvalidDDLJob,
//we fall back to old way that creating table one by one
log.Warn("batch create table from tidb failure", zap.Error(err))
return err
}
}

return nil
func (gs *tidbSession) CreateTables(_ context.Context,
tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
return errors.Trace(executor.BRIECreateTables(gs.se, tables, brComment, cs...))
}

// CreateTable implements glue.Session.
func (gs *tidbSession) CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
d := domain.GetDomain(gs.se).DDL()
query, err := gs.showCreateTable(table)
if err != nil {
return errors.Trace(err)
}
gs.se.SetValue(sessionctx.QueryString, query)
// Clone() does not clone partitions yet :(
table = table.Clone()
if table.Partition != nil {
newPartition := *table.Partition
newPartition.Definitions = append([]model.PartitionDefinition{}, table.Partition.Definitions...)
table.Partition = &newPartition
}

return d.CreateTableWithInfo(gs.se, dbName, table, append(cs, ddl.OnExistIgnore)...)
func (gs *tidbSession) CreateTable(_ context.Context, dbName model.CIStr,
table *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
return errors.Trace(executor.BRIECreateTable(gs.se, dbName, table, brComment, cs...))
}

// Close implements glue.Session.
Expand All @@ -310,30 +216,6 @@ func (gs *tidbSession) GetGlobalVariable(name string) (string, error) {
return gs.se.GetSessionVars().GlobalVarsAccessor.GetTiDBTableValue(name)
}

// showCreateTable shows the result of SHOW CREATE TABLE from a TableInfo.
func (gs *tidbSession) showCreateTable(tbl *model.TableInfo) (string, error) {
table := tbl.Clone()
table.AutoIncID = 0
result := bytes.NewBuffer(make([]byte, 0, defaultCapOfCreateTable))
// this can never fail.
_, _ = result.WriteString(brComment)
if err := executor.ConstructResultOfShowCreateTable(gs.se, tbl, autoid.Allocators{}, result); err != nil {
return "", errors.Trace(err)
}
return result.String(), nil
}

// showCreateDatabase shows the result of SHOW CREATE DATABASE from a dbInfo.
func (gs *tidbSession) showCreateDatabase(db *model.DBInfo) (string, error) {
result := bytes.NewBuffer(make([]byte, 0, defaultCapOfCreateDatabase))
// this can never fail.
_, _ = result.WriteString(brComment)
if err := executor.ConstructResultOfShowCreateDatabase(gs.se, db, true, result); err != nil {
return "", errors.Trace(err)
}
return result.String(), nil
}

func (gs *tidbSession) showCreatePlacementPolicy(policy *model.PolicyInfo) string {
return executor.ConstructResultOfShowCreatePlacementPolicy(policy)
}
Expand Down
Loading