diff --git a/server/conn.go b/server/conn.go index d6d947ae..b4911c52 100644 --- a/server/conn.go +++ b/server/conn.go @@ -16,7 +16,7 @@ func newKcpConn(server *Server, session *kcp.UDPSession) *Conn { c := &Conn{ ctx: server.ctx, connection: &connection{ - packets: make(chan *connPacket, 1024*10), + packets: make(chan *connPacket, DefaultConnectionChannelSize), server: server, remoteAddr: session.RemoteAddr(), ip: session.RemoteAddr().String(), @@ -39,7 +39,7 @@ func newGNetConn(server *Server, conn gnet.Conn) *Conn { c := &Conn{ ctx: server.ctx, connection: &connection{ - packets: make(chan *connPacket, 1024*10), + packets: make(chan *connPacket, DefaultConnectionChannelSize), server: server, remoteAddr: conn.RemoteAddr(), ip: conn.RemoteAddr().String(), @@ -62,7 +62,7 @@ func newWebsocketConn(server *Server, ws *websocket.Conn, ip string) *Conn { c := &Conn{ ctx: server.ctx, connection: &connection{ - packets: make(chan *connPacket, 1024*10), + packets: make(chan *connPacket, DefaultConnectionChannelSize), server: server, remoteAddr: ws.RemoteAddr(), ip: ip, @@ -82,7 +82,7 @@ func newGatewayConn(conn *Conn, connId string) *Conn { c := &Conn{ //ctx: server.ctx, connection: &connection{ - packets: make(chan *connPacket, 1024*10), + packets: make(chan *connPacket, DefaultConnectionChannelSize), server: conn.server, data: map[any]any{}, }, @@ -98,7 +98,7 @@ func NewEmptyConn(server *Server) *Conn { c := &Conn{ ctx: server.ctx, connection: &connection{ - packets: make(chan *connPacket, 1024*10), + packets: make(chan *connPacket, DefaultConnectionChannelSize), server: server, remoteAddr: &net.TCPAddr{}, ip: "0.0.0.0:0", diff --git a/server/constants.go b/server/constants.go index f2119c8b..3ef2107a 100644 --- a/server/constants.go +++ b/server/constants.go @@ -25,6 +25,7 @@ const ( DefaultMessageChannelSize = 1024 * 1024 DefaultAsyncPoolSize = 256 DefaultWebsocketReadDeadline = 30 * time.Second + DefaultConnectionChannelSize = 1024 * 10 ) const ( diff --git a/server/event.go b/server/event.go index e81e988a..3191413b 100644 --- a/server/event.go +++ b/server/event.go @@ -171,6 +171,12 @@ func (slf *event) OnStartFinishEvent() { return true }) }, "StartFinishEvent") + if slf.Server.limitLife > 0 { + go func() { + time.Sleep(slf.Server.limitLife) + slf.Shutdown() + }() + } } // RegConnectionClosedEvent 在连接关闭后将立刻执行被注册的事件处理函数 diff --git a/server/options.go b/server/options.go index 209e0cb1..3a104f8c 100644 --- a/server/options.go +++ b/server/options.go @@ -41,6 +41,28 @@ type runtime struct { websocketReadDeadline time.Duration // websocket连接超时时间 websocketCompression int // websocket压缩等级 websocketWriteCompression bool // websocket写入压缩 + limitLife time.Duration // 限制最大生命周期 + connMessageChannelSize int // 连接消息通道大小 +} + +// WithConnMessageChannelSize 通过指定连接消息通道大小的方式创建服务器 +// - 足够大的消息通道可以确保连接在写入消息时不至于阻塞 +// - 默认值为 DefaultConnectionChannelSize +func WithConnMessageChannelSize(size int) Option { + return func(srv *Server) { + if size <= 0 { + size = DefaultConnectionChannelSize + } + srv.connMessageChannelSize = size + } +} + +// WithLimitLife 通过限制最大生命周期的方式创建服务器 +// - 通常用于测试服务器,服务器将在到达最大生命周期时自动关闭 +func WithLimitLife(t time.Duration) Option { + return func(srv *Server) { + srv.limitLife = t + } } // WithWebsocketWriteCompression 通过数据写入压缩的方式创建Websocket服务器 diff --git a/server/server.go b/server/server.go index ebee8bcb..639cbbdd 100644 --- a/server/server.go +++ b/server/server.go @@ -31,7 +31,11 @@ import ( // New 根据特定网络类型创建一个服务器 func New(network Network, options ...Option) *Server { server := &Server{ - runtime: &runtime{messagePoolSize: DefaultMessageBufferSize, messageChannelSize: DefaultMessageChannelSize}, + runtime: &runtime{ + messagePoolSize: DefaultMessageBufferSize, + messageChannelSize: DefaultMessageChannelSize, + connMessageChannelSize: DefaultConnectionChannelSize, + }, option: &option{}, network: network, online: concurrent.NewBalanceMap[string, *Conn](), diff --git a/server/server_example_test.go b/server/server_example_test.go index 8e6c512e..a7c2cb21 100644 --- a/server/server_example_test.go +++ b/server/server_example_test.go @@ -6,16 +6,22 @@ import ( ) func ExampleNew() { - srv := server.New(server.NetworkWebsocket, - server.WithDeadlockDetect(time.Second*5), - server.WithPProf("/debug/pprof"), - ) - + srv := server.New(server.NetworkWebsocket, server.WithLimitLife(time.Millisecond)) srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte) { conn.Write(packet) }) + if err := srv.Run(":9999"); err != nil { + panic(err) + } + + // Output: +} - go func() { time.Sleep(1 * time.Second); srv.Shutdown() }() +func ExampleServer_Run() { + srv := server.New(server.NetworkWebsocket, server.WithLimitLife(time.Millisecond)) + srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte) { + conn.Write(packet) + }) if err := srv.Run(":9999"); err != nil { panic(err) } diff --git a/server/server_test.go b/server/server_test.go index a85dfed1..bd6a5e55 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -42,7 +42,7 @@ func TestNewClient(t *testing.T) { for i := 0; i < 1000; i++ { id := i fmt.Println("启动", i+1) - cli := client.NewWebsocket("ws://127.0.0.1:9999") + cli := client.NewWebsocket("ws://127.0.0.1:8888") cli.RegConnectionReceivePacketEvent(func(conn *client.Client, wst int, packet []byte) { fmt.Println("收到", id+1, string(packet)) })