From 19d1da0a066bd135369eb2ce8611444fb3ad6514 Mon Sep 17 00:00:00 2001 From: dongjiang Date: Fri, 29 Dec 2023 10:49:04 +0800 Subject: [PATCH] feat: interleaved weighted round-robin load balance (#2405) * dongjiang, add interleaved weighted roundrobin loadbalance Signed-off-by: dongjiang1989 * add benchmarks test case Signed-off-by: dongjiang1989 * fix unittest case name Signed-off-by: dongjiang1989 --------- Signed-off-by: dongjiang1989 --- .../interleavedweightedroundrobin/doc.go | 19 +++ .../interleavedweightedroundrobin/iwrr.go | 130 ++++++++++++++++++ .../loadbalance.go | 50 +++++++ .../loadbalance_test.go | 73 ++++++++++ .../loadbalance_benchmarks_test.go | 80 +++++++++++ common/constant/loadbalance.go | 13 +- 6 files changed, 359 insertions(+), 6 deletions(-) create mode 100644 cluster/loadbalance/interleavedweightedroundrobin/doc.go create mode 100644 cluster/loadbalance/interleavedweightedroundrobin/iwrr.go create mode 100644 cluster/loadbalance/interleavedweightedroundrobin/loadbalance.go create mode 100644 cluster/loadbalance/interleavedweightedroundrobin/loadbalance_test.go create mode 100644 cluster/loadbalance/loadbalance_benchmarks_test.go diff --git a/cluster/loadbalance/interleavedweightedroundrobin/doc.go b/cluster/loadbalance/interleavedweightedroundrobin/doc.go new file mode 100644 index 0000000000..2f5433c135 --- /dev/null +++ b/cluster/loadbalance/interleavedweightedroundrobin/doc.go @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package leastactive implements LeastActive load balance strategy. +package iwrr diff --git a/cluster/loadbalance/interleavedweightedroundrobin/iwrr.go b/cluster/loadbalance/interleavedweightedroundrobin/iwrr.go new file mode 100644 index 0000000000..e1d78a3704 --- /dev/null +++ b/cluster/loadbalance/interleavedweightedroundrobin/iwrr.go @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package iwrr + +import ( + "math/rand" + "sync" + + "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance" + "dubbo.apache.org/dubbo-go/v3/protocol" +) + +type iwrrEntry struct { + weight int64 + invoker protocol.Invoker + + next *iwrrEntry +} + +type iwrrQueue struct { + head *iwrrEntry + tail *iwrrEntry +} + +func NewIwrrQueue() *iwrrQueue { + return &iwrrQueue{} +} + +func (item *iwrrQueue) push(entry *iwrrEntry) { + entry.next = nil + tail := item.tail + item.tail = entry + if tail == nil { + item.head = entry + } else { + tail.next = entry + } +} + +func (item *iwrrQueue) pop() *iwrrEntry { + head := item.head + next := head.next + head.next = nil + item.head = next + if next == nil { + item.tail = nil + } + return head +} + +func (item *iwrrQueue) empty() bool { + return item.head == nil +} + +// InterleavedweightedRoundRobin struct +type interleavedweightedRoundRobin struct { + current *iwrrQueue + next *iwrrQueue + step int64 + mu sync.Mutex +} + +func NewInterleavedweightedRoundRobin(invokers []protocol.Invoker, invocation protocol.Invocation) *interleavedweightedRoundRobin { + iwrrp := new(interleavedweightedRoundRobin) + iwrrp.current = NewIwrrQueue() + iwrrp.next = NewIwrrQueue() + + size := uint64(len(invokers)) + offset := rand.Uint64() % size + step := int64(0) + for idx := uint64(0); idx < size; idx++ { + invoker := invokers[(idx+offset)%size] + weight := loadbalance.GetWeight(invoker, invocation) + step = gcdInt(step, weight) + iwrrp.current.push(&iwrrEntry{ + invoker: invoker, + weight: weight, + }) + } + iwrrp.step = step + + return iwrrp +} + +func (iwrr *interleavedweightedRoundRobin) Pick(invocation protocol.Invocation) protocol.Invoker { + iwrr.mu.Lock() + defer iwrr.mu.Unlock() + + if iwrr.current.empty() { + iwrr.current, iwrr.next = iwrr.next, iwrr.current + } + + entry := iwrr.current.pop() + entry.weight -= iwrr.step + + if entry.weight > 0 { + iwrr.current.push(entry) + } else { + weight := loadbalance.GetWeight(entry.invoker, invocation) + if weight < 0 { + weight = 0 + } + entry.weight = weight + iwrr.next.push(entry) + } + + return entry.invoker +} + +func gcdInt(a, b int64) int64 { + for b != 0 { + a, b = b, a%b + } + return a +} diff --git a/cluster/loadbalance/interleavedweightedroundrobin/loadbalance.go b/cluster/loadbalance/interleavedweightedroundrobin/loadbalance.go new file mode 100644 index 0000000000..20f9c3c200 --- /dev/null +++ b/cluster/loadbalance/interleavedweightedroundrobin/loadbalance.go @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package iwrr + +import ( + "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance" + "dubbo.apache.org/dubbo-go/v3/common/constant" + "dubbo.apache.org/dubbo-go/v3/common/extension" + "dubbo.apache.org/dubbo-go/v3/protocol" +) + +func init() { + extension.SetLoadbalance(constant.LoadBalanceKeyInterleavedWeightedRoundRobin, newInterleavedWeightedRoundRobinBalance) +} + +type interleavedWeightedRoundRobinBalance struct{} + +// newInterleavedWeightedRoundRobinBalance returns a interleaved weighted round robin load balance. +func newInterleavedWeightedRoundRobinBalance() loadbalance.LoadBalance { + return &interleavedWeightedRoundRobinBalance{} +} + +// Select gets invoker based on interleaved weighted round robine load balancing strategy +func (lb *interleavedWeightedRoundRobinBalance) Select(invokers []protocol.Invoker, invocation protocol.Invocation) protocol.Invoker { + count := len(invokers) + if count == 0 { + return nil + } + if count == 1 { + return invokers[0] + } + + iwrrp := NewInterleavedweightedRoundRobin(invokers, invocation) + return iwrrp.Pick(invocation) +} diff --git a/cluster/loadbalance/interleavedweightedroundrobin/loadbalance_test.go b/cluster/loadbalance/interleavedweightedroundrobin/loadbalance_test.go new file mode 100644 index 0000000000..9e926adf38 --- /dev/null +++ b/cluster/loadbalance/interleavedweightedroundrobin/loadbalance_test.go @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package iwrr + +import ( + "fmt" + "testing" + + "dubbo.apache.org/dubbo-go/v3/common" + "dubbo.apache.org/dubbo-go/v3/common/constant" + "dubbo.apache.org/dubbo-go/v3/protocol" + "dubbo.apache.org/dubbo-go/v3/protocol/invocation" + "github.com/stretchr/testify/assert" +) + +func TestIWrrRoundRobinSelect(t *testing.T) { + loadBalance := newInterleavedWeightedRoundRobinBalance() + + var invokers []protocol.Invoker + + url, _ := common.NewURL(fmt.Sprintf("dubbo://%s:%d/org.apache.demo.HelloService", + constant.LocalHostValue, constant.DefaultPort)) + invokers = append(invokers, protocol.NewBaseInvoker(url)) + i := loadBalance.Select(invokers, &invocation.RPCInvocation{}) + assert.True(t, i.GetURL().URLEqual(url)) + + for i := 1; i < 10; i++ { + url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/org.apache.demo.HelloService", i)) + invokers = append(invokers, protocol.NewBaseInvoker(url)) + } + loadBalance.Select(invokers, &invocation.RPCInvocation{}) +} + +func TestIWrrRoundRobinByWeight(t *testing.T) { + loadBalance := newInterleavedWeightedRoundRobinBalance() + + var invokers []protocol.Invoker + loop := 10 + for i := 1; i <= loop; i++ { + url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/org.apache.demo.HelloService?weight=%v", i, i)) + invokers = append(invokers, protocol.NewBaseInvoker(url)) + } + + loop = (1 + loop) * loop / 2 + selected := make(map[protocol.Invoker]int) + + for i := 1; i <= loop; i++ { + invoker := loadBalance.Select(invokers, &invocation.RPCInvocation{}) + selected[invoker]++ + } + + sum := 0 + for _, value := range selected { + sum += value + } + + assert.Equal(t, loop, sum) +} diff --git a/cluster/loadbalance/loadbalance_benchmarks_test.go b/cluster/loadbalance/loadbalance_benchmarks_test.go new file mode 100644 index 0000000000..c1d27b7de4 --- /dev/null +++ b/cluster/loadbalance/loadbalance_benchmarks_test.go @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package loadbalance_test + +import ( + "fmt" + "testing" + + "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance" + _ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/consistenthashing" + _ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/interleavedweightedroundrobin" + _ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/leastactive" + _ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/p2c" + _ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/random" + _ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/ringhash" + _ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/roundrobin" + "dubbo.apache.org/dubbo-go/v3/common" + "dubbo.apache.org/dubbo-go/v3/common/constant" + "dubbo.apache.org/dubbo-go/v3/common/extension" + "dubbo.apache.org/dubbo-go/v3/protocol" + "dubbo.apache.org/dubbo-go/v3/protocol/invocation" +) + +func Generate() []protocol.Invoker { + var invokers []protocol.Invoker + for i := 1; i < 256; i++ { + url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/org.apache.demo.HelloService", i)) + invokers = append(invokers, protocol.NewBaseInvoker(url)) + } + return invokers +} + +func Benchloadbalace(b *testing.B, lb loadbalance.LoadBalance) { + b.Helper() + invokers := Generate() + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + lb.Select(invokers, &invocation.RPCInvocation{}) + } +} + +func BenchmarkRoudrobinLoadbalace(b *testing.B) { + Benchloadbalace(b, extension.GetLoadbalance(constant.LoadBalanceKeyRoundRobin)) +} + +func BenchmarkLeastativeLoadbalace(b *testing.B) { + Benchloadbalace(b, extension.GetLoadbalance(constant.LoadBalanceKeyLeastActive)) +} + +func BenchmarkConsistenthashingLoadbalace(b *testing.B) { + Benchloadbalace(b, extension.GetLoadbalance(constant.LoadBalanceKeyConsistentHashing)) +} + +func BenchmarkP2CLoadbalace(b *testing.B) { + Benchloadbalace(b, extension.GetLoadbalance(constant.LoadBalanceKeyP2C)) +} + +func BenchmarkInterleavedWeightedRoundRobinLoadbalace(b *testing.B) { + Benchloadbalace(b, extension.GetLoadbalance(constant.LoadBalanceKeyInterleavedWeightedRoundRobin)) +} + +func BenchmarkRandomLoadbalace(b *testing.B) { + Benchloadbalace(b, extension.GetLoadbalance(constant.LoadBalanceKeyRandom)) +} diff --git a/common/constant/loadbalance.go b/common/constant/loadbalance.go index dde8443379..09af051b45 100644 --- a/common/constant/loadbalance.go +++ b/common/constant/loadbalance.go @@ -18,10 +18,11 @@ package constant const ( - LoadBalanceKeyConsistentHashing = "consistenthashing" - LoadBalanceKeyLeastActive = "leastactive" - LoadBalanceKeyRandom = "random" - LoadBalanceKeyRoundRobin = "roundrobin" - LoadBalanceKeyP2C = "p2c" - LoadXDSRingHash = "xdsringhash" + LoadBalanceKeyConsistentHashing = "consistenthashing" + LoadBalanceKeyLeastActive = "leastactive" + LoadBalanceKeyRandom = "random" + LoadBalanceKeyRoundRobin = "roundrobin" + LoadBalanceKeyP2C = "p2c" + LoadXDSRingHash = "xdsringhash" + LoadBalanceKeyInterleavedWeightedRoundRobin = "interleavedweightedroundrobin" )