Skip to content

Commit

Permalink
feat: interleaved weighted round-robin load balance (#2405)
Browse files Browse the repository at this point in the history
* dongjiang, add interleaved weighted roundrobin loadbalance

Signed-off-by: dongjiang1989 <dongjiang1989@126.com>

* add benchmarks test case

Signed-off-by: dongjiang1989 <dongjiang1989@126.com>

* fix unittest case name

Signed-off-by: dongjiang1989 <dongjiang1989@126.com>

---------

Signed-off-by: dongjiang1989 <dongjiang1989@126.com>
  • Loading branch information
dongjiang1989 authored Dec 29, 2023
1 parent 5465486 commit 19d1da0
Show file tree
Hide file tree
Showing 6 changed files with 359 additions and 6 deletions.
19 changes: 19 additions & 0 deletions cluster/loadbalance/interleavedweightedroundrobin/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Package leastactive implements LeastActive load balance strategy.
package iwrr
130 changes: 130 additions & 0 deletions cluster/loadbalance/interleavedweightedroundrobin/iwrr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package iwrr

import (
"math/rand"
"sync"

"dubbo.apache.org/dubbo-go/v3/cluster/loadbalance"
"dubbo.apache.org/dubbo-go/v3/protocol"
)

type iwrrEntry struct {
weight int64
invoker protocol.Invoker

next *iwrrEntry
}

type iwrrQueue struct {
head *iwrrEntry
tail *iwrrEntry
}

func NewIwrrQueue() *iwrrQueue {
return &iwrrQueue{}
}

func (item *iwrrQueue) push(entry *iwrrEntry) {
entry.next = nil
tail := item.tail
item.tail = entry
if tail == nil {
item.head = entry
} else {
tail.next = entry
}
}

func (item *iwrrQueue) pop() *iwrrEntry {
head := item.head
next := head.next
head.next = nil
item.head = next
if next == nil {
item.tail = nil
}
return head
}

func (item *iwrrQueue) empty() bool {
return item.head == nil
}

// InterleavedweightedRoundRobin struct
type interleavedweightedRoundRobin struct {
current *iwrrQueue
next *iwrrQueue
step int64
mu sync.Mutex
}

func NewInterleavedweightedRoundRobin(invokers []protocol.Invoker, invocation protocol.Invocation) *interleavedweightedRoundRobin {
iwrrp := new(interleavedweightedRoundRobin)
iwrrp.current = NewIwrrQueue()
iwrrp.next = NewIwrrQueue()

size := uint64(len(invokers))
offset := rand.Uint64() % size
step := int64(0)
for idx := uint64(0); idx < size; idx++ {
invoker := invokers[(idx+offset)%size]
weight := loadbalance.GetWeight(invoker, invocation)
step = gcdInt(step, weight)
iwrrp.current.push(&iwrrEntry{
invoker: invoker,
weight: weight,
})
}
iwrrp.step = step

return iwrrp
}

func (iwrr *interleavedweightedRoundRobin) Pick(invocation protocol.Invocation) protocol.Invoker {
iwrr.mu.Lock()
defer iwrr.mu.Unlock()

if iwrr.current.empty() {
iwrr.current, iwrr.next = iwrr.next, iwrr.current
}

entry := iwrr.current.pop()
entry.weight -= iwrr.step

if entry.weight > 0 {
iwrr.current.push(entry)
} else {
weight := loadbalance.GetWeight(entry.invoker, invocation)
if weight < 0 {
weight = 0
}
entry.weight = weight
iwrr.next.push(entry)
}

return entry.invoker
}

func gcdInt(a, b int64) int64 {
for b != 0 {
a, b = b, a%b
}
return a
}
50 changes: 50 additions & 0 deletions cluster/loadbalance/interleavedweightedroundrobin/loadbalance.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package iwrr

import (
"dubbo.apache.org/dubbo-go/v3/cluster/loadbalance"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/protocol"
)

func init() {
extension.SetLoadbalance(constant.LoadBalanceKeyInterleavedWeightedRoundRobin, newInterleavedWeightedRoundRobinBalance)
}

type interleavedWeightedRoundRobinBalance struct{}

// newInterleavedWeightedRoundRobinBalance returns a interleaved weighted round robin load balance.
func newInterleavedWeightedRoundRobinBalance() loadbalance.LoadBalance {
return &interleavedWeightedRoundRobinBalance{}
}

// Select gets invoker based on interleaved weighted round robine load balancing strategy
func (lb *interleavedWeightedRoundRobinBalance) Select(invokers []protocol.Invoker, invocation protocol.Invocation) protocol.Invoker {
count := len(invokers)
if count == 0 {
return nil
}
if count == 1 {
return invokers[0]
}

iwrrp := NewInterleavedweightedRoundRobin(invokers, invocation)
return iwrrp.Pick(invocation)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package iwrr

import (
"fmt"
"testing"

"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/invocation"
"github.com/stretchr/testify/assert"
)

func TestIWrrRoundRobinSelect(t *testing.T) {
loadBalance := newInterleavedWeightedRoundRobinBalance()

var invokers []protocol.Invoker

url, _ := common.NewURL(fmt.Sprintf("dubbo://%s:%d/org.apache.demo.HelloService",
constant.LocalHostValue, constant.DefaultPort))
invokers = append(invokers, protocol.NewBaseInvoker(url))
i := loadBalance.Select(invokers, &invocation.RPCInvocation{})
assert.True(t, i.GetURL().URLEqual(url))

for i := 1; i < 10; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/org.apache.demo.HelloService", i))
invokers = append(invokers, protocol.NewBaseInvoker(url))
}
loadBalance.Select(invokers, &invocation.RPCInvocation{})
}

func TestIWrrRoundRobinByWeight(t *testing.T) {
loadBalance := newInterleavedWeightedRoundRobinBalance()

var invokers []protocol.Invoker
loop := 10
for i := 1; i <= loop; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/org.apache.demo.HelloService?weight=%v", i, i))
invokers = append(invokers, protocol.NewBaseInvoker(url))
}

loop = (1 + loop) * loop / 2
selected := make(map[protocol.Invoker]int)

for i := 1; i <= loop; i++ {
invoker := loadBalance.Select(invokers, &invocation.RPCInvocation{})
selected[invoker]++
}

sum := 0
for _, value := range selected {
sum += value
}

assert.Equal(t, loop, sum)
}
80 changes: 80 additions & 0 deletions cluster/loadbalance/loadbalance_benchmarks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package loadbalance_test

