Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,57 @@ func TestConnectionRead(t *testing.T) {
rconn.Close()
}

func TestConnectionNoCopyReadString(t *testing.T) {
r, w := GetSysFdPairs()
var rconn, wconn = &connection{}, &connection{}
rconn.init(&netFD{fd: r}, nil)
wconn.init(&netFD{fd: w}, nil)

var size, cycleTime = 256, 100
// record historical data, check data consistency
var readBucket = make([]string, cycleTime)
var trigger = make(chan struct{})

// read data
go func() {
for i := 0; i < cycleTime; i++ {
// nocopy read string
str, err := rconn.Reader().ReadString(size)
MustNil(t, err)
Equal(t, len(str), size)
// release buffer node
rconn.Release()
// record current read string
readBucket[i] = str
// write next msg
trigger <- struct{}{}
}
}()

// write data
var msg = make([]byte, size)
for i := 0; i < cycleTime; i++ {
byt := 'a' + byte(i%26)
for c := 0; c < size; c++ {
msg[c] = byt
}
n, err := wconn.Write(msg)
MustNil(t, err)
Equal(t, n, len(msg))
<-trigger
}

for i := 0; i < cycleTime; i++ {
byt := 'a' + byte(i%26)
for _, c := range readBucket[i] {
Equal(t, byte(c), byt)
}
}

wconn.Close()
rconn.Close()
}

