Skip to content

Commit

Permalink
Merge branch 'master' into ttl_tirgger_command
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao committed Jan 10, 2023
2 parents 75f03ab + b912237 commit 460f925
Show file tree
Hide file tree
Showing 24 changed files with 455 additions and 73 deletions.
92 changes: 92 additions & 0 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2646,6 +2646,74 @@ func (rc *Client) SetWithSysTable(withSysTable bool) {
rc.withSysTable = withSysTable
}

func (rc *Client) ResetTiFlashReplicas(ctx context.Context, g glue.Glue, storage kv.Storage) error {
dom, err := g.GetDomain(storage)
if err != nil {
return errors.Trace(err)
}
info := dom.InfoSchema()
allSchema := info.AllSchemas()
recorder := tiflashrec.New()

expectTiFlashStoreCount := uint64(0)
needTiFlash := false
for _, s := range allSchema {
for _, t := range s.Tables {
if t.TiFlashReplica != nil {
expectTiFlashStoreCount = mathutil.Max(expectTiFlashStoreCount, t.TiFlashReplica.Count)
recorder.AddTable(t.ID, *t.TiFlashReplica)
needTiFlash = true
}
}
}
if !needTiFlash {
log.Info("no need to set tiflash replica, since there is no tables enable tiflash replica")
return nil
}
// we wait for ten minutes to wait tiflash starts.
// since tiflash only starts when set unmark recovery mode finished.
timeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
err = utils.WithRetry(timeoutCtx, func() error {
tiFlashStoreCount, err := rc.getTiFlashNodeCount(ctx)
log.Info("get tiflash store count for resetting TiFlash Replica",
zap.Uint64("count", tiFlashStoreCount))
if err != nil {
return errors.Trace(err)
}
if tiFlashStoreCount < expectTiFlashStoreCount {
log.Info("still waiting for enough tiflash store start",
zap.Uint64("expect", expectTiFlashStoreCount),
zap.Uint64("actual", tiFlashStoreCount),
)
return errors.New("tiflash store count is less than expected")
}
return nil
}, &waitTiFlashBackoffer{
Attempts: 30,
BaseBackoff: 4 * time.Second,
})
if err != nil {
return err
}

sqls := recorder.GenerateResetAlterTableDDLs(info)
log.Info("Generating SQLs for resetting tiflash replica",
zap.Strings("sqls", sqls))

return g.UseOneShotSession(storage, false, func(se glue.Session) error {
for _, sql := range sqls {
if errExec := se.ExecuteInternal(ctx, sql); errExec != nil {
logutil.WarnTerm("Failed to restore tiflash replica config, you may execute the sql restore it manually.",
logutil.ShortError(errExec),
zap.String("sql", sql),
)
}
}
return nil
})
}

// MockClient create a fake client used to test.
func MockClient(dbs map[string]*utils.Database) *Client {
return &Client{databases: dbs}
Expand Down Expand Up @@ -2721,3 +2789,27 @@ func CheckNewCollationEnable(
log.Info("set new_collation_enabled", zap.Bool("new_collation_enabled", enabled))
return nil
}

type waitTiFlashBackoffer struct {
Attempts int
BaseBackoff time.Duration
}

// NextBackoff returns a duration to wait before retrying again
func (b *waitTiFlashBackoffer) NextBackoff(error) time.Duration {
bo := b.BaseBackoff
b.Attempts--
if b.Attempts == 0 {
return 0
}
b.BaseBackoff *= 2
if b.BaseBackoff > 32*time.Second {
b.BaseBackoff = 32 * time.Second
}
return bo
}

// Attempt returns the remain attempt times
func (b *waitTiFlashBackoffer) Attempt() int {
return b.Attempts
}
52 changes: 50 additions & 2 deletions br/pkg/restore/tiflashrec/tiflash_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,46 @@ func (r *TiFlashRecorder) Rewrite(oldID int64, newID int64) {
}
}

