Skip to content

config: add region-max-size property #4635

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
2 changes: 2 additions & 0 deletions conf/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

## Join to an existing cluster. The value should be cluster's ${advertise-client-urls}
# join = ""
# max-region-size = 96
# max-split-size = 144

[security]
## Path of file that contains list of trusted SSL CAs. if set, following four settings shouldn't be empty
Expand Down
19 changes: 13 additions & 6 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (
const (
defaultStoreCapacity = 100 * (1 << 30) // 100GiB
defaultRegionSize = 96 * (1 << 20) // 96MiB
mb = (1 << 20) // 1MiB
mb = 1 << 20 // 1MiB
)

// Cluster is used to mock a cluster for test purpose.
Expand All @@ -51,18 +51,20 @@ type Cluster struct {
*labeler.RegionLabeler
*statistics.HotStat
*config.PersistOptions
*config.ImmutableConfig
ID uint64
suspectRegions map[uint64]struct{}
}

// NewCluster creates a new Cluster
func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster {
clus := &Cluster{
BasicCluster: core.NewBasicCluster(),
IDAllocator: mockid.NewIDAllocator(),
HotStat: statistics.NewHotStat(ctx),
PersistOptions: opts,
suspectRegions: map[uint64]struct{}{},
BasicCluster: core.NewBasicCluster(),
IDAllocator: mockid.NewIDAllocator(),
HotStat: statistics.NewHotStat(ctx),
PersistOptions: opts,
ImmutableConfig: config.NewTestImmutableOptions(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does immutable means that it cannot be updated by pd-ctl?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, pd will reload config after restarting.

suspectRegions: map[uint64]struct{}{},
}
if clus.PersistOptions.GetReplicationConfig().EnablePlacementRules {
clus.initRuleManager()
Expand All @@ -78,6 +80,11 @@ func (mc *Cluster) GetOpts() *config.PersistOptions {
return mc.PersistOptions
}

// GetImmutableCfg returns the cluster immutable configuration.
func (mc *Cluster) GetImmutableCfg() *config.ImmutableConfig {
return mc.ImmutableConfig
}

// GetAllocator returns the ID allocator.
func (mc *Cluster) GetAllocator() id.Allocator {
return mc.IDAllocator
Expand Down
22 changes: 15 additions & 7 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,13 @@ type RaftCluster struct {
clusterID uint64

// cached cluster info
core *core.BasicCluster
meta *metapb.Cluster
opt *config.PersistOptions
storage storage.Storage
id id.Allocator
limiter *StoreLimiter
core *core.BasicCluster
meta *metapb.Cluster
opt *config.PersistOptions
immutableCfg *config.ImmutableConfig
storage storage.Storage
id id.Allocator
limiter *StoreLimiter

changedRegions chan *core.RegionInfo

Expand Down Expand Up @@ -132,14 +133,16 @@ type Status struct {
}

// NewRaftCluster create a new cluster.
func NewRaftCluster(ctx context.Context, clusterID uint64, regionSyncer *syncer.RegionSyncer, etcdClient *clientv3.Client, httpClient *http.Client) *RaftCluster {
func NewRaftCluster(ctx context.Context, clusterID uint64, regionSyncer *syncer.RegionSyncer, etcdClient *clientv3.Client,
httpClient *http.Client, cfg *config.Config) *RaftCluster {
return &RaftCluster{
serverCtx: ctx,
running: false,
clusterID: clusterID,
regionSyncer: regionSyncer,
httpClient: httpClient,
etcdClient: etcdClient,
immutableCfg: config.NewImmutableConfig(cfg),
}
}

Expand Down Expand Up @@ -479,6 +482,11 @@ func (c *RaftCluster) GetOpts() *config.PersistOptions {
return c.opt
}

// GetImmutableCfg gets the cluster Immutable configuration.
func (c *RaftCluster) GetImmutableCfg() *config.ImmutableConfig {
return c.immutableCfg
}

// AddSuspectRegions adds regions to suspect list.
func (c *RaftCluster) AddSuspectRegions(regionIDs ...uint64) {
c.coordinator.checkers.AddSuspectRegions(regionIDs...)
Expand Down
11 changes: 10 additions & 1 deletion server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ type Config struct {
ReplicationMode ReplicationModeConfig `toml:"replication-mode" json:"replication-mode"`

EnableAuditMiddleware bool

MaxRegionSize uint64 `toml:"max-region-size" json:"max-region-size"`

MaxSplitSize uint64 `toml:"max-split-size" json:"max-split-size"`
}

// NewConfig creates a new config.
Expand Down Expand Up @@ -251,6 +255,9 @@ const (
DefaultTSOUpdatePhysicalInterval = 50 * time.Millisecond
maxTSOUpdatePhysicalInterval = 10 * time.Second
minTSOUpdatePhysicalInterval = 50 * time.Millisecond

defaultMaxRegionSize = 96
defaultMaxSplitSize = 144
)

// Special keys for Labels
Expand Down Expand Up @@ -441,7 +448,6 @@ func (c *Config) Validate() error {
if !strings.HasPrefix(rel, "..") {
return errors.New("log directory shouldn't be the subdirectory of data directory")
}

return nil
}

Expand Down Expand Up @@ -594,6 +600,9 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {

c.Security.Encryption.Adjust()

adjustUint64(&c.MaxRegionSize, defaultMaxRegionSize)
adjustUint64(&c.MaxSplitSize, defaultMaxSplitSize)

return nil
}

Expand Down
17 changes: 17 additions & 0 deletions server/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ func (s *testConfigSuite) TestAdjust(c *C) {
name = ""
lease = 0
max-request-bytes = 20000000
max-region-size = 10000
max-split-size = 14400

[pd-server]
metric-storage = "http://127.0.0.1:9090"
Expand All @@ -187,6 +189,8 @@ leader-schedule-limit = 0
c.Assert(cfg.Name, Equals, fmt.Sprintf("%s-%s", defaultName, host))
c.Assert(cfg.LeaderLease, Equals, defaultLeaderLease)
c.Assert(cfg.MaxRequestBytes, Equals, uint(20000000))
c.Assert(cfg.MaxRegionSize, Equals, uint64(10000))
c.Assert(cfg.MaxSplitSize, Equals, uint64(14400))
// When defined, use values from config file.
c.Assert(cfg.Schedule.MaxMergeRegionSize, Equals, uint64(0))
c.Assert(cfg.Schedule.EnableOneWayMerge, IsTrue)
Expand Down Expand Up @@ -508,3 +512,16 @@ func (s *testConfigSuite) TestConfigClone(c *C) {
replicationMode.adjust(emptyConfigMetaData)
c.Assert(replicationMode.Clone(), DeepEquals, replicationMode)
}

func (s *testConfigSuite) TestImmutableConfig(c *C) {
config := NewConfig()
config.Adjust(nil, false)
iconfig := NewImmutableConfig(config)
c.Assert(iconfig.maxRegionSize, Equals, uint64(defaultMaxRegionSize))
c.Assert(iconfig.maxSplitSize, Equals, uint64(defaultMaxSplitSize))
config.MaxRegionSize = 200
config.MaxSplitSize = 300
iconfig = NewImmutableConfig(config)
c.Assert(iconfig.GetMaxRegionSize(), Equals, uint64(200))
c.Assert(iconfig.GetMaxSplitSize(), Equals, uint64(300))
}
40 changes: 40 additions & 0 deletions server/config/immutable_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package config

// ImmutableConfig is a readonly config.
type ImmutableConfig struct {
maxRegionSize uint64
maxSplitSize uint64
}

// NewImmutableConfig creates a new immutable config.
func NewImmutableConfig(cfg *Config) *ImmutableConfig {
config := &ImmutableConfig{
maxRegionSize: cfg.MaxRegionSize,
maxSplitSize: cfg.MaxSplitSize,
}
return config
}

// GetMaxRegionSize returns the max size of every region.
func (ic *ImmutableConfig) GetMaxRegionSize() uint64 {
return ic.maxRegionSize
}

// GetMaxSplitSize returns the max split size of every region.
func (ic *ImmutableConfig) GetMaxSplitSize() uint64 {
return ic.maxSplitSize
}
7 changes: 7 additions & 0 deletions server/config/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ func NewTestOptions() *PersistOptions {
return NewPersistOptions(c)
}

// NewTestImmutableOptions creates default immutable options for testing.
func NewTestImmutableOptions() *ImmutableConfig {
c := NewConfig()
c.Adjust(nil, false)
return NewImmutableConfig(c)
}

// parseUrls parse a string into multiple urls.
func parseUrls(s string) ([]url.URL, error) {
items := strings.Split(s, ",")
Expand Down
14 changes: 11 additions & 3 deletions server/schedule/checker/merge_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ import (
"github.com/tikv/pd/server/schedule/placement"
)

const maxTargetRegionSize = 500
const (
maxTargetRegionSize = 500
maxRegionSizeFactor = 5
)

// When a region has label `merge_option=deny`, skip merging the region.
// If label value is `allow` or other value, it will be treated as `allow`.
Expand All @@ -46,6 +49,7 @@ type MergeChecker struct {
PauseController
cluster schedule.Cluster
opts *config.PersistOptions
config *config.ImmutableConfig
splitCache *cache.TTLUint64
startTime time.Time // it's used to judge whether server recently start.
}
Expand All @@ -57,6 +61,7 @@ func NewMergeChecker(ctx context.Context, cluster schedule.Cluster) *MergeChecke
return &MergeChecker{
cluster: cluster,
opts: opts,
config: cluster.GetImmutableCfg(),
splitCache: splitCache,
startTime: time.Now(),
}
Expand Down Expand Up @@ -145,8 +150,11 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator {
checkerCounter.WithLabelValues("merge_checker", "no-target").Inc()
return nil
}

if target.GetApproximateSize() > maxTargetRegionSize {
maxSize := int64(maxRegionSizeFactor * m.config.GetMaxRegionSize())
if maxSize < maxTargetRegionSize {
maxSize = maxTargetRegionSize
}
if target.GetApproximateSize() > maxSize {
checkerCounter.WithLabelValues("merge_checker", "target-too-large").Inc()
return nil
}
Expand Down
9 changes: 9 additions & 0 deletions server/schedule/checker/merge_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,15 @@ func (s *testMergeCheckerSuite) TestBasic(c *C) {
c.Assert(ops[0].RegionID(), Equals, s.regions[2].GetID())
c.Assert(ops[1].RegionID(), Equals, s.regions[1].GetID())

// change the max region size
cfg := config.NewConfig()
cfg.MaxRegionSize = 200
s.mc.config = config.NewImmutableConfig(cfg)
s.cluster.PutRegion(s.regions[1].Clone(core.SetApproximateSize(200)))
ops = s.mc.Check(s.regions[2])
c.Assert(ops, NotNil)
cfg.MaxRegionSize = 96
s.mc.config = config.NewImmutableConfig(cfg)
// Test the peer store check.
store := s.cluster.GetStore(1)
c.Assert(store, NotNil)
Expand Down
2 changes: 2 additions & 0 deletions server/schedule/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package schedule

import (
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/server/statistics"
Expand All @@ -33,4 +34,5 @@ type Cluster interface {

RemoveScheduler(name string) error
AddSuspectRegions(ids ...uint64)
GetImmutableCfg() *config.ImmutableConfig
}
4 changes: 3 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ type Server struct {
cfg *config.Config
etcdCfg *embed.Config
persistOptions *config.PersistOptions
immutableCfg *config.ImmutableConfig
handler *Handler

ctx context.Context
Expand Down Expand Up @@ -238,6 +239,7 @@ func CreateServer(ctx context.Context, cfg *config.Config, serviceBuilders ...Ha
s := &Server{
cfg: cfg,
persistOptions: config.NewPersistOptions(cfg),
immutableCfg: config.NewImmutableConfig(cfg),
member: &member.Member{},
ctx: ctx,
startTimestamp: time.Now().Unix(),
Expand Down Expand Up @@ -401,7 +403,7 @@ func (s *Server) startServer(ctx context.Context) error {
defaultStorage := storage.NewStorageWithEtcdBackend(s.client, s.rootPath)
s.storage = storage.NewCoreStorage(defaultStorage, regionStorage)
s.basicCluster = core.NewBasicCluster()
s.cluster = cluster.NewRaftCluster(ctx, s.clusterID, syncer.NewRegionSyncer(s), s.client, s.httpClient)
s.cluster = cluster.NewRaftCluster(ctx, s.clusterID, syncer.NewRegionSyncer(s), s.client, s.httpClient, s.GetConfig())
s.hbStreams = hbstream.NewHeartbeatStreams(ctx, s.clusterID, s.cluster)
// initial hot_region_storage in here.
s.hotRegionStorage, err = storage.NewHotRegionsStorage(
Expand Down
4 changes: 2 additions & 2 deletions tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ func (s *clusterTestSuite) TestLoadClusterInfo(c *C) {
tc.WaitLeader()
leaderServer := tc.GetServer(tc.GetLeader())
svr := leaderServer.GetServer()
rc := cluster.NewRaftCluster(s.ctx, svr.ClusterID(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient())
rc := cluster.NewRaftCluster(s.ctx, svr.ClusterID(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetConfig())

// Cluster is not bootstrapped.
rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetStorage(), svr.GetBasicCluster())
Expand Down Expand Up @@ -686,7 +686,7 @@ func (s *clusterTestSuite) TestLoadClusterInfo(c *C) {
}
c.Assert(storage.Flush(), IsNil)

raftCluster = cluster.NewRaftCluster(s.ctx, svr.ClusterID(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient())
raftCluster = cluster.NewRaftCluster(s.ctx, svr.ClusterID(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetConfig())
raftCluster.InitCluster(mockid.NewIDAllocator(), opt, storage, basicCluster)
raftCluster, err = raftCluster.LoadClusterInfo()
c.Assert(err, IsNil)
Expand Down