Skip to content

Commit

Permalink
feat: server.Server 支持通过 WithShunt 函数对服务器消息进行分流
Browse files Browse the repository at this point in the history
  • Loading branch information
kercylan98 committed Aug 1, 2023
1 parent aef7740 commit c92f16c
Showing 2 changed files with 81 additions and 20 deletions.
20 changes: 20 additions & 0 deletions server/options.go
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ package server

import (
"github.com/gin-contrib/pprof"
"github.com/kercylan98/minotaur/utils/concurrent"
"github.com/kercylan98/minotaur/utils/log"
"github.com/kercylan98/minotaur/utils/timer"
"google.golang.org/grpc"
@@ -229,3 +230,22 @@ func WithPProf(pattern ...string) Option {
pprof.Register(srv.ginServer, pattern...)
}
}

// WithShunt 通过连接数据包分流的方式创建服务器
// - 在分流的情况下,将会使用分流通道处理数据包,而不是使用系统通道,消息的执行将转移到对应的分流通道内进行串行处理,默认情况下所有消息都是串行处理的,适用于例如不同游戏房间并行处理,游戏房间内部消息串行处理的情况
// - channelGenerator:用于生成分流通道的函数
// - shuntMatcher:用于匹配连接的函数,返回值为分流通道的 GUID 和是否允许创建新的分流通道,当返回不允许创建新的分流通道时,将会使用使用默认的系统通道
//
// 将被分流的消息类型(更多类型有待斟酌):
// - MessageTypePacket
func WithShunt(channelGenerator func(guid int64) chan *Message, shuntMatcher func(conn *Conn) (guid int64, allowToCreate bool)) Option {
return func(srv *Server) {
if channelGenerator == nil || shuntMatcher == nil {
log.Warn("WithShunt", log.String("State", "Ignore"), log.String("Reason", "channelGenerator or shuntMatcher is nil"))
return
}
srv.shuntChannels = concurrent.NewBalanceMap[int64, chan *Message]()
srv.channelGenerator = channelGenerator
srv.shuntMatcher = shuntMatcher
}
}
81 changes: 61 additions & 20 deletions server/server.go
Original file line number Diff line number Diff line change
@@ -72,26 +72,29 @@ func New(network Network, options ...Option) *Server {

// Server 网络服务器
type Server struct {
*event // 事件
*runtime // 运行时
*option // 可选项
network Network // 网络类型
addr string // 侦听地址
systemSignal chan os.Signal // 系统信号
online *concurrent.BalanceMap[string, *Conn] // 在线连接
ginServer *gin.Engine // HTTP模式下的路由器
httpServer *http.Server // HTTP模式下的服务器
grpcServer *grpc.Server // GRPC模式下的服务器
gServer *gNet // TCP或UDP模式下的服务器
isRunning bool // 是否正在运行
isShutdown atomic.Bool // 是否已关闭
closeChannel chan struct{} // 关闭信号
ants *ants.Pool // 协程池
messagePool *concurrent.Pool[*Message] // 消息池
messageChannel chan *Message // 消息管道
multiple *MultipleServer // 多服务器模式下的服务器
multipleRuntimeErrorChan chan error // 多服务器模式下的运行时错误
runMode RunMode // 运行模式
*event // 事件
*runtime // 运行时
*option // 可选项
network Network // 网络类型
addr string // 侦听地址
systemSignal chan os.Signal // 系统信号
online *concurrent.BalanceMap[string, *Conn] // 在线连接
ginServer *gin.Engine // HTTP模式下的路由器
httpServer *http.Server // HTTP模式下的服务器
grpcServer *grpc.Server // GRPC模式下的服务器
gServer *gNet // TCP或UDP模式下的服务器
isRunning bool // 是否正在运行
isShutdown atomic.Bool // 是否已关闭
closeChannel chan struct{} // 关闭信号
ants *ants.Pool // 协程池
messagePool *concurrent.Pool[*Message] // 消息池
messageChannel chan *Message // 消息管道
multiple *MultipleServer // 多服务器模式下的服务器
multipleRuntimeErrorChan chan error // 多服务器模式下的运行时错误
runMode RunMode // 运行模式
shuntChannels *concurrent.BalanceMap[int64, chan *Message] // 分流管道
channelGenerator func(guid int64) chan *Message // 消息管道生成器
shuntMatcher func(conn *Conn) (guid int64, allowToCreate bool) // 分流管道匹配器
}

// Run 使用特定地址运行服务器
@@ -417,6 +420,14 @@ func (slf *Server) shutdown(err error) {
slf.messagePool.Close()
slf.messageChannel = nil
}
if slf.shuntChannels != nil {
slf.shuntChannels.Range(func(key int64, c chan *Message) bool {
close(c)
return false
})
slf.shuntChannels.Clear()
slf.shuntChannels = nil
}
if slf.grpcServer != nil && slf.isRunning {
slf.grpcServer.GracefulStop()
}
@@ -469,12 +480,42 @@ func (slf *Server) HttpRouter() gin.IRouter {
return slf.ginServer
}

// ShuntChannelFreed 释放分流通道
func (slf *Server) ShuntChannelFreed(channelGuid int64) {
if slf.shuntChannels == nil {
return
}
channel, exist := slf.shuntChannels.GetExist(channelGuid)
if exist {
close(channel)
slf.shuntChannels.Delete(channelGuid)
}
}

// pushMessage 向服务器中写入特定类型的消息,需严格遵守消息属性要求
func (slf *Server) pushMessage(message *Message) {
if slf.messagePool.IsClose() {
slf.messagePool.Release(message)
return
}
if slf.shuntChannels != nil && (message.t == MessageTypePacket) {
conn := message.attrs[0].(*Conn)
channelGuid, allowToCreate := slf.shuntMatcher(conn)
channel, exist := slf.shuntChannels.GetExist(channelGuid)
if !exist && allowToCreate {
channel = slf.channelGenerator(channelGuid)
slf.shuntChannels.Set(channelGuid, channel)
go func(channel chan *Message) {
for message := range channel {
slf.dispatchMessage(message)
}
}(channel)
}
if channel != nil {
channel <- message
return
}
}
slf.messageChannel <- message
}

0 comments on commit c92f16c

Please sign in to comment.