Skip to content

Commit

Permalink
Merge pull request #30 from guowei-gong/feature/interleave-weight-rou…
Browse files Browse the repository at this point in the history
…nd-robin

added iwrr dispatch feature
  • Loading branch information
dobyte authored Dec 29, 2024
2 parents 00baed9 + 9275eda commit cb881c5
Show file tree
Hide file tree
Showing 14 changed files with 411 additions and 1 deletion.
1 change: 1 addition & 0 deletions cluster/gate/gate.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ func (g *Gate) registerServiceInstance() {
Kind: cluster.Gate.String(),
Alias: g.opts.name,
State: g.getState().String(),
Weight: g.opts.weight,
Endpoint: g.linker.Endpoint().String(),
}

Expand Down
13 changes: 13 additions & 0 deletions cluster/gate/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ const (
defaultName = "gate" // 默认名称
defaultAddr = ":0" // 连接器监听地址
defaultTimeout = 3 * time.Second // 默认超时时间
defaultWeight = 1 // 默认权重
)

const (
defaultIDKey = "etc.cluster.gate.id"
defaultNameKey = "etc.cluster.gate.name"
defaultAddrKey = "etc.cluster.gate.addr"
defaultTimeoutKey = "etc.cluster.gate.timeout"
defaultWeightKey = "etc.cluster.gate.weight"
)

