Skip to content

Commit

Permalink
placement: fix alter placement policy cannot update the relative rang…
Browse files Browse the repository at this point in the history
…es policy (#52254) (#53077)

close #51712, close #52257
  • Loading branch information
ti-chi-bot authored May 28, 2024
1 parent a8087f7 commit 52f35c5
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 66 deletions.
16 changes: 11 additions & 5 deletions pkg/ddl/placement/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,23 +473,29 @@ func (c *constraintsGroup) MergeTransformableRoles() {
c.rules = newRules
}

// GetRangeStartAndEndKeyHex get startKeyHex and endKeyHex of range by rangeBundleID.
func GetRangeStartAndEndKeyHex(rangeBundleID string) (startKey string, endKey string) {
startKey, endKey = "", ""
if rangeBundleID == TiDBBundleRangePrefixForMeta {
startKey = hex.EncodeToString(metaPrefix)
endKey = hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTablePrefix(0)))
}
return startKey, endKey
}

// RebuildForRange rebuilds the bundle for system range.
func (b *Bundle) RebuildForRange(rangeName string, policyName string) *Bundle {
rule := b.Rules
startKey := ""
endKey := ""
switch rangeName {
case KeyRangeGlobal:
b.ID = TiDBBundleRangePrefixForGlobal
b.Index = RuleIndexKeyRangeForGlobal
case KeyRangeMeta:
// change range
startKey = hex.EncodeToString(metaPrefix)
endKey = hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTablePrefix(0)))
b.ID = TiDBBundleRangePrefixForMeta
b.Index = RuleIndexKeyRangeForMeta
}

