Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion net_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,15 @@ func (ln *listener) Accept() (net.Conn, error) {
// tcp
var fd, sa, err = syscall.Accept(ln.fd)
if err != nil {
if err == syscall.EAGAIN {
/* https://man7.org/linux/man-pages/man2/accept.2.html
EAGAIN or EWOULDBLOCK
The socket is marked nonblocking and no connections are
present to be accepted. POSIX.1-2001 and POSIX.1-2008
allow either error to be returned for this case, and do
not require these constants to have the same value, so a
portable application should check for both possibilities.
*/
if err == syscall.EAGAIN || err == syscall.EWOULDBLOCK {
return nil, nil
}
return nil, err
Expand Down
92 changes: 70 additions & 22 deletions netpoll_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"strings"
"sync"
"syscall"
"time"
)

Expand Down Expand Up @@ -92,39 +93,86 @@ func (s *server) Close(ctx context.Context) error {
func (s *server) OnRead(p Poll) error {
// accept socket
conn, err := s.ln.Accept()
if err != nil {
// shut down
if strings.Contains(err.Error(), "closed") {
s.operator.Control(PollDetach)
s.onQuit(err)
if err == nil {
if conn != nil {
s.onAccept(conn.(Conn))
}
// EAGAIN | EWOULDBLOCK if conn and err both nil
return nil
}
logger.Printf("NETPOLL: accept conn failed: %v", err)

// delay accept when too many open files
if isOutOfFdErr(err) {
// since we use Epoll LT, we have to detach listener fd from epoll first
// and re-register it when accept successfully or there is no available connection
cerr := s.operator.Control(PollDetach)
if cerr != nil {
logger.Printf("NETPOLL: detach listener fd failed: %v", cerr)
return err
}
logger.Println("NETPOLL: accept conn failed:", err.Error())
return err
go func() {
retryTimes := []time.Duration{0, 10, 50, 100, 200, 500, 1000} // ms
retryTimeIndex := 0
for {
if retryTimeIndex > 0 {
time.Sleep(retryTimes[retryTimeIndex] * time.Millisecond)
}
conn, err := s.ln.Accept()
if err == nil {
if conn == nil {
// recovery accept poll loop
s.operator.Control(PollReadable)
return
}
s.onAccept(conn.(Conn))
logger.Println("NETPOLL: re-accept conn success:", conn.RemoteAddr())
retryTimeIndex = 0
continue
}
if retryTimeIndex+1 < len(retryTimes) {
retryTimeIndex++
}
logger.Printf("NETPOLL: re-accept conn failed, err=[%s] and next retrytime=%dms", err.Error(), retryTimes[retryTimeIndex])
}
}()
}
if conn == nil {
return nil

// shut down
if strings.Contains(err.Error(), "closed") {
s.operator.Control(PollDetach)
s.onQuit(err)
return err
}

return err
}

// OnHup implements FDOperator.
func (s *server) OnHup(p Poll) error {
s.onQuit(errors.New("listener close"))
return nil
}

func (s *server) onAccept(conn Conn) {
// store & register connection
var connection = &connection{}
connection.init(conn.(Conn), s.opts)
if !connection.IsActive() {
return nil
var nconn = new(connection)
nconn.init(conn, s.opts)
if !nconn.IsActive() {
return
}
var fd = conn.(Conn).Fd()
connection.AddCloseCallback(func(connection Connection) error {
var fd = conn.Fd()
nconn.AddCloseCallback(func(connection Connection) error {
s.connections.Delete(fd)
return nil
})
s.connections.Store(fd, connection)
s.connections.Store(fd, nconn)

// trigger onConnect asynchronously
connection.onConnect()
return nil
nconn.onConnect()
}

// OnHup implements FDOperator.
func (s *server) OnHup(p Poll) error {
s.onQuit(errors.New("listener close"))
return nil
func isOutOfFdErr(err error) bool {
se, ok := err.(syscall.Errno)
return ok && (se == syscall.EMFILE || se == syscall.ENFILE)
}
74 changes: 74 additions & 0 deletions netpoll_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
"context"
"errors"
"math/rand"
"os"
"runtime"
"sync"
"sync/atomic"
"syscall"
"testing"
"time"
)
Expand Down Expand Up @@ -507,6 +509,78 @@ func TestClientWriteAndClose(t *testing.T) {
MustNil(t, err)
}

func TestServerAcceptWhenTooManyOpenFiles(t *testing.T) {
if os.Getenv("N_LOCAL") == "" {
t.Skip("Only test for debug purpose")
return
}

var originalRlimit syscall.Rlimit
err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &originalRlimit)
MustNil(t, err)
t.Logf("Original RLimit: %v", originalRlimit)

rlimit := syscall.Rlimit{Cur: 32, Max: originalRlimit.Max}
err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &rlimit)
MustNil(t, err)
err = syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rlimit)
MustNil(t, err)
t.Logf("New RLimit: %v", rlimit)
defer func() { // reset
err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &originalRlimit)
MustNil(t, err)
}()

var network, address = "tcp", ":18888"
var connected int32
var loop = newTestEventLoop(network, address,
func(ctx context.Context, connection Connection) error {
buf, err := connection.Reader().Next(connection.Reader().Len())
connection.Writer().WriteBinary(buf)
connection.Writer().Flush()
return err
},
WithOnConnect(func(ctx context.Context, connection Connection) context.Context {
atomic.AddInt32(&connected, 1)
t.Logf("Conn[%s] accpeted", connection.RemoteAddr())
return ctx
}),
WithOnDisconnect(func(ctx context.Context, connection Connection) {
t.Logf("Conn[%s] disconnected", connection.RemoteAddr())
}),
)
time.Sleep(time.Millisecond * 10)

// out of fds
files := make([]*os.File, 0)
for {
f, err := os.Open("/dev/null")
if err != nil {
Assert(t, isOutOfFdErr(errors.Unwrap(err)), err)
break
}
files = append(files, f)
}
go func() {
time.Sleep(time.Second * 10)
t.Logf("close all files")
for _, f := range files {
f.Close()
}
}()

// we should use telnet manually
var connections = 1
for atomic.LoadInt32(&connected) < int32(connections) {
t.Logf("connected=%d", atomic.LoadInt32(&connected))
time.Sleep(time.Second)
}
time.Sleep(time.Second * 10)

err = loop.Shutdown(context.Background())
MustNil(t, err)
}

func createTestListener(network, address string) (Listener, error) {
for {
ln, err := CreateListener(network, address)
Expand Down
13 changes: 13 additions & 0 deletions test_conns.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/usr/bin/env bash

ip="$1"
port="$2"
conns="$3"
timeout="$4"

for i in $(seq 1 $conns);
do
nc -v -w $timeout $ip $port < /dev/null &
done

wait