Skip to content

Commit

Permalink
mcs: add balancer for keyspace group (#6274)
Browse files Browse the repository at this point in the history
close #6233

Signed-off-by: lhy1024 <admin@liudos.us>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
lhy1024 and ti-chi-bot authored Apr 19, 2023
1 parent 858bbe0 commit 1c360b6
Show file tree
Hide file tree
Showing 19 changed files with 871 additions and 50 deletions.
61 changes: 61 additions & 0 deletions pkg/balancer/balancer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package balancer

// Policy is the policy of balancer.
type Policy int

const (
// PolicyRoundRobin is the round robin policy.
PolicyRoundRobin Policy = iota
// PolicyLeast is the policy to return the least used node.
// TODO: move indexed heap to pkg and use it.
PolicyLeast
)

func (p Policy) String() string {
switch p {
case PolicyRoundRobin:
return "round-robin"
case PolicyLeast:
return "least"
default:
return "unknown"
}
}

// Balancer is the interface for balancer.
type Balancer[T uint32 | string] interface {
// Next returns next one.
Next() T
// Put puts one into balancer.
Put(T)
// Delete deletes one from balancer.
Delete(T)
// GetAll returns all nodes.
GetAll() []T
// Len returns the length of nodes.
Len() int
}

// GenByPolicy generates a balancer by policy.
func GenByPolicy[T uint32 | string](policy Policy) Balancer[T] {
switch policy {
case PolicyRoundRobin:
return NewRoundRobin[T]()
default: // only round-robin is supported now.
return NewRoundRobin[T]()
}
}
102 changes: 102 additions & 0 deletions pkg/balancer/balancer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package balancer

import (
"math/rand"
"testing"

"github.com/stretchr/testify/require"
)

func TestBalancerPutAndDelete(t *testing.T) {
t.Parallel()
re := require.New(t)
balancers := []Balancer[uint32]{
NewRoundRobin[uint32](),
}
for _, balancer := range balancers {
re.Equal(uint32(0), balancer.Next())
// test put
exists := make(map[uint32]struct{})
for i := 0; i < 100; i++ {
num := rand.Uint32()
balancer.Put(num)
exists[num] = struct{}{}
re.Equal(len(balancer.GetAll()), len(exists))
t := balancer.Next()
re.Contains(exists, t)
}
// test delete
for num := range exists {
balancer.Delete(num)
delete(exists, num)
re.Equal(len(balancer.GetAll()), len(exists))
if len(exists) == 0 {
break
}
t := balancer.Next()
re.NotEqual(t, num)
re.Contains(exists, t)
}
re.Equal(uint32(0), balancer.Next())
}
}

func TestBalancerDuplicate(t *testing.T) {
t.Parallel()
re := require.New(t)
balancers := []Balancer[uint32]{
NewRoundRobin[uint32](),
}
for _, balancer := range balancers {
re.Len(balancer.GetAll(), 0)
// test duplicate put
balancer.Put(1)
re.Len(balancer.GetAll(), 1)
balancer.Put(1)
re.Len(balancer.GetAll(), 1)
// test duplicate delete
balancer.Delete(1)
re.Len(balancer.GetAll(), 0)
balancer.Delete(1)
re.Len(balancer.GetAll(), 0)
}
}

func TestRoundRobin(t *testing.T) {
t.Parallel()
re := require.New(t)
balancer := NewRoundRobin[uint32]()
for i := 0; i < 100; i++ {
num := rand.Uint32()
balancer.Put(num)
}
statistics := make(map[uint32]int)
for i := 0; i < 1000; i++ {
statistics[balancer.Next()]++
}
min := 1000
max := 0
for _, v := range statistics {
if v < min {
min = v
}
if v > max {
max = v
}
}
re.LessOrEqual(max-min, 10)
}
87 changes: 87 additions & 0 deletions pkg/balancer/round_robin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package balancer

import (
"sync"
"sync/atomic"
)

// RoundRobin is a balancer that selects nodes in a round-robin fashion.
type RoundRobin[T uint32 | string] struct {
sync.RWMutex
nodes []T
exists map[T]struct{}
next uint32
}

// NewRoundRobin creates a balancer that selects nodes in a round-robin fashion.
func NewRoundRobin[T uint32 | string]() *RoundRobin[T] {
return &RoundRobin[T]{
nodes: make([]T, 0),
exists: make(map[T]struct{}),
}
}

// Next returns next address
func (r *RoundRobin[T]) Next() (t T) {
r.RLock()
defer r.RUnlock()
if len(r.nodes) == 0 {
return
}
next := atomic.AddUint32(&r.next, 1)
node := r.nodes[(int(next)-1)%len(r.nodes)]
return node
}

// GetAll returns all nodes
func (r *RoundRobin[T]) GetAll() []T {
r.RLock()
defer r.RUnlock()
return r.nodes
}

// Put puts one into balancer.
func (r *RoundRobin[T]) Put(node T) {
r.Lock()
defer r.Unlock()
if _, ok := r.exists[node]; !ok {
r.nodes = append(r.nodes, node)
r.exists[node] = struct{}{}
}
}

// Delete deletes one from balancer.
func (r *RoundRobin[T]) Delete(node T) {
r.Lock()
defer r.Unlock()
if _, ok := r.exists[node]; ok {
for i, n := range r.nodes {
if n == node {
r.nodes = append(r.nodes[:i], r.nodes[i+1:]...)
delete(r.exists, node)
break
}
}
}
}

// Len returns the length of nodes.
func (r *RoundRobin[T]) Len() int {
r.RLock()
defer r.RUnlock()
return len(r.nodes)
}
14 changes: 7 additions & 7 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ type CreateKeyspaceRequest struct {
// Using an existing name will result in error.
Name string
Config map[string]string
// Now is the timestamp used to record creation time.
Now int64
// CreateTime is the timestamp used to record creation time.
CreateTime int64
}

// NewKeyspaceManager creates a Manager of keyspace related data.
Expand Down Expand Up @@ -140,9 +140,9 @@ func (manager *Manager) Bootstrap() error {
return err
}
req := &CreateKeyspaceRequest{
Name: keyspaceName,
Now: now,
Config: config,
Name: keyspaceName,
CreateTime: now,
Config: config,
}
keyspace, err := manager.CreateKeyspace(req)
// Ignore the keyspaceExists error for the same reason as saving default keyspace.
Expand Down Expand Up @@ -190,8 +190,8 @@ func (manager *Manager) CreateKeyspace(request *CreateKeyspaceRequest) (*keyspac
Id: newID,
Name: request.Name,
State: keyspacepb.KeyspaceState_ENABLED,
CreatedAt: request.Now,
StateChangedAt: request.Now,
CreatedAt: request.CreateTime,
StateChangedAt: request.CreateTime,
Config: request.Config,
}
err = manager.saveNewKeyspace(keyspace)
Expand Down
10 changes: 5 additions & 5 deletions pkg/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (suite *keyspaceTestSuite) SetupTest() {
suite.ctx, suite.cancel = context.WithCancel(context.Background())
store := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
allocator := mockid.NewIDAllocator()
kgm := NewKeyspaceGroupManager(suite.ctx, store)
kgm := NewKeyspaceGroupManager(suite.ctx, store, nil, 0)
suite.manager = NewKeyspaceManager(store, nil, allocator, &mockConfig{}, kgm)
suite.NoError(kgm.Bootstrap())
suite.NoError(suite.manager.Bootstrap())
Expand All @@ -82,12 +82,12 @@ func makeCreateKeyspaceRequests(count int) []*CreateKeyspaceRequest {
requests := make([]*CreateKeyspaceRequest, count)
for i := 0; i < count; i++ {
requests[i] = &CreateKeyspaceRequest{
Name: fmt.Sprintf("test_keyspace%d", i),
Name: fmt.Sprintf("test_keyspace_%d", i),
Config: map[string]string{
testConfig1: "100",
testConfig2: "200",
},
Now: now,
CreateTime: now,
}
}
return requests
Expand Down Expand Up @@ -312,8 +312,8 @@ func (suite *keyspaceTestSuite) TestUpdateMultipleKeyspace() {
// checkCreateRequest verifies a keyspace meta matches a create request.
func checkCreateRequest(re *require.Assertions, request *CreateKeyspaceRequest, meta *keyspacepb.KeyspaceMeta) {
re.Equal(request.Name, meta.GetName())
re.Equal(request.Now, meta.GetCreatedAt())
re.Equal(request.Now, meta.GetStateChangedAt())
re.Equal(request.CreateTime, meta.GetCreatedAt())
re.Equal(request.CreateTime, meta.GetStateChangedAt())
re.Equal(keyspacepb.KeyspaceState_ENABLED, meta.GetState())
re.Equal(request.Config, meta.GetConfig())
}
Expand Down
Loading

0 comments on commit 1c360b6

Please sign in to comment.