startKey, endKey := GetRangeStartAndEndKeyHex(b.ID)
b.Override = true
newRules := make([]*Rule, 0, len(rule))
for i, r := range b.Rules {
Expand Down
101 changes: 70 additions & 31 deletions pkg/ddl/placement_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,41 +276,67 @@ func updateExistPlacementPolicy(t *meta.Meta, policy *model.PolicyInfo) error {
return errors.Trace(err)
}

dbIDs, partIDs, tblInfos, err := getPlacementPolicyDependedObjectsIDs(t, policy)
_, partIDs, tblInfos, err := getPlacementPolicyDependedObjectsIDs(t, policy)
if err != nil {
return errors.Trace(err)
}

if len(dbIDs)+len(tblInfos)+len(partIDs) != 0 {
// build bundle from new placement policy.
bundle, err := placement.NewBundleFromOptions(policy.PlacementSettings)
if err != nil {
return errors.Trace(err)
}
// Do the http request only when the rules is existed.
bundles := make([]*placement.Bundle, 0, len(tblInfos)+len(partIDs))
// Reset bundle for tables (including the default rule for partition).
for _, tbl := range tblInfos {
cp := bundle.Clone()
ids := []int64{tbl.ID}
if tbl.Partition != nil {
for _, pDef := range tbl.Partition.Definitions {
ids = append(ids, pDef.ID)
}
// build bundle from new placement policy.
bundle, err := placement.NewBundleFromOptions(policy.PlacementSettings)
if err != nil {
return errors.Trace(err)
}
// Do the http request only when the rules is existed.
bundles := make([]*placement.Bundle, 0, len(tblInfos)+len(partIDs)+2)
// Reset bundle for tables (including the default rule for partition).
for _, tbl := range tblInfos {
cp := bundle.Clone()
ids := []int64{tbl.ID}
if tbl.Partition != nil {
for _, pDef := range tbl.Partition.Definitions {
ids = append(ids, pDef.ID)
}
bundles = append(bundles, cp.Reset(placement.RuleIndexTable, ids))
}
// Reset bundle for partitions.
for _, id := range partIDs {
bundles = append(bundles, cp.Reset(placement.RuleIndexTable, ids))
}
// Reset bundle for partitions.
for _, id := range partIDs {
cp := bundle.Clone()
bundles = append(bundles, cp.Reset(placement.RuleIndexPartition, []int64{id}))
}

resetRangeFn := func(ctx context.Context, rangeName string) error {
rangeBundleID := placement.TiDBBundleRangePrefixForGlobal
if rangeName == placement.KeyRangeMeta {
rangeBundleID = placement.TiDBBundleRangePrefixForMeta
}
policyName, err := GetRangePlacementPolicyName(ctx, rangeBundleID)
if err != nil {
return err
}
if policyName == policy.Name.L {
cp := bundle.Clone()
bundles = append(bundles, cp.Reset(placement.RuleIndexPartition, []int64{id}))
bundles = append(bundles, cp.RebuildForRange(rangeName, policyName))
}
return nil
}
// Reset range "global".
err = resetRangeFn(context.TODO(), placement.KeyRangeGlobal)
if err != nil {
return err
}
// Reset range "meta".
err = resetRangeFn(context.TODO(), placement.KeyRangeMeta)
if err != nil {
return err
}

if len(bundles) > 0 {
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles)
if err != nil {
return errors.Wrapf(err, "failed to notify PD the placement rules")
}
}

return nil
}

Expand Down Expand Up @@ -350,18 +376,13 @@ func CheckPlacementPolicyNotInUseFromInfoSchema(is infoschema.InfoSchema, policy

// checkPlacementPolicyNotInUseFromRange checks whether the placement policy is used by the special range.
func checkPlacementPolicyNotInUseFromRange(policy *model.PolicyInfo) error {
checkFn := func(rangeName string) error {
bundle, err := infosync.GetRuleBundle(context.TODO(), rangeName)
checkFn := func(rangeBundleID string) error {
policyName, err := GetRangePlacementPolicyName(context.TODO(), rangeBundleID)
if err != nil {
return err
}
if bundle == nil {
return nil
}
for _, rule := range bundle.Rules {
if strings.Contains(rule.ID, policy.Name.L) {
return dbterror.ErrPlacementPolicyInUse.GenWithStackByArgs(policy.Name)
}
if policyName == policy.Name.L {
return dbterror.ErrPlacementPolicyInUse.GenWithStackByArgs(policy.Name)
}
return nil
}
Expand Down Expand Up @@ -447,3 +468,21 @@ func checkPlacementPolicyNotUsedByTable(tblInfo *model.TableInfo, policy *model.

return nil
}

// GetRangePlacementPolicyName get the placement policy name used by range.
// rangeBundleID is limited to TiDBBundleRangePrefixForGlobal and TiDBBundleRangePrefixForMeta.
func GetRangePlacementPolicyName(ctx context.Context, rangeBundleID string) (string, error) {
bundle, err := infosync.GetRuleBundle(ctx, rangeBundleID)
if err != nil {
return "", err
}
if bundle == nil || len(bundle.Rules) == 0 {
return "", nil
}
rule := bundle.Rules[0]
pos := strings.LastIndex(rule.ID, "_rule_")
if pos > 0 {
return rule.ID[:pos], nil
}
return "", nil
}
27 changes: 27 additions & 0 deletions pkg/ddl/placement_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,10 +851,37 @@ func TestAlterRangePlacementPolicy(t *testing.T) {
bundle, err := infosync.GetRuleBundle(context.TODO(), placement.TiDBBundleRangePrefixForGlobal)
require.NoError(t, err)
require.Equal(t, 1, len(bundle.Rules))
require.Equal(t, 0, len(bundle.Rules[0].LocationLabels))
tk.MustExec("alter range meta placement policy fiveReplicas")
tk.MustQuery(`show placement;`).Sort().Check(testkit.Rows(
"POLICY fiveReplicas FOLLOWERS=4 NULL",
"RANGE TiDB_GLOBAL FOLLOWERS=4 PENDING",
"RANGE TiDB_META FOLLOWERS=4 PENDING"))
bundle, err = infosync.GetRuleBundle(context.TODO(), placement.TiDBBundleRangePrefixForMeta)
require.NoError(t, err)
require.Equal(t, 1, len(bundle.Rules))
require.Equal(t, 0, len(bundle.Rules[0].LocationLabels))

// Test Issue #51712
tk.MustExec("alter placement policy fiveReplicas followers=4 SURVIVAL_PREFERENCES=\"[region]\"")
tk.MustQuery(`show placement;`).Sort().Check(testkit.Rows(
"POLICY fiveReplicas FOLLOWERS=4 SURVIVAL_PREFERENCES=\"[region]\" NULL",
"RANGE TiDB_GLOBAL FOLLOWERS=4 SURVIVAL_PREFERENCES=\"[region]\" PENDING",
"RANGE TiDB_META FOLLOWERS=4 SURVIVAL_PREFERENCES=\"[region]\" PENDING"))
bundle, err = infosync.GetRuleBundle(context.TODO(), placement.TiDBBundleRangePrefixForGlobal)
require.NoError(t, err)
require.Equal(t, 1, len(bundle.Rules))
require.Equal(t, 1, len(bundle.Rules[0].LocationLabels))
require.Equal(t, "region", bundle.Rules[0].LocationLabels[0])
bundle, err = infosync.GetRuleBundle(context.TODO(), placement.TiDBBundleRangePrefixForMeta)
require.NoError(t, err)
require.Equal(t, 1, len(bundle.Rules))
require.Equal(t, 1, len(bundle.Rules[0].LocationLabels))
require.Equal(t, "region", bundle.Rules[0].LocationLabels[0])
// Test Issue #52257
tk.MustExec("create placement policy fiveRepl followers=4 SURVIVAL_PREFERENCES=\"[region]\"")
tk.MustExec("drop placement policy fiveRepl")

err = tk.ExecToErr("drop placement policy fiveReplicas")
require.EqualError(t, err, "[ddl:8241]Placement policy 'fiveReplicas' is still in use")
tk.MustExec("alter range global placement policy default")
Expand Down
46 changes: 16 additions & 30 deletions pkg/executor/show_placement.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import (
gjson "encoding/json"
"fmt"
"slices"
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/ddl/placement"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/infoschema"
Expand Down Expand Up @@ -269,39 +269,25 @@ func (e *ShowExec) fetchAllPlacementPolicies() error {
}

func (e *ShowExec) fetchRangesPlacementPlocy(ctx context.Context) error {
fetchFn := func(ctx context.Context, rangeName string) error {
bundle, err := infosync.GetRuleBundle(ctx, rangeName)
fetchFn := func(ctx context.Context, rangeBundleID string) error {
policyName, err := ddl.GetRangePlacementPolicyName(ctx, rangeBundleID)
if err != nil {
return err
}
if bundle == nil || len(bundle.Rules) == 0 {
return nil
}
policyName := ""
startKey := []byte("")
endKey := []byte("")
rule := bundle.Rules[0]
pos := strings.Index(rule.ID, "_rule")
if pos > 0 {
policyName = rule.ID[:pos]
}
startKey, err = hex.DecodeString(rule.StartKeyHex)
if err != nil {
return err
}
endKey, err = hex.DecodeString(rule.EndKeyHex)
if err != nil {
return err
}
state, err := infosync.GetReplicationState(ctx, startKey, endKey)
if err != nil {
return err
}
policy, ok := e.is.PolicyByName(model.NewCIStr(policyName))
if !ok {
return errors.Errorf("Policy with name '%s' not found", policyName)
if policyName != "" {
startKeyHex, endKeyHex := placement.GetRangeStartAndEndKeyHex(rangeBundleID)
startKey, _ := hex.DecodeString(startKeyHex)
endKey, _ := hex.DecodeString(endKeyHex)
state, err := infosync.GetReplicationState(ctx, startKey, endKey)
if err != nil {
return err
}
policy, ok := e.is.PolicyByName(model.NewCIStr(policyName))
if !ok {
return errors.Errorf("Policy with name '%s' not found", policyName)
}
e.appendRow([]any{"RANGE " + rangeBundleID, policy.PlacementSettings.String(), state.String()})
}
e.appendRow([]interface{}{"RANGE " + rangeName, policy.PlacementSettings.String(), state.String()})
return nil
}
// try fetch ranges placement policy
Expand Down

0 comments on commit 52f35c5

Please sign in to comment.