Skip to content

Commit

Permalink
Add interleaved weighted round-robin and alias-method loadbalancer (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
NX-Official authored Mar 8, 2024
1 parent c865c83 commit b645659
Show file tree
Hide file tree
Showing 10 changed files with 625 additions and 6 deletions.
113 changes: 113 additions & 0 deletions cluster/loadbalance/aliasmethod/alias_method.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Package aliasmethod implements alias-method algorithm load balance strategy.
package aliasmethod // weighted random with alias-method algorithm

import (
"math/rand"

"dubbo.apache.org/dubbo-go/v3/cluster/loadbalance"
"dubbo.apache.org/dubbo-go/v3/protocol"
)

type aliasMethodPicker struct {
invokers []protocol.Invoker // Instance

weightSum int64
alias []int
prob []float64
}

func NewAliasMethodPicker(invokers []protocol.Invoker, invocation protocol.Invocation) *aliasMethodPicker {
am := &aliasMethodPicker{
invokers: invokers,
}
am.init(invocation)
return am
}

// Alias Method: https://en.wikipedia.org/wiki/Alias_method
func (am *aliasMethodPicker) init(invocation protocol.Invocation) {
n := len(am.invokers)
weights := make([]int64, n)
am.alias = make([]int, n)
am.prob = make([]float64, n)

totalWeight := int64(0)

scaledProb := make([]float64, n)
small := make([]int, 0, n)
large := make([]int, 0, n)

for i, invoker := range am.invokers {
weight := loadbalance.GetWeight(invoker, invocation)
weights[i] = weight
totalWeight += weight
}
// when invoker weight all zero
if totalWeight <= 0 {
totalWeight = int64(1)
}
am.weightSum = totalWeight

for i, weight := range weights {
scaledProb[i] = float64(weight) * float64(n) / float64(totalWeight)
if scaledProb[i] < 1.0 {
small = append(small, i)
} else {
large = append(large, i)
}
}

for len(small) > 0 && len(large) > 0 {
l := small[len(small)-1]
small = small[:len(small)-1]
g := large[len(large)-1]
large = large[:len(large)-1]

am.prob[l] = scaledProb[l]
am.alias[l] = g

scaledProb[g] -= 1.0 - scaledProb[l]
if scaledProb[g] < 1.0 {
small = append(small, g)
} else {
large = append(large, g)
}
}

for len(large) > 0 {
g := large[len(large)-1]
large = large[:len(large)-1]
am.prob[g] = 1.0
}

for len(small) > 0 {
l := small[len(small)-1]
small = small[:len(small)-1]
am.prob[l] = 1.0
}
}

func (am *aliasMethodPicker) Pick() protocol.Invoker {
i := rand.Intn(len(am.invokers))
if rand.Float64() < am.prob[i] {
return am.invokers[i]
}
return am.invokers[am.alias[i]]
}
21 changes: 21 additions & 0 deletions cluster/loadbalance/aliasmethod/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Package aliasmethod implements alias-method algorithm load balance strategy.
// Alias Method: https://en.wikipedia.org/wiki/Alias_method
// It needs O(n) time and O(n) memory to initialize and O(1) time to generate a random number.
package aliasmethod // weighted random with alias-method algorithm
51 changes: 51 additions & 0 deletions cluster/loadbalance/aliasmethod/loadbalance.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Package aliasmethod implements alias-method algorithm load balance strategy.
package aliasmethod // weighted random with alias-method algorithm

import (
"dubbo.apache.org/dubbo-go/v3/cluster/loadbalance"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/protocol"
)

func init() {
extension.SetLoadbalance(constant.LoadBalanceKeyAliasMethod, newWeightedRandomWithAliasMethodBalance)
}

type weightedRandomWithAliasMethodBalance struct{}

// newWeightedRandomWithAliasMethodBalance returns a loadbalancer using alias-method algorithm..
func newWeightedRandomWithAliasMethodBalance() loadbalance.LoadBalance {
return &weightedRandomWithAliasMethodBalance{}
}

// Select gets invoker based on interleaved weighted round robine load balancing strategy
func (lb *weightedRandomWithAliasMethodBalance) Select(invokers []protocol.Invoker, invocation protocol.Invocation) protocol.Invoker {
count := len(invokers)
if count == 0 {
return nil
}
if count == 1 {
return invokers[0]
}

wramp := NewAliasMethodPicker(invokers, invocation)
return wramp.Pick()
}
73 changes: 73 additions & 0 deletions cluster/loadbalance/aliasmethod/loadbalance_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package aliasmethod

import (
"fmt"
"testing"

"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/invocation"
"github.com/stretchr/testify/assert"
)

func TestWRAMRoundRobinSelect(t *testing.T) {
loadBalance := newWeightedRandomWithAliasMethodBalance()

var invokers []protocol.Invoker

url, _ := common.NewURL(fmt.Sprintf("dubbo://%s:%d/org.apache.demo.HelloService",
constant.LocalHostValue, constant.DefaultPort))
invokers = append(invokers, protocol.NewBaseInvoker(url))
i := loadBalance.Select(invokers, &invocation.RPCInvocation{})
assert.True(t, i.GetURL().URLEqual(url))

for i := 1; i < 10; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/org.apache.demo.HelloService", i))
invokers = append(invokers, protocol.NewBaseInvoker(url))
}
loadBalance.Select(invokers, &invocation.RPCInvocation{})
}

func TestWRAMRoundRobinByWeight(t *testing.T) {
loadBalance := newWeightedRandomWithAliasMethodBalance()

var invokers []protocol.Invoker
loop := 10
for i := 1; i <= loop; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/org.apache.demo.HelloService?weight=%v", i, i))
invokers = append(invokers, protocol.NewBaseInvoker(url))
}

loop = (1 + loop) * loop / 2
selected := make(map[protocol.Invoker]int)

for i := 1; i <= loop; i++ {
invoker := loadBalance.Select(invokers, &invocation.RPCInvocation{})
selected[invoker]++
}

sum := 0
for _, value := range selected {
sum += value
}

assert.Equal(t, loop, sum)
}
21 changes: 21 additions & 0 deletions cluster/loadbalance/iwrr/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Package iwrr implements Interleaved Weighted Round Robin load balance strategy.
// Interleaved Weighted Round Robin: https://en.wikipedia.org/wiki/Weighted_round_robin#Interleaved_WRR
// It needs O(n log maxWeight) time and O(n) memory to initialize and O(1) time to generate a random number.
package iwrr
Loading

0 comments on commit b645659

Please sign in to comment.