Skip to content

jiangema/link

Repository files navigation

Build Status Coverage Status

介绍

最初开发这个包的目的是提炼一套可以在公司内多个项目间共用的网络层,因为在项目中我发现不同的网络应用一直重复一些相同或相类似的东西,比如最常用到的就是会话管理,不管是做游戏的前端连接层还是做服务器和服务器之间的RPC层或者是游戏的网关,虽然协议不一样但是它们都会需要会话的管理。会话管理看似简单,但是涉及到并发简单的需求就变得复杂起来,所以看似简单的会话管理每次实现起来都得再配套做单元测试甚至线上实际运行几个版本才能放心。所以我决定提取这些公共的部分,避免那些容易引入BUG的重复劳动。

但是在提取这些公共部分的时候并没有期初想象的那么容易,因为不同的应用场景有不同的需求,比如有的场景需要异步,有的场景需要同步,有的协议需要握手过程,有的则需要keepalive。从代码的提交历史里面可以看出这个包前后经过了很多次大的调整,因为要做一个能满足所有需求的通用底层真的很难。

经过不断的提炼,就像在简化公式一样的,link变得十分的简单,同时它的定位也很清楚。link不是一个网络层也不是一个框架,它只是一个脚手架,但它可以帮助你快速的实现出你所需要的网络层或者框架,帮你约束网络层的实现方式,不至于用不合理的方式实现网络层,除此之外它不会管更多的事情。

link是协议无关的,使用link只需要理解CodecType的概念,你可以加入任何你需要的通讯协议实现。

基础

link包核心由ServerSessionCodecType组成。ServerSession很容易理解,分别用于实现网络服务和连接管理。CodecType则提供具体的协议实现和io逻辑。

Server在使用的时候很简单,可以用 link.Serve()创建,也可以用link.NewServer()的方式创建。这样设计的目的是可以支持更多类型的Listener,不受限于net包。

Session在使用上分为两种情况,一种是由Server.Accept()产生的服务端会话,一种是由link.Dial()link.NewSession()产生的客户端会话。

CodecType的设计目的是让每个Session都有各自的EncoderDecoder用于消息的收发,这样才有机会实现有状态的通讯协议,比如有多阶段握手的通讯协议。

ServerSession上都有一个interface{}类型的State字段,可用于存储自定义状态。

Session上提供了关闭事件的监听机制,有一些应用场景需要在会话关闭时对一些资源做回收,就可以利用这个机制。

EncoderDecoder都可以选择性的实现Dispose()方法,Session关闭时将会尝试调用这个方法,这可以可以做到EncoderDecoder的资源回收利用,内置的BufioCodecType就利用这个机制引入了sync.Pool来提高对象的重用性。

一些示例

示例,创建一个使用Json作为消息格式的TCP服务端:

srv, err := link.Serve("tcp", "0.0.0.0:0", link.Json())

示例,使用Bufio优化IO:

srv, err := link.Serve("tcp", "0.0.0.0:0", link.Bufio(link.Json()))

示例,加入线程安全:

srv, err := link.Serve("tcp", "0.0.0.0:0", link.ThreadSafe(link.Json()))

示例,把发送方式改为异步:

srv, err := link.Serve("tcp", "0.0.0.0:0", link.Async(link.Json()))

我是不会告诉你除了以上示例,阅读all_test.goexample目录下的代码也是很有帮助的!

内置类型

link的核心部分代码是极少的,link另外提供了一些常用到的工具类型,下面一一对其进行介绍。

这个文件里实现了Channel类型的模板,Channel类型用于手工管理一组Session通常用于发送广播和维护在线列表。

这个文件是不参与编译的,所以link实际上不存在一个叫Channel的类型,这个文件只是提供类型的模板。

之前版本的通用Channel.go类型,用的是Session.Id()做key,这个设计会导致实际项目种出现类似这样的操作逻辑:

用户ID -> Session ID -> Session