func (r *TiFlashRecorder) GenerateResetAlterTableDDLs(info infoschema.InfoSchema) []string {
items := make([]string, 0, len(r.items))
r.Iterate(func(id int64, replica model.TiFlashReplicaInfo) {
table, ok := info.TableByID(id)
if !ok {
log.Warn("Table do not exist, skipping", zap.Int64("id", id))
return
}
schema, ok := info.SchemaByTable(table.Meta())
if !ok {
log.Warn("Schema do not exist, skipping", zap.Int64("id", id), zap.Stringer("table", table.Meta().Name))
return
}
// Currently, we didn't backup tiflash cluster volume during volume snapshot backup,
// But the table has replica info after volume restoration.
// We should reset it to 0, then set it back. otherwise, it will return error when alter tiflash replica.
altTableSpec, err := alterTableSpecOf(replica, true)
if err != nil {
log.Warn("Failed to generate the alter table spec", logutil.ShortError(err), zap.Any("replica", replica))
return
}
items = append(items, fmt.Sprintf(
"ALTER TABLE %s %s",
utils.EncloseDBAndTable(schema.Name.O, table.Meta().Name.O),
altTableSpec),
)
altTableSpec, err = alterTableSpecOf(replica, false)
if err != nil {
log.Warn("Failed to generate the alter table spec", logutil.ShortError(err), zap.Any("replica", replica))
return
}
items = append(items, fmt.Sprintf(
"ALTER TABLE %s %s",
utils.EncloseDBAndTable(schema.Name.O, table.Meta().Name.O),
altTableSpec),
)
})
return items
}

