Skip to content

Commit

Permalink
router, factor: fix the speed control of session migration (#550)
Browse files Browse the repository at this point in the history
  • Loading branch information
djshow832 authored Jun 7, 2024
1 parent c0f8f82 commit 3b03bf6
Show file tree
Hide file tree
Showing 17 changed files with 414 additions and 71 deletions.
4 changes: 2 additions & 2 deletions pkg/balance/factor/factor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ type Factor interface {
UpdateScore(backends []scoredBackend)
// ScoreBitNum returns the bit number of the score.
ScoreBitNum() int
// BalanceCount returns the count of connections to balance in this round.
// 0 indicates balanced (within the threshold) or the migration speed is limited.
// BalanceCount returns the count of connections to balance per second.
// 0 indicates the factor is already balanced.
BalanceCount(from, to scoredBackend) int
SetConfig(cfg *config.Config)
}
4 changes: 2 additions & 2 deletions pkg/balance/factor/factor_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ const (
// connBalancedRatio is the threshold of ratio of the most connection count and least count.
// If the ratio exceeds the threshold, we migrate connections.
connBalancedRatio = 1.2
// balanceCount4Conn indicates how many connections to balance in each round.
// Always migrate 1 connection because we don't know the CPU usage here and we may migrate too many connections.
// balanceCount4Conn indicates how many connections to balance per second.
// Migrate slowly because we don't know the CPU usage here and we may migrate too many connections.
balanceCount4Conn = 1
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/balance/factor/factor_cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const (
// 0.001 represents for 0.1%
minCpuPerConn = 0.001
cpuBalancedRatio = 1.2
// If the CPU difference of 2 backends is 30% and we're narrowing it to 20% in 30 seconds(rounds),
// If the CPU difference of 2 backends is 30% and we're narrowing it to 20% in 30 seconds,
// then in each round, we migrate ((30% - 20%) / 2) / usagePerConn / 30 = 1 / usagePerConn / 600 connections.
balanceRatio4Cpu = 600
)
Expand Down
12 changes: 10 additions & 2 deletions pkg/balance/factor/factor_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import (

const (
errMetricExpDuration = 1 * time.Minute
balanceCount4Err = 1000
// balanceSeconds4Error indicates the time (in seconds) to migrate all the connections.
balanceSeconds4Error = 10
)

type valueRange int
Expand Down Expand Up @@ -242,7 +243,14 @@ func (fe *FactorError) BalanceCount(from, to scoredBackend) int {
fromScore := fe.caclErrScore(from.Addr())
toScore := fe.caclErrScore(to.Addr())
if fromScore-toScore > 1 {
return balanceCount4Err
// Assuming that the source and target backends have similar connections at first.
// We wish the connections to be migrated in 10 seconds but only a few are migrated in each round.
// If we use from.ConnScore() / 10, the migration will be slower and slower.
conns := (from.ConnScore() + to.ConnScore()) / (balanceSeconds4Error * 2)
if conns > 0 {
return conns
}
return 1
}
return 0
}
Expand Down
74 changes: 73 additions & 1 deletion pkg/balance/factor/factor_error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func TestErrorBalance(t *testing.T) {
{float64(errDefinitions[0].failThreshold + 1), float64(errDefinitions[1].recoverThreshold + 1)},
{float64(errDefinitions[0].recoverThreshold - 1), float64(errDefinitions[1].recoverThreshold - 1)}},
scores: []uint64{2, 0},
balanceCount: balanceCount4Err,
balanceCount: 1,
},
}

Expand Down Expand Up @@ -214,3 +214,75 @@ func TestNoErrorMetrics(t *testing.T) {
require.Equal(t, backends[0].score(), backends[1].score(), "test index %d", i)
}
}

