Skip to content

Commit

Permalink
Merge branch 'master' into remove_concurrent_ddl
Browse files Browse the repository at this point in the history
  • Loading branch information
wjhuang2016 authored Dec 30, 2022
2 parents e4d1ab3 + 91adaaf commit b10a7d8
Show file tree
Hide file tree
Showing 230 changed files with 8,060 additions and 2,249 deletions.
72 changes: 40 additions & 32 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,8 @@ def go_deps():
name = "com_github_cespare_xxhash_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/cespare/xxhash/v2",
sum = "h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=",
version = "v2.1.2",
sum = "h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=",
version = "v2.2.0",
)
go_repository(
name = "com_github_charithe_durationcheck",
Expand Down Expand Up @@ -1143,8 +1143,8 @@ def go_deps():
name = "com_github_go_kit_log",
build_file_proto_mode = "disable_global",
importpath = "github.com/go-kit/log",
sum = "h1:7i2K3eKTos3Vc0enKCfnVcgHh2olr/MyfboYq7cAcFw=",
version = "v0.2.0",
sum = "h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU=",
version = "v0.2.1",
)
go_repository(
name = "com_github_go_logfmt_logfmt",
Expand Down Expand Up @@ -2182,8 +2182,8 @@ def go_deps():
name = "com_github_klauspost_compress",
build_file_proto_mode = "disable_global",
importpath = "github.com/klauspost/compress",
sum = "h1:y9FcTHGyrebwfP0ZZqFiaxTaiDnUrGkJkI+f583BL1A=",
version = "v1.15.1",
sum = "h1:NFn1Wr8cfnenSJSA46lLq4wHCcBzKTSjnBIexDMMOV0=",
version = "v1.15.13",
)
go_repository(
name = "com_github_klauspost_cpuid",
Expand Down Expand Up @@ -2435,8 +2435,8 @@ def go_deps():
name = "com_github_matttproud_golang_protobuf_extensions",
build_file_proto_mode = "disable_global",
importpath = "github.com/matttproud/golang_protobuf_extensions",
sum = "h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=",
version = "v1.0.1",
sum = "h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=",
version = "v1.0.4",
)
go_repository(
name = "com_github_maxatome_go_testdeep",
Expand Down Expand Up @@ -2881,8 +2881,8 @@ def go_deps():
name = "com_github_pingcap_badger",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/badger",
sum = "h1:MKVFZuqFvAMiDtv3AbihOQ6rY5IE8LWflI1BuZ/hF0Y=",
version = "v1.5.1-0.20220314162537-ab58fbf40580",
sum = "h1:QB16qn8wx5X4SRn3/5axrjPMNS3WRt87+5Bfrnmt6IA=",
version = "v1.5.1-0.20221229114011-ddffaa0fff7a",
)
go_repository(
name = "com_github_pingcap_check",
Expand All @@ -2895,8 +2895,8 @@ def go_deps():
name = "com_github_pingcap_errors",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/errors",
sum = "h1:3Dm0DWeQlwV8LbpQxP2tojHhxd9aY59KI+QN0ns6bBo=",
version = "v0.11.5-0.20220729040631-518f63d66278",
sum = "h1:m5ZsBa5o/0CkzZXfXLaThzKuR85SnHHetqBCpzQ30h8=",
version = "v0.11.5-0.20221009092201-b66cddb77c32",
)
go_repository(
name = "com_github_pingcap_failpoint",
Expand All @@ -2923,8 +2923,8 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sum = "h1:46ZD6xzQWJ8Jkeal/U7SqkX030Mgs8DAn6QV/9zbqOQ=",
version = "v0.0.0-20221130022225-6c56ac56fe5f",
sum = "h1:v0Z0nC0knwWHn3e9br8EMNfLBB14QDULn142UGjiTMQ=",
version = "v0.0.0-20221213093948-9ccc6beaf0aa",
)
go_repository(
name = "com_github_pingcap_log",
Expand Down Expand Up @@ -3001,29 +3001,29 @@ def go_deps():
name = "com_github_prometheus_client_golang",
build_file_proto_mode = "disable_global",
importpath = "github.com/prometheus/client_golang",
sum = "h1:b71QUfeo5M8gq2+evJdTPfZhYMAU0uKPkyPJ7TPsloU=",
version = "v1.13.0",
sum = "h1:nJdhIvne2eSX/XRAFV9PcvFFRbrjbcTUj0VP62TMhnw=",
version = "v1.14.0",
)
go_repository(
name = "com_github_prometheus_client_model",
build_file_proto_mode = "disable_global",
importpath = "github.com/prometheus/client_model",
sum = "h1:uq5h0d+GuxiXLJLNABMgp2qUWDPiLvgCzz2dUR+/W/M=",
version = "v0.2.0",
sum = "h1:UBgGFHqYdG/TPFD1B1ogZywDqEkwp3fBMvqdiQ7Xew4=",
version = "v0.3.0",
)
go_repository(
name = "com_github_prometheus_common",
build_file_proto_mode = "disable_global",
importpath = "github.com/prometheus/common",
sum = "h1:ccBbHCgIiT9uSoFY0vX8H3zsNR5eLt17/RQLUvn8pXE=",
version = "v0.37.0",
sum = "h1:oOyhkDq05hPZKItWVBkJ6g6AtGxi+fy7F4JvUV8uhsI=",
version = "v0.39.0",
)
go_repository(
name = "com_github_prometheus_procfs",
build_file_proto_mode = "disable_global",
importpath = "github.com/prometheus/procfs",
sum = "h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo=",
version = "v0.8.0",
sum = "h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI=",
version = "v0.9.0",
)
go_repository(
name = "com_github_prometheus_prometheus",
Expand Down Expand Up @@ -3523,12 +3523,20 @@ def go_deps():
sum = "h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=",
version = "v0.0.0-20181126055449-889f96f722a2",
)
go_repository(
name = "com_github_tiancaiamao_gp",
build_file_proto_mode = "disable",
importpath = "github.com/tiancaiamao/gp",
sum = "h1:4RNtqw1/tW67qP9fFgfQpTVd7DrfkaAWu4vsC18QmBo=",
version = "v0.0.0-20221221095600-1a473d1f9b4b",
)