而新的Channel.go类型可以自定义key类型,在上述场景中就可以直接用用户ID做key来索引Session

用户ID -> Session

不同的应用场景会需要用不同的信息来索引Session,但是Go暂不支持泛型语法,所以我们通过channel_gen.go这个工具来生成具体的Channel类型的代码。

除了直观的可以看出少了一次map操作之外,其实额外维护一份Session ID映射关系也不是一件容易的事情,你需要重复Channel.go内部做的所有事情,而又不能重用Channel.go的代码。

所以自动生成代码的方式解决了以上所有问题,唯一需要做的就是手工执行一个命令。

举例,生成一个用uint64类型作为key的Channel

go run channel_gen.go Uint64Channel uint64 channel_uint64.go

channel_gen.go的参数列表如下:

  • 第一个参数为类型名,不一定非得叫Channel,可以根据实际使用场景来命名
  • 第二个参数是key的类型,通常是int之类的简单类型,但如果需要同时根据多个条件索引Session,可以使用结构体做key
  • 第三个参数是输出的代码文件名
  • 第四个参数是可选的包名称,没有指定此参数时生成的代码归属于link包,你可以通过这个参数生成归属于自己包的代码

此外,link借助go generate命令内置了一组常用到的Channel类型的代码生成。因为这些代码是工具自动生成的,所以不纳入版本管理,在刚拿到link包的代码时是找不到这些代码的。

需要在link包的根目录下执行go generate channel.go命令,来生成这些Channel类型,关于go generate的原理请参阅Go官方文档。

提示: 使用Channel.Fetch()进行遍历发送广播的时候,请注意存在io阻塞的可能,如果io阻塞会影响业务处理,可以通过异步发送的方式避免阻塞。

这个文件中实现了一个用于支持异步消息发送的CodecType。之前的版本中Session有一个AsyncSend()方法用于异步消息发送。我一直很不满意AsyncSend()的设计,从link包的历史版本中可以看到AsyncSend()经过了多次修改。

原因是不同的应用场景会有不同的异步消息发送需求,比如我们在游戏里很简单粗暴的把异步发送时出现chan阻塞的Session关闭掉,但是别的应用场景可能会需要等待一段时间后再重试,或者丢弃阻塞的消息,又或者阻塞允许一段时间,等到超时再做进一步处理。

所以AsyncSend()怎么改都不可能满足所有需求,最后我干脆删除它,由CodecType来决定消息是否异步发送,以及怎么进行异步发送。目前内置的asyncCodecType的逻辑是一旦遇到发送用的chan阻塞就立即关闭Writer并返回ErrBlocking错误。如果这个设计不符合你的需求,你可以参考它实现出自己所需的异步发送逻辑。

需要注意,目前的asyncCodecType的设计会将Session.Send()的行为从同步变为异步,这样设计的目的是规避掉同时支持两种模式的复杂性,避免使用者误用。

对于高级用户如果需要同时支持同步和异步发送,可以自己实现一个Encoder在发送消息时通过判断消息类型来决定采用哪种发送方式,但是这样的设计需要周全的考虑各种并行执行的可能性。

这个文件中实现了带缓冲的IO以及缓冲对象重用,这是网络层很常用到的优化。缓冲读和缓冲写可以显著的降低实际的IO调用次数,在Go语言中一次实际的net.Conn.Read()调用开销并不低,它需要给文件句柄加锁然后放入事件循环里等待IO事件,这里面有一系列的系统调用。所以实际项目中,强烈建议使用bufio来降低IO调用次数。

有一个细节需要注意sync.Pool是跟着BufioCodecType实例的,所以在实际使用中,特别是创建客户端Session时,需要重用BufioCodecType而不是每次调用link.Dial()时都创建一个新的BufioCodecType实例。服务端不容易出现这个问题是因为BufioCodecType会被存在Server对象里,反复赋值给新建的服务端Session

