From dc2612249c254b4347d638badd54728b862266bb Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Wed, 6 Nov 2019 17:28:11 +0800 Subject: [PATCH] Add implementation of LengthFieldBasedFrameCodec --- README.md | 2 +- codec.go | 191 +++++++++++++++++++++++++++++++++++++++++++++++++ errors.go | 14 ++-- gnet.go | 2 +- gnet_server.go | 2 +- options.go | 4 +- 6 files changed, 205 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index d9f445ca4..ea503361c 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,7 @@ The goal of this project is to create a server framework for Go that performs on - [x] Supporting asynchronous write operation - [x] Flexible ticker event - [x] SO_REUSEPORT socket option -- [ ] Codec implementations to encode/decode TCP frames, referencing [netty codec](https://github.com/netty/netty/tree/4.1/codec/src/main/java/io/netty/handler/codec) +- [ ] Codec implementations to encode/decode TCP stream to frame, referencing [netty codec](https://github.com/netty/netty/tree/4.1/codec/src/main/java/io/netty/handler/codec) - [ ] Additional load-balancing algorithms: Random, Least-Connections, Consistent-hashing and so on - [ ] New event-notification mechanism: IOCP on Windows platform - [ ] TLS support diff --git a/codec.go b/codec.go index eefaba862..54d2cd970 100644 --- a/codec.go +++ b/codec.go @@ -1,13 +1,23 @@ +// Copyright 2019 Andy Pan. All rights reserved. +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file. + package gnet import ( "bytes" + "encoding/binary" + "fmt" ) +// CRLFByte represents a byte of CRLF. var CRLFByte = byte('\n') +// ICodec is the interface of gnet codec. type ICodec interface { + // Encode encodes frames upon server responses into TCP stream. Encode(buf []byte) ([]byte, error) + // Encode decodes frames from TCP stream via specific implementation. Decode(c Conn) ([]byte, error) } @@ -18,26 +28,32 @@ type ICodec interface { // Encode(buf []byte) ([]byte, error) //} +// BuiltInFrameCodec is the built-in codec which will be assign to gnet server when customized codec is not set up. type BuiltInFrameCodec struct { } +// Encode ... func (cc *BuiltInFrameCodec) Encode(buf []byte) ([]byte, error) { return buf, nil } +// Decode ... func (cc *BuiltInFrameCodec) Decode(c Conn) ([]byte, error) { buf := c.Read() c.ResetBuffer() return buf, nil } +// LineBasedFrameCodec encodes/decodes line-separated frames into/from TCP stream. type LineBasedFrameCodec struct { } +// Encode ... func (cc *LineBasedFrameCodec) Encode(buf []byte) ([]byte, error) { return append(buf, CRLFByte), nil } +// Decode ... func (cc *LineBasedFrameCodec) Decode(c Conn) ([]byte, error) { buf := c.Read() idx := bytes.IndexByte(buf, CRLFByte) @@ -48,14 +64,17 @@ func (cc *LineBasedFrameCodec) Decode(c Conn) ([]byte, error) { return buf[:idx], nil } +// DelimiterBasedFrameCodec encodes/decodes specific-delimiter-separated frames into/from TCP stream. type DelimiterBasedFrameCodec struct { Delimiter byte } +// Encode ... func (cc *DelimiterBasedFrameCodec) Encode(buf []byte) ([]byte, error) { return append(buf, cc.Delimiter), nil } +// Decode ... func (cc *DelimiterBasedFrameCodec) Decode(c Conn) ([]byte, error) { buf := c.Read() idx := bytes.IndexByte(buf, cc.Delimiter) @@ -66,10 +85,12 @@ func (cc *DelimiterBasedFrameCodec) Decode(c Conn) ([]byte, error) { return buf[:idx], nil } +// FixedLengthFrameCodec encodes/decodes fixed-length-separated frames into/from TCP stream. type FixedLengthFrameCodec struct { FrameLength int } +// Encode ... func (cc *FixedLengthFrameCodec) Encode(buf []byte) ([]byte, error) { if len(buf)%cc.FrameLength != 0 { return nil, ErrInvalidFixedLength @@ -77,6 +98,7 @@ func (cc *FixedLengthFrameCodec) Encode(buf []byte) ([]byte, error) { return buf, nil } +// Decode ... func (cc *FixedLengthFrameCodec) Decode(c Conn) ([]byte, error) { size, buf := c.ReadN(cc.FrameLength) if size == 0 { @@ -85,5 +107,174 @@ func (cc *FixedLengthFrameCodec) Decode(c Conn) ([]byte, error) { return buf, nil } +// LengthFieldBasedFrameCodec is the refactoring from https://github.com/smallnest/goframe/blob/master/length_field_based_frameconn.go, licensed by Apache License 2.0. +// It encodes/decodes frames into/from TCP stream with value of the length field in the message. +// Original implementation: https://github.com/netty/netty/blob/4.1/codec/src/main/java/io/netty/handler/codec/LengthFieldBasedFrameDecoder.java type LengthFieldBasedFrameCodec struct { + encoderConfig EncoderConfig + decoderConfig DecoderConfig +} + +// EncoderConfig config for encoder. +type EncoderConfig struct { + // ByteOrder is the ByteOrder of the length field. + ByteOrder binary.ByteOrder + // LengthFieldLength is the length of the length field. + LengthFieldLength int + // LengthAdjustment is the compensation value to add to the value of the length field + LengthAdjustment int + // LengthIncludesLengthFieldLength is true, the length of the prepended length field is added to the value of the prepended length field + LengthIncludesLengthFieldLength bool +} + +// DecoderConfig config for decoder. +type DecoderConfig struct { + // ByteOrder is the ByteOrder of the length field. + ByteOrder binary.ByteOrder + // LengthFieldOffset is the offset of the length field + LengthFieldOffset int + // LengthFieldLength is the length of the length field + LengthFieldLength int + // LengthAdjustment is the compensation value to add to the value of the length field + LengthAdjustment int + // InitialBytesToStrip is the number of first bytes to strip out from the decoded frame + InitialBytesToStrip int +} + +// Encode ... +func (cc *LengthFieldBasedFrameCodec) Encode(buf []byte) (out []byte, err error) { + length := len(buf) + cc.encoderConfig.LengthAdjustment + if cc.encoderConfig.LengthIncludesLengthFieldLength { + length += cc.encoderConfig.LengthFieldLength + } + + if length < 0 { + return nil, ErrTooLessLength + } + + switch cc.encoderConfig.LengthFieldLength { + case 1: + if length >= 256 { + return nil, fmt.Errorf("length does not fit into a byte: %d", length) + } + out = []byte{byte(length)} + case 2: + if length >= 65536 { + return nil, fmt.Errorf("length does not fit into a short integer: %d", length) + } + out = make([]byte, 2) + cc.encoderConfig.ByteOrder.PutUint16(out, uint16(length)) + case 3: + if length >= 16777216 { + return nil, fmt.Errorf("length does not fit into a medium integer: %d", length) + } + out = writeUint24(cc.encoderConfig.ByteOrder, length) + case 4: + out = make([]byte, 4) + cc.encoderConfig.ByteOrder.PutUint32(out, uint32(length)) + case 8: + out = make([]byte, 8) + cc.encoderConfig.ByteOrder.PutUint64(out, uint64(length)) + default: + return nil, ErrUnsupportedLength + } + + out = append(out, buf...) + return +} + +func writeUint24(byteOrder binary.ByteOrder, v int) []byte { + b := make([]byte, 3) + if byteOrder == binary.LittleEndian { + b[0] = byte(v) + b[1] = byte(v >> 8) + b[2] = byte(v >> 16) + } else { + b[2] = byte(v) + b[1] = byte(v >> 8) + b[0] = byte(v >> 16) + } + return b +} + +// Decode ... +func (cc *LengthFieldBasedFrameCodec) Decode(c Conn) ([]byte, error) { + var size int + var header []byte + if cc.decoderConfig.LengthFieldOffset > 0 { //discard header(offset) + size, header = c.ReadN(cc.decoderConfig.LengthFieldOffset) + if size == 0 { + return nil, ErrUnexpectedEOF + } + } + + lenBuf, frameLength, err := cc.getUnadjustedFrameLength(c) + if err != nil { + return nil, err + } + + if cc.decoderConfig.LengthAdjustment > 0 { //discard adjust header + size, _ := c.ReadN(cc.decoderConfig.LengthAdjustment) + if size == 0 { + return nil, ErrUnexpectedEOF + } + } + + // real message length + msgLength := int(frameLength) + cc.decoderConfig.LengthAdjustment + size, msg := c.ReadN(msgLength) + if size == 0 { + return nil, ErrUnexpectedEOF + } + + fullMessage := make([]byte, len(header)+len(lenBuf)+msgLength) + copy(fullMessage, header) + copy(fullMessage[len(header):], lenBuf) + copy(fullMessage[len(header)+len(lenBuf):], msg) + return fullMessage[cc.decoderConfig.InitialBytesToStrip:], nil +} + +func (cc *LengthFieldBasedFrameCodec) getUnadjustedFrameLength(c Conn) (lenBuf []byte, n uint64, err error) { + switch cc.decoderConfig.LengthFieldLength { + case 1: + size, b := c.ReadN(1) + if size == 0 { + err = ErrUnexpectedEOF + } + return b, uint64(b[0]), err + case 2: + size, lenBuf := c.ReadN(2) + if size == 0 { + return nil, 0, ErrUnexpectedEOF + } + return lenBuf, uint64(cc.decoderConfig.ByteOrder.Uint16(lenBuf)), nil + case 3: + size, lenBuf := c.ReadN(3) + if size == 0 { + return nil, 0, ErrUnexpectedEOF + } + return lenBuf, readUint24(cc.decoderConfig.ByteOrder, lenBuf), nil + case 4: + size, lenBuf := c.ReadN(4) + if size == 0 { + return nil, 0, ErrUnexpectedEOF + } + return lenBuf, uint64(cc.decoderConfig.ByteOrder.Uint32(lenBuf)), nil + case 8: + size, lenBuf := c.ReadN(8) + if size == 0 { + return nil, 0, ErrUnexpectedEOF + } + return lenBuf, uint64(cc.decoderConfig.ByteOrder.Uint64(lenBuf)), nil + default: + return nil, 0, ErrUnsupportedLength + } +} + +func readUint24(byteOrder binary.ByteOrder, b []byte) uint64 { + _ = b[2] + if byteOrder == binary.LittleEndian { + return uint64(b[0]) | uint64(b[1])<<8 | uint64(b[2])<<16 + } + return uint64(b[2]) | uint64(b[1])<<8 | uint64(b[0])<<16 } diff --git a/errors.go b/errors.go index 86647212e..e1f8d6ef5 100644 --- a/errors.go +++ b/errors.go @@ -3,14 +3,18 @@ package gnet import "errors" var ( - // errShutdown indicates this server is closing. + // errShutdown server is closing. errShutdown = errors.New("server is going to be shutdown") - // ErrInvalidFixedLength ... + // ErrInvalidFixedLength invalid fixed length. ErrInvalidFixedLength = errors.New("invalid fixed length of bytes") - // ErrUnexpectedEOF ... + // ErrUnexpectedEOF no enough data to read. ErrUnexpectedEOF = errors.New("there is no enough data") - // ErrDelimiterNotFound ... + // ErrDelimiterNotFound no such a delimiter. ErrDelimiterNotFound = errors.New("there is no such a delimiter") - // ErrDelimiterNotFound ... + // ErrCRLFNotFound CRLF not found. ErrCRLFNotFound = errors.New("there is no CRLF") + // ErrUnsupportedLength unsupported lengthFieldLength. + ErrUnsupportedLength = errors.New("unsupported lengthFieldLength. (expected: 1, 2, 3, 4, or 8)") + // ErrTooLessLength adjusted frame length is less than zero. + ErrTooLessLength = errors.New("adjusted frame length is less than zero") ) diff --git a/gnet.go b/gnet.go index 7ea11f260..b5718fea6 100644 --- a/gnet.go +++ b/gnet.go @@ -88,7 +88,7 @@ type Conn interface { // "read" pointer, which means it will evict the data from buffer and it can't be revoked (put back to buffer), // it reads data from the inbound ring-buffer and event-loop-buffer when the length of the available data is equal // to the given "n", otherwise, it will not read any data from the inbound ring-buffer. So you should use this - // function only if you know exactly the length of subsequent TCP streams based on the protocol, like the + // function only if you know exactly the length of subsequent TCP stream based on the protocol, like the // Content-Length attribute in an HTTP request which indicates you how much data you should read from inbound ring-buffer. ReadN(n int) (size int, buf []byte) diff --git a/gnet_server.go b/gnet_server.go index c8e258197..202fb96b4 100644 --- a/gnet_server.go +++ b/gnet_server.go @@ -24,7 +24,7 @@ type server struct { opts *Options // options with server once sync.Once // make sure only signalShutdown once cond *sync.Cond // shutdown signaler - codec ICodec // codec for TCP streams + codec ICodec // codec for TCP stream mainLoop *loop // main loop for accepting connections bytesPool sync.Pool // pool for storing bytes eventHandler EventHandler // user eventHandler diff --git a/options.go b/options.go index d27ee3036..f52bc83d0 100644 --- a/options.go +++ b/options.go @@ -36,7 +36,7 @@ type Options struct { // TCPKeepAlive (SO_KEEPALIVE) socket option. TCPKeepAlive time.Duration - // ICodec encodes and decodes TCP streams. + // ICodec encodes and decodes TCP stream. Codec ICodec } @@ -75,7 +75,7 @@ func WithTicker(ticker bool) Option { } } -// WithCodec sets up a codec to handle TCP streams. +// WithCodec sets up a codec to handle TCP stream. func WithCodec(codec ICodec) Option { return func(opts *Options) { opts.Codec = codec