Skip to content

Commit

Permalink
domin: set placement rule of partition table in batch (#43087)
Browse files Browse the repository at this point in the history
close #43070
  • Loading branch information
Lloyd-Pottiger authored Apr 19, 2023
1 parent d6bfa0e commit 8eae738
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 11 deletions.
17 changes: 10 additions & 7 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -1279,18 +1279,21 @@ func ConfigureTiFlashPDForPartitions(accel bool, definitions *[]model.PartitionD
return errors.Trace(err)
}
ctx := context.Background()
rules := make([]placement.TiFlashRule, 0, len(*definitions))
pids := make([]int64, 0, len(*definitions))
for _, p := range *definitions {
logutil.BgLogger().Info("ConfigureTiFlashPDForPartitions", zap.Int64("tableID", tableID), zap.Int64("partID", p.ID), zap.Bool("accel", accel), zap.Uint64("count", count))
ruleNew := MakeNewRule(p.ID, count, *locationLabels)
if e := is.tiflashReplicaManager.SetPlacementRule(ctx, ruleNew); e != nil {
rules = append(rules, ruleNew)
pids = append(pids, p.ID)
}
if e := is.tiflashReplicaManager.SetPlacementRuleBatch(ctx, rules); e != nil {
return errors.Trace(e)
}
if accel {
if e := is.tiflashReplicaManager.PostAccelerateScheduleBatch(ctx, pids); e != nil {
return errors.Trace(e)
}
if accel {
e := is.tiflashReplicaManager.PostAccelerateSchedule(ctx, p.ID)
if e != nil {
return errors.Trace(e)
}
}
}
return nil
}
Expand Down
14 changes: 11 additions & 3 deletions domain/infosync/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,17 +262,25 @@ func TestTiFlashManager(t *testing.T) {
ConfigureTiFlashPDForPartitions(true, &[]model.PartitionDefinition{
{
ID: 2,
Name: model.NewCIStr("p"),
Name: model.NewCIStr("p1"),
LessThan: []string{},
},
{
ID: 3,
Name: model.NewCIStr("p2"),
LessThan: []string{},
},
}, 3, &[]string{}, 100)
rules, err = GetTiFlashGroupRules(ctx, "tiflash")
require.NoError(t, err)
// Have table 1 and 2
require.Equal(t, 2, len(rules))
// Have table a and partitions p1, p2
require.Equal(t, 3, len(rules))
z, ok = tiflash.SyncStatus[2]
require.Equal(t, true, ok)
require.Equal(t, true, z.Accel)
z, ok = tiflash.SyncStatus[3]
require.Equal(t, true, ok)
require.Equal(t, true, z.Accel)

CloseTiFlashManager(ctx)
}
133 changes: 132 additions & 1 deletion domain/infosync/tiflash_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,16 @@ type TiFlashReplicaManager interface {
SetTiFlashGroupConfig(ctx context.Context) error
// SetPlacementRule is a helper function to set placement rule.
SetPlacementRule(ctx context.Context, rule placement.TiFlashRule) error
// SetPlacementRuleBatch is a helper function to set a batch of placement rules.
SetPlacementRuleBatch(ctx context.Context, rules []placement.TiFlashRule) error
// DeletePlacementRule is to delete placement rule for certain group.
DeletePlacementRule(ctx context.Context, group string, ruleID string) error
// GetGroupRules to get all placement rule in a certain group.
GetGroupRules(ctx context.Context, group string) ([]placement.TiFlashRule, error)
// PostAccelerateSchedule sends `regions/accelerate-schedule` request.
PostAccelerateSchedule(ctx context.Context, tableID int64) error
// PostAccelerateScheduleBatch sends `regions/accelerate-schedule/batch` request.
PostAccelerateScheduleBatch(ctx context.Context, tableIDs []int64) error
// GetRegionCountFromPD is a helper function calling `/stats/region`.
GetRegionCountFromPD(ctx context.Context, tableID int64, regionCount *int) error
// GetStoresStat gets the TiKV store information by accessing PD's api.
Expand Down Expand Up @@ -284,6 +288,66 @@ func (m *TiFlashReplicaManagerCtx) doSetPlacementRule(ctx context.Context, rule
return nil
}

// SetPlacementRuleBatch is a helper function to set a batch of placement rules.
func (m *TiFlashReplicaManagerCtx) SetPlacementRuleBatch(ctx context.Context, rules []placement.TiFlashRule) error {
r := make([]placement.TiFlashRule, 0, len(rules))
for _, rule := range rules {
r = append(r, encodeRule(m.codec, &rule))
}
return m.doSetPlacementRuleBatch(ctx, r)
}

// RuleOpType indicates the operation type
type RuleOpType string

const (
// RuleOpAdd a placement rule, only need to specify the field *Rule
RuleOpAdd RuleOpType = "add"
// RuleOpDel a placement rule, only need to specify the field `GroupID`, `ID`, `MatchID`
RuleOpDel RuleOpType = "del"
)

// RuleOp is for batching placement rule actions. The action type is
// distinguished by the field `Action`.
type RuleOp struct {
*placement.TiFlashRule // information of the placement rule to add/delete the operation type
Action RuleOpType `json:"action"`
DeleteByIDPrefix bool `json:"delete_by_id_prefix"` // if action == delete, delete by the prefix of id
}

func (m *TiFlashReplicaManagerCtx) doSetPlacementRuleBatch(ctx context.Context, rules []placement.TiFlashRule) error {
if err := m.SetTiFlashGroupConfig(ctx); err != nil {
return err
}
ruleOps := make([]RuleOp, 0, len(rules))
for i, r := range rules {
if r.Count == 0 {
ruleOps = append(ruleOps, RuleOp{
TiFlashRule: &rules[i],
Action: RuleOpDel,
})
} else {
ruleOps = append(ruleOps, RuleOp{
TiFlashRule: &rules[i],
Action: RuleOpAdd,
})
}
}
j, err := json.Marshal(ruleOps)
if err != nil {
return errors.Trace(err)
}
buf := bytes.NewBuffer(j)
res, err := doRequest(ctx, "SetPlacementRuleBatch", m.etcdCli.Endpoints(), path.Join(pdapi.Config, "rules", "batch"), "POST", buf)
if err != nil {
return errors.Trace(err)
}
if res == nil {
return fmt.Errorf("TiFlashReplicaManagerCtx returns error in SetPlacementRuleBatch")
}
return nil
}

// DeletePlacementRule is to delete placement rule for certain group.
func (m *TiFlashReplicaManagerCtx) DeletePlacementRule(ctx context.Context, group string, ruleID string) error {
ruleID = encodeRuleID(m.codec, ruleID)
Expand Down Expand Up @@ -335,7 +399,7 @@ func (m *TiFlashReplicaManagerCtx) PostAccelerateSchedule(ctx context.Context, t
return errors.Trace(err)
}
buf := bytes.NewBuffer(j)
res, err := doRequest(ctx, "PostAccelerateSchedule", m.etcdCli.Endpoints(), "/pd/api/v1/regions/accelerate-schedule", "POST", buf)
res, err := doRequest(ctx, "PostAccelerateSchedule", m.etcdCli.Endpoints(), path.Join(pdapi.Regions, "accelerate-schedule"), "POST", buf)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -345,6 +409,36 @@ func (m *TiFlashReplicaManagerCtx) PostAccelerateSchedule(ctx context.Context, t
return nil
}

// PostAccelerateScheduleBatch sends `regions/batch-accelerate-schedule` request.
func (m *TiFlashReplicaManagerCtx) PostAccelerateScheduleBatch(ctx context.Context, tableIDs []int64) error {
if len(tableIDs) == 0 {
return nil
}
input := make([]map[string]string, 0, len(tableIDs))
for _, tableID := range tableIDs {
startKey := tablecodec.GenTableRecordPrefix(tableID)
endKey := tablecodec.EncodeTablePrefix(tableID + 1)
startKey, endKey = m.codec.EncodeRegionRange(startKey, endKey)
input = append(input, map[string]string{
"start_key": hex.EncodeToString(startKey),
"end_key": hex.EncodeToString(endKey),
})
}
j, err := json.Marshal(input)
if err != nil {
return errors.Trace(err)
}
buf := bytes.NewBuffer(j)
res, err := doRequest(ctx, "PostAccelerateScheduleBatch", m.etcdCli.Endpoints(), path.Join(pdapi.Regions, "accelerate-schedule", "batch"), "POST", buf)
if err != nil {
return errors.Trace(err)
}
if res == nil {
return fmt.Errorf("TiFlashReplicaManagerCtx returns error in PostAccelerateScheduleBatch")
}
return nil
}

// GetRegionCountFromPD is a helper function calling `/stats/region`.
func (m *TiFlashReplicaManagerCtx) GetRegionCountFromPD(ctx context.Context, tableID int64, regionCount *int) error {
startKey := tablecodec.GenTableRecordPrefix(tableID)
Expand Down Expand Up @@ -574,6 +668,16 @@ func (tiflash *MockTiFlash) HandleSetPlacementRule(rule placement.TiFlashRule) e
return nil
}

// HandleSetPlacementRuleBatch is mock function for batch SetTiFlashPlacementRule.
func (tiflash *MockTiFlash) HandleSetPlacementRuleBatch(rules []placement.TiFlashRule) error {
for _, r := range rules {
if err := tiflash.HandleSetPlacementRule(r); err != nil {
return err
}
}
return nil
}

// ResetSyncStatus is mock function for reset sync status.
func (tiflash *MockTiFlash) ResetSyncStatus(tableID int, canAvailable bool) {
tiflash.Lock()
Expand Down Expand Up @@ -863,6 +967,16 @@ func (m *mockTiFlashReplicaManagerCtx) SetPlacementRule(ctx context.Context, rul
return m.tiflash.HandleSetPlacementRule(rule)
}

// SetPlacementRuleBatch is a helper function to set a batch of placement rules.
func (m *mockTiFlashReplicaManagerCtx) SetPlacementRuleBatch(ctx context.Context, rules []placement.TiFlashRule) error {
m.Lock()
defer m.Unlock()
if m.tiflash == nil {
return nil
}
return m.tiflash.HandleSetPlacementRuleBatch(rules)
}

// DeletePlacementRule is to delete placement rule for certain group.
func (m *mockTiFlashReplicaManagerCtx) DeletePlacementRule(ctx context.Context, group string, ruleID string) error {
m.Lock()
Expand Down Expand Up @@ -897,6 +1011,23 @@ func (m *mockTiFlashReplicaManagerCtx) PostAccelerateSchedule(ctx context.Contex
return m.tiflash.HandlePostAccelerateSchedule(hex.EncodeToString(endKey))
}

// PostAccelerateScheduleBatch sends `regions/batch-accelerate-schedule` request.
func (m *mockTiFlashReplicaManagerCtx) PostAccelerateScheduleBatch(ctx context.Context, tableIDs []int64) error {
m.Lock()
defer m.Unlock()
if m.tiflash == nil {
return nil
}
for _, tableID := range tableIDs {
endKey := tablecodec.EncodeTablePrefix(tableID + 1)
endKey = codec.EncodeBytes([]byte{}, endKey)
if err := m.tiflash.HandlePostAccelerateSchedule(hex.EncodeToString(endKey)); err != nil {
return err
}
}
return nil
}

// GetRegionCountFromPD is a helper function calling `/stats/region`.
func (m *mockTiFlashReplicaManagerCtx) GetRegionCountFromPD(ctx context.Context, tableID int64, regionCount *int) error {
m.Lock()
Expand Down

0 comments on commit 8eae738

Please sign in to comment.