Skip to content

Commit

Permalink
把可重用的CodeType移回根目录;BufioCodeType引入sync.Pool优化;加入ThreadSafe替代Session的收发锁
Browse files Browse the repository at this point in the history
  • Loading branch information
bg5sbk committed Nov 21, 2015
1 parent c040e4b commit 1d003a4
Show file tree
Hide file tree
Showing 7 changed files with 270 additions and 181 deletions.
75 changes: 67 additions & 8 deletions session_test.go → all_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ func init() {

type TestCodec struct{}

type TestEncoder struct {
w io.Writer
}

type TestDecoder struct {
r io.Reader
}

func (_ TestCodec) NewEncoder(w io.Writer) Encoder {
return TestEncoder{w}
}
Expand All @@ -25,19 +33,11 @@ func (_ TestCodec) NewDecoder(r io.Reader) Decoder {
return TestDecoder{r}
}

type TestEncoder struct {
w io.Writer
}

func (encoder TestEncoder) Encode(msg interface{}) error {
_, err := encoder.w.Write(msg.([]byte))
return err
}

type TestDecoder struct {
r io.Reader
}

func (decoder TestDecoder) Decode(msg interface{}) error {
_, err := io.ReadFull(decoder.r, msg.([]byte))
return err
Expand Down Expand Up @@ -105,3 +105,62 @@ func BytesTest(t *testing.T, session *Session) {
func Test_Bytes(t *testing.T) {
SessionTest(t, TestCodec{}, BytesTest)
}

func Test_Bufio_Bytes(t *testing.T) {
SessionTest(t, Bufio(TestCodec{}), BytesTest)
}

func Test_ThreadSafe_Bytes(t *testing.T) {
SessionTest(t, ThreadSafe(TestCodec{}), BytesTest)
}

func Test_ThreadSafe_Bufio_Bytes(t *testing.T) {
SessionTest(t, ThreadSafe(Bufio(TestCodec{})), BytesTest)
}

type TestObject struct {
X, Y, Z int
}

func RandObject() TestObject {
return TestObject{
X: rand.Int(), Y: rand.Int(), Z: rand.Int(),
}
}

func ObjectTest(t *testing.T, session *Session) {
for i := 0; i < 2000; i++ {
msg1 := RandObject()
err := session.Send(&msg1)
unitest.NotError(t, err)

var msg2 TestObject
err = session.Receive(&msg2)
unitest.NotError(t, err)
unitest.Pass(t, msg1 == msg2)
}
}

func Test_Gob(t *testing.T) {
SessionTest(t, Gob(), ObjectTest)
}

func Test_Bufio_Gob(t *testing.T) {
SessionTest(t, Bufio(Gob()), ObjectTest)
}

func Test_Json(t *testing.T) {
SessionTest(t, Json(), ObjectTest)
}

func Test_Bufio_Json(t *testing.T) {
SessionTest(t, Bufio(Json()), ObjectTest)
}

func Test_Xml(t *testing.T) {
SessionTest(t, Xml(), ObjectTest)
}

func Test_Bufio_Xml(t *testing.T) {
SessionTest(t, Bufio(Xml()), ObjectTest)
}
95 changes: 87 additions & 8 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,16 @@ package link
import (
"io"
"net"
"sync"
"time"

"encoding/gob"
"encoding/json"
"encoding/xml"
)

type CodecType interface {
EncodeType
DecodeType
}

type EncodeType interface {
NewEncoder(w io.Writer) Encoder
}

type DecodeType interface {
NewDecoder(r io.Reader) Decoder
}

Expand All @@ -27,6 +24,10 @@ type Decoder interface {
Decode(msg interface{}) error
}

type Disposeable interface {
Dispose()
}

func Serve(network, address string, codecType CodecType) (*Server, error) {
listener, err := net.Listen(network, address)
if err != nil {
Expand All @@ -50,3 +51,81 @@ func ConnectTimeout(network, address string, timeout time.Duration, codecType Co
}
return NewSession(conn, codecType), nil
}

func ThreadSafe(base CodecType) CodecType {
return safeCodecType{
base: base,
}
}

type safeCodecType struct {
base CodecType
}

type safeDecoder struct {
sync.Mutex
base Decoder
}

type safeEncoder struct {
sync.Mutex
base Encoder
}

func (codecType safeCodecType) NewEncoder(w io.Writer) Encoder {
return &safeEncoder{
base: codecType.base.NewEncoder(w),
}
}

func (codecType safeCodecType) NewDecoder(r io.Reader) Decoder {
return &safeDecoder{
base: codecType.base.NewDecoder(r),
}
}

func (encoder *safeEncoder) Encode(msg interface{}) error {
encoder.Lock()
defer encoder.Unlock()
return encoder.base.Encode(msg)
}

func (decoder *safeDecoder) Decode(msg interface{}) error {
decoder.Lock()
defer decoder.Unlock()
return decoder.base.Decode(msg)
}

func Gob() CodecType {
return &genCodecType{
func(w io.Writer) Encoder { return gob.NewEncoder(w) },
func(r io.Reader) Decoder { return gob.NewDecoder(r) },
}
}

func Json() CodecType {
return &genCodecType{
func(w io.Writer) Encoder { return json.NewEncoder(w) },
func(r io.Reader) Decoder { return json.NewDecoder(r) },
}
}

func Xml() CodecType {
return &genCodecType{
func(w io.Writer) Encoder { return xml.NewEncoder(w) },
func(r io.Reader) Decoder { return xml.NewDecoder(r) },
}
}

type genCodecType struct {
newEncoder func(io.Writer) Encoder
newDecoder func(io.Reader) Decoder
}

func (codecType *genCodecType) NewEncoder(w io.Writer) Encoder {
return codecType.newEncoder(w)
}

func (codecType *genCodecType) NewDecoder(r io.Reader) Decoder {
return codecType.newDecoder(r)
}
98 changes: 98 additions & 0 deletions bufio.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package link

import (
"bufio"
"io"
"sync"
)

type BufioCodecType struct {
base CodecType
readBufferSize int
writeBufferSize int
readerPool sync.Pool
writerPool sync.Pool
}

func Bufio(base CodecType) *BufioCodecType {
return BufioSize(base, 4096, 4096)
}

func BufioSize(base CodecType, readBufferSize, writeBufferSize int) *BufioCodecType {
return &BufioCodecType{
base: base,
readBufferSize: readBufferSize,
writeBufferSize: writeBufferSize,
}
}

func (codecType *BufioCodecType) NewEncoder(w io.Writer) Encoder {
if codecType.writeBufferSize == 0 {
return codecType.base.NewEncoder(w)
}
bw, ok := codecType.writerPool.Get().(*bufio.Writer)
if ok {
bw.Reset(w)
} else {
bw = bufio.NewWriterSize(w, codecType.writeBufferSize)
}
return &bufioEncoder{
writer: bw,
pool: &codecType.writerPool,
base: codecType.base.NewEncoder(bw),
}
}

func (codecType *BufioCodecType) NewDecoder(r io.Reader) Decoder {
if codecType.readBufferSize == 0 {
return codecType.base.NewDecoder(r)
}
br, ok := codecType.readerPool.Get().(*bufio.Reader)
if ok {
br.Reset(r)
} else {
br = bufio.NewReaderSize(r, codecType.readBufferSize)
}
return &bufioDecoder{
reader: br,
pool: &codecType.readerPool,
base: codecType.base.NewDecoder(br),
}
}

type bufioEncoder struct {
writer *bufio.Writer
pool *sync.Pool
base Encoder
}

func (encoder *bufioEncoder) Encode(msg interface{}) error {
if err := encoder.base.Encode(msg); err != nil {
return err
}
return encoder.writer.Flush()
}

func (encoder *bufioEncoder) Dispose() {
if d, ok := encoder.base.(Disposeable); ok {
d.Dispose()
}
encoder.pool.Put(encoder.writer)
}

type bufioDecoder struct {
reader *bufio.Reader
pool *sync.Pool
base Decoder
}

func (decoder *bufioDecoder) Decode(msg interface{}) error {
return decoder.base.Decode(msg)
}

func (decoder *bufioDecoder) Dispose() {
if d, ok := decoder.base.(Disposeable); ok {
d.Dispose()
}
decoder.pool.Put(decoder.reader)
}
Loading

0 comments on commit 1d003a4

Please sign in to comment.