Skip to content

Commit

Permalink
wait region split when creating keyspace (tikv#47)
Browse files Browse the repository at this point in the history
* wait region split

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>

---------

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>
Signed-off-by: lhy1024 <admin@liudos.us>
  • Loading branch information
zeminzhou authored and lhy1024 committed May 8, 2023
1 parent 076ce4a commit 1bc1006
Show file tree
Hide file tree
Showing 8 changed files with 232 additions and 17 deletions.
99 changes: 84 additions & 15 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package keyspace

import (
"bytes"
"context"
"strconv"
"time"
Expand Down Expand Up @@ -55,6 +56,9 @@ const (
// Config is the interface for keyspace config.
type Config interface {
GetPreAlloc() []string
ToWaitRegionSplit() bool
GetWaitRegionSplitTimeout() time.Duration
GetCheckRegionSplitInterval() time.Duration
}

// Manager manages keyspace related data.
Expand Down Expand Up @@ -86,6 +90,8 @@ type CreateKeyspaceRequest struct {
Config map[string]string
// CreateTime is the timestamp used to record creation time.
CreateTime int64
// IsPreAlloc indicates whether the keyspace is pre-allocated when the cluster starts.
IsPreAlloc bool
}

// NewKeyspaceManager creates a Manager of keyspace related data.
Expand All @@ -112,7 +118,7 @@ func NewKeyspaceManager(
// Bootstrap saves default keyspace info.
func (manager *Manager) Bootstrap() error {
// Split Keyspace Region for default keyspace.
if err := manager.splitKeyspaceRegion(utils.DefaultKeyspaceID); err != nil {
if err := manager.splitKeyspaceRegion(utils.DefaultKeyspaceID, false); err != nil {
return err
}
now := time.Now().Unix()
Expand Down Expand Up @@ -148,6 +154,7 @@ func (manager *Manager) Bootstrap() error {
req := &CreateKeyspaceRequest{
Name: keyspaceName,
CreateTime: now,
IsPreAlloc: true,
Config: config,
}
keyspace, err := manager.CreateKeyspace(req)
Expand All @@ -162,6 +169,11 @@ func (manager *Manager) Bootstrap() error {
return nil
}

// UpdateConfig update keyspace manager's config.
func (manager *Manager) UpdateConfig(cfg Config) {
manager.config = cfg
}

// CreateKeyspace create a keyspace meta with given config and save it to storage.
func (manager *Manager) CreateKeyspace(request *CreateKeyspaceRequest) (*keyspacepb.KeyspaceMeta, error) {
// Validate purposed name's legality.
Expand All @@ -173,8 +185,11 @@ func (manager *Manager) CreateKeyspace(request *CreateKeyspaceRequest) (*keyspac
if err != nil {
return nil, err
}
// If the request to create a keyspace is pre-allocated when the PD starts,
// there is no need to wait for the region split, because TiKV has not started.
waitRegionSplit := !request.IsPreAlloc && manager.config.ToWaitRegionSplit()
// Split keyspace region.
err = manager.splitKeyspaceRegion(newID)
err = manager.splitKeyspaceRegion(newID, waitRegionSplit)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -252,27 +267,81 @@ func (manager *Manager) saveNewKeyspace(keyspace *keyspacepb.KeyspaceMeta) error

// splitKeyspaceRegion add keyspace's boundaries to region label. The corresponding
// region will then be split by Coordinator's patrolRegion.
func (manager *Manager) splitKeyspaceRegion(id uint32) error {
func (manager *Manager) splitKeyspaceRegion(id uint32, waitRegionSplit bool) (err error) {
failpoint.Inject("skipSplitRegion", func() {
failpoint.Return(nil)
})

start := time.Now()
keyspaceRule := makeLabelRule(id)
if cl, ok := manager.cluster.(interface{ GetRegionLabeler() *labeler.RegionLabeler }); ok {
err := cl.GetRegionLabeler().SetLabelRule(keyspaceRule)
if err != nil {
log.Warn("[keyspace] failed to add region label for keyspace",
zap.Uint32("keyspaceID", id),
zap.Error(err),
)
}
log.Info("[keyspace] added region label for keyspace",
cl, ok := manager.cluster.(interface{ GetRegionLabeler() *labeler.RegionLabeler })
if !ok {
return errors.New("cluster does not support region label")
}
err = cl.GetRegionLabeler().SetLabelRule(keyspaceRule)
if err != nil {
log.Warn("[keyspace] failed to add region label for keyspace",
zap.Uint32("keyspaceID", id),
zap.Any("LabelRule", keyspaceRule),
zap.Error(err),
)
return nil
return err
}
return errors.New("cluster does not support region label")
defer func() {
if err != nil {
cl.GetRegionLabeler().DeleteLabelRule(keyspaceRule.ID)
}
}()

if waitRegionSplit {
ranges := keyspaceRule.Data.([]*labeler.KeyRangeRule)
rawLeftBound, rawRightBound := ranges[0].StartKey, ranges[0].EndKey
txnLeftBound, txnRightBound := ranges[1].StartKey, ranges[1].EndKey

ticker := time.NewTicker(manager.config.GetCheckRegionSplitInterval())
timer := time.NewTimer(manager.config.GetWaitRegionSplitTimeout())
defer func() {
ticker.Stop()
timer.Stop()
}()
for {
select {
case <-ticker.C:
regionsInfo := manager.cluster.GetBasicCluster().RegionsInfo
region := regionsInfo.GetRegionByKey(rawLeftBound)
if region == nil || !bytes.Equal(region.GetStartKey(), rawLeftBound) {
continue
}
region = regionsInfo.GetRegionByKey(rawRightBound)
if region == nil || !bytes.Equal(region.GetStartKey(), rawRightBound) {
continue
}
region = regionsInfo.GetRegionByKey(txnLeftBound)
if region == nil || !bytes.Equal(region.GetStartKey(), txnLeftBound) {
continue
}
region = regionsInfo.GetRegionByKey(txnRightBound)
if region == nil || !bytes.Equal(region.GetStartKey(), txnRightBound) {
continue
}
case <-timer.C:
log.Warn("[keyspace] wait region split timeout",
zap.Uint32("keyspaceID", id),
zap.Error(err),
)
err = ErrRegionSplitTimeout
return
}
log.Info("[keyspace] wait reigon split successfully", zap.Uint32("keyspaceID", id))
break
}
}

log.Info("[keyspace] added region label for keyspace",
zap.Uint32("keyspaceID", id),
zap.Any("LabelRule", keyspaceRule),
zap.Duration("takes", time.Since(start)),
)
return
}

// LoadKeyspace returns the keyspace specified by name.
Expand Down
23 changes: 21 additions & 2 deletions pkg/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/tikv/pd/pkg/mock/mockid"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/typeutil"
)

const (
Expand All @@ -51,10 +52,27 @@ func TestKeyspaceTestSuite(t *testing.T) {
}

type mockConfig struct {
PreAlloc []string
PreAlloc []string
WaitRegionSplit bool
WaitRegionSplitTimeout typeutil.Duration
CheckRegionSplitInterval typeutil.Duration
}

func (m *mockConfig) GetPreAlloc() []string { return m.PreAlloc }
func (m *mockConfig) GetPreAlloc() []string {
return m.PreAlloc
}

func (m *mockConfig) ToWaitRegionSplit() bool {
return m.WaitRegionSplit
}

func (m *mockConfig) GetWaitRegionSplitTimeout() time.Duration {
return m.WaitRegionSplitTimeout.Duration
}

func (m *mockConfig) GetCheckRegionSplitInterval() time.Duration {
return m.CheckRegionSplitInterval.Duration
}

func (suite *keyspaceTestSuite) SetupTest() {
suite.ctx, suite.cancel = context.WithCancel(context.Background())
Expand Down Expand Up @@ -89,6 +107,7 @@ func makeCreateKeyspaceRequests(count int) []*CreateKeyspaceRequest {
testConfig2: "200",
},
CreateTime: now,
IsPreAlloc: true,
}
}
return requests
Expand Down
2 changes: 2 additions & 0 deletions pkg/keyspace/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ const (
var (
// ErrKeyspaceNotFound is used to indicate target keyspace does not exist.
ErrKeyspaceNotFound = errors.New("keyspace does not exist")
// ErrRegionSplitTimeout indices to split region timeout
ErrRegionSplitTimeout = errors.New("region split timeout")
// ErrKeyspaceExists indicates target keyspace already exists.
// It's used when creating a new keyspace.
ErrKeyspaceExists = errors.New("keyspace already exists")
Expand Down
18 changes: 18 additions & 0 deletions server/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,28 @@ func (h *confHandler) updateConfig(cfg *config.Config, key string, value interfa
case "cluster-version":
return h.updateClusterVersion(value)
case "label-property": // TODO: support changing label-property
case "keyspace":
return h.updateKeyspaceConfig(cfg, kp[len(kp)-1], value)
}
return errors.Errorf("config prefix %s not found", kp[0])
}

func (h *confHandler) updateKeyspaceConfig(config *config.Config, key string, value interface{}) error {
updated, found, err := jsonutil.AddKeyValue(&config.Keyspace, key, value)
if err != nil {
return err
}

if !found {
return errors.Errorf("config item %s not found", key)
}

if updated {
err = h.svr.SetKeyspaceConfig(config.Keyspace)
}
return err
}

func (h *confHandler) updateSchedule(config *config.Config, key string, value interface{}) error {
updated, found, err := jsonutil.AddKeyValue(&config.Schedule, key, value)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions server/apiv2/handlers/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func CreateKeyspace(c *gin.Context) {
Name: createParams.Name,
Config: createParams.Config,
CreateTime: time.Now().Unix(),
IsPreAlloc: false,
}
meta, err := manager.CreateKeyspace(req)
if err != nil {
Expand Down
60 changes: 60 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,11 @@ const (
defaultGCTunerThreshold = 0.6
minGCTunerThreshold = 0
maxGCTunerThreshold = 0.9

defaultWaitRegionSplitTimeout = 30 * time.Second
defaultCheckRegionSplitInterval = 50 * time.Millisecond
minCheckRegionSplitInterval = 1 * time.Millisecond
maxCheckRegionSplitInterval = 100 * time.Millisecond
)

// Special keys for Labels
Expand Down Expand Up @@ -496,6 +501,8 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {

c.ReplicationMode.adjust(configMetaData.Child("replication-mode"))

c.Keyspace.adjust(configMetaData.Child("keyspace"))

c.Security.Encryption.Adjust()

if len(c.Log.Format) == 0 {
Expand Down Expand Up @@ -1399,9 +1406,62 @@ func (c *DRAutoSyncReplicationConfig) adjust(meta *configutil.ConfigMetaData) {
type KeyspaceConfig struct {
// PreAlloc contains the keyspace to be allocated during keyspace manager initialization.
PreAlloc []string `toml:"pre-alloc" json:"pre-alloc"`
// WaitRegionSplit indicates whether to wait for the region split to complete
WaitRegionSplit bool `toml:"wait-region-split" json:"wait-region-split"`
// WaitRegionSplitTimeout indicates the max duration to wait region split.
WaitRegionSplitTimeout typeutil.Duration `toml:"wait-region-split-timeout" json:"wait-region-split-timeout"`
// CheckRegionSplitInterval indicates the interval to check whether the region split is complete
CheckRegionSplitInterval typeutil.Duration `toml:"check-region-split-interval" json:"check-region-split-interval"`
}

// Validate checks if keyspace config falls within acceptable range.
func (c *KeyspaceConfig) Validate() error {
if c.CheckRegionSplitInterval.Duration > maxCheckRegionSplitInterval || c.CheckRegionSplitInterval.Duration < minCheckRegionSplitInterval {
return errors.New(fmt.Sprintf("[keyspace] check-region-split-interval should between %v and %v",
minCheckRegionSplitInterval, maxCheckRegionSplitInterval))
}
if c.CheckRegionSplitInterval.Duration >= c.WaitRegionSplitTimeout.Duration {
return errors.New(fmt.Sprintf("[keyspace] check-region-split-interval should be less than wait-region-split-timeout"))
}
return nil
}

func (c *KeyspaceConfig) adjust(meta *configutil.ConfigMetaData) {
if !meta.IsDefined("wait-region-split") {
c.WaitRegionSplit = true
}
if !meta.IsDefined("wait-region-split-timeout") {
c.WaitRegionSplitTimeout = typeutil.NewDuration(defaultWaitRegionSplitTimeout)
}
if !meta.IsDefined("check-region-split-interval") {
c.CheckRegionSplitInterval = typeutil.NewDuration(defaultCheckRegionSplitInterval)
}
}

// Clone makes a deep copy of the keyspace config.
func (c *KeyspaceConfig) Clone() *KeyspaceConfig {
preAlloc := append(c.PreAlloc[:0:0], c.PreAlloc...)
cfg := *c
cfg.PreAlloc = preAlloc
return &cfg
}

// GetPreAlloc returns the keyspace to be allocated during keyspace manager initialization.
func (c *KeyspaceConfig) GetPreAlloc() []string {
return c.PreAlloc
}

// ToWaitRegionSplit returns whether to wait for the region split to complete.
func (c *KeyspaceConfig) ToWaitRegionSplit() bool {
return c.WaitRegionSplit
}

// GetWaitRegionSplitTimeout returns the max duration to wait region split.
func (c *KeyspaceConfig) GetWaitRegionSplitTimeout() time.Duration {
return c.WaitRegionSplitTimeout.Duration
}

// GetCheckRegionSplitInterval returns the interval to check whether the region split is complete.
func (c *KeyspaceConfig) GetCheckRegionSplitInterval() time.Duration {
return c.CheckRegionSplitInterval.Duration
}
14 changes: 14 additions & 0 deletions server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type PersistOptions struct {
pdServerConfig atomic.Value
replicationMode atomic.Value
labelProperty atomic.Value
keyspace atomic.Value
clusterVersion unsafe.Pointer
}

Expand All @@ -62,6 +63,7 @@ func NewPersistOptions(cfg *Config) *PersistOptions {
o.pdServerConfig.Store(&cfg.PDServerCfg)
o.replicationMode.Store(&cfg.ReplicationMode)
o.labelProperty.Store(cfg.LabelProperty)
o.keyspace.Store(&cfg.Keyspace)
o.SetClusterVersion(&cfg.ClusterVersion)
o.ttl = nil
return o
Expand Down Expand Up @@ -117,6 +119,16 @@ func (o *PersistOptions) SetLabelPropertyConfig(cfg LabelPropertyConfig) {
o.labelProperty.Store(cfg)
}

// GetKeyspaceConfig returns the keyspace config.
func (o *PersistOptions) GetKeyspaceConfig() *KeyspaceConfig {
return o.keyspace.Load().(*KeyspaceConfig)
}

// SetKeyspaceConfig sets the keyspace configuration.
func (o *PersistOptions) SetKeyspaceConfig(cfg *KeyspaceConfig) {
o.keyspace.Store(cfg)
}

// GetClusterVersion returns the cluster version.
func (o *PersistOptions) GetClusterVersion() *semver.Version {
return (*semver.Version)(atomic.LoadPointer(&o.clusterVersion))
Expand Down Expand Up @@ -736,6 +748,7 @@ func (o *PersistOptions) Persist(storage endpoint.ConfigStorage) error {
PDServerCfg: *o.GetPDServerConfig(),
ReplicationMode: *o.GetReplicationModeConfig(),
LabelProperty: o.GetLabelPropertyConfig(),
Keyspace: *o.GetKeyspaceConfig(),
ClusterVersion: *o.GetClusterVersion(),
}
err := storage.SaveConfig(cfg)
Expand Down Expand Up @@ -763,6 +776,7 @@ func (o *PersistOptions) Reload(storage endpoint.ConfigStorage) error {
o.pdServerConfig.Store(&cfg.PDServerCfg)
o.replicationMode.Store(&cfg.ReplicationMode)
o.labelProperty.Store(cfg.LabelProperty)
o.keyspace.Store(&cfg.Keyspace)
o.SetClusterVersion(&cfg.ClusterVersion)
}
return nil
Expand Down
Loading

0 comments on commit 1bc1006

Please sign in to comment.