Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: add balancer for keyspace group #6274

Merged
merged 17 commits into from
Apr 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions pkg/balancer/balancer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package balancer

// Policy is the policy of balancer.
type Policy int

const (
// PolicyRoundRobin is the round robin policy.
PolicyRoundRobin Policy = iota
// PolicyLeast is the policy to return the least used node.
// TODO: move indexed heap to pkg and use it.
PolicyLeast
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is it used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will do at anotherr pr

Copy link
Contributor

@bufferflies bufferflies Apr 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not add it in another pr? maybe you need replace it by adding todo?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to remove PolicyLeast if we don't use it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't worry. It will be added after #6268 is merged.

)

func (p Policy) String() string {
switch p {
case PolicyRoundRobin:
return "round-robin"
case PolicyLeast:
return "least"
default:
return "unknown"
}
}

// Balancer is the interface for balancer.
type Balancer[T uint32 | string] interface {
// Next returns next one.
Next() T
// Put puts one into balancer.
Put(T)
// Delete deletes one from balancer.
Delete(T)
// GetAll returns all nodes.
GetAll() []T
// Len returns the length of nodes.
Len() int
}

// GenByPolicy generates a balancer by policy.
func GenByPolicy[T uint32 | string](policy Policy) Balancer[T] {
switch policy {
case PolicyRoundRobin:
return NewRoundRobin[T]()
default: // only round-robin is supported now.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not return round robin directly?

return NewRoundRobin[T]()
}
}
102 changes: 102 additions & 0 deletions pkg/balancer/balancer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package balancer

import (
"math/rand"
"testing"

"github.com/stretchr/testify/require"
)

func TestBalancerPutAndDelete(t *testing.T) {
t.Parallel()
re := require.New(t)
balancers := []Balancer[uint32]{
NewRoundRobin[uint32](),
}
for _, balancer := range balancers {
re.Equal(uint32(0), balancer.Next())
// test put
exists := make(map[uint32]struct{})
for i := 0; i < 100; i++ {
num := rand.Uint32()
balancer.Put(num)
exists[num] = struct{}{}
re.Equal(len(balancer.GetAll()), len(exists))
t := balancer.Next()
re.Contains(exists, t)
}
// test delete
for num := range exists {
balancer.Delete(num)
delete(exists, num)
re.Equal(len(balancer.GetAll()), len(exists))
if len(exists) == 0 {
break
}
t := balancer.Next()
re.NotEqual(t, num)
re.Contains(exists, t)
}
re.Equal(uint32(0), balancer.Next())
}
}

func TestBalancerDuplicate(t *testing.T) {
t.Parallel()
re := require.New(t)
balancers := []Balancer[uint32]{
NewRoundRobin[uint32](),
}
for _, balancer := range balancers {
re.Len(balancer.GetAll(), 0)
// test duplicate put
balancer.Put(1)
re.Len(balancer.GetAll(), 1)
balancer.Put(1)
re.Len(balancer.GetAll(), 1)
// test duplicate delete
balancer.Delete(1)
re.Len(balancer.GetAll(), 0)
balancer.Delete(1)
re.Len(balancer.GetAll(), 0)
}
}

func TestRoundRobin(t *testing.T) {
t.Parallel()
re := require.New(t)
balancer := NewRoundRobin[uint32]()
for i := 0; i < 100; i++ {
num := rand.Uint32()
balancer.Put(num)
}
statistics := make(map[uint32]int)
for i := 0; i < 1000; i++ {
statistics[balancer.Next()]++
}
min := 1000
max := 0
for _, v := range statistics {
if v < min {
min = v
}
if v > max {
max = v
}
}
re.LessOrEqual(max-min, 10)
}
87 changes: 87 additions & 0 deletions pkg/balancer/round_robin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package balancer

import (
"sync"
"sync/atomic"
)

// RoundRobin is a balancer that selects nodes in a round-robin fashion.
type RoundRobin[T uint32 | string] struct {
sync.RWMutex
nodes []T
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
exists map[T]struct{}
next uint32
}

// NewRoundRobin creates a balancer that selects nodes in a round-robin fashion.
func NewRoundRobin[T uint32 | string]() *RoundRobin[T] {
return &RoundRobin[T]{
nodes: make([]T, 0),
exists: make(map[T]struct{}),
}
}

