Skip to content

Commit

Permalink
feat: gateway 数据包支持像普通数据包一样处理,并且支持自定义端点健康评估函数
Browse files Browse the repository at this point in the history
  • Loading branch information
kercylan98 committed Aug 17, 2023
1 parent 6e9a578 commit 3512570
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 38 deletions.
32 changes: 31 additions & 1 deletion server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package server
import (
"github.com/gorilla/websocket"
"github.com/kercylan98/minotaur/utils/concurrent"
"github.com/kercylan98/minotaur/utils/super"
"github.com/panjf2000/gnet"
"github.com/xtaci/kcp-go/v5"
"net"
Expand Down Expand Up @@ -65,6 +66,26 @@ func newWebsocketConn(server *Server, ws *websocket.Conn, ip string) *Conn {
return c
}

// newGatewayConn 创建一个处理网关消息的连接
func newGatewayConn(conn *Conn, connId string) *Conn {
c := &Conn{
server: conn.server,
data: map[any]any{},
}
c.gw = func(packet Packet) {
var gp = GP{
C: connId,
WT: packet.WebsocketType,
D: packet.Data,
T: time.Now().UnixNano(),
}
pd := super.MarshalJSON(&gp)
packet.Data = append(pd, 0xff)
conn.Write(packet)
}
return c
}

// NewEmptyConn 创建一个适用于测试的空连接
func NewEmptyConn(server *Server) *Conn {
c := &Conn{
Expand All @@ -88,6 +109,7 @@ type Conn struct {
ws *websocket.Conn
gn gnet.Conn
kcp *kcp.UDPSession
gw func(packet Packet)
data map[any]any
mutex sync.Mutex
packetPool *concurrent.Pool[*connPacket]
Expand All @@ -96,7 +118,7 @@ type Conn struct {

// IsEmpty 是否是空连接
func (slf *Conn) IsEmpty() bool {
return slf.ws == nil && slf.gn == nil && slf.kcp == nil
return slf.ws == nil && slf.gn == nil && slf.kcp == nil && slf.gw == nil
}

// Reuse 重用连接
Expand Down Expand Up @@ -179,6 +201,10 @@ func (slf *Conn) IsWebsocket() bool {
// Write 向连接中写入数据
// - messageType: websocket模式中指定消息类型
func (slf *Conn) Write(packet Packet) {
if slf.gw != nil {
slf.gw(packet)
return
}
packet = slf.server.OnConnectionWritePacketBeforeEvent(slf, packet)
if slf.packetPool == nil {
return
Expand All @@ -194,6 +220,10 @@ func (slf *Conn) Write(packet Packet) {
// WriteWithCallback 与 Write 相同,但是会在写入完成后调用 callback
// - 当 callback 为 nil 时,与 Write 相同
func (slf *Conn) WriteWithCallback(packet Packet, callback func(err error)) {
if slf.gw != nil {
slf.gw(packet)
return
}
packet = slf.server.OnConnectionWritePacketBeforeEvent(slf, packet)
if slf.packetPool == nil {
return
Expand Down
38 changes: 26 additions & 12 deletions server/gateway/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,38 @@ package gateway
import (
"github.com/kercylan98/minotaur/server"
"github.com/kercylan98/minotaur/server/client"
"github.com/kercylan98/minotaur/utils/super"
"time"
)

// NewEndpoint 创建网关端点
func NewEndpoint(name, address string) *Endpoint {
func NewEndpoint(name, address string, options ...EndpointOption) *Endpoint {
endpoint := &Endpoint{
client: client.NewWebsocket(address),
name: name,
address: address,
}
for _, option := range options {
option(endpoint)
}
if endpoint.evaluator == nil {
endpoint.evaluator = func(costUnixNano float64) float64 {
return 1 / (1 + 1.5*time.Duration(costUnixNano).Seconds())
}
}
endpoint.client.RegConnectionClosedEvent(endpoint.onConnectionClosed)
endpoint.client.RegConnectionReceivePacketEvent(endpoint.onConnectionReceivePacket)
return endpoint
}

// Endpoint 网关端点
type Endpoint struct {
client *client.Websocket // 端点客户端
name string // 端点名称
address string // 端点地址
state float64 // 端点健康值(0为不可用,越高越优)
offline bool // 离线
client *client.Websocket // 端点客户端
name string // 端点名称
address string // 端点地址
state float64 // 端点健康值(0为不可用,越高越优)
offline bool // 离线
evaluator func(costUnixNano float64) float64 // 端点健康值评估函数
}

// Offline 离线
Expand All @@ -35,9 +45,9 @@ func (slf *Endpoint) Offline() {
// Connect 连接端点
func (slf *Endpoint) Connect() {
for {
var now = time.Now()
cur := time.Now().UnixNano()
if err := slf.client.Run(); err == nil {
slf.state = 1 - (time.Since(now).Seconds() / 10)
slf.state = slf.evaluator(float64(time.Now().UnixNano() - cur))
break
}
time.Sleep(100 * time.Millisecond)
Expand All @@ -56,9 +66,13 @@ func (slf *Endpoint) onConnectionClosed(conn *client.Websocket, err any) {
}
}

// onConnectionReceivePacket 解说到来自端点的数据包事件
// onConnectionReceivePacket 接收到来自端点的数据包事件
func (slf *Endpoint) onConnectionReceivePacket(conn *client.Websocket, packet server.Packet) {
p := UnpackGatewayPacket(packet)
packet.Data = p.Data
conn.GetData(p.ConnID).(*server.Conn).Write(packet)
var gp server.GP
if err := super.UnmarshalJSON(packet.Data[:len(packet.Data)-1], &gp); err != nil {
panic(err)
}
cur := time.Now().UnixNano()
slf.state = slf.evaluator(float64(cur - gp.T))
conn.GetData(gp.C).(*server.Conn).Write(server.NewWSPacket(gp.WT, gp.D))
}
11 changes: 11 additions & 0 deletions server/gateway/endpoint_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package gateway

// EndpointOption 网关端点选项
type EndpointOption func(endpoint *Endpoint)

// WithEndpointStateEvaluator 设置端点健康值评估函数
func WithEndpointStateEvaluator(evaluator func(costUnixNano float64) float64) EndpointOption {
return func(endpoint *Endpoint) {
endpoint.evaluator = evaluator
}
}
29 changes: 7 additions & 22 deletions server/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,27 +48,12 @@ func (slf *Gateway) onConnectionOpened(srv *server.Server, conn *server.Conn) {

// onConnectionReceivePacket 连接接收数据包事件
func (slf *Gateway) onConnectionReceivePacket(srv *server.Server, conn *server.Conn, packet server.Packet) {
conn.GetData("endpoint").(*Endpoint).Write(PackGatewayPacket(conn.GetID(), packet.WebsocketType, packet.Data))
}

// PackGatewayPacket 打包网关数据包
func PackGatewayPacket(connID string, websocketType int, data []byte) server.Packet {
var gatewayPacket = Packet{
ConnID: connID,
WebsocketType: websocketType,
Data: data,
}
return server.Packet{
WebsocketType: websocketType,
Data: super.MarshalJSON(&gatewayPacket),
}
}

// UnpackGatewayPacket 解包网关数据包
func UnpackGatewayPacket(packet server.Packet) Packet {
var gatewayPacket Packet
if err := super.UnmarshalJSON(packet.Data, &gatewayPacket); err != nil {
panic(err)
var gp = server.GP{
C: conn.GetID(),
WT: packet.WebsocketType,
D: packet.Data,
}
return gatewayPacket
pd := super.MarshalJSON(&gp)
packet.Data = append(pd, 0xff)
conn.GetData("endpoint").(*Endpoint).Write(packet)
}
3 changes: 1 addition & 2 deletions server/gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ import (
func TestGateway_RunEndpointServer(t *testing.T) {
srv := server.New(server.NetworkWebsocket)
srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet server.Packet) {
p := gateway2.UnpackGatewayPacket(packet)
fmt.Println("endpoint receive packet", string(p.Data))
fmt.Println("endpoint receive packet", string(packet.Data))
conn.Write(packet)
})
if err := srv.Run(":8889"); err != nil {
Expand Down
8 changes: 8 additions & 0 deletions server/gp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package server

type GP struct {
C string // 连接 ID
WT int // WebSocket 类型
D []byte // 数据
T int64 // 时间戳
}
13 changes: 12 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,8 +631,19 @@ func (slf *Server) dispatchMessage(msg *Message) {
var conn = attrs[0].(*Conn)
var packet = attrs[1].([]byte)
var wst = int(packet[len(packet)-1])
var ct = packet[len(packet)-2]
if ct == 0xff {
var gp GP
if err := super.UnmarshalJSON(packet[:len(packet)-2], &gp); err != nil {
panic(err)
}
packet = gp.D
conn = newGatewayConn(conn, gp.C)
} else {
packet = packet[:len(packet)-1]
}
if !slf.OnConnectionPacketPreprocessEvent(conn, packet, func(newPacket []byte) { packet = newPacket }) {
slf.OnConnectionReceivePacketEvent(conn, Packet{Data: packet[:len(packet)-1], WebsocketType: wst})
slf.OnConnectionReceivePacketEvent(conn, Packet{Data: packet, WebsocketType: wst})
}
case MessageTypeError:
err, action := attrs[0].(error), attrs[1].(MessageErrorAction)
Expand Down

0 comments on commit 3512570

Please sign in to comment.