go_repository(
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:/glZOHs/K2pkCioDVae+aThUHFYRYQkEgY4NUTgfh+s=",
version = "v2.0.3",
sum = "h1:m6glgBGCIds9QURbk8Mn+8mjLKDcv6nWrNwYh92fydQ=",
version = "v2.0.4-0.20221226080148-018c59dbd837",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down Expand Up @@ -4432,8 +4440,8 @@ def go_deps():
name = "org_golang_x_oauth2",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/oauth2",
sum = "h1:GtQkldQ9m7yvzCL1V+LrYow3Khe0eJH0w7RbX/VbaIU=",
version = "v0.2.0",
sum = "h1:6l90koy8/LaBLmLu8jpHeHexzMwEita0zFfYlggy2F8=",
version = "v0.3.0",
)
go_repository(
name = "org_golang_x_sync",
Expand Down Expand Up @@ -4467,8 +4475,8 @@ def go_deps():
name = "org_golang_x_time",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/time",
sum = "h1:52I/1L54xyEQAYdtcSuxtiT84KGYTBGXwayxmIpNJhE=",
version = "v0.2.0",
sum = "h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=",
version = "v0.3.0",
)
go_repository(
name = "org_golang_x_tools",
Expand Down Expand Up @@ -4600,8 +4608,8 @@ def go_deps():
name = "org_uber_go_multierr",
build_file_proto_mode = "disable_global",
importpath = "go.uber.org/multierr",
sum = "h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8=",
version = "v1.8.0",
sum = "h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=",
version = "v1.9.0",
)
go_repository(
name = "org_uber_go_tools",
Expand All @@ -4614,6 +4622,6 @@ def go_deps():
name = "org_uber_go_zap",
build_file_proto_mode = "disable_global",
importpath = "go.uber.org/zap",
sum = "h1:OjGQ5KQDEUawVHxNwQgPpiypGHOxo2mNZsOqTak4fFY=",
version = "v1.23.0",
sum = "h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=",
version = "v1.24.0",
)
9 changes: 9 additions & 0 deletions bindinfo/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,15 @@ func (h *BindHandle) SetBindRecordStatus(originalSQL string, binding *Binding, n
return
}

