From 1d003a4be2bae92ff75aa1060b2163679d046bd8 Mon Sep 17 00:00:00 2001 From: idada Date: Sun, 22 Nov 2015 00:19:25 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8A=8A=E5=8F=AF=E9=87=8D=E7=94=A8=E7=9A=84Co?= =?UTF-8?q?deType=E7=A7=BB=E5=9B=9E=E6=A0=B9=E7=9B=AE=E5=BD=95=EF=BC=9BBuf?= =?UTF-8?q?ioCodeType=E5=BC=95=E5=85=A5sync.Pool=E4=BC=98=E5=8C=96?= =?UTF-8?q?=EF=BC=9B=E5=8A=A0=E5=85=A5ThreadSafe=E6=9B=BF=E4=BB=A3Session?= =?UTF-8?q?=E7=9A=84=E6=94=B6=E5=8F=91=E9=94=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- session_test.go => all_test.go | 75 +++++++++++++++++++++++--- api.go | 95 +++++++++++++++++++++++++++++--- bufio.go | 98 ++++++++++++++++++++++++++++++++++ example/codec/all_test.go | 65 +++++----------------- example/codec/bufio.go | 61 --------------------- example/codec/general.go | 51 ------------------ session.go | 6 +++ 7 files changed, 270 insertions(+), 181 deletions(-) rename session_test.go => all_test.go (62%) create mode 100644 bufio.go delete mode 100644 example/codec/bufio.go delete mode 100644 example/codec/general.go diff --git a/session_test.go b/all_test.go similarity index 62% rename from session_test.go rename to all_test.go index 7c61630..36b2a37 100644 --- a/session_test.go +++ b/all_test.go @@ -17,6 +17,14 @@ func init() { type TestCodec struct{} +type TestEncoder struct { + w io.Writer +} + +type TestDecoder struct { + r io.Reader +} + func (_ TestCodec) NewEncoder(w io.Writer) Encoder { return TestEncoder{w} } @@ -25,19 +33,11 @@ func (_ TestCodec) NewDecoder(r io.Reader) Decoder { return TestDecoder{r} } -type TestEncoder struct { - w io.Writer -} - func (encoder TestEncoder) Encode(msg interface{}) error { _, err := encoder.w.Write(msg.([]byte)) return err } -type TestDecoder struct { - r io.Reader -} - func (decoder TestDecoder) Decode(msg interface{}) error { _, err := io.ReadFull(decoder.r, msg.([]byte)) return err @@ -105,3 +105,62 @@ func BytesTest(t *testing.T, session *Session) { func Test_Bytes(t *testing.T) { SessionTest(t, TestCodec{}, BytesTest) } + +func Test_Bufio_Bytes(t *testing.T) { + SessionTest(t, Bufio(TestCodec{}), BytesTest) +} + +func Test_ThreadSafe_Bytes(t *testing.T) { + SessionTest(t, ThreadSafe(TestCodec{}), BytesTest) +} + +func Test_ThreadSafe_Bufio_Bytes(t *testing.T) { + SessionTest(t, ThreadSafe(Bufio(TestCodec{})), BytesTest) +} + +type TestObject struct { + X, Y, Z int +} + +func RandObject() TestObject { + return TestObject{ + X: rand.Int(), Y: rand.Int(), Z: rand.Int(), + } +} + +func ObjectTest(t *testing.T, session *Session) { + for i := 0; i < 2000; i++ { + msg1 := RandObject() + err := session.Send(&msg1) + unitest.NotError(t, err) + + var msg2 TestObject + err = session.Receive(&msg2) + unitest.NotError(t, err) + unitest.Pass(t, msg1 == msg2) + } +} + +func Test_Gob(t *testing.T) { + SessionTest(t, Gob(), ObjectTest) +} + +func Test_Bufio_Gob(t *testing.T) { + SessionTest(t, Bufio(Gob()), ObjectTest) +} + +func Test_Json(t *testing.T) { + SessionTest(t, Json(), ObjectTest) +} + +func Test_Bufio_Json(t *testing.T) { + SessionTest(t, Bufio(Json()), ObjectTest) +} + +func Test_Xml(t *testing.T) { + SessionTest(t, Xml(), ObjectTest) +} + +func Test_Bufio_Xml(t *testing.T) { + SessionTest(t, Bufio(Xml()), ObjectTest) +} diff --git a/api.go b/api.go index 2b588f9..440029c 100644 --- a/api.go +++ b/api.go @@ -3,19 +3,16 @@ package link import ( "io" "net" + "sync" "time" + + "encoding/gob" + "encoding/json" + "encoding/xml" ) type CodecType interface { - EncodeType - DecodeType -} - -type EncodeType interface { NewEncoder(w io.Writer) Encoder -} - -type DecodeType interface { NewDecoder(r io.Reader) Decoder } @@ -27,6 +24,10 @@ type Decoder interface { Decode(msg interface{}) error } +type Disposeable interface { + Dispose() +} + func Serve(network, address string, codecType CodecType) (*Server, error) { listener, err := net.Listen(network, address) if err != nil { @@ -50,3 +51,81 @@ func ConnectTimeout(network, address string, timeout time.Duration, codecType Co } return NewSession(conn, codecType), nil } + +func ThreadSafe(base CodecType) CodecType { + return safeCodecType{ + base: base, + } +} + +type safeCodecType struct { + base CodecType +} + +type safeDecoder struct { + sync.Mutex + base Decoder +} + +type safeEncoder struct { + sync.Mutex + base Encoder +} + +func (codecType safeCodecType) NewEncoder(w io.Writer) Encoder { + return &safeEncoder{ + base: codecType.base.NewEncoder(w), + } +} + +func (codecType safeCodecType) NewDecoder(r io.Reader) Decoder { + return &safeDecoder{ + base: codecType.base.NewDecoder(r), + } +} + +func (encoder *safeEncoder) Encode(msg interface{}) error { + encoder.Lock() + defer encoder.Unlock() + return encoder.base.Encode(msg) +} + +func (decoder *safeDecoder) Decode(msg interface{}) error { + decoder.Lock() + defer decoder.Unlock() + return decoder.base.Decode(msg) +} + +func Gob() CodecType { + return &genCodecType{ + func(w io.Writer) Encoder { return gob.NewEncoder(w) }, + func(r io.Reader) Decoder { return gob.NewDecoder(r) }, + } +} + +func Json() CodecType { + return &genCodecType{ + func(w io.Writer) Encoder { return json.NewEncoder(w) }, + func(r io.Reader) Decoder { return json.NewDecoder(r) }, + } +} + +func Xml() CodecType { + return &genCodecType{ + func(w io.Writer) Encoder { return xml.NewEncoder(w) }, + func(r io.Reader) Decoder { return xml.NewDecoder(r) }, + } +} + +type genCodecType struct { + newEncoder func(io.Writer) Encoder + newDecoder func(io.Reader) Decoder +} + +func (codecType *genCodecType) NewEncoder(w io.Writer) Encoder { + return codecType.newEncoder(w) +} + +func (codecType *genCodecType) NewDecoder(r io.Reader) Decoder { + return codecType.newDecoder(r) +} diff --git a/bufio.go b/bufio.go new file mode 100644 index 0000000..1dc19c8 --- /dev/null +++ b/bufio.go @@ -0,0 +1,98 @@ +package link + +import ( + "bufio" + "io" + "sync" +) + +type BufioCodecType struct { + base CodecType + readBufferSize int + writeBufferSize int + readerPool sync.Pool + writerPool sync.Pool +} + +func Bufio(base CodecType) *BufioCodecType { + return BufioSize(base, 4096, 4096) +} + +func BufioSize(base CodecType, readBufferSize, writeBufferSize int) *BufioCodecType { + return &BufioCodecType{ + base: base, + readBufferSize: readBufferSize, + writeBufferSize: writeBufferSize, + } +} + +func (codecType *BufioCodecType) NewEncoder(w io.Writer) Encoder { + if codecType.writeBufferSize == 0 { + return codecType.base.NewEncoder(w) + } + bw, ok := codecType.writerPool.Get().(*bufio.Writer) + if ok { + bw.Reset(w) + } else { + bw = bufio.NewWriterSize(w, codecType.writeBufferSize) + } + return &bufioEncoder{ + writer: bw, + pool: &codecType.writerPool, + base: codecType.base.NewEncoder(bw), + } +} + +func (codecType *BufioCodecType) NewDecoder(r io.Reader) Decoder { + if codecType.readBufferSize == 0 { + return codecType.base.NewDecoder(r) + } + br, ok := codecType.readerPool.Get().(*bufio.Reader) + if ok { + br.Reset(r) + } else { + br = bufio.NewReaderSize(r, codecType.readBufferSize) + } + return &bufioDecoder{ + reader: br, + pool: &codecType.readerPool, + base: codecType.base.NewDecoder(br), + } +} + +type bufioEncoder struct { + writer *bufio.Writer + pool *sync.Pool + base Encoder +} + +func (encoder *bufioEncoder) Encode(msg interface{}) error { + if err := encoder.base.Encode(msg); err != nil { + return err + } + return encoder.writer.Flush() +} + +func (encoder *bufioEncoder) Dispose() { + if d, ok := encoder.base.(Disposeable); ok { + d.Dispose() + } + encoder.pool.Put(encoder.writer) +} + +type bufioDecoder struct { + reader *bufio.Reader + pool *sync.Pool + base Decoder +} + +func (decoder *bufioDecoder) Decode(msg interface{}) error { + return decoder.base.Decode(msg) +} + +func (decoder *bufioDecoder) Dispose() { + if d, ok := decoder.base.(Disposeable); ok { + d.Dispose() + } + decoder.pool.Put(decoder.reader) +} diff --git a/example/codec/all_test.go b/example/codec/all_test.go index 74b3b10..e419138 100644 --- a/example/codec/all_test.go +++ b/example/codec/all_test.go @@ -4,7 +4,6 @@ import ( "bytes" "io" "math/rand" - "runtime/pprof" "sync" "testing" "time" @@ -86,8 +85,6 @@ func SessionTest(t *testing.T, codecType link.CodecType, test func(*testing.T, * server.Stop() serverWait.Wait() - - MakeSureSessionGoroutineExit(t) } func BytesTest(t *testing.T, session *link.Session) { @@ -108,7 +105,7 @@ func Test_Bytes(t *testing.T) { } func Test_Bufio_Bytes(t *testing.T) { - SessionTest(t, Bufio(Bytes(Uint16BE)), BytesTest) + SessionTest(t, link.Bufio(Bytes(Uint16BE)), BytesTest) } func Test_Packet_Bytes(t *testing.T) { @@ -116,7 +113,7 @@ func Test_Packet_Bytes(t *testing.T) { } func Test_Bufio_Packet_Bytes(t *testing.T) { - SessionTest(t, Bufio(Packet(Uint16BE, Bytes(Uint16BE))), BytesTest) + SessionTest(t, link.Bufio(Packet(Uint16BE, Bytes(Uint16BE))), BytesTest) } func StringTest(t *testing.T, session *link.Session) { @@ -137,7 +134,7 @@ func Test_String(t *testing.T) { } func Test_Bufio_String(t *testing.T) { - SessionTest(t, Bufio(String(Uint16BE)), StringTest) + SessionTest(t, link.Bufio(String(Uint16BE)), StringTest) } func Test_Packet_String(t *testing.T) { @@ -145,7 +142,7 @@ func Test_Packet_String(t *testing.T) { } func Test_Bufio_Packet_String(t *testing.T) { - SessionTest(t, Bufio(Packet(Uint16BE, String(Uint16BE))), StringTest) + SessionTest(t, link.Bufio(Packet(Uint16BE, String(Uint16BE))), StringTest) } func ObjectTest(t *testing.T, session *link.Session) { @@ -161,52 +158,28 @@ func ObjectTest(t *testing.T, session *link.Session) { } } -func Test_Gob(t *testing.T) { - SessionTest(t, Gob(), ObjectTest) -} - -func Test_Bufio_Gob(t *testing.T) { - SessionTest(t, Bufio(Gob()), ObjectTest) -} - func Test_Packet_Gob(t *testing.T) { - SessionTest(t, Packet(Uint16BE, Gob()), ObjectTest) + SessionTest(t, Packet(Uint16BE, link.Gob()), ObjectTest) } func Test_Bufio_Packet_Gob(t *testing.T) { - SessionTest(t, Bufio(Packet(Uint16BE, Gob())), ObjectTest) -} - -func Test_Json(t *testing.T) { - SessionTest(t, Json(), ObjectTest) -} - -func Test_Bufio_Json(t *testing.T) { - SessionTest(t, Bufio(Json()), ObjectTest) + SessionTest(t, link.Bufio(Packet(Uint16BE, link.Gob())), ObjectTest) } func Test_Packet_Json(t *testing.T) { - SessionTest(t, Packet(Uint16BE, Json()), ObjectTest) + SessionTest(t, Packet(Uint16BE, link.Json()), ObjectTest) } func Test_Bufio_Packet_Json(t *testing.T) { - SessionTest(t, Bufio(Packet(Uint16BE, Json())), ObjectTest) -} - -func Test_Xml(t *testing.T) { - SessionTest(t, Xml(), ObjectTest) -} - -func Test_Bufio_Xml(t *testing.T) { - SessionTest(t, Bufio(Xml()), ObjectTest) + SessionTest(t, link.Bufio(Packet(Uint16BE, link.Json())), ObjectTest) } func Test_Packet_Xml(t *testing.T) { - SessionTest(t, Packet(Uint16BE, Xml()), ObjectTest) + SessionTest(t, Packet(Uint16BE, link.Xml()), ObjectTest) } func Test_Bufio_Packet_Xml(t *testing.T) { - SessionTest(t, Bufio(Packet(Uint16BE, Xml())), ObjectTest) + SessionTest(t, link.Bufio(Packet(Uint16BE, link.Xml())), ObjectTest) } func Test_SelfCodec(t *testing.T) { @@ -214,7 +187,7 @@ func Test_SelfCodec(t *testing.T) { } func Test_Bufio_SelfCodec(t *testing.T) { - SessionTest(t, Bufio(SelfCodec()), ObjectTest) + SessionTest(t, link.Bufio(SelfCodec()), ObjectTest) } func Test_Packet_SelfCodec(t *testing.T) { @@ -222,19 +195,5 @@ func Test_Packet_SelfCodec(t *testing.T) { } func Test_Bufio_Packet_SelfCodec(t *testing.T) { - SessionTest(t, Bufio(Packet(Uint16BE, SelfCodec())), ObjectTest) -} - -func MakeSureSessionGoroutineExit(t *testing.T) { - buff := new(bytes.Buffer) - goroutines := pprof.Lookup("goroutine") - - if err := goroutines.WriteTo(buff, 2); err != nil { - t.Fatalf("Dump goroutine failed: %v", err) - } - - if n := bytes.Index(buff.Bytes(), []byte("link.HandlerFunc.Handle")); n >= 0 { - t.Log(buff.String()) - t.Fatalf("Some handler goroutine running") - } + SessionTest(t, link.Bufio(Packet(Uint16BE, SelfCodec())), ObjectTest) } diff --git a/example/codec/bufio.go b/example/codec/bufio.go deleted file mode 100644 index 23f0d50..0000000 --- a/example/codec/bufio.go +++ /dev/null @@ -1,61 +0,0 @@ -package codec - -import ( - "bufio" - "io" - - "github.com/funny/link" -) - -const DEFAULT_BUFFER_SIZE = 4096 - -type BufioCodecType struct { - Base link.CodecType - ReadBufferSize int - WriteBufferSize int -} - -func Bufio(base link.CodecType) *BufioCodecType { - return &BufioCodecType{ - base, - DEFAULT_BUFFER_SIZE, - DEFAULT_BUFFER_SIZE, - } -} - -func (codecType *BufioCodecType) NewEncoder(w io.Writer) link.Encoder { - bw := bufio.NewWriterSize(w, codecType.WriteBufferSize) - codec := &bufioEncoder{ - Writer: bw, - Base: codecType.Base.NewEncoder(bw), - } - return codec -} - -func (codecType *BufioCodecType) NewDecoder(r io.Reader) link.Decoder { - return &bufioDecoder{ - Base: codecType.Base.NewDecoder( - bufio.NewReaderSize(r, codecType.ReadBufferSize), - ), - } -} - -type bufioEncoder struct { - Base link.Encoder - Writer *bufio.Writer -} - -func (encoder *bufioEncoder) Encode(msg interface{}) error { - if err := encoder.Base.Encode(msg); err != nil { - return err - } - return encoder.Writer.Flush() -} - -type bufioDecoder struct { - Base link.Decoder -} - -func (decoder *bufioDecoder) Decode(msg interface{}) error { - return decoder.Base.Decode(msg) -} diff --git a/example/codec/general.go b/example/codec/general.go deleted file mode 100644 index e5f08f1..0000000 --- a/example/codec/general.go +++ /dev/null @@ -1,51 +0,0 @@ -package codec - -import ( - "encoding/gob" - "encoding/json" - "encoding/xml" - "io" - - "github.com/funny/link" -) - -func Gob() link.CodecType { - return &genCodecType{ - func(w io.Writer) link.Encoder { return gob.NewEncoder(w) }, - func(r io.Reader) link.Decoder { return gob.NewDecoder(r) }, - } -} - -func Json() link.CodecType { - return &genCodecType{ - func(w io.Writer) link.Encoder { return json.NewEncoder(w) }, - func(r io.Reader) link.Decoder { return json.NewDecoder(r) }, - } -} - -func Xml() link.CodecType { - return &genCodecType{ - func(w io.Writer) link.Encoder { return xml.NewEncoder(w) }, - func(r io.Reader) link.Decoder { return xml.NewDecoder(r) }, - } -} - -func Mix(encodeType link.EncodeType, decodeType link.DecodeType) link.CodecType { - return &genCodecType{ - encodeType.NewEncoder, - decodeType.NewDecoder, - } -} - -type genCodecType struct { - newEncoder func(io.Writer) link.Encoder - newDecoder func(io.Reader) link.Decoder -} - -func (codecType *genCodecType) NewEncoder(w io.Writer) link.Encoder { - return codecType.newEncoder(w) -} - -func (codecType *genCodecType) NewDecoder(r io.Reader) link.Decoder { - return codecType.newDecoder(r) -} diff --git a/session.go b/session.go index 47cdb7c..47c1c4c 100644 --- a/session.go +++ b/session.go @@ -57,6 +57,12 @@ func (session *Session) Close() { close(session.closeChan) } session.conn.Close() + if d, ok := session.encoder.(Disposeable); ok { + d.Dispose() + } + if d, ok := session.decoder.(Disposeable); ok { + d.Dispose() + } } }