From 8eae738c8a333dcbac2f5995b1da677e6fab943e Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger <60744015+Lloyd-Pottiger@users.noreply.github.com> Date: Wed, 19 Apr 2023 21:03:20 +0800 Subject: [PATCH] domin: set placement rule of partition table in batch (#43087) close pingcap/tidb#43070 --- domain/infosync/info.go | 17 ++-- domain/infosync/info_test.go | 14 ++- domain/infosync/tiflash_manager.go | 133 ++++++++++++++++++++++++++++- 3 files changed, 153 insertions(+), 11 deletions(-) diff --git a/domain/infosync/info.go b/domain/infosync/info.go index ef0777572de82..392afdf2f06f9 100644 --- a/domain/infosync/info.go +++ b/domain/infosync/info.go @@ -1279,18 +1279,21 @@ func ConfigureTiFlashPDForPartitions(accel bool, definitions *[]model.PartitionD return errors.Trace(err) } ctx := context.Background() + rules := make([]placement.TiFlashRule, 0, len(*definitions)) + pids := make([]int64, 0, len(*definitions)) for _, p := range *definitions { logutil.BgLogger().Info("ConfigureTiFlashPDForPartitions", zap.Int64("tableID", tableID), zap.Int64("partID", p.ID), zap.Bool("accel", accel), zap.Uint64("count", count)) ruleNew := MakeNewRule(p.ID, count, *locationLabels) - if e := is.tiflashReplicaManager.SetPlacementRule(ctx, ruleNew); e != nil { + rules = append(rules, ruleNew) + pids = append(pids, p.ID) + } + if e := is.tiflashReplicaManager.SetPlacementRuleBatch(ctx, rules); e != nil { + return errors.Trace(e) + } + if accel { + if e := is.tiflashReplicaManager.PostAccelerateScheduleBatch(ctx, pids); e != nil { return errors.Trace(e) } - if accel { - e := is.tiflashReplicaManager.PostAccelerateSchedule(ctx, p.ID) - if e != nil { - return errors.Trace(e) - } - } } return nil } diff --git a/domain/infosync/info_test.go b/domain/infosync/info_test.go index c30ad265984bc..f1d0febf9b2af 100644 --- a/domain/infosync/info_test.go +++ b/domain/infosync/info_test.go @@ -262,17 +262,25 @@ func TestTiFlashManager(t *testing.T) { ConfigureTiFlashPDForPartitions(true, &[]model.PartitionDefinition{ { ID: 2, - Name: model.NewCIStr("p"), + Name: model.NewCIStr("p1"), + LessThan: []string{}, + }, + { + ID: 3, + Name: model.NewCIStr("p2"), LessThan: []string{}, }, }, 3, &[]string{}, 100) rules, err = GetTiFlashGroupRules(ctx, "tiflash") require.NoError(t, err) - // Have table 1 and 2 - require.Equal(t, 2, len(rules)) + // Have table a and partitions p1, p2 + require.Equal(t, 3, len(rules)) z, ok = tiflash.SyncStatus[2] require.Equal(t, true, ok) require.Equal(t, true, z.Accel) + z, ok = tiflash.SyncStatus[3] + require.Equal(t, true, ok) + require.Equal(t, true, z.Accel) CloseTiFlashManager(ctx) } diff --git a/domain/infosync/tiflash_manager.go b/domain/infosync/tiflash_manager.go index deb3800c33ce4..714faa666a41c 100644 --- a/domain/infosync/tiflash_manager.go +++ b/domain/infosync/tiflash_manager.go @@ -56,12 +56,16 @@ type TiFlashReplicaManager interface { SetTiFlashGroupConfig(ctx context.Context) error // SetPlacementRule is a helper function to set placement rule. SetPlacementRule(ctx context.Context, rule placement.TiFlashRule) error + // SetPlacementRuleBatch is a helper function to set a batch of placement rules. + SetPlacementRuleBatch(ctx context.Context, rules []placement.TiFlashRule) error // DeletePlacementRule is to delete placement rule for certain group. DeletePlacementRule(ctx context.Context, group string, ruleID string) error // GetGroupRules to get all placement rule in a certain group. GetGroupRules(ctx context.Context, group string) ([]placement.TiFlashRule, error) // PostAccelerateSchedule sends `regions/accelerate-schedule` request. PostAccelerateSchedule(ctx context.Context, tableID int64) error + // PostAccelerateScheduleBatch sends `regions/accelerate-schedule/batch` request. + PostAccelerateScheduleBatch(ctx context.Context, tableIDs []int64) error // GetRegionCountFromPD is a helper function calling `/stats/region`. GetRegionCountFromPD(ctx context.Context, tableID int64, regionCount *int) error // GetStoresStat gets the TiKV store information by accessing PD's api. @@ -284,6 +288,66 @@ func (m *TiFlashReplicaManagerCtx) doSetPlacementRule(ctx context.Context, rule return nil } +// SetPlacementRuleBatch is a helper function to set a batch of placement rules. +func (m *TiFlashReplicaManagerCtx) SetPlacementRuleBatch(ctx context.Context, rules []placement.TiFlashRule) error { + r := make([]placement.TiFlashRule, 0, len(rules)) + for _, rule := range rules { + r = append(r, encodeRule(m.codec, &rule)) + } + return m.doSetPlacementRuleBatch(ctx, r) +} + +// RuleOpType indicates the operation type +type RuleOpType string + +const ( + // RuleOpAdd a placement rule, only need to specify the field *Rule + RuleOpAdd RuleOpType = "add" + // RuleOpDel a placement rule, only need to specify the field `GroupID`, `ID`, `MatchID` + RuleOpDel RuleOpType = "del" +) + +// RuleOp is for batching placement rule actions. The action type is +// distinguished by the field `Action`. +type RuleOp struct { + *placement.TiFlashRule // information of the placement rule to add/delete the operation type + Action RuleOpType `json:"action"` + DeleteByIDPrefix bool `json:"delete_by_id_prefix"` // if action == delete, delete by the prefix of id +} + +func (m *TiFlashReplicaManagerCtx) doSetPlacementRuleBatch(ctx context.Context, rules []placement.TiFlashRule) error { + if err := m.SetTiFlashGroupConfig(ctx); err != nil { + return err + } + ruleOps := make([]RuleOp, 0, len(rules)) + for i, r := range rules { + if r.Count == 0 { + ruleOps = append(ruleOps, RuleOp{ + TiFlashRule: &rules[i], + Action: RuleOpDel, + }) + } else { + ruleOps = append(ruleOps, RuleOp{ + TiFlashRule: &rules[i], + Action: RuleOpAdd, + }) + } + } + j, err := json.Marshal(ruleOps) + if err != nil { + return errors.Trace(err) + } + buf := bytes.NewBuffer(j) + res, err := doRequest(ctx, "SetPlacementRuleBatch", m.etcdCli.Endpoints(), path.Join(pdapi.Config, "rules", "batch"), "POST", buf) + if err != nil { + return errors.Trace(err) + } + if res == nil { + return fmt.Errorf("TiFlashReplicaManagerCtx returns error in SetPlacementRuleBatch") + } + return nil +} + // DeletePlacementRule is to delete placement rule for certain group. func (m *TiFlashReplicaManagerCtx) DeletePlacementRule(ctx context.Context, group string, ruleID string) error { ruleID = encodeRuleID(m.codec, ruleID) @@ -335,7 +399,7 @@ func (m *TiFlashReplicaManagerCtx) PostAccelerateSchedule(ctx context.Context, t return errors.Trace(err) } buf := bytes.NewBuffer(j) - res, err := doRequest(ctx, "PostAccelerateSchedule", m.etcdCli.Endpoints(), "/pd/api/v1/regions/accelerate-schedule", "POST", buf) + res, err := doRequest(ctx, "PostAccelerateSchedule", m.etcdCli.Endpoints(), path.Join(pdapi.Regions, "accelerate-schedule"), "POST", buf) if err != nil { return errors.Trace(err) } @@ -345,6 +409,36 @@ func (m *TiFlashReplicaManagerCtx) PostAccelerateSchedule(ctx context.Context, t return nil } +// PostAccelerateScheduleBatch sends `regions/batch-accelerate-schedule` request. +func (m *TiFlashReplicaManagerCtx) PostAccelerateScheduleBatch(ctx context.Context, tableIDs []int64) error { + if len(tableIDs) == 0 { + return nil + } + input := make([]map[string]string, 0, len(tableIDs)) + for _, tableID := range tableIDs { + startKey := tablecodec.GenTableRecordPrefix(tableID) + endKey := tablecodec.EncodeTablePrefix(tableID + 1) + startKey, endKey = m.codec.EncodeRegionRange(startKey, endKey) + input = append(input, map[string]string{ + "start_key": hex.EncodeToString(startKey), + "end_key": hex.EncodeToString(endKey), + }) + } + j, err := json.Marshal(input) + if err != nil { + return errors.Trace(err) + } + buf := bytes.NewBuffer(j) + res, err := doRequest(ctx, "PostAccelerateScheduleBatch", m.etcdCli.Endpoints(), path.Join(pdapi.Regions, "accelerate-schedule", "batch"), "POST", buf) + if err != nil { + return errors.Trace(err) + } + if res == nil { + return fmt.Errorf("TiFlashReplicaManagerCtx returns error in PostAccelerateScheduleBatch") + } + return nil +} + // GetRegionCountFromPD is a helper function calling `/stats/region`. func (m *TiFlashReplicaManagerCtx) GetRegionCountFromPD(ctx context.Context, tableID int64, regionCount *int) error { startKey := tablecodec.GenTableRecordPrefix(tableID) @@ -574,6 +668,16 @@ func (tiflash *MockTiFlash) HandleSetPlacementRule(rule placement.TiFlashRule) e return nil } +// HandleSetPlacementRuleBatch is mock function for batch SetTiFlashPlacementRule. +func (tiflash *MockTiFlash) HandleSetPlacementRuleBatch(rules []placement.TiFlashRule) error { + for _, r := range rules { + if err := tiflash.HandleSetPlacementRule(r); err != nil { + return err + } + } + return nil +} + // ResetSyncStatus is mock function for reset sync status. func (tiflash *MockTiFlash) ResetSyncStatus(tableID int, canAvailable bool) { tiflash.Lock() @@ -863,6 +967,16 @@ func (m *mockTiFlashReplicaManagerCtx) SetPlacementRule(ctx context.Context, rul return m.tiflash.HandleSetPlacementRule(rule) } +// SetPlacementRuleBatch is a helper function to set a batch of placement rules. +func (m *mockTiFlashReplicaManagerCtx) SetPlacementRuleBatch(ctx context.Context, rules []placement.TiFlashRule) error { + m.Lock() + defer m.Unlock() + if m.tiflash == nil { + return nil + } + return m.tiflash.HandleSetPlacementRuleBatch(rules) +} + // DeletePlacementRule is to delete placement rule for certain group. func (m *mockTiFlashReplicaManagerCtx) DeletePlacementRule(ctx context.Context, group string, ruleID string) error { m.Lock() @@ -897,6 +1011,23 @@ func (m *mockTiFlashReplicaManagerCtx) PostAccelerateSchedule(ctx context.Contex return m.tiflash.HandlePostAccelerateSchedule(hex.EncodeToString(endKey)) } +// PostAccelerateScheduleBatch sends `regions/batch-accelerate-schedule` request. +func (m *mockTiFlashReplicaManagerCtx) PostAccelerateScheduleBatch(ctx context.Context, tableIDs []int64) error { + m.Lock() + defer m.Unlock() + if m.tiflash == nil { + return nil + } + for _, tableID := range tableIDs { + endKey := tablecodec.EncodeTablePrefix(tableID + 1) + endKey = codec.EncodeBytes([]byte{}, endKey) + if err := m.tiflash.HandlePostAccelerateSchedule(hex.EncodeToString(endKey)); err != nil { + return err + } + } + return nil +} + // GetRegionCountFromPD is a helper function calling `/stats/region`. func (m *mockTiFlashReplicaManagerCtx) GetRegionCountFromPD(ctx context.Context, tableID int64, regionCount *int) error { m.Lock()