里面实现了常见的Json、Gob、Xml格式的消息编解码,这三种消息格式都不需要分包协议就可以直接使用,但也可以跟分包协议配合使用。

这是最实用的一个内置类型,里面实现了对应Erlang的{packet, 2}格式的分包协议。

这种分包协议的包结构很简单,每个消息包由2个字节包头和不定长的包体组成,包头的数据是小端格式编码的包体长度值。分包的时候先读取包头,解码后获得包体长度,接着读取对应长度的数据即为包体。

由于包头固定是2个字节,所以最大的消息长度是64K,实际应用场景中如果有可能出现超过此大小的消息,需要自己再封装一层消息分帧。

在使用次类型时,Session接收的消息必须实现PacketUnmarshaler接口,发送的消息必须实现PacketMarshaler接口。

可以配合fastbin来自动生成这两种接口的代码。

里面实现了线程安全的CodecType,旧版本的link里Session内置了收发锁让Session.Receive()和Session.Send()可以被并发调用。但是实际项目中并发接收或者并发发送的场景很少,如果一开始就内置到Session`里,这部分调用开销就多余了。

所以后来我删除了Session里面加锁的逻辑,引入了ThreadSafe()。在需要对收发过程进行加锁保护的时候可以用它。

消息分发

实际项目中通常都会需要识别消息类型然后执行不同消息类型的解包(反序列化)接着调用不同的消息处理过程(业务逻辑),link包的测试代码和示例代码都没有直接体现出如何做消息分发,所以新手经常会卡在这一步。

下面我就简单的演示怎样实现一个可以消息类型识别和消息分发。

首先我先假定我们需要实现这样一个协议格式:

2字节的包头 + 2字节的消息类型ID + 消息内容

我们通过消息类型ID来识别消息类型,消息内容的格式这里不举例,我们只需要假设它们内容格式都不一样。

因为link已经内置了分包协议支持,所以我们只需要实现一个可以识别消息类型的PacketUnmarshaler

实现起来大概像这样子:

package xxoo

import (
	"io"
	"github.com/funny/binary"
)

// 所有的请求都必须实现这个接口
type Request interface {
	Process(*link.Session) error
}

// 用于消息识别
type Recognizer struct {
	Request Request // 真正要处理的请求
}

// 实现PacketUnmarshaler接口
func (r *Recognizer) UnmarshalPacket(p []byte) error {
	switch binary.LittleEndian.Uint16(p) {
	case 1:
		msg := new(MessageType1)
		msg.UnmarshalPacket(p[2:])
		r.Request = msg
	case 2:
		msg := new(MessageType2)
		msg.UnmarshalPacket(p[2:])
		r.Request = msg
	default:
		return errors.New("unknow message type")
	}
	return nil
}

这样我们就可以在接收消息时传入Recognizer,接收到消息后调用Recognizer里的Request.Process()处理请求:

var msg Recognizer

session.Receive(&Recognizer)

msg.Request.Process(session)

具体的Request.Process()内是通过怎样的机制把消息分发给对应的业务接口的,这就八仙过海各显神通了,我在项目里用的是注册回调函数的方式,大家可以根据实际的项目情况设计。

包括上面示例中的MessageType1和MessageType2,实际项目中不一定是这样做的,接口比较多的项目里通常会需要把不同业务模块的消息类型分到不同的包里,所以会需要做多级的消息识别。

示例只是提供思路,希望大家要灵活变通不要死记硬背。

总结

link的核心其实很简单,IO调用方式和协议实现都靠CodecType解耦,理解了CodecType就能熟练的用link搭建针对各种场景的网络层。

建议在实际项目中根据项目需求,参考内置类型的设计实现针对项目的CodecType,这样可以得到最好的执行效率和使用体验。

如果有问题或者改进建议,欢迎加技术交(xian)流(liao)群一起讨论:188680931

附录

About

Go语言网络层脚手架

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Go 100.0%