Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Before you submit your Pull Request (PR) consider the following guidelines:
3. [Fork](https://docs.github.com/en/github/getting-started-with-github/fork-a-repo) the cloudwego/netpoll repo.
4. In your forked repository, make your changes in a new git branch:
```
git checkout -b my-fix-branch develop
git checkout -b my-fix-branch main
```
5. Create your patch, including appropriate test cases.
6. Follow our [Style Guides](#code-style-guides).
Expand All @@ -41,7 +41,7 @@ Before you submit your Pull Request (PR) consider the following guidelines:
```
git push origin my-fix-branch
```
9. In GitHub, send a pull request to `netpoll:develop`
9. In GitHub, send a pull request to `netpoll:main`

## Contribution Prerequisites
- Our development environment keeps up with [Go Official](https://golang.org/project/).
Expand All @@ -50,9 +50,5 @@ Before you submit your Pull Request (PR) consider the following guidelines:
- Maybe you need familiar with [Actions](https://github.com/features/actions)(our default workflow tool).

## Code Style Guides
Also see [Pingcap General advice](https://pingcap.github.io/style-guide/general.html).

Good resources:
- [Effective Go](https://golang.org/doc/effective_go)
- [Go Code Review Comments](https://github.com/golang/go/wiki/CodeReviewComments)
- [Uber Go Style Guide](https://github.com/uber-go/guide/blob/master/style.md)
38 changes: 15 additions & 23 deletions connection_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import (
type connState = int32

const (
defaultZeroCopyTimeoutSec = 60

connStateNone = 0
connStateConnected = 1
connStateDisconnected = 2
Expand All @@ -39,21 +37,20 @@ type connection struct {
netFD
onEvent
locker
operator *FDOperator
readTimeout time.Duration
readTimer *time.Timer
readTrigger chan error
waitReadSize int64
writeTimeout time.Duration
writeTimer *time.Timer
writeTrigger chan error
inputBuffer *LinkBuffer
outputBuffer *LinkBuffer
outputBarrier *barrier
supportZeroCopy bool
maxSize int // The maximum size of data between two Release().
bookSize int // The size of data that can be read at once.
state connState // Connection state should be changed sequentially.
operator *FDOperator
readTimeout time.Duration
readTimer *time.Timer
readTrigger chan error
waitReadSize int64
writeTimeout time.Duration
writeTimer *time.Timer
writeTrigger chan error
inputBuffer *LinkBuffer
outputBuffer *LinkBuffer
outputBarrier *barrier
maxSize int // The maximum size of data between two Release().
bookSize int // The size of data that can be read at once.
state connState // Connection state should be changed sequentially.
}

var (
Expand Down Expand Up @@ -351,10 +348,6 @@ func (c *connection) init(conn Conn, opts *options) (err error) {
case "tcp", "tcp4", "tcp6":
setTCPNoDelay(c.fd, true)
}
// check zero-copy
if setZeroCopy(c.fd) == nil && setBlockZeroCopySend(c.fd, defaultZeroCopyTimeoutSec, 0) == nil {
c.supportZeroCopy = true
}

// connection initialized and prepare options
return c.onPrepare(opts)
Expand Down Expand Up @@ -483,9 +476,8 @@ func (c *connection) flush() error {
if c.outputBuffer.IsEmpty() {
return nil
}
// TODO: Let the upper layer pass in whether to use ZeroCopy.
bs := c.outputBuffer.GetBytes(c.outputBarrier.bs)
n, err := sendmsg(c.fd, bs, c.outputBarrier.ivs, false && c.supportZeroCopy)
n, err := sendmsg(c.fd, bs, c.outputBarrier.ivs, false)
if err != nil && err != syscall.EAGAIN {
return Exception(err, "when flush")
}
Expand Down
4 changes: 2 additions & 2 deletions connection_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ type locker struct {
}

func (l *locker) closeBy(w who) (success bool) {
return atomic.CompareAndSwapInt32(&l.keychain[closing], 0, int32(w))
return atomic.CompareAndSwapInt32(&l.keychain[closing], 0, w)
}

func (l *locker) isCloseBy(w who) (yes bool) {
return atomic.LoadInt32(&l.keychain[closing]) == int32(w)
return atomic.LoadInt32(&l.keychain[closing]) == w
}

func (l *locker) status(k key) int32 {
Expand Down
6 changes: 3 additions & 3 deletions connection_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,13 @@ func (c *connection) inputAck(n int) (err error) {
}

// outputs implements FDOperator.
func (c *connection) outputs(vs [][]byte) (rs [][]byte, supportZeroCopy bool) {
func (c *connection) outputs(vs [][]byte) (rs [][]byte, _ bool) {
if c.outputBuffer.IsEmpty() {
c.rw2r()
return rs, c.supportZeroCopy
return rs, false
}
rs = c.outputBuffer.GetBytes(vs)
return rs, c.supportZeroCopy
return rs, false
}

// outputAck implements FDOperator.
Expand Down
1 change: 1 addition & 0 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,7 @@ func TestConnectionServerClose(t *testing.T) {
WithOnPrepare(func(connection Connection) context.Context {
// t.Logf("server.OnPrepare: addr=%s", connection.RemoteAddr())
defer wg.Done()
//nolint:staticcheck // SA1029 no built-in type string as key
return context.WithValue(context.Background(), "prepare", "true")
}),
)
Expand Down
1 change: 1 addition & 0 deletions fd_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type FDOperator struct {
InputAck func(n int) (err error)

// Outputs will locked if len(rs) > 0, which need unlocked by OutputAck.
// supportZeroCopy is not implemented, and it will be ignored
Outputs func(vs [][]byte) (rs [][]byte, supportZeroCopy bool)
OutputAck func(n int) (err error)

Expand Down
2 changes: 1 addition & 1 deletion net_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestListenerDialer(t *testing.T) {
conn.SetOnRequest(onRequest)

MustNil(t, err)
n, err := conn.Write([]byte(msg))
n, err := conn.Write(msg)
MustNil(t, err)
Equal(t, n, len(msg))
time.Sleep(10 * time.Millisecond)
Expand Down
2 changes: 1 addition & 1 deletion net_sock.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type sockaddr interface {
}

func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string) (conn *netFD, err error) {
if (runtime.GOOS == "aix" || runtime.GOOS == "windows" || runtime.GOOS == "openbsd" || runtime.GOOS == "nacl") && raddr.isWildcard() {
if (runtime.GOOS == "aix" || runtime.GOOS == "openbsd" || runtime.GOOS == "nacl") && raddr.isWildcard() {
raddr = raddr.toLocal(net)
}
family, ipv6only := favoriteAddrFamily(net, laddr, raddr)
Expand Down
2 changes: 1 addition & 1 deletion nocopy_linkbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ func (b *UnsafeLinkBuffer) MallocAck(n int) (err error) {
}
// discard the rest
for node := b.write.next; node != nil; node = node.next {
node.off, node.malloc, node.refer, node.buf = 0, 0, 1, node.buf[:0]
node.malloc, node.refer, node.buf = node.off, 1, node.buf[:node.off]
}
return nil
}
Expand Down
29 changes: 29 additions & 0 deletions nocopy_linkbuffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"bytes"
"encoding/binary"
"fmt"
"reflect"
"runtime"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -778,6 +779,34 @@ func TestLinkBufferPeekOutOfMemory(t *testing.T) {
}
}

func TestMallocAck(t *testing.T) {
sLen := 1024 * 7
buf1 := []byte{1, 2, 3, 4}
buf2 := []byte{5, 6, 7, 8}
lb := NewLinkBuffer(0)

buf, err := lb.Malloc(4 + sLen)
MustNil(t, err)
copy(buf[:4], buf1)
s := make([]byte, sLen)
err = lb.WriteDirect(s, sLen)
MustNil(t, err)

err = lb.MallocAck(4 + sLen)
MustNil(t, err)
lb.Flush()

buf, err = lb.Malloc(4)
MustNil(t, err)
copy(buf[:4], buf2)
lb.Flush()

buf, err = lb.Next(8 + sLen)
MustNil(t, err)

MustTrue(t, reflect.DeepEqual(buf, append(append(buf1, s...), buf2...)))
}

func BenchmarkStringToSliceByte(b *testing.B) {
b.StopTimer()
s := "hello world"
Expand Down
2 changes: 1 addition & 1 deletion poll_default_bsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func openDefaultPoll() (*defaultPoll, error) {
type defaultPoll struct {
fd int
trigger uint32
m sync.Map // only used in go:race
m sync.Map //nolint:unused // only used in go:race
opcache *operatorCache // operator cache
hups []func(p Poll) error
}
Expand Down
5 changes: 2 additions & 3 deletions poll_default_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,9 @@ func (p *defaultPoll) handler(events []epollevent) (closed bool) {
operator.OnWrite(p)
} else if operator.Outputs != nil {
// for connection
bs, supportZeroCopy := operator.Outputs(p.barriers[i].bs)
bs, _ := operator.Outputs(p.barriers[i].bs)
if len(bs) > 0 {
// TODO: Let the upper layer pass in whether to use ZeroCopy.
n, err := iosend(operator.FD, bs, p.barriers[i].ivs, false && supportZeroCopy)
n, err := iosend(operator.FD, bs, p.barriers[i].ivs, false)
operator.OutputAck(n)
if err != nil {
p.appendHup(operator)
Expand Down
1 change: 1 addition & 0 deletions poll_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ START:
// m.Run() will finish very quickly, so will not many goroutines block on Pick.
_ = m.Run()

//nolint:staticcheck // SA9003: empty branch
if !atomic.CompareAndSwapInt32(&m.status, managerInitializing, managerInitialized) {
// SetNumLoops called during m.Run() which cause CAS failed
// The polls will be adjusted next Pick
Expand Down
4 changes: 2 additions & 2 deletions sys_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func writev(fd int, bs [][]byte, ivs []syscall.Iovec) (n int, err error) {
r, _, e := syscall.RawSyscall(syscall.SYS_WRITEV, uintptr(fd), uintptr(unsafe.Pointer(&ivs[0])), uintptr(iovLen))
resetIovecs(bs, ivs[:iovLen])
if e != 0 {
return int(r), syscall.Errno(e)
return int(r), e
}
return int(r), nil
}
Expand All @@ -88,7 +88,7 @@ func readv(fd int, bs [][]byte, ivs []syscall.Iovec) (n int, err error) {
r, _, e := syscall.RawSyscall(syscall.SYS_READV, uintptr(fd), uintptr(unsafe.Pointer(&ivs[0])), uintptr(iovLen))
resetIovecs(bs, ivs[:iovLen])
if e != 0 {
return int(r), syscall.Errno(e)
return int(r), e
}
return int(r), nil
}
Expand Down
4 changes: 1 addition & 3 deletions sys_sendmsg_bsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
"unsafe"
)

var supportZeroCopySend bool

// sendmsg wraps the sendmsg system call.
// Must len(iovs) >= len(vs)
func sendmsg(fd int, bs [][]byte, ivs []syscall.Iovec, zerocopy bool) (n int, err error) {
Expand All @@ -39,7 +37,7 @@ func sendmsg(fd int, bs [][]byte, ivs []syscall.Iovec, zerocopy bool) (n int, er
r, _, e := syscall.RawSyscall(syscall.SYS_SENDMSG, uintptr(fd), uintptr(unsafe.Pointer(&msghdr)), uintptr(0))
resetIovecs(bs, ivs[:iovLen])
if e != 0 {
return int(r), syscall.Errno(e)
return int(r), e
}
return int(r), nil
}
6 changes: 1 addition & 5 deletions sys_sendmsg_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,7 @@ func sendmsg(fd int, bs [][]byte, ivs []syscall.Iovec, zerocopy bool) (n int, er
Iov: &ivs[0],
Iovlen: uint64(iovLen),
}
var flags uintptr
if zerocopy {
flags = MSG_ZEROCOPY
}
r, _, e := syscall.RawSyscall(syscall.SYS_SENDMSG, uintptr(fd), uintptr(unsafe.Pointer(&msghdr)), flags)
r, _, e := syscall.RawSyscall(syscall.SYS_SENDMSG, uintptr(fd), uintptr(unsafe.Pointer(&msghdr)), 0)
resetIovecs(bs, ivs[:iovLen])
if e != 0 {
return int(r), syscall.Errno(e)
Expand Down
28 changes: 0 additions & 28 deletions sys_zerocopy_bsd.go

This file was deleted.

36 changes: 0 additions & 36 deletions sys_zerocopy_linux.go

This file was deleted.