type Option func(o *options)
Expand All @@ -39,6 +41,7 @@ type options struct {
name string // 实例名称
addr string // 监听地址
timeout time.Duration // RPC调用超时时间
weight int // 权重
server network.Server // 网关服务器
locator locate.Locator // 用户定位器
registry registry.Registry // 服务注册器
Expand All @@ -50,6 +53,7 @@ func defaultOptions() *options {
name: defaultName,
addr: defaultAddr,
timeout: defaultTimeout,
weight: defaultWeight,
}

if id := etc.Get(defaultIDKey).String(); id != "" {
Expand All @@ -70,6 +74,10 @@ func defaultOptions() *options {
opts.timeout = timeout
}

if weight := etc.Get(defaultWeightKey).Int(); weight > 0 {
opts.weight = weight
}

return opts
}

Expand Down Expand Up @@ -107,3 +115,8 @@ func WithLocator(locator locate.Locator) Option {
func WithRegistry(r registry.Registry) Option {
return func(o *options) { o.registry = r }
}

// WithWeight 设置权重
func WithWeight(weight int) Option {
return func(o *options) { o.weight = weight }
}
1 change: 1 addition & 0 deletions cluster/mesh/mesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ func (m *Mesh) registerServiceInstances() {
Kind: cluster.Mesh.String(),
Alias: m.opts.name,
State: m.getState().String(),
Weight: m.opts.weight,
Endpoint: m.transporter.Endpoint().String(),
Services: make([]string, 0, len(m.services)),
}
Expand Down
13 changes: 13 additions & 0 deletions cluster/mesh/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ const (
defaultName = "mesh" // 默认节点名称
defaultCodec = "proto" // 默认编解码器名称
defaultTimeout = 3 * time.Second // 默认超时时间
defaultWeight = 1 // 默认权重
)

const (
defaultIDKey = "etc.cluster.mesh.id"
defaultNameKey = "etc.cluster.mesh.name"
defaultCodecKey = "etc.cluster.mesh.codec"
defaultTimeoutKey = "etc.cluster.mesh.timeout"
defaultWeightKey = "etc.cluster.mesh.weight"
)

type Option func(o *options)
Expand All @@ -37,6 +39,7 @@ type options struct {
registry registry.Registry // 服务注册器
encryptor crypto.Encryptor // 消息加密器
transporter transport.Transporter // 消息传输器
weight int // 权重
}

func defaultOptions() *options {
Expand All @@ -45,6 +48,7 @@ func defaultOptions() *options {
name: defaultName,
codec: encoding.Invoke(defaultCodec),
timeout: defaultTimeout,
weight: defaultWeight,
}

if id := etc.Get(defaultIDKey).String(); id != "" {
Expand All @@ -65,6 +69,10 @@ func defaultOptions() *options {
opts.timeout = timeout
}

if weight := etc.Get(defaultWeightKey).Int(); weight > 0 {
opts.weight = weight
}

return opts
}

Expand Down Expand Up @@ -107,3 +115,8 @@ func WithEncryptor(encryptor crypto.Encryptor) Option {
func WithTransporter(transporter transport.Transporter) Option {
return func(o *options) { o.transporter = transporter }
}

// WithWeight 设置权重
func WithWeight(weight int) Option {
return func(o *options) { o.weight = weight }
}
2 changes: 2 additions & 0 deletions cluster/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ func (n *Node) registerServiceInstances() {
Routes: routes,
Events: events,
Endpoint: n.linker.Endpoint().String(),
Weight: n.opts.weight,
})

if n.transporter != nil {
Expand All @@ -316,6 +317,7 @@ func (n *Node) registerServiceInstances() {
State: n.getState().String(),
Services: services,
Endpoint: n.transporter.Endpoint().String(),
Weight: n.opts.weight,
})
}

Expand Down
13 changes: 13 additions & 0 deletions cluster/node/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const (
defaultAddr = ":0" // 连接器监听地址
defaultCodec = "proto" // 默认编解码器名称
defaultTimeout = 3 * time.Second // 默认超时时间
defaultWeight = 1 // 默认权重
)

const (
Expand All @@ -25,6 +26,7 @@ const (
defaultAddrKey = "etc.cluster.node.addr"
defaultCodecKey = "etc.cluster.node.codec"
defaultTimeoutKey = "etc.cluster.node.timeout"
defaultWeightKey = "etc.cluster.node.weight"
)

// SchedulingModel 调度模型
Expand All @@ -43,6 +45,7 @@ type options struct {
registry registry.Registry // 服务注册器
encryptor crypto.Encryptor // 消息加密器
transporter transport.Transporter // 消息传输器
weight int // 权重
}

func defaultOptions() *options {
Expand All @@ -52,6 +55,7 @@ func defaultOptions() *options {
addr: defaultAddr,
codec: encoding.Invoke(defaultCodec),
timeout: defaultTimeout,
weight: defaultWeight,
}

if id := etc.Get(defaultIDKey).String(); id != "" {
Expand All @@ -76,6 +80,10 @@ func defaultOptions() *options {
opts.timeout = timeout
}

if weight := etc.Get(defaultWeightKey).Int(); weight > 0 {
opts.weight = weight
}

return opts
}

Expand Down Expand Up @@ -128,3 +136,8 @@ func WithEncryptor(encryptor crypto.Encryptor) Option {
func WithTransporter(transporter transport.Transporter) Option {
return func(o *options) { o.transporter = transporter }
}

// WithWeight 设置权重
func WithWeight(weight int) Option {
return func(o *options) { o.weight = weight }
}
125 changes: 124 additions & 1 deletion internal/dispatcher/abstract.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/dobyte/due/v2/core/endpoint"
"github.com/dobyte/due/v2/errors"
"sync/atomic"
"sync"
)

type serviceEndpoint struct {
Expand All @@ -20,6 +21,25 @@ type abstract struct {
endpoints2 map[string]*serviceEndpoint // 所有端口(包含work、busy、hang、shut状态的实例)
endpoints3 []*serviceEndpoint // 所有端口(包含work、busy状态的实例)
endpoints4 map[string]*serviceEndpoint // 所有端口(包含work、busy状态的实例)
// 加权轮询相关字段
currentQueue *wrrQueue // 当前队列
nextQueue *wrrQueue // 下一个队列
step int // GCD步长
wrrMu sync.Mutex // 加权轮询锁
}

// 加权轮询队列节点
type wrrEntry struct {
weight int // 当前权重
orgWeight int // 原始权重
endpoint *serviceEndpoint
next *wrrEntry
}

// 加权轮询队列
type wrrQueue struct {
head *wrrEntry
tail *wrrEntry
}

// FindEndpoint 查询路由服务端点
Expand All @@ -29,7 +49,7 @@ func (a *abstract) FindEndpoint(insID ...string) (*endpoint.Endpoint, error) {
case RoundRobin:
return a.roundRobinDispatch()
case WeightRoundRobin:
return a.randomDispatch()
return a.weightRoundRobinDispatch()
default:
return a.randomDispatch()
}
Expand Down Expand Up @@ -111,3 +131,106 @@ func (a *abstract) roundRobinDispatch() (*endpoint.Endpoint, error) {

return a.endpoints3[index].endpoint, nil
}

// 加权轮询分配
func (a *abstract) weightRoundRobinDispatch() (*endpoint.Endpoint, error) {
a.wrrMu.Lock()
defer a.wrrMu.Unlock()

// 如果当前队列为空,交换当前队列和下一个队列
if a.currentQueue.isEmpty() {
a.currentQueue, a.nextQueue = a.nextQueue, a.currentQueue
}

// 从当前队列中取出一个节点
entry := a.currentQueue.pop()
if entry == nil {
return nil, errors.ErrNotFoundEndpoint
}

// 减少当前权重
entry.weight -= a.step

// 如果权重大于0,放回当前队列
if entry.weight > 0 {
a.currentQueue.push(entry)
} else {
// 重置权重并放入下一个队列
entry.weight = entry.orgWeight
a.nextQueue.push(entry)
}

return entry.endpoint.endpoint, nil
}

// 初始化 WRR 队列
func (a *abstract) initWRRQueue() {
a.currentQueue = &wrrQueue{}
a.nextQueue = &wrrQueue{}

// 计算最大公约数作为步长
a.step = 0
for _, sep := range a.endpoints4 {
weight := a.dispatcher.instances[sep.insID].Weight
if a.step == 0 {
a.step = weight
} else {
a.step = gcd(a.step, weight)
}

// 创建队列节点
entry := &wrrEntry{
weight: weight,
orgWeight: weight,
endpoint: sep,
}
a.currentQueue.push(entry)
}
}

// 判断队列是否为空
func (q *wrrQueue) isEmpty() bool {
return q.head == nil
}

// 将节点加入队列尾部
func (q *wrrQueue) push(entry *wrrEntry) {
entry.next = nil

if q.tail == nil {
// 空队列
q.head = entry
q.tail = entry
return
}

// 添加到队列尾部
q.tail.next = entry
q.tail = entry
}

// 从队列头部取出节点
func (q *wrrQueue) pop() *wrrEntry {
if q.head == nil {
return nil
}

entry := q.head
q.head = entry.next

if q.head == nil {
// 队列已空
q.tail = nil
}

entry.next = nil
return entry
}

// 计算最大公约数
func gcd(a, b int) int {
for b != 0 {
a, b = b, a%b
}
return a
}
13 changes: 13 additions & 0 deletions internal/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Dispatcher struct {
routes map[int32]*Route
events map[int]*Event
endpoints map[string]*endpoint.Endpoint
instances map[string]*registry.ServiceInstance
}

func NewDispatcher(strategy BalanceStrategy) *Dispatcher {
Expand Down Expand Up @@ -85,6 +86,7 @@ func (d *Dispatcher) ReplaceServices(services ...*registry.ServiceInstance) {
routes := make(map[int32]*Route, len(services))
events := make(map[int]*Event, len(services))
endpoints := make(map[string]*endpoint.Endpoint)
instances := make(map[string]*registry.ServiceInstance, len(services))

log.Debugf("services change: %v", xconv.Json(services))

Expand All @@ -97,6 +99,7 @@ func (d *Dispatcher) ReplaceServices(services ...*registry.ServiceInstance) {
}

endpoints[service.ID] = ep
instances[service.ID] = service

for _, item := range service.Routes {
route, ok := routes[item.ID]
Expand All @@ -121,5 +124,15 @@ func (d *Dispatcher) ReplaceServices(services ...*registry.ServiceInstance) {
d.routes = routes
d.events = events
d.endpoints = endpoints
d.instances = instances

if d.strategy == WeightRoundRobin {
for _, route := range routes {
route.initWRRQueue()
}
for _, event := range events {
event.initWRRQueue()
}
}
d.rw.Unlock()
}
Loading

0 comments on commit cb881c5

Please sign in to comment.