diff --git a/README.md b/README.md index 4d109483e..bc8733488 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ # [[中文](README_ZH.md)] -`gnet` is an Event-Loop networking framework that is fast and small. It makes direct [epoll](https://en.wikipedia.org/wiki/Epoll) and [kqueue](https://en.wikipedia.org/wiki/Kqueue) syscalls rather than using the standard Go [net](https://golang.org/pkg/net/) package, and works in a similar manner as [libuv](https://github.com/libuv/libuv) and [libevent](https://github.com/libevent/libevent). +`gnet` is an Event-Loop networking framework that is fast and small. It makes direct [epoll](https://en.wikipedia.org/wiki/Epoll) and [kqueue](https://en.wikipedia.org/wiki/Kqueue) syscalls rather than using the standard Go [net](https://golang.org/pkg/net/) package, and works in a similar manner as [netty](https://github.com/netty/netty) and [libuv](https://github.com/libuv/libuv). The goal of this project is to create a server framework for Go that performs on par with [Redis](http://redis.io) and [Haproxy](http://www.haproxy.org) for packet handling. @@ -219,7 +219,7 @@ Servers can utilize the [SO_REUSEPORT](https://lwn.net/Articles/542629/) option Just use functional options to set up `SO_REUSEPORT` and you can enjoy this feature: ```go -gnet.Serve(events, "tcp://:9000", gnet.WithMulticore(true))) +gnet.Serve(events, "tcp://:9000", gnet.WithMulticore(true), gnet.WithReusePort(true))) ``` # Performance diff --git a/README_ZH.md b/README_ZH.md index 81c33b84b..65eb5e3a2 100644 --- a/README_ZH.md +++ b/README_ZH.md @@ -13,7 +13,7 @@ # [[英文](README.md)] -`gnet` 是一个基于 Event-Loop 事件驱动的高性能和轻量级网络库。这个库直接使用 [epoll](https://en.wikipedia.org/wiki/Epoll) 和 [kqueue](https://en.wikipedia.org/wiki/Kqueue) 系统调用而非标准 Golang 网络包:[net](https://golang.org/pkg/net/) 来构建网络应用,它的工作原理类似两个开源的网络库:[libuv](https://github.com/libuv/libuv) 和 [libevent](https://github.com/libevent/libevent)。 +`gnet` 是一个基于 Event-Loop 事件驱动的高性能和轻量级网络库。这个库直接使用 [epoll](https://en.wikipedia.org/wiki/Epoll) 和 [kqueue](https://en.wikipedia.org/wiki/Kqueue) 系统调用而非标准 Golang 网络包:[net](https://golang.org/pkg/net/) 来构建网络应用,它的工作原理类似两个开源的网络库:[netty](https://github.com/netty/netty) 和 [libuv](https://github.com/libuv/libuv)。 这个项目存在的价值是提供一个在网络包处理方面能和 [Redis](http://redis.io)、[Haproxy](http://www.haproxy.org) 这两个项目具有相近性能的 Go 语言网络服务器框架。 @@ -216,7 +216,7 @@ events.Tick = func() (delay time.Duration, action Action){ 开启这个功能也很简单,使用 functional options 设置一下即可: ```go -gnet.Serve(events, "tcp://:9000", gnet.WithMulticore(true))) +gnet.Serve(events, "tcp://:9000", gnet.WithMulticore(true), gnet.WithReusePort(true))) ``` # 性能测试 diff --git a/acceptor.go b/acceptor.go new file mode 100644 index 000000000..d2037b2e6 --- /dev/null +++ b/acceptor.go @@ -0,0 +1,83 @@ +// Copyright 2019 Andy Pan. All rights reserved. +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file. + +// +build darwin netbsd freebsd openbsd dragonfly linux + +package gnet + +import ( + "net" + "os" + + "github.com/panjf2000/gnet/ringbuffer" + "golang.org/x/sys/unix" +) + +func (svr *server) acceptNewConnection(fd int) error { + nfd, sa, err := unix.Accept(fd) + if err != nil { + if err == unix.EAGAIN { + return nil + } + return err + } + if err := unix.SetNonblock(nfd, true); err != nil { + return err + } + lp := svr.subLoopGroup.next() + c := &conn{ + fd: nfd, + sa: sa, + loop: lp, + inboundBuffer: ringbuffer.New(socketRingBufferSize), + outboundBuffer: ringbuffer.New(socketRingBufferSize), + } + _ = lp.loopOpen(c) + _ = lp.poller.Trigger(func() (err error) { + if err = lp.poller.AddRead(nfd); err == nil { + lp.connections[nfd] = c + return + } + return + }) + return nil +} + +func (ln *listener) close() { + if ln.f != nil { + sniffError(ln.f.Close()) + } + if ln.ln != nil { + sniffError(ln.ln.Close()) + } + if ln.pconn != nil { + sniffError(ln.pconn.Close()) + } + if ln.network == "unix" { + sniffError(os.RemoveAll(ln.addr)) + } +} + +// system takes the net listener and detaches it from it's parent +// event loop, grabs the file descriptor, and makes it non-blocking. +func (ln *listener) system() error { + var err error + switch netln := ln.ln.(type) { + case nil: + switch pconn := ln.pconn.(type) { + case *net.UDPConn: + ln.f, err = pconn.File() + } + case *net.TCPListener: + ln.f, err = netln.File() + case *net.UnixListener: + ln.f, err = netln.File() + } + if err != nil { + ln.close() + return err + } + ln.fd = int(ln.f.Fd()) + return unix.SetNonblock(ln.fd, true) +} diff --git a/connection.go b/connection.go index c83cdf74c..bf0dc685e 100644 --- a/connection.go +++ b/connection.go @@ -15,35 +15,35 @@ import ( ) type conn struct { - fd int // file descriptor - inBuf *ringbuffer.RingBuffer - outBuf *ringbuffer.RingBuffer - sa unix.Sockaddr // remote socket address - opened bool // connection opened event fired - action Action // next user action - ctx interface{} // user-defined context - localAddr net.Addr // local addre - remoteAddr net.Addr // remote addr - loop *loop // connected loop - extra []byte + fd int // file descriptor + sa unix.Sockaddr // remote socket address + ctx interface{} // user-defined context + loop *loop // connected loop + extra []byte // reuse memory of inbound data + opened bool // connection opened event fired + action Action // next user action + localAddr net.Addr // local addre + remoteAddr net.Addr // remote addr + inboundBuffer *ringbuffer.RingBuffer // buffer for data from client + outboundBuffer *ringbuffer.RingBuffer // buffer for data that is ready to write to client } func (c *conn) ReadPair() (top, tail []byte) { - if c.inBuf.IsEmpty() { + if c.inboundBuffer.IsEmpty() { top = c.extra return } - top, _ = c.inBuf.PreReadAll() + top, _ = c.inboundBuffer.PreReadAll() tail = c.extra return } func (c *conn) ReadBytes() []byte { - return c.inBuf.WithBytes(c.extra) + return c.inboundBuffer.WithBytes(c.extra) } func (c *conn) ResetBuffer() { - c.inBuf.Reset() + c.inboundBuffer.Reset() } func (c *conn) AsyncWrite(buf []byte) { @@ -57,24 +57,24 @@ func (c *conn) AsyncWrite(buf []byte) { func (c *conn) open(buf []byte) { n, err := unix.Write(c.fd, buf) if err != nil { - _, _ = c.outBuf.Write(buf) + _, _ = c.outboundBuffer.Write(buf) return } if n < len(buf) { - _, _ = c.outBuf.Write(buf[n:]) + _, _ = c.outboundBuffer.Write(buf[n:]) } } func (c *conn) write(buf []byte) { - if !c.outBuf.IsEmpty() { - _, _ = c.outBuf.Write(buf) + if !c.outboundBuffer.IsEmpty() { + _, _ = c.outboundBuffer.Write(buf) return } n, err := unix.Write(c.fd, buf) if err != nil { if err == unix.EAGAIN { - _, _ = c.outBuf.Write(buf) + _, _ = c.outboundBuffer.Write(buf) _ = c.loop.poller.ModReadWrite(c.fd) return } @@ -82,7 +82,7 @@ func (c *conn) write(buf []byte) { return } if n < len(buf) { - _, _ = c.outBuf.Write(buf[n:]) + _, _ = c.outboundBuffer.Write(buf[n:]) _ = c.loop.poller.ModReadWrite(c.fd) } } @@ -91,6 +91,9 @@ func (c *conn) Context() interface{} { return c.ctx } func (c *conn) SetContext(ctx interface{}) { c.ctx = ctx } func (c *conn) LocalAddr() net.Addr { return c.localAddr } func (c *conn) RemoteAddr() net.Addr { return c.remoteAddr } +func (c *conn) SendTo(buf []byte, sa unix.Sockaddr) { + _ = unix.Sendto(c.fd, buf, 0, sa) +} //func (c *conn) Wake() { // if c.loop != nil { diff --git a/eventloop.go b/eventloop.go index 86ea39b03..6e55eabe2 100644 --- a/eventloop.go +++ b/eventloop.go @@ -11,7 +11,6 @@ import ( "net" "time" - "github.com/panjf2000/gnet/internal" "github.com/panjf2000/gnet/netpoll" "github.com/panjf2000/gnet/ringbuffer" "golang.org/x/sys/unix" @@ -19,10 +18,10 @@ import ( type loop struct { idx int // loop index in the server loops list - poller *netpoll.Poller // epoll or kqueue + svr *server // server in loop packet []byte // read packet buffer + poller *netpoll.Poller // epoll or kqueue connections map[int]*conn // loop connections fd -> conn - svr *server } func (lp *loop) loopRun() { @@ -32,29 +31,13 @@ func (lp *loop) loopRun() { go lp.loopTicker() } - _ = lp.poller.Polling(func(fd int, job internal.Job) error { - if fd == 0 { - return job() - } - if c, ok := lp.connections[fd]; ok { - switch { - case !c.opened: - return lp.loopOpened(c) - case c.outBuf.Length() > 0: - return lp.loopWrite(c) - default: - return lp.loopRead(c) - } - } else { - return lp.loopAccept(fd) - } - }) + _ = lp.poller.Polling(lp.handleEvent) } func (lp *loop) loopAccept(fd int) error { if fd == lp.svr.ln.fd { if lp.svr.ln.pconn != nil { - return lp.loopUDPRead(fd) + return lp.loopUDPIn(fd) } nfd, sa, err := unix.Accept(fd) if err != nil { @@ -67,10 +50,10 @@ func (lp *loop) loopAccept(fd int) error { return err } c := &conn{fd: nfd, - sa: sa, - inBuf: ringbuffer.New(connRingBufferSize), - outBuf: ringbuffer.New(connRingBufferSize), - loop: lp, + sa: sa, + inboundBuffer: ringbuffer.New(socketRingBufferSize), + outboundBuffer: ringbuffer.New(socketRingBufferSize), + loop: lp, } if err = lp.poller.AddReadWrite(c.fd); err == nil { lp.connections[c.fd] = c @@ -81,7 +64,7 @@ func (lp *loop) loopAccept(fd int) error { return nil } -func (lp *loop) loopOpened(c *conn) error { +func (lp *loop) loopOpen(c *conn) error { c.opened = true c.localAddr = lp.svr.ln.lnaddr c.remoteAddr = netpoll.SockaddrToTCPOrUnixAddr(c.sa) @@ -96,13 +79,13 @@ func (lp *loop) loopOpened(c *conn) error { if len(out) > 0 { c.open(out) } - if c.outBuf.Length() != 0 { + if !c.outboundBuffer.IsEmpty() { _ = lp.poller.AddWrite(c.fd) } return lp.handleAction(c) } -func (lp *loop) loopRead(c *conn) error { +func (lp *loop) loopIn(c *conn) error { n, err := unix.Read(c.fd, lp.packet) if n == 0 || err != nil { if err == unix.EAGAIN { @@ -116,15 +99,15 @@ func (lp *loop) loopRead(c *conn) error { if len(out) > 0 { c.write(out) } else if action != DataRead { - _, _ = c.inBuf.Write(c.extra) + _, _ = c.inboundBuffer.Write(c.extra) } return lp.handleAction(c) } -func (lp *loop) loopWrite(c *conn) error { +func (lp *loop) loopOut(c *conn) error { lp.svr.eventHandler.PreWrite() - top, tail := c.outBuf.PreReadAll() + top, tail := c.outboundBuffer.PreReadAll() n, err := unix.Write(c.fd, top) if err != nil { if err == unix.EAGAIN { @@ -132,7 +115,7 @@ func (lp *loop) loopWrite(c *conn) error { } return lp.loopCloseConn(c, err) } - c.outBuf.Advance(n) + c.outboundBuffer.Advance(n) if len(top) == n && tail != nil { n, err = unix.Write(c.fd, tail) if err != nil { @@ -141,10 +124,10 @@ func (lp *loop) loopWrite(c *conn) error { } return lp.loopCloseConn(c, err) } - c.outBuf.Advance(n) + c.outboundBuffer.Advance(n) } - if c.outBuf.Length() == 0 { + if c.outboundBuffer.IsEmpty() { _ = lp.poller.ModRead(c.fd) } return nil @@ -234,7 +217,7 @@ func (lp *loop) handleAction(c *conn) error { } } -func (lp *loop) loopUDPRead(fd int) error { +func (lp *loop) loopUDPIn(fd int) error { n, sa, err := unix.Recvfrom(fd, lp.packet, 0) if err != nil || n == 0 { return nil @@ -255,15 +238,16 @@ func (lp *loop) loopUDPRead(fd int) error { sa6 = *sa } c := &conn{ - localAddr: lp.svr.ln.lnaddr, - remoteAddr: netpoll.SockaddrToUDPAddr(&sa6), - inBuf: ringbuffer.New(connRingBufferSize), + fd: fd, + localAddr: lp.svr.ln.lnaddr, + remoteAddr: netpoll.SockaddrToUDPAddr(&sa6), + inboundBuffer: ringbuffer.New(socketRingBufferSize), } - _, _ = c.inBuf.Write(lp.packet[:n]) + _, _ = c.inboundBuffer.Write(lp.packet[:n]) out, action := lp.svr.eventHandler.React(c) if len(out) > 0 { lp.svr.eventHandler.PreWrite() - sniffError(unix.Sendto(fd, out, 0, sa)) + c.SendTo(out, sa) } switch action { case Shutdown: diff --git a/eventloop_group.go b/eventloop_group.go index 960ab0e9e..97c0168ef 100644 --- a/eventloop_group.go +++ b/eventloop_group.go @@ -27,7 +27,6 @@ type IEventLoopGroup interface { } type eventLoopGroup struct { - //loadBalance LoadBalance nextLoopIndex int eventLoops []*loop size int @@ -41,7 +40,6 @@ func (g *eventLoopGroup) register(lp *loop) { // Built-in load-balance algorithm is Round-Robin. // TODO: support more load-balance algorithms. func (g *eventLoopGroup) next() (lp *loop) { - //return g.nextByRoundRobin() lp = g.eventLoops[g.nextLoopIndex] g.nextLoopIndex++ if g.nextLoopIndex >= g.size { @@ -50,15 +48,6 @@ func (g *eventLoopGroup) next() (lp *loop) { return } -//func (g *eventLoopGroup) nextByRoundRobin() (lp *loop) { -// lp = g.eventLoops[g.nextLoopIndex] -// g.nextLoopIndex++ -// if g.nextLoopIndex >= g.size { -// g.nextLoopIndex = 0 -// } -// return -//} - func (g *eventLoopGroup) iterate(f func(int, *loop) bool) { for i, lp := range g.eventLoops { if !f(i, lp) { diff --git a/gnet.go b/gnet.go index c57cf4954..7d4102b82 100644 --- a/gnet.go +++ b/gnet.go @@ -7,6 +7,7 @@ package gnet import ( "errors" + "log" "net" "os" "strings" @@ -20,7 +21,7 @@ var ( ErrClosing = errors.New("closing") ) -const connRingBufferSize = 1024 +const socketRingBufferSize = 1024 // Action is an action that occurs after the completion of an event. type Action int @@ -28,10 +29,13 @@ type Action int const ( // None indicates that no action should occur following an event. None Action = iota + // DataRead indicates data in buffer has been read. DataRead + // Close closes the connection. Close + // Shutdown shutdowns the server. Shutdown ) @@ -44,9 +48,11 @@ type Server struct { // it will run the server with single thread. The number of threads in the server will be automatically // assigned to the value of runtime.NumCPU(). Multicore bool + // The addrs parameter is an array of listening addresses that align // with the addr strings passed to the Serve function. Addr net.Addr + // NumLoops is the number of loops that the server is using. NumLoops int } @@ -55,20 +61,28 @@ type Server struct { type Conn interface { // Context returns a user-defined context. Context() interface{} + // SetContext sets a user-defined context. SetContext(interface{}) + // LocalAddr is the connection's local socket address. LocalAddr() net.Addr + // RemoteAddr is the connection's remote peer address. RemoteAddr() net.Addr + // Wake triggers a React event for this connection. //Wake() + // ReadPair reads all data from ring buffer. ReadPair() ([]byte, []byte) + // ReadBytes reads all data and return a new slice. ReadBytes() []byte + // ResetBuffer resets the ring buffer. ResetBuffer() + // AyncWrite writes data asynchronously. AsyncWrite(buf []byte) } @@ -80,21 +94,26 @@ type EventHandler interface { // OnInitComplete fires when the server can accept connections. The server // parameter has information and various utilities. OnInitComplete(server Server) (action Action) + // OnOpened fires when a new connection has opened. // The info parameter has information about the connection such as // it's local and remote address. // Use the out return value to write data to the connection. // The opts return value is used to set connection options. OnOpened(c Conn) (out []byte, action Action) + // OnClosed fires when a connection has closed. // The err parameter is the last known connection error. OnClosed(c Conn, err error) (action Action) + // PreWrite fires just before any data is written to any client socket. PreWrite() + // React fires when a connection sends the server data. // The in parameter is the incoming data. // Use the out return value to write data to the connection. React(c Conn) (out []byte, action Action) + // Tick fires immediately after the server starts and will fire again // following the duration specified by the delay return value. Tick() (delay time.Duration, action Action) @@ -214,3 +233,9 @@ type listener struct { network string addr string } + +func sniffError(err error) { + if err != nil { + log.Println(err) + } +} diff --git a/gnet_unix.go b/gnet_server.go similarity index 68% rename from gnet_unix.go rename to gnet_server.go index f7e6aaad4..9e89ef99f 100644 --- a/gnet_unix.go +++ b/gnet_server.go @@ -9,27 +9,24 @@ package gnet import ( "log" - "net" - "os" "runtime" "sync" "time" "github.com/panjf2000/gnet/netpoll" - "golang.org/x/sys/unix" ) type server struct { - eventHandler EventHandler // user eventHandler - mainLoop *loop - loopGroup IEventLoopGroup - loopGroupSize int // number of loops - opts *Options - ln *listener // all the listeners - wg sync.WaitGroup // loop close waitgroup - cond *sync.Cond // shutdown signaler - tch chan time.Duration // ticker channel - once sync.Once // make sure only signalShutdown once + ln *listener // all the listeners + wg sync.WaitGroup // loop close waitgroup + tch chan time.Duration // ticker channel + opts *Options // options with server + once sync.Once // make sure only signalShutdown once + cond *sync.Cond // shutdown signaler + mainLoop *loop // main loop for accepting connections + eventHandler EventHandler // user eventHandler + subLoopGroup IEventLoopGroup // loops for handling events + subLoopGroupSize int // number of loops } // waitForShutdown waits for a signal to shutdown @@ -49,7 +46,7 @@ func (svr *server) signalShutdown() { } func (svr *server) startLoops() { - svr.loopGroup.iterate(func(i int, lp *loop) bool { + svr.subLoopGroup.iterate(func(i int, lp *loop) bool { svr.wg.Add(1) go func() { lp.loopRun() @@ -60,14 +57,14 @@ func (svr *server) startLoops() { } func (svr *server) closeLoops() { - svr.loopGroup.iterate(func(i int, lp *loop) bool { + svr.subLoopGroup.iterate(func(i int, lp *loop) bool { _ = lp.poller.Close() return true }) } func (svr *server) startReactors() { - svr.loopGroup.iterate(func(i int, lp *loop) bool { + svr.subLoopGroup.iterate(func(i int, lp *loop) bool { svr.wg.Add(1) go func() { svr.activateSubReactor(lp) @@ -89,12 +86,12 @@ func (svr *server) activateLoops(numLoops int) error { svr: svr, } _ = lp.poller.AddRead(svr.ln.fd) - svr.loopGroup.register(lp) + svr.subLoopGroup.register(lp) } else { return err } } - svr.loopGroupSize = svr.loopGroup.len() + svr.subLoopGroupSize = svr.subLoopGroup.len() // Start loops in background svr.startLoops() return nil @@ -110,12 +107,12 @@ func (svr *server) activateReactors(numLoops int) error { connections: make(map[int]*conn), svr: svr, } - svr.loopGroup.register(lp) + svr.subLoopGroup.register(lp) } else { return err } } - svr.loopGroupSize = svr.loopGroup.len() + svr.subLoopGroupSize = svr.subLoopGroup.len() // Start sub reactors... svr.startReactors() @@ -151,7 +148,7 @@ func (svr *server) stop() { svr.waitForShutdown() // Notify all loops to close by closing all listeners - svr.loopGroup.iterate(func(i int, lp *loop) bool { + svr.subLoopGroup.iterate(func(i int, lp *loop) bool { sniffError(lp.poller.Trigger(func() error { return ErrClosing })) @@ -168,7 +165,7 @@ func (svr *server) stop() { svr.wg.Wait() // Close loops and all outstanding connections - svr.loopGroup.iterate(func(i int, lp *loop) bool { + svr.subLoopGroup.iterate(func(i int, lp *loop) bool { for _, c := range lp.connections { sniffError(lp.loopCloseConn(c, nil)) } @@ -193,7 +190,7 @@ func serve(eventHandler EventHandler, listener *listener, options *Options) erro svr := new(server) svr.eventHandler = eventHandler svr.ln = listener - svr.loopGroup = new(eventLoopGroup) + svr.subLoopGroup = new(eventLoopGroup) svr.cond = sync.NewCond(&sync.Mutex{}) svr.tch = make(chan time.Duration) svr.opts = options @@ -215,47 +212,3 @@ func serve(eventHandler EventHandler, listener *listener, options *Options) erro return nil } - -func (ln *listener) close() { - if ln.f != nil { - sniffError(ln.f.Close()) - } - if ln.ln != nil { - sniffError(ln.ln.Close()) - } - if ln.pconn != nil { - sniffError(ln.pconn.Close()) - } - if ln.network == "unix" { - sniffError(os.RemoveAll(ln.addr)) - } -} - -// system takes the net listener and detaches it from it's parent -// event loop, grabs the file descriptor, and makes it non-blocking. -func (ln *listener) system() error { - var err error - switch netln := ln.ln.(type) { - case nil: - switch pconn := ln.pconn.(type) { - case *net.UDPConn: - ln.f, err = pconn.File() - } - case *net.TCPListener: - ln.f, err = netln.File() - case *net.UnixListener: - ln.f, err = netln.File() - } - if err != nil { - ln.close() - return err - } - ln.fd = int(ln.f.Fd()) - return unix.SetNonblock(ln.fd, true) -} - -func sniffError(err error) { - if err != nil { - log.Println(err) - } -} diff --git a/gnet_test.go b/gnet_test.go index a50309c42..c16cea1a3 100644 --- a/gnet_test.go +++ b/gnet_test.go @@ -12,7 +12,6 @@ import ( "math/rand" "net" "os" - "strings" "sync" "sync/atomic" "testing" @@ -53,10 +52,10 @@ func TestServe(t *testing.T) { }) //t.Run("unix", func(t *testing.T) { // t.Run("1-loop", func(t *testing.T) { - // testServe("unix", ":9991", false, false, false, 10) + // testServe("unix", "socket9991", false, false, false, 10) // }) // t.Run("N-loop", func(t *testing.T) { - // testServe("unix", ":9992", false, true, false, 10) + // testServe("unix", "socket9992", false, true, false, 10) // }) //}) }) @@ -80,10 +79,10 @@ func TestServe(t *testing.T) { }) //t.Run("unix", func(t *testing.T) { // t.Run("1-loop", func(t *testing.T) { - // testServe("unix", ":9991", true, false, false, 10) + // testServe("unix", "socket9991", true, false, false, 10) // }) // t.Run("N-loop", func(t *testing.T) { - // testServe("unix", ":9992", true, true, false, 10) + // testServe("unix", "socket9992", true, true, false, 10) // }) //}) }) @@ -170,10 +169,9 @@ func testServe(network, addr string, reuseport, multicore, async bool, nclients var err error ts := &testServer{network: network, addr: addr, multicore: multicore, async: async, nclients: nclients} if network == "unix" { - socket := strings.Replace(addr, ":", "socket", 1) - _ = os.RemoveAll(socket) - defer os.RemoveAll(socket) - err = Serve(ts, network+"://"+socket, WithMulticore(multicore), WithTicker(true), WithTCPKeepAlive(time.Minute*5)) + _ = os.RemoveAll(addr) + defer os.RemoveAll(addr) + err = Serve(ts, network+"://"+addr, WithMulticore(multicore), WithTicker(true), WithTCPKeepAlive(time.Minute*5)) } else { if reuseport { err = Serve(ts, network+"://"+addr, WithMulticore(multicore), WithReusePort(true), WithTicker(true), WithTCPKeepAlive(time.Minute*5)) diff --git a/loop_bsd.go b/loop_bsd.go new file mode 100644 index 000000000..3a4dd2226 --- /dev/null +++ b/loop_bsd.go @@ -0,0 +1,32 @@ +// Copyright 2019 Andy Pan. All rights reserved. +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file. + +// +build darwin netbsd freebsd openbsd dragonfly + +package gnet + +import ( + "github.com/panjf2000/gnet/internal" + "github.com/panjf2000/gnet/netpoll" +) + +func (lp *loop) handleEvent(fd int, filter int16, job internal.Job) error { + if fd == 0 { + return job() + } + if c, ok := lp.connections[fd]; ok { + if !c.opened { + return lp.loopOpen(c) + } else if filter == netpoll.EVFilterWrite && !c.outboundBuffer.IsEmpty() { + return lp.loopOut(c) + } else if filter == netpoll.EVFilterRead { + return lp.loopIn(c) + } else if filter == netpoll.EVFilterSock { + return lp.loopCloseConn(c, nil) + } + } else { + return lp.loopAccept(fd) + } + return nil +} diff --git a/loop_linux.go b/loop_linux.go new file mode 100644 index 000000000..3feef8ed5 --- /dev/null +++ b/loop_linux.go @@ -0,0 +1,36 @@ +// Copyright 2019 Andy Pan. All rights reserved. +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file. + +// +build linux + +package gnet + +import ( + "github.com/panjf2000/gnet/internal" + "github.com/panjf2000/gnet/netpoll" +) + +func (lp *loop) handleEvent(fd int, ev uint32, job internal.Job) error { + if fd == 0 { + return job() + } + if c, ok := lp.connections[fd]; ok { + if !c.opened { + return lp.loopOpen(c) + } + if ev&netpoll.InEvents != 0 { + if err := lp.loopIn(c); err != nil { + return err + } + } + if ev&netpoll.OutEvents != 0 && !c.outboundBuffer.IsEmpty() { + if err := lp.loopOut(c); err != nil { + return err + } + } + } else { + return lp.loopAccept(fd) + } + return nil +} diff --git a/netpoll/epoll.go b/netpoll/epoll.go index 50fa4b310..8947243b6 100644 --- a/netpoll/epoll.go +++ b/netpoll/epoll.go @@ -8,6 +8,8 @@ package netpoll import ( + "log" + "github.com/panjf2000/gnet/internal" "golang.org/x/sys/unix" ) @@ -58,30 +60,32 @@ func (p *Poller) Trigger(job internal.Job) error { } // Polling ... -func (p *Poller) Polling(iter func(fd int, job internal.Job) error) error { +func (p *Poller) Polling(callback func(fd int, ev uint32, job internal.Job) error) error { el := newEventList(initEvents) - var note bool + var wakenUp bool for { n, err := unix.EpollWait(p.fd, el.events, -1) if err != nil && err != unix.EINTR { - return err + log.Println(err) + continue } for i := 0; i < n; i++ { if fd := int(el.events[i].Fd); fd != p.wfd { - if err := iter(fd, nil); err != nil { + if err := callback(fd, el.events[i].Events, nil); err != nil { return err } } else { + wakenUp = true if _, err := unix.Read(p.wfd, p.wfdBuf); err != nil { - return err + log.Println(err) + continue } - note = true } } - if note { - note = false + if wakenUp { + wakenUp = false if err := p.asyncJobQueue.ForEach(func(job internal.Job) error { - return iter(0, job) + return callback(0, 0, job) }); err != nil { return err } diff --git a/netpoll/epoll_events.go b/netpoll/epoll_events.go index 67dd8886f..16fdcc1fe 100644 --- a/netpoll/epoll_events.go +++ b/netpoll/epoll_events.go @@ -8,6 +8,15 @@ package netpoll import "golang.org/x/sys/unix" +const ( + // ErrEvents ... + ErrEvents = unix.EPOLLERR | unix.EPOLLHUP + // OutEvents ... + OutEvents = unix.EPOLLOUT + // InEvents ... + InEvents = ErrEvents | unix.EPOLLRDHUP | unix.EPOLLIN +) + type eventList struct { size int events []unix.EpollEvent diff --git a/netpoll/events.go b/netpoll/events.go index 9d84e8349..44cb23c60 100644 --- a/netpoll/events.go +++ b/netpoll/events.go @@ -1,3 +1,3 @@ package netpoll -const initEvents = 512 +const initEvents = 1024 diff --git a/netpoll/kqueue.go b/netpoll/kqueue.go index c0221342c..f988cfa52 100644 --- a/netpoll/kqueue.go +++ b/netpoll/kqueue.go @@ -8,6 +8,8 @@ package netpoll import ( + "log" + "github.com/panjf2000/gnet/internal" "golang.org/x/sys/unix" ) @@ -55,27 +57,33 @@ func (p *Poller) Trigger(job internal.Job) error { } // Polling ... -func (p *Poller) Polling(iter func(fd int, job internal.Job) error) error { +func (p *Poller) Polling(callback func(fd int, filter int16, job internal.Job) error) error { el := newEventList(initEvents) - var note bool + var wakenUp bool for { n, err := unix.Kevent(p.fd, nil, el.events, nil) if err != nil && err != unix.EINTR { - return err + log.Println(err) + continue } + var evFilter int16 for i := 0; i < n; i++ { if fd := int(el.events[i].Ident); fd != 0 { - if err := iter(fd, nil); err != nil { + evFilter = el.events[i].Filter + if (el.events[i].Flags&unix.EV_EOF != 0) || (el.events[i].Flags&unix.EV_ERROR != 0) { + evFilter = EVFilterSock + } + if err := callback(fd, evFilter, nil); err != nil { return err } } else { - note = true + wakenUp = true } } - if note { - note = false + if wakenUp { + wakenUp = false if err := p.asyncJobQueue.ForEach(func(job internal.Job) error { - return iter(0, job) + return callback(0, 0, job) }); err != nil { return err } diff --git a/netpoll/kqueue_events.go b/netpoll/kqueue_events.go index b5b119f45..0ffff5f14 100644 --- a/netpoll/kqueue_events.go +++ b/netpoll/kqueue_events.go @@ -8,6 +8,15 @@ package netpoll import "golang.org/x/sys/unix" +const ( + // EVFilterWrite ... + EVFilterWrite = unix.EVFILT_WRITE + // EVFilterRead ... + EVFilterRead = unix.EVFILT_READ + // EVFilterSock ... + EVFilterSock = -0xd +) + type eventList struct { size int events []unix.Kevent_t diff --git a/netpoll/netpoll_unix.go b/netpoll/netpoll_unix.go index 2e899e3b9..556d8a08a 100644 --- a/netpoll/netpoll_unix.go +++ b/netpoll/netpoll_unix.go @@ -9,7 +9,6 @@ package netpoll import "golang.org/x/sys/unix" - // SetKeepAlive ... func SetKeepAlive(fd, secs int) error { if err := unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_KEEPALIVE, 1); err != nil { diff --git a/reactor.go b/reactor.go deleted file mode 100644 index 1e57393ef..000000000 --- a/reactor.go +++ /dev/null @@ -1,69 +0,0 @@ -// Copyright 2019 Andy Pan. All rights reserved. -// Use of this source code is governed by an MIT-style -// license that can be found in the LICENSE file. - -// +build darwin netbsd freebsd openbsd dragonfly linux - -package gnet - -import ( - "github.com/panjf2000/gnet/internal" - "github.com/panjf2000/gnet/ringbuffer" - "golang.org/x/sys/unix" -) - -func (svr *server) activateMainReactor() { - defer svr.signalShutdown() - - _ = svr.mainLoop.poller.Polling(func(fd int, job internal.Job) error { - if fd == 0 { - return job() - } - nfd, sa, err := unix.Accept(fd) - if err != nil { - if err == unix.EAGAIN { - return nil - } - return err - } - if err := unix.SetNonblock(nfd, true); err != nil { - return err - } - lp := svr.loopGroup.next() - c := &conn{ - fd: nfd, - sa: sa, - loop: lp, - inBuf: ringbuffer.New(connRingBufferSize), - outBuf: ringbuffer.New(connRingBufferSize), - } - _ = lp.loopOpened(c) - _ = lp.poller.Trigger(func() (err error) { - if err = lp.poller.AddRead(nfd); err == nil { - lp.connections[nfd] = c - return - } - return - }) - return nil - }) -} - -func (svr *server) activateSubReactor(lp *loop) { - defer svr.signalShutdown() - - if lp.idx == 0 && svr.opts.Ticker { - go lp.loopTicker() - } - - _ = lp.poller.Polling(func(fd int, job internal.Job) error { - if fd == 0 { - return job() - } - c := lp.connections[fd] - if c.outBuf.IsEmpty() { - return lp.loopRead(c) - } - return lp.loopWrite(c) - }) -} diff --git a/reactor_bsd.go b/reactor_bsd.go new file mode 100644 index 000000000..f41665c5a --- /dev/null +++ b/reactor_bsd.go @@ -0,0 +1,48 @@ +// Copyright 2019 Andy Pan. All rights reserved. +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file. + +// +build darwin netbsd freebsd openbsd dragonfly + +package gnet + +import ( + "github.com/panjf2000/gnet/internal" + "github.com/panjf2000/gnet/netpoll" +) + +func (svr *server) activateMainReactor() { + defer svr.signalShutdown() + + _ = svr.mainLoop.poller.Polling(func(fd int, filter int16, job internal.Job) error { + if fd == 0 { + return job() + } + return svr.acceptNewConnection(fd) + }) +} + +func (svr *server) activateSubReactor(lp *loop) { + defer svr.signalShutdown() + + if lp.idx == 0 && svr.opts.Ticker { + go lp.loopTicker() + } + + _ = lp.poller.Polling(func(fd int, filter int16, job internal.Job) error { + if fd == 0 { + return job() + } + + c := lp.connections[fd] + if filter == netpoll.EVFilterWrite && !c.outboundBuffer.IsEmpty() { + return lp.loopOut(c) + } else if filter == netpoll.EVFilterRead { + return lp.loopIn(c) + } else if filter == netpoll.EVFilterSock { + return lp.loopCloseConn(c, nil) + } + + return nil + }) +} diff --git a/reactor_linux.go b/reactor_linux.go new file mode 100644 index 000000000..1e3937d88 --- /dev/null +++ b/reactor_linux.go @@ -0,0 +1,48 @@ +// Copyright 2019 Andy Pan. All rights reserved. +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file. + +// +build linux + +package gnet + +import ( + "github.com/panjf2000/gnet/internal" + "github.com/panjf2000/gnet/netpoll" +) + +func (svr *server) activateMainReactor() { + defer svr.signalShutdown() + + _ = svr.mainLoop.poller.Polling(func(fd int, ev uint32, job internal.Job) error { + if fd == 0 { + return job() + } + return svr.acceptNewConnection(fd) + }) +} + +func (svr *server) activateSubReactor(lp *loop) { + defer svr.signalShutdown() + + if lp.idx == 0 && svr.opts.Ticker { + go lp.loopTicker() + } + + _ = lp.poller.Polling(func(fd int, ev uint32, job internal.Job) error { + if fd == 0 { + return job() + } + + c := lp.connections[fd] + if !c.outboundBuffer.IsEmpty() { + if ev&netpoll.OutEvents != 0 { + return lp.loopOut(c) + } + } else if ev&netpoll.InEvents != 0 { + return lp.loopIn(c) + } + + return nil + }) +}