From 8a59a51d248d7de19e8ed92e8084c725ed1bbfba Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Sun, 9 Feb 2020 12:54:38 +0800 Subject: [PATCH] trifle: rename structs, functions and variables with more rational names --- acceptor_unix.go | 12 ++--- acceptor_windows.go | 14 +++--- connection_unix.go | 12 ++--- connection_windows.go | 12 ++--- eventloop_group.go | 22 ++++----- eventloop_unix.go | 112 +++++++++++++++++++++--------------------- eventloop_windows.go | 110 ++++++++++++++++++++--------------------- loop_bsd.go | 12 ++--- loop_linux.go | 10 ++-- reactor_bsd.go | 16 +++--- reactor_linux.go | 16 +++--- server_unix.go | 42 ++++++++-------- server_windows.go | 18 +++---- 13 files changed, 204 insertions(+), 204 deletions(-) diff --git a/acceptor_unix.go b/acceptor_unix.go index 1624e8a73..7a0a51106 100644 --- a/acceptor_unix.go +++ b/acceptor_unix.go @@ -19,14 +19,14 @@ func (svr *server) acceptNewConnection(fd int) error { if err := unix.SetNonblock(nfd, true); err != nil { return err } - lp := svr.subLoopGroup.next() - c := newTCPConn(nfd, lp, sa) - _ = lp.poller.Trigger(func() (err error) { - if err = lp.poller.AddRead(nfd); err != nil { + el := svr.subLoopGroup.next() + c := newTCPConn(nfd, el, sa) + _ = el.poller.Trigger(func() (err error) { + if err = el.poller.AddRead(nfd); err != nil { return } - lp.connections[nfd] = c - err = lp.loopOpen(c) + el.connections[nfd] = c + err = el.loopOpen(c) return }) return nil diff --git a/acceptor_windows.go b/acceptor_windows.go index 5b637aac7..573bae0d3 100644 --- a/acceptor_windows.go +++ b/acceptor_windows.go @@ -28,8 +28,8 @@ func (svr *server) listenerRun() { buf := bytebuffer.Get() _, _ = buf.Write(packet[:n]) - lp := svr.subLoopGroup.next() - lp.ch <- &udpIn{newUDPConn(lp, svr.ln.lnaddr, addr, buf)} + el := svr.subLoopGroup.next() + el.ch <- &udpIn{newUDPConn(el, svr.ln.lnaddr, addr, buf)} } else { // Accept TCP socket. conn, e := svr.ln.ln.Accept() @@ -37,21 +37,21 @@ func (svr *server) listenerRun() { err = e return } - lp := svr.subLoopGroup.next() - c := newTCPConn(conn, lp) - lp.ch <- c + el := svr.subLoopGroup.next() + c := newTCPConn(conn, el) + el.ch <- c go func() { var packet [0xFFFF]byte for { n, err := c.conn.Read(packet[:]) if err != nil { _ = c.conn.SetReadDeadline(time.Time{}) - lp.ch <- &stderr{c, err} + el.ch <- &stderr{c, err} return } buf := bytebuffer.Get() _, _ = buf.Write(packet[:n]) - lp.ch <- &tcpIn{c, buf} + el.ch <- &tcpIn{c, buf} } }() } diff --git a/connection_unix.go b/connection_unix.go index 047c3ecae..461d388e4 100644 --- a/connection_unix.go +++ b/connection_unix.go @@ -21,7 +21,7 @@ type conn struct { fd int // file descriptor sa unix.Sockaddr // remote socket address ctx interface{} // user-defined context - loop *loop // connected loop + loop *eventloop // connected event-loop buffer []byte // reuse memory of inbound data as a temporary buffer codec ICodec // codec for TCP opened bool // connection opened event fired @@ -32,12 +32,12 @@ type conn struct { outboundBuffer *ringbuffer.RingBuffer // buffer for data that is ready to write to client } -func newTCPConn(fd int, lp *loop, sa unix.Sockaddr) *conn { +func newTCPConn(fd int, el *eventloop, sa unix.Sockaddr) *conn { return &conn{ fd: fd, sa: sa, - loop: lp, - codec: lp.codec, + loop: el, + codec: el.codec, inboundBuffer: prb.Get(), outboundBuffer: prb.Get(), } @@ -58,11 +58,11 @@ func (c *conn) releaseTCP() { c.byteBuffer = nil } -func newUDPConn(fd int, lp *loop, sa unix.Sockaddr) *conn { +func newUDPConn(fd int, el *eventloop, sa unix.Sockaddr) *conn { return &conn{ fd: fd, sa: sa, - localAddr: lp.svr.ln.lnaddr, + localAddr: el.svr.ln.lnaddr, remoteAddr: netpoll.SockaddrToUDPAddr(sa), } } diff --git a/connection_windows.go b/connection_windows.go index 02535701a..18d58387a 100644 --- a/connection_windows.go +++ b/connection_windows.go @@ -36,7 +36,7 @@ type udpIn struct { type stdConn struct { ctx interface{} // user-defined context conn net.Conn // original connection - loop *loop // owner loop + loop *eventloop // owner event-loop done int32 // 0: attached, 1: closed buffer *bytebuffer.ByteBuffer // reuse memory of inbound data as a temporary buffer codec ICodec // codec for TCP @@ -46,11 +46,11 @@ type stdConn struct { inboundBuffer *ringbuffer.RingBuffer // buffer for data from client } -func newTCPConn(conn net.Conn, lp *loop) *stdConn { +func newTCPConn(conn net.Conn, el *eventloop) *stdConn { return &stdConn{ conn: conn, - loop: lp, - codec: lp.codec, + loop: el, + codec: el.codec, inboundBuffer: prb.Get(), } } @@ -65,9 +65,9 @@ func (c *stdConn) releaseTCP() { c.buffer = nil } -func newUDPConn(lp *loop, localAddr, remoteAddr net.Addr, buf *bytebuffer.ByteBuffer) *stdConn { +func newUDPConn(el *eventloop, localAddr, remoteAddr net.Addr, buf *bytebuffer.ByteBuffer) *stdConn { return &stdConn{ - loop: lp, + loop: el, localAddr: localAddr, remoteAddr: remoteAddr, buffer: buf, diff --git a/eventloop_group.go b/eventloop_group.go index c4bfcafdf..2f5b863ca 100644 --- a/eventloop_group.go +++ b/eventloop_group.go @@ -21,28 +21,28 @@ package gnet // IEventLoopGroup represents a set of event-loops. type ( IEventLoopGroup interface { - register(*loop) - next() *loop - iterate(func(int, *loop) bool) + register(*eventloop) + next() *eventloop + iterate(func(int, *eventloop) bool) len() int } eventLoopGroup struct { nextLoopIndex int - eventLoops []*loop + eventLoops []*eventloop size int } ) -func (g *eventLoopGroup) register(lp *loop) { - g.eventLoops = append(g.eventLoops, lp) +func (g *eventLoopGroup) register(el *eventloop) { + g.eventLoops = append(g.eventLoops, el) g.size++ } // Built-in load-balance algorithm is Round-Robin. // TODO: support more load-balance algorithms. -func (g *eventLoopGroup) next() (lp *loop) { - lp = g.eventLoops[g.nextLoopIndex] +func (g *eventLoopGroup) next() (el *eventloop) { + el = g.eventLoops[g.nextLoopIndex] g.nextLoopIndex++ if g.nextLoopIndex >= g.size { g.nextLoopIndex = 0 @@ -50,9 +50,9 @@ func (g *eventLoopGroup) next() (lp *loop) { return } -func (g *eventLoopGroup) iterate(f func(int, *loop) bool) { - for i, lp := range g.eventLoops { - if !f(i, lp) { +func (g *eventLoopGroup) iterate(f func(int, *eventloop) bool) { + for i, el := range g.eventLoops { + if !f(i, el) { break } } diff --git a/eventloop_unix.go b/eventloop_unix.go index d3f93980e..9576cb708 100644 --- a/eventloop_unix.go +++ b/eventloop_unix.go @@ -16,7 +16,7 @@ import ( "golang.org/x/sys/unix" ) -type loop struct { +type eventloop struct { idx int // loop index in the server loops list svr *server // server in loop codec ICodec // codec for TCP @@ -26,25 +26,25 @@ type loop struct { eventHandler EventHandler // user eventHandler } -func (lp *loop) loopRun() { +func (el *eventloop) loopRun() { defer func() { - if lp.idx == 0 && lp.svr.opts.Ticker { - close(lp.svr.ticktock) + if el.idx == 0 && el.svr.opts.Ticker { + close(el.svr.ticktock) } - lp.svr.signalShutdown() + el.svr.signalShutdown() }() - if lp.idx == 0 && lp.svr.opts.Ticker { - go lp.loopTicker() + if el.idx == 0 && el.svr.opts.Ticker { + go el.loopTicker() } - sniffError(lp.poller.Polling(lp.handleEvent)) + sniffError(el.poller.Polling(el.handleEvent)) } -func (lp *loop) loopAccept(fd int) error { - if fd == lp.svr.ln.fd { - if lp.svr.ln.pconn != nil { - return lp.loopUDPIn(fd) +func (el *eventloop) loopAccept(fd int) error { + if fd == el.svr.ln.fd { + if el.svr.ln.pconn != nil { + return el.loopReadUDP(fd) } nfd, sa, err := unix.Accept(fd) if err != nil { @@ -56,24 +56,24 @@ func (lp *loop) loopAccept(fd int) error { if err := unix.SetNonblock(nfd, true); err != nil { return err } - c := newTCPConn(nfd, lp, sa) - if err = lp.poller.AddRead(c.fd); err == nil { - lp.connections[c.fd] = c - return lp.loopOpen(c) + c := newTCPConn(nfd, el, sa) + if err = el.poller.AddRead(c.fd); err == nil { + el.connections[c.fd] = c + return el.loopOpen(c) } return err } return nil } -func (lp *loop) loopOpen(c *conn) error { +func (el *eventloop) loopOpen(c *conn) error { c.opened = true - c.localAddr = lp.svr.ln.lnaddr + c.localAddr = el.svr.ln.lnaddr c.remoteAddr = netpoll.SockaddrToTCPOrUnixAddr(c.sa) - out, action := lp.eventHandler.OnOpened(c) - if lp.svr.opts.TCPKeepAlive > 0 { - if _, ok := lp.svr.ln.ln.(*net.TCPListener); ok { - _ = netpoll.SetKeepAlive(c.fd, int(lp.svr.opts.TCPKeepAlive/time.Second)) + out, action := el.eventHandler.OnOpened(c) + if el.svr.opts.TCPKeepAlive > 0 { + if _, ok := el.svr.ln.ln.(*net.TCPListener); ok { + _ = netpoll.SetKeepAlive(c.fd, int(el.svr.opts.TCPKeepAlive/time.Second)) } } if out != nil { @@ -81,27 +81,27 @@ func (lp *loop) loopOpen(c *conn) error { } if !c.outboundBuffer.IsEmpty() { - _ = lp.poller.AddWrite(c.fd) + _ = el.poller.AddWrite(c.fd) } - return lp.handleAction(c, action) + return el.handleAction(c, action) } -func (lp *loop) loopIn(c *conn) error { - n, err := unix.Read(c.fd, lp.packet) +func (el *eventloop) loopRead(c *conn) error { + n, err := unix.Read(c.fd, el.packet) if n == 0 || err != nil { if err == unix.EAGAIN { return nil } - return lp.loopCloseConn(c, err) + return el.loopCloseConn(c, err) } - c.buffer = lp.packet[:n] + c.buffer = el.packet[:n] outBuffer := bytebuffer.Get() for inFrame, _ := c.read(); inFrame != nil; inFrame, _ = c.read() { - out, action := lp.eventHandler.React(inFrame, c) + out, action := el.eventHandler.React(inFrame, c) if out != nil { - outFrame, _ := lp.codec.Encode(c, out) + outFrame, _ := el.codec.Encode(c, out) _, _ = outBuffer.Write(outFrame) } switch action { @@ -109,7 +109,7 @@ func (lp *loop) loopIn(c *conn) error { case Close: c.write(outBuffer.Bytes()) bytebuffer.Put(outBuffer) - return lp.loopCloseConn(c, nil) + return el.loopCloseConn(c, nil) case Shutdown: c.write(outBuffer.Bytes()) bytebuffer.Put(outBuffer) @@ -123,8 +123,8 @@ func (lp *loop) loopIn(c *conn) error { return nil } -func (lp *loop) loopOut(c *conn) error { - lp.eventHandler.PreWrite() +func (el *eventloop) loopWrite(c *conn) error { + el.eventHandler.PreWrite() head, tail := c.outboundBuffer.LazyReadAll() n, err := unix.Write(c.fd, head) @@ -132,7 +132,7 @@ func (lp *loop) loopOut(c *conn) error { if err == unix.EAGAIN { return nil } - return lp.loopCloseConn(c, err) + return el.loopCloseConn(c, err) } c.outboundBuffer.Shift(n) @@ -142,21 +142,21 @@ func (lp *loop) loopOut(c *conn) error { if err == unix.EAGAIN { return nil } - return lp.loopCloseConn(c, err) + return el.loopCloseConn(c, err) } c.outboundBuffer.Shift(n) } if c.outboundBuffer.IsEmpty() { - _ = lp.poller.ModRead(c.fd) + _ = el.poller.ModRead(c.fd) } return nil } -func (lp *loop) loopCloseConn(c *conn, err error) error { - if lp.poller.Delete(c.fd) == nil && unix.Close(c.fd) == nil { - delete(lp.connections, c.fd) - switch lp.eventHandler.OnClosed(c, err) { +func (el *eventloop) loopCloseConn(c *conn, err error) error { + if el.poller.Delete(c.fd) == nil && unix.Close(c.fd) == nil { + delete(el.connections, c.fd) + switch el.eventHandler.OnClosed(c, err) { case Shutdown: return errServerShutdown } @@ -165,26 +165,26 @@ func (lp *loop) loopCloseConn(c *conn, err error) error { return nil } -func (lp *loop) loopWake(c *conn) error { - if co, ok := lp.connections[c.fd]; !ok || co != c { +func (el *eventloop) loopWake(c *conn) error { + if co, ok := el.connections[c.fd]; !ok || co != c { return nil // ignore stale wakes. } - out, action := lp.eventHandler.React(nil, c) + out, action := el.eventHandler.React(nil, c) if out != nil { c.write(out) } - return lp.handleAction(c, action) + return el.handleAction(c, action) } -func (lp *loop) loopTicker() { +func (el *eventloop) loopTicker() { var ( delay time.Duration open bool ) for { - if err := lp.poller.Trigger(func() (err error) { - delay, action := lp.eventHandler.Tick() - lp.svr.ticktock <- delay + if err := el.poller.Trigger(func() (err error) { + delay, action := el.eventHandler.Tick() + el.svr.ticktock <- delay switch action { case None: case Shutdown: @@ -194,7 +194,7 @@ func (lp *loop) loopTicker() { }); err != nil { break } - if delay, open = <-lp.svr.ticktock; open { + if delay, open = <-el.svr.ticktock; open { time.Sleep(delay) } else { break @@ -202,12 +202,12 @@ func (lp *loop) loopTicker() { } } -func (lp *loop) handleAction(c *conn, action Action) error { +func (el *eventloop) handleAction(c *conn, action Action) error { switch action { case None: return nil case Close: - return lp.loopCloseConn(c, nil) + return el.loopCloseConn(c, nil) case Shutdown: return errServerShutdown default: @@ -215,15 +215,15 @@ func (lp *loop) handleAction(c *conn, action Action) error { } } -func (lp *loop) loopUDPIn(fd int) error { - n, sa, err := unix.Recvfrom(fd, lp.packet, 0) +func (el *eventloop) loopReadUDP(fd int) error { + n, sa, err := unix.Recvfrom(fd, el.packet, 0) if err != nil || n == 0 { return nil } - c := newUDPConn(fd, lp, sa) - out, action := lp.eventHandler.React(lp.packet[:n], c) + c := newUDPConn(fd, el, sa) + out, action := el.eventHandler.React(el.packet[:n], c) if out != nil { - lp.eventHandler.PreWrite() + el.eventHandler.PreWrite() c.sendTo(out) } switch action { diff --git a/eventloop_windows.go b/eventloop_windows.go index 7b0227878..b351a4e4e 100644 --- a/eventloop_windows.go +++ b/eventloop_windows.go @@ -17,7 +17,7 @@ import ( "github.com/panjf2000/gnet/pool/bytebuffer" ) -type loop struct { +type eventloop struct { ch chan interface{} // command channel idx int // loop index svr *server // server in loop @@ -26,34 +26,34 @@ type loop struct { eventHandler EventHandler // user eventHandler } -func (lp *loop) loopRun() { +func (el *eventloop) loopRun() { var err error defer func() { - if lp.idx == 0 && lp.svr.opts.Ticker { - close(lp.svr.ticktock) + if el.idx == 0 && el.svr.opts.Ticker { + close(el.svr.ticktock) } - lp.svr.signalShutdown(err) - lp.svr.loopWG.Done() - lp.loopEgress() - lp.svr.loopWG.Done() + el.svr.signalShutdown(err) + el.svr.loopWG.Done() + el.loopEgress() + el.svr.loopWG.Done() }() - if lp.idx == 0 && lp.svr.opts.Ticker { - go lp.loopTicker() + if el.idx == 0 && el.svr.opts.Ticker { + go el.loopTicker() } - for v := range lp.ch { + for v := range el.ch { switch v := v.(type) { case error: err = v case *stdConn: - err = lp.loopAccept(v) + err = el.loopAccept(v) case *tcpIn: - err = lp.loopRead(v) + err = el.loopRead(v) case *udpIn: - err = lp.loopReadUDP(v.c) + err = el.loopReadUDP(v.c) case *stderr: - err = lp.loopError(v.c, v.err) + err = el.loopError(v.c, v.err) case wakeReq: - err = lp.loopWake(v.c) + err = el.loopWake(v.c) case func() error: err = v() } @@ -63,35 +63,35 @@ func (lp *loop) loopRun() { } } -func (lp *loop) loopAccept(c *stdConn) error { - lp.connections[c] = true - c.localAddr = lp.svr.ln.lnaddr +func (el *eventloop) loopAccept(c *stdConn) error { + el.connections[c] = true + c.localAddr = el.svr.ln.lnaddr c.remoteAddr = c.conn.RemoteAddr() - out, action := lp.eventHandler.OnOpened(c) + out, action := el.eventHandler.OnOpened(c) if out != nil { - lp.eventHandler.PreWrite() + el.eventHandler.PreWrite() _, _ = c.conn.Write(out) } - if lp.svr.opts.TCPKeepAlive > 0 { + if el.svr.opts.TCPKeepAlive > 0 { if c, ok := c.conn.(*net.TCPConn); ok { _ = c.SetKeepAlive(true) - _ = c.SetKeepAlivePeriod(lp.svr.opts.TCPKeepAlive) + _ = c.SetKeepAlivePeriod(el.svr.opts.TCPKeepAlive) } } - return lp.handleAction(c, action) + return el.handleAction(c, action) } -func (lp *loop) loopRead(ti *tcpIn) error { +func (el *eventloop) loopRead(ti *tcpIn) error { c := ti.c c.buffer = ti.in outBuffer := bytebuffer.Get() for inFrame, _ := c.read(); inFrame != nil; inFrame, _ = c.read() { - out, action := lp.eventHandler.React(inFrame, c) + out, action := el.eventHandler.React(inFrame, c) if out != nil { - lp.eventHandler.PreWrite() - outFrame, _ := lp.codec.Encode(c, out) + el.eventHandler.PreWrite() + outFrame, _ := el.codec.Encode(c, out) _, _ = outBuffer.Write(outFrame) } switch action { @@ -99,7 +99,7 @@ func (lp *loop) loopRead(ti *tcpIn) error { case Close: _, _ = c.conn.Write(outBuffer.Bytes()) bytebuffer.Put(outBuffer) - return lp.loopClose(c) + return el.loopClose(c) case Shutdown: _, _ = c.conn.Write(outBuffer.Bytes()) bytebuffer.Put(outBuffer) @@ -109,7 +109,7 @@ func (lp *loop) loopRead(ti *tcpIn) error { _, err := c.conn.Write(outBuffer.Bytes()) bytebuffer.Put(outBuffer) if err != nil { - return lp.loopClose(c) + return el.loopClose(c) } _, _ = c.inboundBuffer.Write(c.buffer.Bytes()) bytebuffer.Put(c.buffer) @@ -117,48 +117,48 @@ func (lp *loop) loopRead(ti *tcpIn) error { return nil } -func (lp *loop) loopClose(c *stdConn) error { +func (el *eventloop) loopClose(c *stdConn) error { atomic.StoreInt32(&c.done, 1) _ = c.conn.SetReadDeadline(time.Now()) return nil } -func (lp *loop) loopEgress() { +func (el *eventloop) loopEgress() { var closed bool - for v := range lp.ch { + for v := range el.ch { switch v := v.(type) { case error: if v == errCloseConns { closed = true - for c := range lp.connections { - _ = lp.loopClose(c) + for c := range el.connections { + _ = el.loopClose(c) } } case *stderr: - _ = lp.loopError(v.c, v.err) + _ = el.loopError(v.c, v.err) } - if len(lp.connections) == 0 && closed { + if len(el.connections) == 0 && closed { break } } } -func (lp *loop) loopTicker() { +func (el *eventloop) loopTicker() { var ( delay time.Duration open bool ) for { - lp.ch <- func() (err error) { - delay, action := lp.eventHandler.Tick() - lp.svr.ticktock <- delay + el.ch <- func() (err error) { + delay, action := el.eventHandler.Tick() + el.svr.ticktock <- delay switch action { case Shutdown: err = errClosing } return } - if delay, open = <-lp.svr.ticktock; open { + if delay, open = <-el.svr.ticktock; open { time.Sleep(delay) } else { break @@ -166,9 +166,9 @@ func (lp *loop) loopTicker() { } } -func (lp *loop) loopError(c *stdConn, err error) (e error) { +func (el *eventloop) loopError(c *stdConn, err error) (e error) { if e = c.conn.Close(); e == nil { - delete(lp.connections, c) + delete(el.connections, c) switch atomic.LoadInt32(&c.done) { case 0: // read error if err != io.EOF { @@ -177,7 +177,7 @@ func (lp *loop) loopError(c *stdConn, err error) (e error) { case 1: // closed log.Printf("socket: %s has been closed by client\n", c.remoteAddr.String()) } - switch lp.eventHandler.OnClosed(c, err) { + switch el.eventHandler.OnClosed(c, err) { case Shutdown: return errClosing } @@ -186,23 +186,23 @@ func (lp *loop) loopError(c *stdConn, err error) (e error) { return } -func (lp *loop) loopWake(c *stdConn) error { - if _, ok := lp.connections[c]; !ok { +func (el *eventloop) loopWake(c *stdConn) error { + if _, ok := el.connections[c]; !ok { return nil // ignore stale wakes. } - out, action := lp.eventHandler.React(nil, c) + out, action := el.eventHandler.React(nil, c) if out != nil { _, _ = c.conn.Write(out) } - return lp.handleAction(c, action) + return el.handleAction(c, action) } -func (lp *loop) handleAction(c *stdConn, action Action) error { +func (el *eventloop) handleAction(c *stdConn, action Action) error { switch action { case None: return nil case Close: - return lp.loopClose(c) + return el.loopClose(c) case Shutdown: return errServerShutdown default: @@ -210,11 +210,11 @@ func (lp *loop) handleAction(c *stdConn, action Action) error { } } -func (lp *loop) loopReadUDP(c *stdConn) error { - out, action := lp.eventHandler.React(c.buffer.Bytes(), c) +func (el *eventloop) loopReadUDP(c *stdConn) error { + out, action := el.eventHandler.React(c.buffer.Bytes(), c) if out != nil { - lp.eventHandler.PreWrite() - _, _ = lp.svr.ln.pconn.WriteTo(out, c.remoteAddr) + el.eventHandler.PreWrite() + _, _ = el.svr.ln.pconn.WriteTo(out, c.remoteAddr) } switch action { case Shutdown: diff --git a/loop_bsd.go b/loop_bsd.go index cd78a8947..d11e506dd 100644 --- a/loop_bsd.go +++ b/loop_bsd.go @@ -8,10 +8,10 @@ package gnet import "github.com/panjf2000/gnet/internal/netpoll" -func (lp *loop) handleEvent(fd int, filter int16) error { - if c, ok := lp.connections[fd]; ok { +func (el *eventloop) handleEvent(fd int, filter int16) error { + if c, ok := el.connections[fd]; ok { if filter == netpoll.EVFilterSock { - return lp.loopCloseConn(c, nil) + return el.loopCloseConn(c, nil) } switch c.outboundBuffer.IsEmpty() { // Don't change the ordering of processing EVFILT_WRITE | EVFILT_READ | EV_ERROR/EV_EOF unless you're 100% @@ -19,15 +19,15 @@ func (lp *loop) handleEvent(fd int, filter int16) error { // Re-ordering can easily introduce bugs and bad side-effects, as I found out painfully in the past. case false: if filter == netpoll.EVFilterWrite { - return lp.loopOut(c) + return el.loopWrite(c) } return nil case true: if filter == netpoll.EVFilterRead { - return lp.loopIn(c) + return el.loopRead(c) } return nil } } - return lp.loopAccept(fd) + return el.loopAccept(fd) } diff --git a/loop_linux.go b/loop_linux.go index cf465e6fa..8de0bbe35 100644 --- a/loop_linux.go +++ b/loop_linux.go @@ -8,25 +8,25 @@ package gnet import "github.com/panjf2000/gnet/internal/netpoll" -func (lp *loop) handleEvent(fd int, ev uint32) error { - if c, ok := lp.connections[fd]; ok { +func (el *eventloop) handleEvent(fd int, ev uint32) error { + if c, ok := el.connections[fd]; ok { switch c.outboundBuffer.IsEmpty() { // Don't change the ordering of processing EPOLLOUT | EPOLLRDHUP / EPOLLIN unless you're 100% // sure what you're doing! // Re-ordering can easily introduce bugs and bad side-effects, as I found out painfully in the past. case false: if ev&netpoll.OutEvents != 0 { - return lp.loopOut(c) + return el.loopWrite(c) } return nil case true: if ev&netpoll.InEvents != 0 { - return lp.loopIn(c) + return el.loopRead(c) } return nil default: return nil } } - return lp.loopAccept(fd) + return el.loopAccept(fd) } diff --git a/reactor_bsd.go b/reactor_bsd.go index 7ec13f15e..b3b73585b 100644 --- a/reactor_bsd.go +++ b/reactor_bsd.go @@ -16,17 +16,17 @@ func (svr *server) activateMainReactor() { })) } -func (svr *server) activateSubReactor(lp *loop) { +func (svr *server) activateSubReactor(el *eventloop) { defer svr.signalShutdown() - if lp.idx == 0 && svr.opts.Ticker { - go lp.loopTicker() + if el.idx == 0 && svr.opts.Ticker { + go el.loopTicker() } - sniffError(lp.poller.Polling(func(fd int, filter int16) error { - if c, ack := lp.connections[fd]; ack { + sniffError(el.poller.Polling(func(fd int, filter int16) error { + if c, ack := el.connections[fd]; ack { if filter == netpoll.EVFilterSock { - return lp.loopCloseConn(c, nil) + return el.loopCloseConn(c, nil) } switch c.outboundBuffer.IsEmpty() { // Don't change the ordering of processing EVFILT_WRITE | EVFILT_READ | EV_ERROR/EV_EOF unless you're 100% @@ -34,12 +34,12 @@ func (svr *server) activateSubReactor(lp *loop) { // Re-ordering can easily introduce bugs and bad side-effects, as I found out painfully in the past. case false: if filter == netpoll.EVFilterWrite { - return lp.loopOut(c) + return el.loopWrite(c) } return nil case true: if filter == netpoll.EVFilterRead { - return lp.loopIn(c) + return el.loopRead(c) } return nil } diff --git a/reactor_linux.go b/reactor_linux.go index 893303226..61eef8802 100644 --- a/reactor_linux.go +++ b/reactor_linux.go @@ -16,32 +16,32 @@ func (svr *server) activateMainReactor() { })) } -func (svr *server) activateSubReactor(lp *loop) { +func (svr *server) activateSubReactor(el *eventloop) { defer func() { - if lp.idx == 0 && svr.opts.Ticker { + if el.idx == 0 && svr.opts.Ticker { close(svr.ticktock) } svr.signalShutdown() }() - if lp.idx == 0 && svr.opts.Ticker { - go lp.loopTicker() + if el.idx == 0 && svr.opts.Ticker { + go el.loopTicker() } - sniffError(lp.poller.Polling(func(fd int, ev uint32) error { - if c, ack := lp.connections[fd]; ack { + sniffError(el.poller.Polling(func(fd int, ev uint32) error { + if c, ack := el.connections[fd]; ack { switch c.outboundBuffer.IsEmpty() { // Don't change the ordering of processing EPOLLOUT | EPOLLRDHUP / EPOLLIN unless you're 100% // sure what you're doing! // Re-ordering can easily introduce bugs and bad side-effects, as I found out painfully in the past. case false: if ev&netpoll.OutEvents != 0 { - return lp.loopOut(c) + return el.loopWrite(c) } return nil case true: if ev&netpoll.InEvents != 0 { - return lp.loopIn(c) + return el.loopRead(c) } return nil } diff --git a/server_unix.go b/server_unix.go index 9b27a1dfe..28d2669fc 100644 --- a/server_unix.go +++ b/server_unix.go @@ -18,13 +18,13 @@ import ( type server struct { ln *listener // all the listeners - wg sync.WaitGroup // loop close WaitGroup + wg sync.WaitGroup // event-loop close WaitGroup opts *Options // options with server once sync.Once // make sure only signalShutdown once cond *sync.Cond // shutdown signaler codec ICodec // codec for TCP stream ticktock chan time.Duration // ticker channel - mainLoop *loop // main loop for accepting connections + mainLoop *eventloop // main loop for accepting connections eventHandler EventHandler // user eventHandler subLoopGroup IEventLoopGroup // loops for handling events subLoopGroupSize int // number of loops @@ -47,10 +47,10 @@ func (svr *server) signalShutdown() { } func (svr *server) startLoops() { - svr.subLoopGroup.iterate(func(i int, lp *loop) bool { + svr.subLoopGroup.iterate(func(i int, el *eventloop) bool { svr.wg.Add(1) go func() { - lp.loopRun() + el.loopRun() svr.wg.Done() }() return true @@ -58,17 +58,17 @@ func (svr *server) startLoops() { } func (svr *server) closeLoops() { - svr.subLoopGroup.iterate(func(i int, lp *loop) bool { - _ = lp.poller.Close() + svr.subLoopGroup.iterate(func(i int, el *eventloop) bool { + _ = el.poller.Close() return true }) } func (svr *server) startReactors() { - svr.subLoopGroup.iterate(func(i int, lp *loop) bool { + svr.subLoopGroup.iterate(func(i int, el *eventloop) bool { svr.wg.Add(1) go func() { - svr.activateSubReactor(lp) + svr.activateSubReactor(el) svr.wg.Done() }() return true @@ -79,7 +79,7 @@ func (svr *server) activateLoops(numLoops int) error { // Create loops locally and bind the listeners. for i := 0; i < numLoops; i++ { if p, err := netpoll.OpenPoller(); err == nil { - lp := &loop{ + el := &eventloop{ idx: i, svr: svr, codec: svr.codec, @@ -88,8 +88,8 @@ func (svr *server) activateLoops(numLoops int) error { connections: make(map[int]*conn), eventHandler: svr.eventHandler, } - _ = lp.poller.AddRead(svr.ln.fd) - svr.subLoopGroup.register(lp) + _ = el.poller.AddRead(svr.ln.fd) + svr.subLoopGroup.register(el) } else { return err } @@ -103,7 +103,7 @@ func (svr *server) activateLoops(numLoops int) error { func (svr *server) activateReactors(numLoops int) error { for i := 0; i < numLoops; i++ { if p, err := netpoll.OpenPoller(); err == nil { - lp := &loop{ + el := &eventloop{ idx: i, svr: svr, codec: svr.codec, @@ -112,7 +112,7 @@ func (svr *server) activateReactors(numLoops int) error { connections: make(map[int]*conn), eventHandler: svr.eventHandler, } - svr.subLoopGroup.register(lp) + svr.subLoopGroup.register(el) } else { return err } @@ -122,13 +122,13 @@ func (svr *server) activateReactors(numLoops int) error { svr.startReactors() if p, err := netpoll.OpenPoller(); err == nil { - lp := &loop{ + el := &eventloop{ idx: -1, poller: p, svr: svr, } - _ = lp.poller.AddRead(svr.ln.fd) - svr.mainLoop = lp + _ = el.poller.AddRead(svr.ln.fd) + svr.mainLoop = el // Start main reactor. svr.wg.Add(1) go func() { @@ -153,8 +153,8 @@ func (svr *server) stop() { svr.waitForShutdown() // Notify all loops to close by closing all listeners - svr.subLoopGroup.iterate(func(i int, lp *loop) bool { - sniffError(lp.poller.Trigger(func() error { + svr.subLoopGroup.iterate(func(i int, el *eventloop) bool { + sniffError(el.poller.Trigger(func() error { return errServerShutdown })) return true @@ -171,9 +171,9 @@ func (svr *server) stop() { svr.wg.Wait() // Close loops and all outstanding connections - svr.subLoopGroup.iterate(func(i int, lp *loop) bool { - for _, c := range lp.connections { - sniffError(lp.loopCloseConn(c, nil)) + svr.subLoopGroup.iterate(func(i int, el *eventloop) bool { + for _, c := range el.connections { + sniffError(el.loopCloseConn(c, nil)) } return true }) diff --git a/server_windows.go b/server_windows.go index 2f995b4ce..ea26e7c08 100644 --- a/server_windows.go +++ b/server_windows.go @@ -32,7 +32,7 @@ type server struct { serr error // signal error once sync.Once // make sure only signalShutdown once codec ICodec // codec for TCP stream - loops []*loop // all the loops + loops []*eventloop // all the loops loopWG sync.WaitGroup // loop close WaitGroup ticktock chan time.Duration // ticker channel listenerWG sync.WaitGroup // listener close WaitGroup @@ -70,7 +70,7 @@ func (svr *server) startListener() { func (svr *server) startLoops(numLoops int) { for i := 0; i < numLoops; i++ { - lp := &loop{ + el := &eventloop{ ch: make(chan interface{}, commandBufferSize), idx: i, svr: svr, @@ -78,12 +78,12 @@ func (svr *server) startLoops(numLoops int) { connections: make(map[*stdConn]bool), eventHandler: svr.eventHandler, } - svr.subLoopGroup.register(lp) + svr.subLoopGroup.register(el) } svr.subLoopGroupSize = svr.subLoopGroup.len() svr.loopWG.Add(svr.subLoopGroupSize) - svr.subLoopGroup.iterate(func(i int, lp *loop) bool { - go lp.loopRun() + svr.subLoopGroup.iterate(func(i int, el *eventloop) bool { + go el.loopRun() return true }) } @@ -97,8 +97,8 @@ func (svr *server) stop() { svr.listenerWG.Wait() // Notify all loops to close. - svr.subLoopGroup.iterate(func(i int, lp *loop) bool { - lp.ch <- errClosing + svr.subLoopGroup.iterate(func(i int, el *eventloop) bool { + el.ch <- errClosing return true }) @@ -107,8 +107,8 @@ func (svr *server) stop() { // Close all connections. svr.loopWG.Add(svr.subLoopGroupSize) - svr.subLoopGroup.iterate(func(i int, lp *loop) bool { - lp.ch <- errCloseConns + svr.subLoopGroup.iterate(func(i int, el *eventloop) bool { + el.ch <- errCloseConns return true }) svr.loopWG.Wait()