// Next returns next address
func (r *RoundRobin[T]) Next() (t T) {
r.RLock()
defer r.RUnlock()
if len(r.nodes) == 0 {
return
}
next := atomic.AddUint32(&r.next, 1)
node := r.nodes[(int(next)-1)%len(r.nodes)]
return node
}

// GetAll returns all nodes
func (r *RoundRobin[T]) GetAll() []T {
r.RLock()
defer r.RUnlock()
return r.nodes
}

// Put puts one into balancer.
func (r *RoundRobin[T]) Put(node T) {
r.Lock()
defer r.Unlock()
if _, ok := r.exists[node]; !ok {
r.nodes = append(r.nodes, node)
r.exists[node] = struct{}{}
}
}

// Delete deletes one from balancer.
func (r *RoundRobin[T]) Delete(node T) {
r.Lock()
defer r.Unlock()
if _, ok := r.exists[node]; ok {
for i, n := range r.nodes {
if n == node {
r.nodes = append(r.nodes[:i], r.nodes[i+1:]...)
delete(r.exists, node)
break
}
}
}
}

// Len returns the length of nodes.
func (r *RoundRobin[T]) Len() int {
r.RLock()
defer r.RUnlock()
return len(r.nodes)
}
14 changes: 7 additions & 7 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ type CreateKeyspaceRequest struct {
// Using an existing name will result in error.
Name string
Config map[string]string
// Now is the timestamp used to record creation time.
Now int64
// CreateTime is the timestamp used to record creation time.
CreateTime int64
}

// NewKeyspaceManager creates a Manager of keyspace related data.
Expand Down Expand Up @@ -140,9 +140,9 @@ func (manager *Manager) Bootstrap() error {
return err
}
req := &CreateKeyspaceRequest{
Name: keyspaceName,
Now: now,
Config: config,
Name: keyspaceName,
CreateTime: now,
Config: config,
}
keyspace, err := manager.CreateKeyspace(req)
// Ignore the keyspaceExists error for the same reason as saving default keyspace.
Expand Down Expand Up @@ -190,8 +190,8 @@ func (manager *Manager) CreateKeyspace(request *CreateKeyspaceRequest) (*keyspac
Id: newID,
Name: request.Name,
State: keyspacepb.KeyspaceState_ENABLED,
CreatedAt: request.Now,
StateChangedAt: request.Now,
CreatedAt: request.CreateTime,
StateChangedAt: request.CreateTime,
Config: request.Config,
}
err = manager.saveNewKeyspace(keyspace)
Expand Down
10 changes: 5 additions & 5 deletions pkg/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (suite *keyspaceTestSuite) SetupTest() {
suite.ctx, suite.cancel = context.WithCancel(context.Background())
store := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
allocator := mockid.NewIDAllocator()
kgm := NewKeyspaceGroupManager(suite.ctx, store)
kgm := NewKeyspaceGroupManager(suite.ctx, store, nil, 0)
suite.manager = NewKeyspaceManager(store, nil, allocator, &mockConfig{}, kgm)
suite.NoError(kgm.Bootstrap())
suite.NoError(suite.manager.Bootstrap())
Expand All @@ -82,12 +82,12 @@ func makeCreateKeyspaceRequests(count int) []*CreateKeyspaceRequest {
requests := make([]*CreateKeyspaceRequest, count)
for i := 0; i < count; i++ {
requests[i] = &CreateKeyspaceRequest{
Name: fmt.Sprintf("test_keyspace%d", i),
Name: fmt.Sprintf("test_keyspace_%d", i),
Config: map[string]string{
testConfig1: "100",
testConfig2: "200",
},
Now: now,
CreateTime: now,
}
}
return requests
Expand Down Expand Up @@ -312,8 +312,8 @@ func (suite *keyspaceTestSuite) TestUpdateMultipleKeyspace() {
// checkCreateRequest verifies a keyspace meta matches a create request.
func checkCreateRequest(re *require.Assertions, request *CreateKeyspaceRequest, meta *keyspacepb.KeyspaceMeta) {
re.Equal(request.Name, meta.GetName())
re.Equal(request.Now, meta.GetCreatedAt())
re.Equal(request.Now, meta.GetStateChangedAt())
re.Equal(request.CreateTime, meta.GetCreatedAt())
re.Equal(request.CreateTime, meta.GetStateChangedAt())
re.Equal(keyspacepb.KeyspaceState_ENABLED, meta.GetState())
re.Equal(request.Config, meta.GetConfig())
}
Expand Down
Loading