import (
"fmt"
"testing"

"dubbo.apache.org/dubbo-go/v3/cluster/loadbalance"
_ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/consistenthashing"
_ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/interleavedweightedroundrobin"
_ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/leastactive"
_ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/p2c"
_ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/random"
_ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/ringhash"
_ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/roundrobin"
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/invocation"
)

func Generate() []protocol.Invoker {
var invokers []protocol.Invoker
for i := 1; i < 256; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/org.apache.demo.HelloService", i))
invokers = append(invokers, protocol.NewBaseInvoker(url))
}
return invokers
}

func Benchloadbalace(b *testing.B, lb loadbalance.LoadBalance) {
b.Helper()
invokers := Generate()
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
lb.Select(invokers, &invocation.RPCInvocation{})
}
}

func BenchmarkRoudrobinLoadbalace(b *testing.B) {
Benchloadbalace(b, extension.GetLoadbalance(constant.LoadBalanceKeyRoundRobin))
}

func BenchmarkLeastativeLoadbalace(b *testing.B) {
Benchloadbalace(b, extension.GetLoadbalance(constant.LoadBalanceKeyLeastActive))
}

func BenchmarkConsistenthashingLoadbalace(b *testing.B) {
Benchloadbalace(b, extension.GetLoadbalance(constant.LoadBalanceKeyConsistentHashing))
}

func BenchmarkP2CLoadbalace(b *testing.B) {
Benchloadbalace(b, extension.GetLoadbalance(constant.LoadBalanceKeyP2C))
}

func BenchmarkInterleavedWeightedRoundRobinLoadbalace(b *testing.B) {
Benchloadbalace(b, extension.GetLoadbalance(constant.LoadBalanceKeyInterleavedWeightedRoundRobin))
}

func BenchmarkRandomLoadbalace(b *testing.B) {
Benchloadbalace(b, extension.GetLoadbalance(constant.LoadBalanceKeyRandom))
}
13 changes: 7 additions & 6 deletions common/constant/loadbalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
package constant

const (
LoadBalanceKeyConsistentHashing = "consistenthashing"
LoadBalanceKeyLeastActive = "leastactive"
LoadBalanceKeyRandom = "random"
LoadBalanceKeyRoundRobin = "roundrobin"
LoadBalanceKeyP2C = "p2c"
LoadXDSRingHash = "xdsringhash"
LoadBalanceKeyConsistentHashing = "consistenthashing"
LoadBalanceKeyLeastActive = "leastactive"
LoadBalanceKeyRandom = "random"
LoadBalanceKeyRoundRobin = "roundrobin"
LoadBalanceKeyP2C = "p2c"
LoadXDSRingHash = "xdsringhash"
LoadBalanceKeyInterleavedWeightedRoundRobin = "interleavedweightedroundrobin"
)

0 comments on commit 19d1da0

Please sign in to comment.