// SetBindRecordStatusByDigest set a BindRecord's status to the storage and bind cache.
func (h *BindHandle) SetBindRecordStatusByDigest(newStatus, sqlDigest string) (ok bool, err error) {
oldRecord, err := h.GetBindRecordBySQLDigest(sqlDigest)
if err != nil {
return false, err
}
return h.SetBindRecordStatus(oldRecord.OriginalSQL, nil, newStatus)
}

// GCBindRecord physically removes the deleted bind records in mysql.bind_info.
func (h *BindHandle) GCBindRecord() (err error) {
h.bindInfo.Lock()
Expand Down
8 changes: 5 additions & 3 deletions br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,8 @@ func (mgr *Mgr) GetTS(ctx context.Context) (uint64, error) {
}

// GetMergeRegionSizeAndCount returns the tikv config `coprocessor.region-split-size` and `coprocessor.region-split-key`.
func (mgr *Mgr) GetMergeRegionSizeAndCount(ctx context.Context, client *http.Client) (uint64, uint64, error) {
// returns the default config when failed.
func (mgr *Mgr) GetMergeRegionSizeAndCount(ctx context.Context, client *http.Client) (uint64, uint64) {
regionSplitSize := DefaultMergeRegionSizeBytes
regionSplitKeys := DefaultMergeRegionKeyCount
type coprocessor struct {
Expand Down Expand Up @@ -309,9 +310,10 @@ func (mgr *Mgr) GetMergeRegionSizeAndCount(ctx context.Context, client *http.Cli
return nil
})
if err != nil {
return 0, 0, errors.Trace(err)
log.Warn("meet error when getting config from TiKV; using default", logutil.ShortError(err))
return DefaultMergeRegionSizeBytes, DefaultMergeRegionKeyCount
}
return regionSplitSize, regionSplitKeys, nil
return regionSplitSize, regionSplitKeys
}

// GetConfigFromTiKV get configs from all alive tikv stores.
Expand Down
35 changes: 33 additions & 2 deletions br/pkg/conn/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,38 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) {
regionSplitSize: DefaultMergeRegionSizeBytes,
regionSplitKeys: DefaultMergeRegionKeyCount,
},
{
stores: []*metapb.Store{
{
Id: 1,
State: metapb.StoreState_Up,
Labels: []*metapb.StoreLabel{
{
Key: "engine",
Value: "tiflash",
},
},
},
{
Id: 2,
State: metapb.StoreState_Up,
Labels: []*metapb.StoreLabel{
{
Key: "engine",
Value: "tikv",
},
},
},
},
content: []string{
"",
// Assuming the TiKV has failed due to some reason.
"",
},
// no tikv detected in this case
regionSplitSize: DefaultMergeRegionSizeBytes,
regionSplitKeys: DefaultMergeRegionKeyCount,
},
{
stores: []*metapb.Store{
{
Expand Down Expand Up @@ -388,8 +420,7 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) {
httpCli := mockServer.Client()
mgr := &Mgr{PdController: &pdutil.PdController{}}
mgr.PdController.SetPDClient(pdCli)
rs, rk, err := mgr.GetMergeRegionSizeAndCount(ctx, httpCli)
require.NoError(t, err)
rs, rk := mgr.GetMergeRegionSizeAndCount(ctx, httpCli)
require.Equal(t, ca.regionSplitSize, rs)
require.Equal(t, ca.regionSplitKeys, rk)
mockServer.Close()
Expand Down
28 changes: 25 additions & 3 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,32 @@ func (gs *tidbSession) CreatePlacementPolicy(ctx context.Context, policy *model.
return d.CreatePlacementPolicyWithInfo(gs.se, policy, ddl.OnExistIgnore)
}

// SplitBatchCreateTable provide a way to split batch into small batch when batch size is large than 6 MB.
// The raft entry has limit size of 6 MB, a batch of CreateTables may hit this limitation
// TODO: shall query string be set for each split batch create, it looks does not matter if we set once for all.
func (gs *tidbSession) SplitBatchCreateTable(schema model.CIStr, info []*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
var err error
d := domain.GetDomain(gs.se).DDL()
if err = d.BatchCreateTableWithInfo(gs.se, schema, info, append(cs, ddl.OnExistIgnore)...); kv.ErrEntryTooLarge.Equal(err) {
if len(info) == 1 {
return err
}
mid := len(info) / 2
err = gs.SplitBatchCreateTable(schema, info[:mid])
if err != nil {
return err
}
err = gs.SplitBatchCreateTable(schema, info[mid:])
if err != nil {
return err
}
return nil
}
return err
}

// CreateTables implements glue.BatchCreateTableSession.
func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
d := domain.GetDomain(gs.se).DDL()
var dbName model.CIStr

// Disable foreign key check when batch create tables.
Expand Down Expand Up @@ -233,8 +256,7 @@ func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*mo
cloneTables = append(cloneTables, table)
}
gs.se.SetValue(sessionctx.QueryString, queryBuilder.String())
err := d.BatchCreateTableWithInfo(gs.se, dbName, cloneTables, append(cs, ddl.OnExistIgnore)...)
if err != nil {
if err := gs.SplitBatchCreateTable(dbName, cloneTables); err != nil {
//It is possible to failure when TiDB does not support model.ActionCreateTables.
//In this circumstance, BatchCreateTableWithInfo returns errno.ErrInvalidDDLJob,
//we fall back to old way that creating table one by one
Expand Down
50 changes: 7 additions & 43 deletions br/pkg/lightning/restore/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,10 @@ func DBFromConfig(ctx context.Context, dsn config.DBStore) (*sql.DB, error) {
"tidb_opt_write_row_id": "1",
// always set auto-commit to ON
"autocommit": "1",
// alway set transaction mode to optimistic
// always set transaction mode to optimistic
"tidb_txn_mode": "optimistic",
// disable foreign key checks
"foreign_key_checks": "0",
}

if dsn.Vars != nil {
Expand Down Expand Up @@ -143,62 +145,24 @@ func (timgr *TiDBManager) Close() {
timgr.db.Close()
}

func InitSchema(ctx context.Context, g glue.Glue, database string, tablesSchema map[string]string) error {
logger := log.FromContext(ctx).With(zap.String("db", database))
sqlExecutor := g.GetSQLExecutor()

var createDatabase strings.Builder
createDatabase.WriteString("CREATE DATABASE IF NOT EXISTS ")
common.WriteMySQLIdentifier(&createDatabase, database)
err := sqlExecutor.ExecuteWithLog(ctx, createDatabase.String(), "create database", logger)
if err != nil {
return errors.Trace(err)
}

task := logger.Begin(zap.InfoLevel, "create tables")
var sqlCreateStmts []string
loopCreate:
for tbl, sqlCreateTable := range tablesSchema {
task.Debug("create table", zap.String("schema", sqlCreateTable))

sqlCreateStmts, err = createIfNotExistsStmt(g.GetParser(), sqlCreateTable, database, tbl)
if err != nil {
break
}

// TODO: maybe we should put these createStems into a transaction
for _, s := range sqlCreateStmts {
err = sqlExecutor.ExecuteWithLog(
ctx,
s,
"create table",
logger.With(zap.String("table", common.UniqueTable(database, tbl))),
)
if err != nil {
break loopCreate
}
}
}
task.End(zap.ErrorLevel, err)

return errors.Trace(err)
}

func createIfNotExistsStmt(p *parser.Parser, createTable, dbName, tblName string) ([]string, error) {
stmts, _, err := p.ParseSQL(createTable)
if err != nil {
return []string{}, common.ErrInvalidSchemaStmt.Wrap(err).GenWithStackByArgs(createTable)
}

var res strings.Builder
ctx := format.NewRestoreCtx(format.DefaultRestoreFlags|format.RestoreTiDBSpecialComment, &res)
ctx := format.NewRestoreCtx(format.DefaultRestoreFlags|format.RestoreTiDBSpecialComment|format.RestoreWithTTLEnableOff, &res)

retStmts := make([]string, 0, len(stmts))
for _, stmt := range stmts {
switch node := stmt.(type) {
case *ast.CreateDatabaseStmt:
node.Name = model.NewCIStr(dbName)
node.IfNotExists = true
case *ast.DropDatabaseStmt:
node.Name = model.NewCIStr(dbName)
node.IfExists = true
case *ast.CreateTableStmt:
node.Table.Schema = model.NewCIStr(dbName)
node.Table.Name = model.NewCIStr(tblName)
Expand Down
Loading

0 comments on commit b10a7d8

Please sign in to comment.