func TestErrorBalanceCount(t *testing.T) {
tests := []struct {
conns []int
minCount int
maxCount int
}{
{
conns: []int{1, 0},
minCount: 1,
maxCount: 1,
},
{
conns: []int{10, 0},
minCount: 1,
maxCount: 4,
},
{
conns: []int{10, 10},
minCount: 1,
maxCount: 4,
},
{
conns: []int{100, 10},
minCount: 5,
maxCount: 20,
},
{
conns: []int{1000, 100},
minCount: 50,
maxCount: 100,
},
{
conns: []int{100, 1000},
minCount: 50,
maxCount: 100,
},
{
conns: []int{10000, 10000},
minCount: 500,
maxCount: 1000,
},
}

values := []*model.Sample{
createSample(99999999, 0),
createSample(0, 1),
}
mmr := &mockMetricsReader{
qrs: map[uint64]metricsreader.QueryResult{
1: {
UpdateTime: monotime.Now(),
Value: model.Vector(values),
},
2: {
UpdateTime: monotime.Now(),
Value: model.Vector(values),
},
},
}
fe := NewFactorError(mmr)
for i, test := range tests {
backends := []scoredBackend{
createBackend(0, test.conns[0], test.conns[0]),
createBackend(1, test.conns[1], test.conns[1]),
}
fe.UpdateScore(backends)
count := fe.BalanceCount(backends[0], backends[1])
require.GreaterOrEqual(t, count, test.minCount, "test idx: %d", i)
require.LessOrEqual(t, count, test.maxCount, "test idx: %d", i)
}
}
13 changes: 10 additions & 3 deletions pkg/balance/factor/factor_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ package factor
import "github.com/pingcap/tiproxy/lib/config"

const (
// balanceCount4Health indicates how many connections to balance in each round.
// balanceSeconds4Health indicates the time (in seconds) to migrate all the connections.
// If some backends are unhealthy, migrate fast but do not put too much pressure on TiDB.
balanceCount4Health = 1000
balanceSeconds4Health = 5
)

var _ Factor = (*FactorHealth)(nil)
Expand Down Expand Up @@ -42,7 +42,14 @@ func (fh *FactorHealth) ScoreBitNum() int {
}

func (fh *FactorHealth) BalanceCount(from, to scoredBackend) int {
return balanceCount4Health
// Assuming that the source and target backends have similar connections at first.
// We wish the connections to be migrated in 5 seconds but only a few are migrated in each round.
// If we use from.ConnScore() / 5, the migration will be slower and slower.
conns := (from.ConnScore() + to.ConnScore()) / (balanceSeconds4Health * 2)
if conns > 0 {
return conns
}
return 1
}

func (fcc *FactorHealth) SetConfig(cfg *config.Config) {
Expand Down
64 changes: 64 additions & 0 deletions pkg/balance/factor/factor_health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,67 @@ func TestFactorHealth(t *testing.T) {
require.Equal(t, test.expectedScore, backends[i].score(), "test idx: %d", i)
}
}

func TestHealthBalanceCount(t *testing.T) {
factor := NewFactorHealth()
tests := []struct {
conns []int
minCount int
maxCount int
}{
{
conns: []int{1, 0},
minCount: 1,
maxCount: 1,
},
{
conns: []int{10, 0},
minCount: 1,
maxCount: 5,
},
{
conns: []int{10, 10},
minCount: 1,
maxCount: 5,
},
{
conns: []int{100, 10},
minCount: 10,
maxCount: 40,
},
{
conns: []int{1000, 100},
minCount: 100,
maxCount: 200,
},
{
conns: []int{100, 1000},
minCount: 100,
maxCount: 200,
},
{
conns: []int{10000, 10000},
minCount: 1000,
maxCount: 2000,
},
}
for i, test := range tests {
backends := []scoredBackend{
{
BackendCtx: &mockBackend{
healthy: false,
connScore: test.conns[0],
},
},
{
BackendCtx: &mockBackend{
healthy: true,
connScore: test.conns[1],
},
},
}
count := factor.BalanceCount(backends[0], backends[1])
require.GreaterOrEqual(t, count, test.minCount, "test idx: %d", i)
require.LessOrEqual(t, count, test.maxCount, "test idx: %d", i)
}
}
2 changes: 1 addition & 1 deletion pkg/balance/factor/factor_label.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package factor
import "github.com/pingcap/tiproxy/lib/config"