func (r *TiFlashRecorder) GenerateAlterTableDDLs(info infoschema.InfoSchema) []string {
items := make([]string, 0, len(r.items))
r.Iterate(func(id int64, replica model.TiFlashReplicaInfo) {
Expand All @@ -92,7 +132,7 @@ func (r *TiFlashRecorder) GenerateAlterTableDDLs(info infoschema.InfoSchema) []s
log.Warn("Schema do not exist, skipping", zap.Int64("id", id), zap.Stringer("table", table.Meta().Name))
return
}
altTableSpec, err := alterTableSpecOf(replica)
altTableSpec, err := alterTableSpecOf(replica, false)
if err != nil {
log.Warn("Failed to generate the alter table spec", logutil.ShortError(err), zap.Any("replica", replica))
return
Expand All @@ -106,14 +146,22 @@ func (r *TiFlashRecorder) GenerateAlterTableDDLs(info infoschema.InfoSchema) []s
return items
}

func alterTableSpecOf(replica model.TiFlashReplicaInfo) (string, error) {
func alterTableSpecOf(replica model.TiFlashReplicaInfo, reset bool) (string, error) {
spec := &ast.AlterTableSpec{
Tp: ast.AlterTableSetTiFlashReplica,
TiFlashReplica: &ast.TiFlashReplicaSpec{
Count: replica.Count,
Labels: replica.LocationLabels,
},
}
if reset {
spec = &ast.AlterTableSpec{
Tp: ast.AlterTableSetTiFlashReplica,
TiFlashReplica: &ast.TiFlashReplicaSpec{
Count: 0,
},
}
}

buf := bytes.NewBuffer(make([]byte, 0, 32))
restoreCx := format.NewRestoreCtx(
Expand Down
29 changes: 29 additions & 0 deletions br/pkg/restore/tiflashrec/tiflash_recorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,32 @@ func TestGenSql(t *testing.T) {
"ALTER TABLE `test`.`evils` SET TIFLASH REPLICA 1 LOCATION LABELS 'kIll''; OR DROP DATABASE test --', 'dEaTh with " + `\\"quoting\\"` + "'",
})
}

func TestGenResetSql(t *testing.T) {
tInfo := func(id int, name string) *model.TableInfo {
return &model.TableInfo{
ID: int64(id),
Name: model.NewCIStr(name),
}
}
fakeInfo := infoschema.MockInfoSchema([]*model.TableInfo{
tInfo(1, "fruits"),
tInfo(2, "whisper"),
})
rec := tiflashrec.New()
rec.AddTable(1, model.TiFlashReplicaInfo{
Count: 1,
})
rec.AddTable(2, model.TiFlashReplicaInfo{
Count: 2,
LocationLabels: []string{"climate"},
})

sqls := rec.GenerateResetAlterTableDDLs(fakeInfo)
require.ElementsMatch(t, sqls, []string{
"ALTER TABLE `test`.`whisper` SET TIFLASH REPLICA 0",
"ALTER TABLE `test`.`whisper` SET TIFLASH REPLICA 2 LOCATION LABELS 'climate'",
"ALTER TABLE `test`.`fruits` SET TIFLASH REPLICA 0",
"ALTER TABLE `test`.`fruits` SET TIFLASH REPLICA 1",
})
}
4 changes: 4 additions & 0 deletions br/pkg/task/restore_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ func RunResolveKvData(c context.Context, g glue.Glue, cmdName string, cfg *Resto
//TODO: restore volume type into origin type
//ModifyVolume(*ec2.ModifyVolumeInput) (*ec2.ModifyVolumeOutput, error) by backupmeta

// since we cannot reset tiflash automaticlly. so we should start it manually
if err = client.ResetTiFlashReplicas(ctx, g, mgr.GetStorage()); err != nil {
return errors.Trace(err)
}
progress.Close()
summary.CollectDuration("restore duration", time.Since(startAll))
summary.SetSuccessStatus(true)
Expand Down
14 changes: 11 additions & 3 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -898,7 +898,7 @@ func (do *Domain) Close() {
const resourceIdleTimeout = 3 * time.Minute // resources in the ResourcePool will be recycled after idleTimeout

// NewDomain creates a new domain. Should not create multiple domains for the same store.
func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duration, idxUsageSyncLease time.Duration, dumpFileGcLease time.Duration, factory pools.Factory, onClose func()) *Domain {
func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duration, idxUsageSyncLease time.Duration, dumpFileGcLease time.Duration, factory pools.Factory) *Domain {
capacity := 200 // capacity of the sysSessionPool size
do := &Domain{
store: store,
Expand All @@ -909,7 +909,6 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio
slowQuery: newTopNSlowQueries(30, time.Hour*24*7, 500),
indexUsageSyncLease: idxUsageSyncLease,
dumpFileGcChecker: &dumpFileGcChecker{gcLease: dumpFileGcLease, paths: []string{replayer.GetPlanReplayerDirName(), GetOptimizerTraceDirName()}},
onClose: onClose,
expiredTimeStamp4PC: types.NewTime(types.ZeroCoreTime, mysql.TypeTimestamp, types.DefaultFsp),
mdlCheckTableInfo: &mdlCheckTableInfo{
mu: sync.Mutex{},
Expand Down Expand Up @@ -1082,6 +1081,11 @@ func (do *Domain) Init(
return nil
}

// SetOnClose used to set do.onClose func.
func (do *Domain) SetOnClose(onClose func()) {
do.onClose = onClose
}

func (do *Domain) initLogBackup(ctx context.Context, pdClient pd.Client) error {
cfg := config.GetGlobalConfig()
if pdClient == nil || do.etcdClient == nil {
Expand Down Expand Up @@ -2475,7 +2479,11 @@ func (do *Domain) serverIDKeeper() {
// StartTTLJobManager creates and starts the ttl job manager
func (do *Domain) StartTTLJobManager() {
do.wg.Run(func() {
ttlJobManager := ttlworker.NewJobManager(do.ddl.GetID(), do.sysSessionPool, do.store, do.GetEtcdClient())
defer func() {
logutil.BgLogger().Info("ttlJobManager exited.")
}()

ttlJobManager := ttlworker.NewJobManager(do.ddl.GetID(), do.sysSessionPool, do.store, do.etcdClient)
do.ttlJobManager = ttlJobManager
ttlJobManager.Start()

Expand Down
6 changes: 3 additions & 3 deletions domain/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestInfo(t *testing.T) {
Storage: s,
pdAddrs: []string{cluster.Members[0].GRPCURL()}}
ddlLease := 80 * time.Millisecond
dom := NewDomain(mockStore, ddlLease, 0, 0, 0, mockFactory, nil)
dom := NewDomain(mockStore, ddlLease, 0, 0, 0, mockFactory)
defer func() {
dom.Close()
err := s.Close()
Expand Down Expand Up @@ -171,7 +171,7 @@ func TestStatWorkRecoverFromPanic(t *testing.T) {
require.NoError(t, err)

ddlLease := 80 * time.Millisecond
dom := NewDomain(store, ddlLease, 0, 0, 0, mockFactory, nil)
dom := NewDomain(store, ddlLease, 0, 0, 0, mockFactory)

metrics.PanicCounter.Reset()
// Since the stats lease is 0 now, so create a new ticker will panic.
Expand Down Expand Up @@ -238,7 +238,7 @@ func TestClosestReplicaReadChecker(t *testing.T) {
require.NoError(t, err)

ddlLease := 80 * time.Millisecond
dom := NewDomain(store, ddlLease, 0, 0, 0, mockFactory, nil)
dom := NewDomain(store, ddlLease, 0, 0, 0, mockFactory)
defer func() {
dom.Close()
require.Nil(t, store.Close())
Expand Down
44 changes: 23 additions & 21 deletions executor/batch_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,28 +190,30 @@ func getKeysNeedCheckOneRow(ctx sessionctx.Context, t table.Table, row []types.D
}
// Pass handle = 0 to GenIndexKey,
// due to we only care about distinct key.
key, distinct, err1 := v.GenIndexKey(ctx.GetSessionVars().StmtCtx,
colVals, kv.IntHandle(0), nil)
if err1 != nil {
return nil, err1
}
// Skip the non-distinct keys.
if !distinct {
continue
}
// If index is used ingest ways, then we should check key from temp index.
if v.Meta().State != model.StatePublic && v.Meta().BackfillState != model.BackfillStateInapplicable {
_, key, _ = tables.GenTempIdxKeyByState(v.Meta(), key)
}
colValStr, err1 := formatDataForDupError(colVals)
if err1 != nil {
return nil, err1
iter := v.GenIndexKVIter(ctx.GetSessionVars().StmtCtx, colVals, kv.IntHandle(0), nil)
for iter.Valid() {
key, _, distinct, err1 := iter.Next(nil)
if err1 != nil {
return nil, err1
}
// Skip the non-distinct keys.
if !distinct {
continue
}
// If index is used ingest ways, then we should check key from temp index.
if v.Meta().State != model.StatePublic && v.Meta().BackfillState != model.BackfillStateInapplicable {
_, key, _ = tables.GenTempIdxKeyByState(v.Meta(), key)
}
colValStr, err1 := formatDataForDupError(colVals)
if err1 != nil {
return nil, err1
}
uniqueKeys = append(uniqueKeys, &keyValueWithDupInfo{
newKey: key,
dupErr: kv.ErrKeyExists.FastGenByArgs(colValStr, fmt.Sprintf("%s.%s", v.TableMeta().Name.String(), v.Meta().Name.String())),
commonHandle: t.Meta().IsCommonHandle,
})
}
uniqueKeys = append(uniqueKeys, &keyValueWithDupInfo{
newKey: key,
dupErr: kv.ErrKeyExists.FastGenByArgs(colValStr, fmt.Sprintf("%s.%s", v.TableMeta().Name.String(), v.Meta().Name.String())),
commonHandle: t.Meta().IsCommonHandle,
})
}
row = row[:len(row)-extraColumns]
result = append(result, toBeCheckedRow{
Expand Down
28 changes: 28 additions & 0 deletions executor/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,34 @@ func TestInsertValueForCastDecimalField(t *testing.T) {
tk.MustQuery(`select cast(a as decimal) from t1;`).Check(testkit.Rows(`9999999999`))
}

func TestInsertForMultiValuedIndex(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec(`drop table if exists t1;`)
tk.MustExec(`create table t1(a json, b int, unique index idx((cast(a as signed array))));`)
tk.MustExec(`insert into t1 values ('[1,11]', 1);`)
tk.MustExec(`insert into t1 values ('[2, 22]', 2);`)
tk.MustQuery(`select * from t1;`).Check(testkit.Rows(`[1, 11] 1`, `[2, 22] 2`))
tk.MustGetErrMsg(`insert into t1 values ('[2, 222]', 2);`, "[kv:1062]Duplicate entry '2' for key 't1.idx'")
tk.MustExec(`replace into t1 values ('[1, 10]', 10)`)
tk.MustQuery(`select * from t1;`).Check(testkit.Rows(`[2, 22] 2`, `[1, 10] 10`))
tk.MustExec(`replace into t1 values ('[1, 2]', 1)`)
tk.MustQuery(`select * from t1;`).Check(testkit.Rows(`[1, 2] 1`))
tk.MustExec(`replace into t1 values ('[1, 11]', 1)`)
tk.MustExec(`insert into t1 values ('[2, 22]', 2);`)
tk.MustQuery(`select * from t1;`).Check(testkit.Rows(`[1, 11] 1`, `[2, 22] 2`))
tk.MustExec(`insert ignore into t1 values ('[1]', 2);`)
tk.MustQuery(`select * from t1;`).Check(testkit.Rows(`[1, 11] 1`, `[2, 22] 2`))
tk.MustExec(`insert ignore into t1 values ('[1, 2]', 2);`)
tk.MustQuery(`select * from t1;`).Check(testkit.Rows(`[1, 11] 1`, `[2, 22] 2`))
tk.MustExec(`insert into t1 values ('[2]', 2) on duplicate key update b = 10;`)
tk.MustQuery(`select * from t1;`).Check(testkit.Rows(`[1, 11] 1`, `[2, 22] 10`))
tk.MustGetErrMsg(`insert into t1 values ('[2, 1]', 2) on duplicate key update a = '[1,2]';`, "[kv:1062]Duplicate entry '[1, 2]' for key 't1.idx'")
tk.MustGetErrMsg(`insert into t1 values ('[1,2]', 2) on duplicate key update a = '[1,2]';`, "[kv:1062]Duplicate entry '[1, 2]' for key 't1.idx'")
tk.MustGetErrMsg(`insert into t1 values ('[11, 22]', 2) on duplicate key update a = '[1,2]';`, "[kv:1062]Duplicate entry '[1, 2]' for key 't1.idx'")
}

func TestInsertDateTimeWithTimeZone(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand Down
Loading

0 comments on commit 460f925

Please sign in to comment.