Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions _typos.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Typo check: https://github.com/crate-ci/typos

[files]
extend-exclude = ["go.sum"]

[default.extend-identifiers]
# *sigh* this just isn't worth the cost of fixing
nd = "nd"
paniced = "paniced"
write_datas = "write_datas"
17 changes: 10 additions & 7 deletions connection_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const (
ErrEOF = syscall.Errno(0x106)
// Write I/O buffer timeout, calling by Connection.Writer
ErrWriteTimeout = syscall.Errno(0x107)
// Concurrent connection access error
ErrConcurrentAccess = syscall.Errno(0x108)
)

const ErrnoMask = 0xFF
Expand Down Expand Up @@ -110,11 +112,12 @@ func (e *exception) Temporary() bool {

// Errors defined in netpoll
var errnos = [...]string{
ErrnoMask & ErrConnClosed: "connection has been closed",
ErrnoMask & ErrReadTimeout: "connection read timeout",
ErrnoMask & ErrDialTimeout: "dial wait timeout",
ErrnoMask & ErrDialNoDeadline: "dial no deadline",
ErrnoMask & ErrUnsupported: "netpoll dose not support",
ErrnoMask & ErrEOF: "EOF",
ErrnoMask & ErrWriteTimeout: "connection write timeout",
ErrnoMask & ErrConnClosed: "connection has been closed",
ErrnoMask & ErrReadTimeout: "connection read timeout",
ErrnoMask & ErrDialTimeout: "dial wait timeout",
ErrnoMask & ErrDialNoDeadline: "dial no deadline",
ErrnoMask & ErrUnsupported: "netpoll does not support",
ErrnoMask & ErrEOF: "EOF",
ErrnoMask & ErrWriteTimeout: "connection write timeout",
ErrnoMask & ErrConcurrentAccess: "concurrent connection access",
}
13 changes: 11 additions & 2 deletions connection_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,15 @@ func (c *connection) MallocLen() (length int) {
// If empty, it will call syscall.Write to send data directly,
// otherwise the buffer will be sent asynchronously by the epoll trigger.
func (c *connection) Flush() error {
if !c.IsActive() || !c.lock(flushing) {
if !c.IsActive() {
return Exception(ErrConnClosed, "when flush")
}

if !c.lock(flushing) {
return Exception(ErrConcurrentAccess, "when flush")
}
defer c.unlock(flushing)

c.outputBuffer.Flush()
return c.flush()
}
Expand Down Expand Up @@ -282,9 +287,13 @@ func (c *connection) Read(p []byte) (n int, err error) {

// Write will Flush soon.
func (c *connection) Write(p []byte) (n int, err error) {
if !c.IsActive() || !c.lock(flushing) {
if !c.IsActive() {
return 0, Exception(ErrConnClosed, "when write")
}

if !c.lock(flushing) {
return 0, Exception(ErrConcurrentAccess, "when write")
}
defer c.unlock(flushing)

dst, _ := c.outputBuffer.Malloc(len(p))
Expand Down
34 changes: 20 additions & 14 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ func writeAll(fd int, buf []byte) error {
// Large packet write test. The socket buffer is 2MB by default, here to verify
// whether Connection.Close can be executed normally after socket output buffer is full.
func TestLargeBufferWrite(t *testing.T) {
ln, err := createTestListener("tcp", ":12345")
address := getTestAddress()
ln, err := createTestListener("tcp", address)
MustNil(t, err)

trigger := make(chan int)
Expand All @@ -231,7 +232,7 @@ func TestLargeBufferWrite(t *testing.T) {
}
}()

conn, err := DialConnection("tcp", ":12345", time.Second)
conn, err := DialConnection("tcp", address, time.Second)
MustNil(t, err)
rfd := <-trigger

Expand Down Expand Up @@ -267,7 +268,8 @@ func TestLargeBufferWrite(t *testing.T) {
}

func TestWriteTimeout(t *testing.T) {
ln, err := createTestListener("tcp", ":1234")
address := getTestAddress()
ln, err := createTestListener("tcp", address)
MustNil(t, err)

interval := time.Millisecond * 100
Expand Down Expand Up @@ -296,7 +298,7 @@ func TestWriteTimeout(t *testing.T) {
}
}()

conn, err := DialConnection("tcp", ":1234", time.Second)
conn, err := DialConnection("tcp", address, time.Second)
MustNil(t, err)

_, err = conn.Writer().Malloc(1024)
Expand Down Expand Up @@ -440,7 +442,8 @@ func TestBookSizeLargerThanMaxSize(t *testing.T) {
}

func TestConnDetach(t *testing.T) {
ln, err := createTestListener("tcp", ":1234")
address := getTestAddress()
ln, err := createTestListener("tcp", address)
MustNil(t, err)

go func() {
Expand Down Expand Up @@ -470,7 +473,7 @@ func TestConnDetach(t *testing.T) {
}
}()

c, err := DialConnection("tcp", ":1234", time.Second)
c, err := DialConnection("tcp", address, time.Second)
MustNil(t, err)

conn := c.(*TCPConnection)
Expand All @@ -497,7 +500,8 @@ func TestConnDetach(t *testing.T) {
}

func TestParallelShortConnection(t *testing.T) {
ln, err := createTestListener("tcp", ":12345")
address := getTestAddress()
ln, err := createTestListener("tcp", address)
MustNil(t, err)
defer ln.Close()

Expand All @@ -523,7 +527,7 @@ func TestParallelShortConnection(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
conn, err := DialConnection("tcp", ":12345", time.Second)
conn, err := DialConnection("tcp", address, time.Second)
MustNil(t, err)
n, err := conn.Writer().WriteBinary(make([]byte, sizePerConn))
MustNil(t, err)
Expand All @@ -546,7 +550,8 @@ func TestParallelShortConnection(t *testing.T) {
}

func TestConnectionServerClose(t *testing.T) {
ln, err := createTestListener("tcp", ":12345")
address := getTestAddress()
ln, err := createTestListener("tcp", address)
MustNil(t, err)
defer ln.Close()

Expand Down Expand Up @@ -601,7 +606,7 @@ func TestConnectionServerClose(t *testing.T) {
go func() {
err := el.Serve(ln)
if err != nil {
t.Logf("servce end with error: %v", err)
t.Logf("service end with error: %v", err)
}
}()

Expand All @@ -628,7 +633,7 @@ func TestConnectionServerClose(t *testing.T) {
wg.Add(conns * 6)
for i := 0; i < conns; i++ {
go func() {
conn, err := DialConnection("tcp", ":12345", time.Second)
conn, err := DialConnection("tcp", address, time.Second)
MustNil(t, err)
err = conn.SetOnRequest(clientOnRequest)
MustNil(t, err)
Expand All @@ -644,7 +649,8 @@ func TestConnectionServerClose(t *testing.T) {
}

func TestConnectionDailTimeoutAndClose(t *testing.T) {
ln, err := createTestListener("tcp", ":12345")
address := getTestAddress()
ln, err := createTestListener("tcp", address)
MustNil(t, err)
defer ln.Close()

Expand All @@ -658,7 +664,7 @@ func TestConnectionDailTimeoutAndClose(t *testing.T) {
go func() {
err := el.Serve(ln)
if err != nil {
t.Logf("servce end with error: %v", err)
t.Logf("service end with error: %v", err)
}
}()

Expand All @@ -670,7 +676,7 @@ func TestConnectionDailTimeoutAndClose(t *testing.T) {
for i := 0; i < conns; i++ {
go func() {
defer wg.Done()
conn, err := DialConnection("tcp", ":12345", time.Nanosecond)
conn, err := DialConnection("tcp", address, time.Nanosecond)
Assert(t, err == nil || strings.Contains(err.Error(), "i/o timeout"))
_ = conn
}()
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ module github.com/cloudwego/netpoll
go 1.15

require (
github.com/bytedance/gopkg v0.0.0-20220413063733-65bf48ffb3a7
golang.org/x/sys v0.0.0-20220110181412-a018aaa089fe
github.com/bytedance/gopkg v0.0.0-20240507064146-197ded923ae3
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10
)
13 changes: 9 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
github.com/bytedance/gopkg v0.0.0-20220413063733-65bf48ffb3a7 h1:PtwsQyQJGxf8iaPptPNaduEIu9BnrNms+pcRdHAxZaM=
github.com/bytedance/gopkg v0.0.0-20220413063733-65bf48ffb3a7/go.mod h1:2ZlV9BaUH4+NXIBF0aMdKKAnHTzqH+iMU4KUjAbL23Q=
github.com/bytedance/gopkg v0.0.0-20240507064146-197ded923ae3 h1:ZKUHguI38SRQJkq7hhmwn8lAv3xM6B5qkj1IneS15YY=
github.com/bytedance/gopkg v0.0.0-20240507064146-197ded923ae3/go.mod h1:FtQG3YbQG9L/91pbKSw787yBQPutC+457AvDW77fgUQ=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/net v0.0.0-20221014081412-f15817d10f9b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20220110181412-a018aaa089fe h1:W8vbETX/n8S6EmY0Pu4Ix7VvpsJUESTwl0oCK8MJOgk=
golang.org/x/sys v0.0.0-20220110181412-a018aaa089fe/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 h1:WIoqL4EROvwiPdUtaip4VcDdpZ4kha7wBWZrbVKCIZg=
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
Expand Down
4 changes: 2 additions & 2 deletions mux/shard_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ func TestShardQueue(t *testing.T) {
var svrConn net.Conn
accepted := make(chan struct{})

network, address := "tcp", ":18888"
ln, err := net.Listen("tcp", ":18888")
network, address := "tcp", "localhost:12345"
ln, err := net.Listen("tcp", address)
MustNil(t, err)
stop := make(chan int, 1)
defer close(stop)
Expand Down
4 changes: 2 additions & 2 deletions net_dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (d *dialer) DialConnection(network, address string, timeout time.Duration)
switch network {
case "tcp", "tcp4", "tcp6":
return d.dialTCP(ctx, network, address)
// case "udp", "udp4", "udp6": // TODO: unsupport now
// case "udp", "udp4", "udp6": // TODO: unsupported now
case "unix", "unixgram", "unixpacket":
raddr := &UnixAddr{
UnixAddr: net.UnixAddr{Name: address, Net: network},
Expand All @@ -75,7 +75,7 @@ func (d *dialer) dialTCP(ctx context.Context, network, address string) (connecti
return nil, err
}
var ipaddrs []net.IPAddr
// host maybe empty if address is ":1234"
// host maybe empty if address is :12345
if host == "" {
ipaddrs = []net.IPAddr{{}}
} else {
Expand Down
31 changes: 18 additions & 13 deletions net_dialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ import (

func TestDialerTCP(t *testing.T) {
dialer := NewDialer()
conn, err := dialer.DialTimeout("tcp", ":1234", time.Second)
address := getTestAddress()
conn, err := dialer.DialTimeout("tcp", address, time.Second)
MustTrue(t, err != nil)
MustTrue(t, conn.(*TCPConnection) == nil)

ln, err := CreateListener("tcp", ":1234")
ln, err := CreateListener("tcp", address)
MustNil(t, err)

stop := make(chan int, 1)
Expand All @@ -57,10 +58,10 @@ func TestDialerTCP(t *testing.T) {
}
}()

conn, err = dialer.DialTimeout("tcp", ":1234", time.Second)
conn, err = dialer.DialTimeout("tcp", address, time.Second)
MustNil(t, err)
MustTrue(t, strings.HasPrefix(conn.LocalAddr().String(), "127.0.0.1:"))
Equal(t, conn.RemoteAddr().String(), "127.0.0.1:1234")
Equal(t, conn.RemoteAddr().String(), address)
}

func TestDialerUnix(t *testing.T) {
Expand Down Expand Up @@ -106,7 +107,8 @@ func TestDialerUnix(t *testing.T) {
}

func TestDialerFdAlloc(t *testing.T) {
ln, err := CreateListener("tcp", ":1234")
address := getTestAddress()
ln, err := CreateListener("tcp", address)
MustNil(t, err)
defer ln.Close()
el1, _ := NewEventLoop(func(ctx context.Context, connection Connection) error {
Expand All @@ -121,7 +123,7 @@ func TestDialerFdAlloc(t *testing.T) {
defer el1.Shutdown(ctx1)

for i := 0; i < 100; i++ {
conn, err := DialConnection("tcp", ":1234", time.Second)
conn, err := DialConnection("tcp", address, time.Second)
MustNil(t, err)
fd := conn.(*TCPConnection).fd
conn.Write([]byte("hello world"))
Expand All @@ -134,7 +136,8 @@ func TestDialerFdAlloc(t *testing.T) {
}

func TestFDClose(t *testing.T) {
ln, err := CreateListener("tcp", ":1234")
address := getTestAddress()
ln, err := CreateListener("tcp", address)
MustNil(t, err)
defer ln.Close()
el1, _ := NewEventLoop(func(ctx context.Context, connection Connection) error {
Expand All @@ -150,13 +153,13 @@ func TestFDClose(t *testing.T) {

var fd int
var conn Connection
conn, err = DialConnection("tcp", ":1234", time.Second)
conn, err = DialConnection("tcp", address, time.Second)
MustNil(t, err)
fd = conn.(*TCPConnection).fd
syscall.SetNonblock(fd, true)
conn.Close()

conn, err = DialConnection("tcp", ":1234", time.Second)
conn, err = DialConnection("tcp", address, time.Second)
MustNil(t, err)
fd = conn.(*TCPConnection).fd
syscall.SetNonblock(fd, true)
Expand All @@ -166,8 +169,10 @@ func TestFDClose(t *testing.T) {

// fd data package race test, use two servers and two dialers.
func TestDialerThenClose(t *testing.T) {
address1 := getTestAddress()
address2 := getTestAddress()
// server 1
ln1, _ := createTestListener("tcp", ":1231")
ln1, _ := createTestListener("tcp", address1)
el1 := mockDialerEventLoop(1)
go func() {
el1.Serve(ln1)
Expand All @@ -177,7 +182,7 @@ func TestDialerThenClose(t *testing.T) {
defer el1.Shutdown(ctx1)

// server 2
ln2, _ := createTestListener("tcp", ":1232")
ln2, _ := createTestListener("tcp", address2)
el2 := mockDialerEventLoop(2)
go func() {
el2.Serve(ln2)
Expand All @@ -194,12 +199,12 @@ func TestDialerThenClose(t *testing.T) {
defer wg.Done()
for i := 0; i < 50; i++ {
// send server 1
conn, err := DialConnection("tcp", ":1231", time.Second)
conn, err := DialConnection("tcp", address1, time.Second)
if err == nil {
mockDialerSend(1, &conn.(*TCPConnection).connection)
}
// send server 2
conn, err = DialConnection("tcp", ":1232", time.Second)
conn, err = DialConnection("tcp", address2, time.Second)
if err == nil {
mockDialerSend(2, &conn.(*TCPConnection).connection)
}
Expand Down
10 changes: 9 additions & 1 deletion net_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,15 @@ func (ln *listener) Accept() (net.Conn, error) {
// tcp
var fd, sa, err = syscall.Accept(ln.fd)
if err != nil {
if err == syscall.EAGAIN {
/* https://man7.org/linux/man-pages/man2/accept.2.html
EAGAIN or EWOULDBLOCK
The socket is marked nonblocking and no connections are
present to be accepted. POSIX.1-2001 and POSIX.1-2008
allow either error to be returned for this case, and do
not require these constants to have the same value, so a
portable application should check for both possibilities.
*/
if err == syscall.EAGAIN || err == syscall.EWOULDBLOCK {
return nil, nil
}
return nil, err
Expand Down
Loading