const (
// balanceCount4Label indicates how many connections to balance in each round.
// balanceCount4Label indicates how many connections to balance per second.
balanceCount4Label = 1
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/balance/factor/factor_location.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const (
// locationLabelName indicates the label name that location-based balance should be based on.
// We use `zone` because the follower read in TiDB also uses `zone` to decide location.
locationLabelName = "zone"
// balanceCount4Location indicates how many connections to balance in each round.
// balanceCount4Location indicates how many connections to balance per second.
balanceCount4Location = 1
)

Expand Down
13 changes: 10 additions & 3 deletions pkg/balance/factor/factor_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
const (
// If some metrics are missing, we use the old one temporarily for no longer than memMetricExpDuration.
memMetricExpDuration = 1 * time.Minute
// balanceCount4Mem indicates how many connections to balance in each round.
balanceCount4Mem = 1000
// balanceSeconds4Memory indicates the time (in seconds) to migrate all the connections.
balanceSeconds4Memory = 10
)

var _ Factor = (*FactorCPU)(nil)
Expand Down Expand Up @@ -203,7 +203,14 @@ func (fm *FactorMemory) BalanceCount(from, to scoredBackend) int {
fromScore := fm.calcMemScore(from.Addr())
toScore := fm.calcMemScore(to.Addr())
if fromScore-toScore > 1 {
return balanceCount4Mem
// Assuming that the source and target backends have similar connections at first.
// We wish the connections to be migrated in 10 seconds but only a few are migrated in each round.
// If we use from.ConnScore() / 10, the migration will be slower and slower.
conns := (from.ConnScore() + to.ConnScore()) / (balanceSeconds4Memory * 2)
if conns > 0 {
return conns
}
return 1
}
return 0
}
Expand Down
72 changes: 70 additions & 2 deletions pkg/balance/factor/factor_memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,12 @@ func TestMemoryBalance(t *testing.T) {
{
memory: [][]float64{{0.2, 0.15}, {0.81, 0.82}},
scores: []uint64{0, 2},
balanceCount: balanceCount4Mem,
balanceCount: 1,
},
{
memory: [][]float64{{0.2, 0.15}, {0.81, 0.82}, {0.65, 0.65}},
scores: []uint64{0, 2, 1},
balanceCount: balanceCount4Mem,
balanceCount: 1,
},
}

Expand Down Expand Up @@ -213,3 +213,71 @@ func TestNoMemMetrics(t *testing.T) {
require.Equal(t, backends[0].score(), backends[1].score(), "test index %d", i)
}
}

func TestMemoryBalanceCount(t *testing.T) {
tests := []struct {
conns []int
minCount int
maxCount int
}{
{
conns: []int{1, 0},
minCount: 1,
maxCount: 1,
},
{
conns: []int{10, 0},
minCount: 1,
maxCount: 4,
},
{
conns: []int{10, 10},
minCount: 1,
maxCount: 4,
},
{
conns: []int{100, 10},
minCount: 5,
maxCount: 20,
},
{
conns: []int{1000, 100},
minCount: 50,
maxCount: 100,
},
{
conns: []int{100, 1000},
minCount: 50,
maxCount: 100,
},
{
conns: []int{10000, 10000},
minCount: 500,
maxCount: 1000,
},
}

values := []*model.SampleStream{
createSampleStream([]float64{1.0, 1.0}, 0),
createSampleStream([]float64{0, 0}, 1),
}
mmr := &mockMetricsReader{
qrs: map[uint64]metricsreader.QueryResult{
1: {
UpdateTime: monotime.Now(),
Value: model.Matrix(values),
},
},
}
fm := NewFactorMemory(mmr)
for i, test := range tests {
backends := []scoredBackend{
createBackend(0, test.conns[0], test.conns[0]),
createBackend(1, test.conns[1], test.conns[1]),
}
fm.UpdateScore(backends)
count := fm.BalanceCount(backends[0], backends[1])
require.GreaterOrEqual(t, count, test.minCount, "test idx: %d", i)
require.LessOrEqual(t, count, test.maxCount, "test idx: %d", i)
}
}
1 change: 1 addition & 0 deletions pkg/balance/policy/balance_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
type BalancePolicy interface {
Init(cfg *config.Config)
BackendToRoute(backends []BackendCtx) BackendCtx
// balanceCount is the count of connections to balance per second.
BackendsToBalance(backends []BackendCtx) (from, to BackendCtx, balanceCount int, reason []zap.Field)
SetConfig(cfg *config.Config)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/balance/policy/simple_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ const (
// ConnBalancedRatio is the threshold of ratio of the most connection count and least count.
// If the ratio exceeds the threshold, we migrate connections.
ConnBalancedRatio = 1.2
// BalanceCount4Health indicates how many connections to balance in each round.
// BalanceCount4Health indicates how many connections to balance per second.
// If some backends are unhealthy, migrate fast but do not put too much pressure on TiDB.
BalanceCount4Health = 10
BalanceCount4Health = 1000
)

var _ BalancePolicy = (*SimpleBalancePolicy)(nil)
Expand Down
Loading

0 comments on commit 3b03bf6

Please sign in to comment.