Skip to content

Commit

Permalink
Add implementation of LengthFieldBasedFrameCodec
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed Nov 6, 2019
1 parent df96381 commit dc26122
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 10 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ The goal of this project is to create a server framework for Go that performs on
- [x] Supporting asynchronous write operation
- [x] Flexible ticker event
- [x] SO_REUSEPORT socket option
- [ ] Codec implementations to encode/decode TCP frames, referencing [netty codec](https://github.com/netty/netty/tree/4.1/codec/src/main/java/io/netty/handler/codec)
- [ ] Codec implementations to encode/decode TCP stream to frame, referencing [netty codec](https://github.com/netty/netty/tree/4.1/codec/src/main/java/io/netty/handler/codec)
- [ ] Additional load-balancing algorithms: Random, Least-Connections, Consistent-hashing and so on
- [ ] New event-notification mechanism: IOCP on Windows platform
- [ ] TLS support
Expand Down
191 changes: 191 additions & 0 deletions codec.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
// Copyright 2019 Andy Pan. All rights reserved.
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.

package gnet

import (
"bytes"
"encoding/binary"
"fmt"
)

// CRLFByte represents a byte of CRLF.
var CRLFByte = byte('\n')

// ICodec is the interface of gnet codec.
type ICodec interface {
// Encode encodes frames upon server responses into TCP stream.
Encode(buf []byte) ([]byte, error)
// Encode decodes frames from TCP stream via specific implementation.
Decode(c Conn) ([]byte, error)
}

Expand All @@ -18,26 +28,32 @@ type ICodec interface {
// Encode(buf []byte) ([]byte, error)
//}

// BuiltInFrameCodec is the built-in codec which will be assign to gnet server when customized codec is not set up.
type BuiltInFrameCodec struct {
}

// Encode ...
func (cc *BuiltInFrameCodec) Encode(buf []byte) ([]byte, error) {
return buf, nil
}

// Decode ...
func (cc *BuiltInFrameCodec) Decode(c Conn) ([]byte, error) {
buf := c.Read()
c.ResetBuffer()
return buf, nil
}

// LineBasedFrameCodec encodes/decodes line-separated frames into/from TCP stream.
type LineBasedFrameCodec struct {
}

// Encode ...
func (cc *LineBasedFrameCodec) Encode(buf []byte) ([]byte, error) {
return append(buf, CRLFByte), nil
}

// Decode ...
func (cc *LineBasedFrameCodec) Decode(c Conn) ([]byte, error) {
buf := c.Read()
idx := bytes.IndexByte(buf, CRLFByte)
Expand All @@ -48,14 +64,17 @@ func (cc *LineBasedFrameCodec) Decode(c Conn) ([]byte, error) {
return buf[:idx], nil
}

// DelimiterBasedFrameCodec encodes/decodes specific-delimiter-separated frames into/from TCP stream.
type DelimiterBasedFrameCodec struct {
Delimiter byte
}

// Encode ...
func (cc *DelimiterBasedFrameCodec) Encode(buf []byte) ([]byte, error) {
return append(buf, cc.Delimiter), nil
}

// Decode ...
func (cc *DelimiterBasedFrameCodec) Decode(c Conn) ([]byte, error) {
buf := c.Read()
idx := bytes.IndexByte(buf, cc.Delimiter)
Expand All @@ -66,17 +85,20 @@ func (cc *DelimiterBasedFrameCodec) Decode(c Conn) ([]byte, error) {
return buf[:idx], nil
}

// FixedLengthFrameCodec encodes/decodes fixed-length-separated frames into/from TCP stream.
type FixedLengthFrameCodec struct {
FrameLength int
}

// Encode ...
func (cc *FixedLengthFrameCodec) Encode(buf []byte) ([]byte, error) {
if len(buf)%cc.FrameLength != 0 {
return nil, ErrInvalidFixedLength
}
return buf, nil
}

// Decode ...
func (cc *FixedLengthFrameCodec) Decode(c Conn) ([]byte, error) {
size, buf := c.ReadN(cc.FrameLength)
if size == 0 {
Expand All @@ -85,5 +107,174 @@ func (cc *FixedLengthFrameCodec) Decode(c Conn) ([]byte, error) {
return buf, nil
}

// LengthFieldBasedFrameCodec is the refactoring from https://github.com/smallnest/goframe/blob/master/length_field_based_frameconn.go, licensed by Apache License 2.0.
// It encodes/decodes frames into/from TCP stream with value of the length field in the message.
// Original implementation: https://github.com/netty/netty/blob/4.1/codec/src/main/java/io/netty/handler/codec/LengthFieldBasedFrameDecoder.java
type LengthFieldBasedFrameCodec struct {
encoderConfig EncoderConfig
decoderConfig DecoderConfig
}

// EncoderConfig config for encoder.
type EncoderConfig struct {
// ByteOrder is the ByteOrder of the length field.
ByteOrder binary.ByteOrder
// LengthFieldLength is the length of the length field.
LengthFieldLength int
// LengthAdjustment is the compensation value to add to the value of the length field
LengthAdjustment int
// LengthIncludesLengthFieldLength is true, the length of the prepended length field is added to the value of the prepended length field
LengthIncludesLengthFieldLength bool
}

// DecoderConfig config for decoder.
type DecoderConfig struct {
// ByteOrder is the ByteOrder of the length field.
ByteOrder binary.ByteOrder
// LengthFieldOffset is the offset of the length field
LengthFieldOffset int
// LengthFieldLength is the length of the length field
LengthFieldLength int
// LengthAdjustment is the compensation value to add to the value of the length field
LengthAdjustment int
// InitialBytesToStrip is the number of first bytes to strip out from the decoded frame
InitialBytesToStrip int
}

// Encode ...
func (cc *LengthFieldBasedFrameCodec) Encode(buf []byte) (out []byte, err error) {
length := len(buf) + cc.encoderConfig.LengthAdjustment
if cc.encoderConfig.LengthIncludesLengthFieldLength {
length += cc.encoderConfig.LengthFieldLength
}

if length < 0 {
return nil, ErrTooLessLength
}

switch cc.encoderConfig.LengthFieldLength {
case 1:
if length >= 256 {
return nil, fmt.Errorf("length does not fit into a byte: %d", length)
}
out = []byte{byte(length)}
case 2:
if length >= 65536 {
return nil, fmt.Errorf("length does not fit into a short integer: %d", length)
}
out = make([]byte, 2)
cc.encoderConfig.ByteOrder.PutUint16(out, uint16(length))
case 3:
if length >= 16777216 {
return nil, fmt.Errorf("length does not fit into a medium integer: %d", length)
}
out = writeUint24(cc.encoderConfig.ByteOrder, length)
case 4:
out = make([]byte, 4)
cc.encoderConfig.ByteOrder.PutUint32(out, uint32(length))
case 8:
out = make([]byte, 8)
cc.encoderConfig.ByteOrder.PutUint64(out, uint64(length))
default:
return nil, ErrUnsupportedLength
}

out = append(out, buf...)
return
}

func writeUint24(byteOrder binary.ByteOrder, v int) []byte {
b := make([]byte, 3)
if byteOrder == binary.LittleEndian {
b[0] = byte(v)
b[1] = byte(v >> 8)
b[2] = byte(v >> 16)
} else {
b[2] = byte(v)
b[1] = byte(v >> 8)
b[0] = byte(v >> 16)
}
return b
}

// Decode ...
func (cc *LengthFieldBasedFrameCodec) Decode(c Conn) ([]byte, error) {
var size int
var header []byte
if cc.decoderConfig.LengthFieldOffset > 0 { //discard header(offset)
size, header = c.ReadN(cc.decoderConfig.LengthFieldOffset)
if size == 0 {
return nil, ErrUnexpectedEOF
}
}

lenBuf, frameLength, err := cc.getUnadjustedFrameLength(c)
if err != nil {
return nil, err
}

if cc.decoderConfig.LengthAdjustment > 0 { //discard adjust header
size, _ := c.ReadN(cc.decoderConfig.LengthAdjustment)
if size == 0 {
return nil, ErrUnexpectedEOF
}
}

// real message length
msgLength := int(frameLength) + cc.decoderConfig.LengthAdjustment
size, msg := c.ReadN(msgLength)
if size == 0 {
return nil, ErrUnexpectedEOF
}

fullMessage := make([]byte, len(header)+len(lenBuf)+msgLength)
copy(fullMessage, header)
copy(fullMessage[len(header):], lenBuf)
copy(fullMessage[len(header)+len(lenBuf):], msg)
return fullMessage[cc.decoderConfig.InitialBytesToStrip:], nil
}

func (cc *LengthFieldBasedFrameCodec) getUnadjustedFrameLength(c Conn) (lenBuf []byte, n uint64, err error) {
switch cc.decoderConfig.LengthFieldLength {
case 1:
size, b := c.ReadN(1)
if size == 0 {
err = ErrUnexpectedEOF
}
return b, uint64(b[0]), err
case 2:
size, lenBuf := c.ReadN(2)
if size == 0 {
return nil, 0, ErrUnexpectedEOF
}
return lenBuf, uint64(cc.decoderConfig.ByteOrder.Uint16(lenBuf)), nil
case 3:
size, lenBuf := c.ReadN(3)
if size == 0 {
return nil, 0, ErrUnexpectedEOF
}
return lenBuf, readUint24(cc.decoderConfig.ByteOrder, lenBuf), nil
case 4:
size, lenBuf := c.ReadN(4)
if size == 0 {
return nil, 0, ErrUnexpectedEOF
}
return lenBuf, uint64(cc.decoderConfig.ByteOrder.Uint32(lenBuf)), nil
case 8:
size, lenBuf := c.ReadN(8)
if size == 0 {
return nil, 0, ErrUnexpectedEOF
}
return lenBuf, uint64(cc.decoderConfig.ByteOrder.Uint64(lenBuf)), nil
default:
return nil, 0, ErrUnsupportedLength
}
}

func readUint24(byteOrder binary.ByteOrder, b []byte) uint64 {
_ = b[2]
if byteOrder == binary.LittleEndian {
return uint64(b[0]) | uint64(b[1])<<8 | uint64(b[2])<<16
}
return uint64(b[2]) | uint64(b[1])<<8 | uint64(b[0])<<16
}
14 changes: 9 additions & 5 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@ package gnet
import "errors"

var (
// errShutdown indicates this server is closing.
// errShutdown server is closing.
errShutdown = errors.New("server is going to be shutdown")
// ErrInvalidFixedLength ...
// ErrInvalidFixedLength invalid fixed length.
ErrInvalidFixedLength = errors.New("invalid fixed length of bytes")
// ErrUnexpectedEOF ...
// ErrUnexpectedEOF no enough data to read.
ErrUnexpectedEOF = errors.New("there is no enough data")
// ErrDelimiterNotFound ...
// ErrDelimiterNotFound no such a delimiter.
ErrDelimiterNotFound = errors.New("there is no such a delimiter")
// ErrDelimiterNotFound ...
// ErrCRLFNotFound CRLF not found.
ErrCRLFNotFound = errors.New("there is no CRLF")
// ErrUnsupportedLength unsupported lengthFieldLength.
ErrUnsupportedLength = errors.New("unsupported lengthFieldLength. (expected: 1, 2, 3, 4, or 8)")
// ErrTooLessLength adjusted frame length is less than zero.
ErrTooLessLength = errors.New("adjusted frame length is less than zero")
)
2 changes: 1 addition & 1 deletion gnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type Conn interface {
// "read" pointer, which means it will evict the data from buffer and it can't be revoked (put back to buffer),
// it reads data from the inbound ring-buffer and event-loop-buffer when the length of the available data is equal
// to the given "n", otherwise, it will not read any data from the inbound ring-buffer. So you should use this
// function only if you know exactly the length of subsequent TCP streams based on the protocol, like the
// function only if you know exactly the length of subsequent TCP stream based on the protocol, like the
// Content-Length attribute in an HTTP request which indicates you how much data you should read from inbound ring-buffer.
ReadN(n int) (size int, buf []byte)

Expand Down
2 changes: 1 addition & 1 deletion gnet_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type server struct {
opts *Options // options with server
once sync.Once // make sure only signalShutdown once
cond *sync.Cond // shutdown signaler
codec ICodec // codec for TCP streams
codec ICodec // codec for TCP stream
mainLoop *loop // main loop for accepting connections
bytesPool sync.Pool // pool for storing bytes
eventHandler EventHandler // user eventHandler
Expand Down
4 changes: 2 additions & 2 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Options struct {
// TCPKeepAlive (SO_KEEPALIVE) socket option.
TCPKeepAlive time.Duration

// ICodec encodes and decodes TCP streams.
// ICodec encodes and decodes TCP stream.
Codec ICodec
}

Expand Down Expand Up @@ -75,7 +75,7 @@ func WithTicker(ticker bool) Option {
}
}

// WithCodec sets up a codec to handle TCP streams.
// WithCodec sets up a codec to handle TCP stream.
func WithCodec(codec ICodec) Option {
return func(opts *Options) {
opts.Codec = codec
Expand Down

0 comments on commit dc26122

Please sign in to comment.