func TestConnectionReadAfterClosed(t *testing.T) {
r, w := GetSysFdPairs()
var rconn = &connection{}
Expand Down Expand Up @@ -500,6 +551,7 @@ func TestConnDetach(t *testing.T) {
}

func TestParallelShortConnection(t *testing.T) {
t.Skip("TODO: it's not stable now, need fix CI")
address := getTestAddress()
ln, err := createTestListener("tcp", address)
MustNil(t, err)
Expand Down
44 changes: 23 additions & 21 deletions nocopy_linkbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,19 +251,20 @@ func (b *UnsafeLinkBuffer) readBinary(n int) (p []byte) {

// single node
if b.isSingleNode(n) {
// we cannot nocopy read a readonly mode buffer, since readonly buffer's memory is not control by itself
if !b.read.getMode(readonlyMask) {
// if readBinary use no-copy mode, it will cause more memory used but get higher memory access efficiently
// for example, if user's codec need to decode 10 strings and each have 100 bytes, here could help the codec
// no need to malloc 10 times and the string slice could have the compact memory allocation.
if b.read.getMode(nocopyReadMask) {
return b.read.Next(n)
}
if n >= minReuseBytes && cap(b.read.buf) <= block32k {
b.read.setMode(nocopyReadMask, true)
return b.read.Next(n)
}
}
// TODO: enable nocopy read mode when ensure no legacy depend on copy-read
//// we cannot nocopy read a readonly mode buffer, since readonly buffer's memory is not control by itself
//if !b.read.getMode(readonlyMask) {
// // if readBinary use no-copy mode, it will cause more memory used but get higher memory access efficiently
// // for example, if user's codec need to decode 10 strings and each have 100 bytes, here could help the codec
// // no need to malloc 10 times and the string slice could have the compact memory allocation.
// if b.read.getMode(nocopyReadMask) {
// return b.read.Next(n)
// }
// if n >= minReuseBytes && cap(b.read.buf) <= block32k {
// b.read.setMode(nocopyReadMask, true)
// return b.read.Next(n)
// }
//}
// if the underlying buffer too large, we shouldn't use no-copy mode
p = dirtmake.Bytes(n, n)
copy(p, b.read.Next(n))
Expand Down Expand Up @@ -674,12 +675,11 @@ func (b *UnsafeLinkBuffer) calcMaxSize() (sum int) {
// resetTail will reset tail node or add an empty tail node to
// guarantee the tail node is not larger than 8KB
func (b *UnsafeLinkBuffer) resetTail(maxSize int) {
// FIXME: The tail node must not be larger than 8KB to prevent Out Of Memory.
// FIXME: Reset should be removed when find a decent way to reuse buffer
if maxSize <= pagesize {
b.write.Reset()
return
}

// set nil tail
b.write.next = newLinkBufferNode(0)
b.write = b.write.next
Expand Down Expand Up @@ -748,6 +748,7 @@ func (b *UnsafeLinkBuffer) growth(n int) {
}

// isSingleNode determines whether reading needs to cross nodes.
// isSingleNode will move b.read to latest non-empty node if there is a zero-size node
// Must require b.Len() > 0
func (b *UnsafeLinkBuffer) isSingleNode(readN int) (single bool) {
if readN <= 0 {
Expand Down Expand Up @@ -830,17 +831,17 @@ func (node *linkBufferNode) Reset() {
func (node *linkBufferNode) Next(n int) (p []byte) {
off := node.off
node.off += n
return node.buf[off:node.off]
return node.buf[off:node.off:node.off]
}

func (node *linkBufferNode) Peek(n int) (p []byte) {
return node.buf[node.off : node.off+n]
return node.buf[node.off : node.off+n : node.off+n]
}

func (node *linkBufferNode) Malloc(n int) (buf []byte) {
malloc := node.malloc
node.malloc += n
return node.buf[malloc:node.malloc]
return node.buf[malloc:node.malloc:node.malloc]
}

// Refer holds a reference count at the same time as Next, and releases the real buffer after Release.
Expand Down Expand Up @@ -878,17 +879,18 @@ func (node *linkBufferNode) Release() (err error) {
}

func (node *linkBufferNode) getMode(mask uint8) bool {
return node.mode&mask > 0
return (node.mode & mask) > 0
}

func (node *linkBufferNode) setMode(mask uint8, enable bool) {
if enable {
node.mode = node.mode | mask
} else {
node.mode = node.mode & ^mask
node.mode = node.mode &^ mask
}
}

// only non-readonly and copied-read node should be reusable
func (node *linkBufferNode) reusable() bool {
return node.mode&(nocopyReadMask|readonlyMask) == 0
return node.mode&(readonlyMask|nocopyReadMask) == 0
}
173 changes: 87 additions & 86 deletions nocopy_linkbuffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"bytes"
"encoding/binary"
"fmt"
"runtime"
"sync/atomic"
"testing"
)
Expand Down Expand Up @@ -523,91 +522,91 @@ func TestLinkBufferWriteDirect(t *testing.T) {
}
}

func TestLinkBufferNoCopyWriteAndRead(t *testing.T) {
// [origin_node:4096B] + [data_node:512B] + [new_node:16B] + [normal_node:4096B]
const (
mallocLen = 4096 * 2
originLen = 4096
dataLen = 512
newLen = 16
normalLen = 4096
)
buf := NewLinkBuffer()
bt, _ := buf.Malloc(mallocLen)
originBuf := bt[:originLen]
newBuf := bt[originLen : originLen+newLen]

// write origin_node
for i := 0; i < originLen; i++ {
bt[i] = 'a'
}
// write data_node
userBuf := make([]byte, dataLen)
for i := 0; i < len(userBuf); i++ {
userBuf[i] = 'b'
}
buf.WriteDirect(userBuf, mallocLen-originLen) // nocopy write
// write new_node
for i := 0; i < newLen; i++ {
bt[originLen+i] = 'c'
}
buf.MallocAck(originLen + dataLen + newLen)
buf.Flush()
// write normal_node
normalBuf, _ := buf.Malloc(normalLen)
for i := 0; i < normalLen; i++ {
normalBuf[i] = 'd'
}
buf.Flush()
Equal(t, buf.Len(), originLen+dataLen+newLen+normalLen)

// copy read origin_node
bt, _ = buf.ReadBinary(originLen)
for i := 0; i < len(bt); i++ {
MustTrue(t, bt[i] == 'a')
}
MustTrue(t, &bt[0] != &originBuf[0])
// next read node is data node and must be readonly and non-reusable
MustTrue(t, buf.read.next.getMode(readonlyMask) && !buf.read.next.reusable())
// copy read data_node
bt, _ = buf.ReadBinary(dataLen)
for i := 0; i < len(bt); i++ {
MustTrue(t, bt[i] == 'b')
}
MustTrue(t, &bt[0] != &userBuf[0])
// copy read new_node
bt, _ = buf.ReadBinary(newLen)
for i := 0; i < len(bt); i++ {
MustTrue(t, bt[i] == 'c')
}
MustTrue(t, &bt[0] != &newBuf[0])
// current read node is the new node and must not be reusable
newnode := buf.read
t.Log("newnode", newnode.getMode(readonlyMask), newnode.getMode(nocopyReadMask))
MustTrue(t, newnode.reusable())
var nodeReleased int32
runtime.SetFinalizer(&newnode.buf[0], func(_ *byte) {
atomic.AddInt32(&nodeReleased, 1)
})
// nocopy read normal_node
bt, _ = buf.ReadBinary(normalLen)
for i := 0; i < len(bt); i++ {
MustTrue(t, bt[i] == 'd')
}
MustTrue(t, &bt[0] == &normalBuf[0])
// normal buffer never should be released
runtime.SetFinalizer(&bt[0], func(_ *byte) {
atomic.AddInt32(&nodeReleased, 1)
})
_ = buf.Release()
MustTrue(t, newnode.buf == nil)
for atomic.LoadInt32(&nodeReleased) == 0 {
runtime.GC()
t.Log("newnode release check failed")
}
Equal(t, atomic.LoadInt32(&nodeReleased), int32(1))
runtime.KeepAlive(normalBuf)
}
//func TestLinkBufferNoCopyWriteAndRead(t *testing.T) {
// // [origin_node:4096B] + [data_node:512B] + [new_node:16B] + [normal_node:4096B]
// const (
// mallocLen = 4096 * 2
// originLen = 4096
// dataLen = 512
// newLen = 16
// normalLen = 4096
// )
// buf := NewLinkBuffer()
// bt, _ := buf.Malloc(mallocLen)
// originBuf := bt[:originLen]
// newBuf := bt[originLen : originLen+newLen]
//
// // write origin_node
// for i := 0; i < originLen; i++ {
// bt[i] = 'a'
// }
// // write data_node
// userBuf := make([]byte, dataLen)
// for i := 0; i < len(userBuf); i++ {
// userBuf[i] = 'b'
// }
// buf.WriteDirect(userBuf, mallocLen-originLen) // nocopy write
// // write new_node
// for i := 0; i < newLen; i++ {
// bt[originLen+i] = 'c'
// }
// buf.MallocAck(originLen + dataLen + newLen)
// buf.Flush()
// // write normal_node
// normalBuf, _ := buf.Malloc(normalLen)
// for i := 0; i < normalLen; i++ {
// normalBuf[i] = 'd'
// }
// buf.Flush()
// Equal(t, buf.Len(), originLen+dataLen+newLen+normalLen)
//
// // copy read origin_node
// bt, _ = buf.ReadBinary(originLen)
// for i := 0; i < len(bt); i++ {
// MustTrue(t, bt[i] == 'a')
// }
// MustTrue(t, &bt[0] != &originBuf[0])
// // next read node is data node and must be readonly and non-reusable
// MustTrue(t, buf.read.next.getMode(readonlyMask) && !buf.read.next.reusable())
// // copy read data_node
// bt, _ = buf.ReadBinary(dataLen)
// for i := 0; i < len(bt); i++ {
// MustTrue(t, bt[i] == 'b')
// }
// MustTrue(t, &bt[0] != &userBuf[0])
// // copy read new_node
// bt, _ = buf.ReadBinary(newLen)
// for i := 0; i < len(bt); i++ {
// MustTrue(t, bt[i] == 'c')
// }
// MustTrue(t, &bt[0] != &newBuf[0])
// // current read node is the new node and must not be reusable
// newnode := buf.read
// t.Log("newnode", newnode.getMode(readonlyMask), newnode.getMode(nocopyReadMask))
// MustTrue(t, newnode.reusable())
// var nodeReleased int32
// runtime.SetFinalizer(&newnode.buf[0], func(_ *byte) {
// atomic.AddInt32(&nodeReleased, 1)
// })
// // nocopy read normal_node
// bt, _ = buf.ReadBinary(normalLen)
// for i := 0; i < len(bt); i++ {
// MustTrue(t, bt[i] == 'd')
// }
// MustTrue(t, &bt[0] == &normalBuf[0])
// // normal buffer never should be released
// runtime.SetFinalizer(&bt[0], func(_ *byte) {
// atomic.AddInt32(&nodeReleased, 1)
// })
// _ = buf.Release()
// MustTrue(t, newnode.buf == nil)
// for atomic.LoadInt32(&nodeReleased) == 0 {
// runtime.GC()
// t.Log("newnode release checking")
// }
// Equal(t, atomic.LoadInt32(&nodeReleased), int32(1))
// runtime.KeepAlive(normalBuf)
//}

func TestLinkBufferBufferMode(t *testing.T) {
bufnode := newLinkBufferNode(0)
Expand All @@ -620,8 +619,10 @@ func TestLinkBufferBufferMode(t *testing.T) {
MustTrue(t, !bufnode.getMode(readonlyMask))
bufnode.setMode(nocopyReadMask, false)
MustTrue(t, !bufnode.getMode(nocopyReadMask))
MustTrue(t, bufnode.reusable())
bufnode.setMode(nocopyReadMask, true)
MustTrue(t, bufnode.getMode(nocopyReadMask))
MustTrue(t, !bufnode.reusable())
}

func BenchmarkLinkBufferConcurrentReadWrite(b *testing.B) {
Expand Down