Skip to content

Commit 7e2883a

Browse files
committed
fix: add new posts
1 parent b363e5e commit 7e2883a

File tree

3 files changed

+370
-1
lines changed

3 files changed

+370
-1
lines changed

config/_default/config.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ rel = "sitemap"
6262
contributor = "contributors"
6363

6464
[permalinks]
65-
blog = "/blog/:title/"
65+
blog = "/blog/:slug/"
6666
# docs = "/docs/1.0/:sections[1:]/:title/"
6767

6868
[minify.tdewolff.html]
Lines changed: 368 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,368 @@
1+
---
2+
slug: "push-server-summarize"
3+
title: "推送服务开发过程的问题总结"
4+
description: "在开发开发 push service 的过程中,有一大部分时间花在 websocket 的使用和连接管理上,期间遇到了一些问题和挑战,这里做一个总结"
5+
lead: "在开发开发 push service 的过程中,有一大部分时间花在 websocket 的使用和连接管理上,期间遇到了一些问题和挑战,这里做一个总结"
6+
date: 2022-06-10T09:19:42+08:00
7+
lastmod: 2022-06-10T09:19:42+08:00
8+
draft: false
9+
weight: 11
10+
contributors: ["Yusank"]
11+
---
12+
13+
- [前言](#前言)
14+
- [事件管理](#事件管理)
15+
- [连接管理](#连接管理)
16+
- [消息推送](#消息推送)
17+
- [总结](#总结)
18+
19+
## 前言
20+
21+
push server 的开发是在做 goim 的前期的重头戏,其核心就是长链接(本次用 websocket)的管理和与业务相互结合.
22+
23+
websocket 算是一个比较久远成熟的长链接方案况且有成熟的框架(本项目使用的 [gorilla/websocket](https://github.com/gorilla/websocket))支持,按理来说,它的使用应该是比较简单的,但是它的使用过程中,有一些问题和挑战,这里做一个总结.
24+
25+
而遇到的问题大部分是在使用和与业务的整合上,大致分为一下几点:
26+
27+
- 事件管理(如 ping pong,心跳,消息等)
28+
- 连接管理
29+
- 消息推送
30+
31+
## 事件管理
32+
33+
在建立起连接之后,需要对一些 ws 消息类型做不同的处理,如 ping/pong 时,连接管理层面刷新一些连接最后活跃时间已确保连接可用,而业务层面需要对该连接对应的用户
34+
的 last_online_time 信息进行刷新动作,又比如 close 事件也是连接管理层面和业务层面各自需要做处理.
35+
36+
虽说框架提供了注册事件处理器的机制,但是不足以满足需求.连接层是纯粹管理长链接不依赖业务逻辑(且连接层与业务是分开的项目代码),因此没办法在连接层直接引入逻辑层的逻辑去注册相关处理函数。而业务层虽说可以引入连接层的逻辑,但是不同业务处理的方式不一样很难统一管理。
37+
38+
以下为原始注册方法,只能注册单个方法,如果需要有多个处理函数则需要手动嵌套:
39+
40+
```go
41+
// 连接层注册连接层的事件处理器
42+
conn.SetCloseHandler(func(code int, text string) error {
43+
wc.cancelWithError(nil)
44+
message := websocket.FormatCloseMessage(code, "")
45+
_ = wc.conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(time.Second))
46+
return nil
47+
})
48+
49+
conn.SetPingHandler(func(message string) error {
50+
err := c.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second))
51+
if err == nil || err == websocket.ErrCloseSent {
52+
return nil
53+
}
54+
55+
return err
56+
})
57+
```
58+
59+
{{< alert icon="👉" context="info" text="同样的事件需要在不同的逻辑层做不同的处理且不能侵入到连接层。" />}}
60+
61+
为了解决这个问题,很明显框架提供的原始的结构体的能力是不够,需要进行封装扩展。
62+
63+
```go
64+
type WebsocketConn struct {
65+
conn *websocket.Conn // 原始连接
66+
67+
// ctx control
68+
ctx context.Context
69+
cancel context.CancelFunc
70+
71+
uid string
72+
}
73+
74+
75+
func WrapWs(ctx context.Context, c *websocket.Conn, uid string) *WebsocketConn {
76+
if ctx == nil {
77+
ctx = context.Background()
78+
}
79+
ctx2, cancel := context.WithCancel(ctx)
80+
wc := &WebsocketConn{
81+
conn: c,
82+
ctx: ctx2,
83+
cancel: cancel,
84+
uid: uid,
85+
}
86+
87+
wc.conn.SetCloseHandler(func(code int, text string) error {
88+
wc.cancelWithError(nil)
89+
message := websocket.FormatCloseMessage(code, "")
90+
_ = wc.conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(time.Second))
91+
return nil
92+
})
93+
94+
wc.conn.SetPingHandler(func(message string) error {
95+
err := c.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second))
96+
if err == nil || err == websocket.ErrCloseSent {
97+
return nil
98+
}
99+
100+
return err
101+
})
102+
103+
return wc
104+
}
105+
106+
// 通过该方法注册业务的处理方法
107+
func (w *WebsocketConn) AddCloseAction(f func() error) {
108+
cf := w.conn.CloseHandler()
109+
w.conn.SetCloseHandler(func(code int, text string) error {
110+
err := cf(code, text)
111+
if err == nil {
112+
return f()
113+
}
114+
115+
return err
116+
})
117+
}
118+
119+
// 通过该方法注册业务的处理方法
120+
func (w *WebsocketConn) AddPingAction(f func() error) {
121+
pf := w.conn.PingHandler()
122+
w.conn.SetPingHandler(func(appData string) error {
123+
err := pf(appData)
124+
if err == nil {
125+
return f()
126+
}
127+
128+
return err
129+
})
130+
}
131+
```
132+
133+
在连接层将原始 conn 进行一层封装,逻辑层能取到封装后的连接,可根据业务需求注册任意数量的事件处理器。而连接层在封装连接时就将连接层需要的处理方法注册进去了。
134+
135+
现在看一下业务层的使用方式:
136+
137+
```go
138+
// 建立连接并封装后注册业务处理函数
139+
wc.AddPingAction(func() error {
140+
return app.GetApplication().Redis.SetEX(context.Background(),
141+
consts.GetUserOnlineAgentKey(uid), app.GetAgentIP(), consts.UserOnlineAgentKeyExpire).Err()
142+
})
143+
// 建立连接并封装后注册业务处理函数
144+
wc.AddCloseAction(func() error {
145+
ctx2, cancel := context.WithTimeout(ctx, time.Second)
146+
defer cancel()
147+
return app.GetApplication().Redis.Del(ctx2, consts.GetUserOnlineAgentKey(uid)).Err()
148+
149+
})
150+
```
151+
152+
这样就把业务和连接管理拆分开互不影响,连接层不关心上层业务怎么处理,只关心连接的正常可用即可。
153+
154+
## 连接管理
155+
156+
连接管理也是花心思比较多的模块,由于 websocket 的连接不同于普通的连接,因此没办法用普通的连接池来管理(其实我已经有一套可用的连接池的方案了[yusank/conn-pool](https://github.com/yusank/conn-pool),可惜了)。
157+
158+
websocket 的每个连接都是唯一的且需要根据一些唯一的值(UID,sessionID 等)来拿到对应的连接,这些连接也不是连接池初始化出来的,是外部传进去让连接池去维护这些连接。
159+
160+
同时 websocket 连接随时可能断开,这种断开可能主动的也可能是连续几个心跳周期没有收到客户端的心跳包从而服务端主动断开的,不管是那种情况,在连接层需要有个 goroutine 去维护这个连接的可用性。在连接需要断开时,停止维护并从连接池删除该连接。
161+
162+
为了确保 `WebsocketConn` 的纯洁性,在维护连接模块里定义一个简单的`连接`的概念,把 `WebsocketConn` 封装起来。
163+
164+
```go
165+
type idleConn struct {
166+
*WebsocketConn
167+
p *namedPool // 连接池
168+
stopChan chan struct{}
169+
}
170+
171+
// close is different form stop
172+
// close is closes the connection and delete it from pool
173+
// stop is a trigger to stop the daemon then call the close
174+
func (i *idleConn) close() {
175+
_ = i.Close() // i.WebsocketConn.Close()
176+
i.p.delete(i.Key()) // i.WebsocketConn.Key() => uid
177+
}
178+
179+
func (i *idleConn) stop() {
180+
i.stopChan <- struct{}{}
181+
}
182+
183+
func (i *idleConn) daemon() {
184+
ticker := time.NewTicker(config.Interval * time.Second)
185+
loop:
186+
for {
187+
select {
188+
case <- ticker.C:
189+
// check last ping time and decide whether stop daemon.
190+
case <-i.ctx.Done():
191+
// WebsocketConn ctx cancelled
192+
log.Error("conn done", "key", i.Key())
193+
break loop
194+
// stop() called
195+
case <-i.stopChan:
196+
log.Info("conn stop", "key", i.Key())
197+
break loop
198+
case data := <-i.writeChan:
199+
i.writeToClient(data) // write msg to client
200+
}
201+
}
202+
203+
log.Info("conn daemon exit", "key", i.Key())
204+
i.close()
205+
}
206+
```
207+
208+
`idleConn` 能力相对纯粹,维护连接可用性和特殊情况下从连接池剔除当前连接。
209+
210+
下面看一下连接池的定义和实现:
211+
212+
```go
213+
type namedPool struct {
214+
*sync.RWMutex
215+
m map[string]*idleConn
216+
}
217+
218+
func newNamedPool() *namedPool {
219+
p := &namedPool{
220+
RWMutex: new(sync.RWMutex),
221+
m: make(map[string]*idleConn),
222+
}
223+
224+
return p
225+
}
226+
227+
// 添加连接
228+
func (p *namedPool) add(c *WebsocketConn) {
229+
select {
230+
case <-c.ctx.Done():
231+
return
232+
default:
233+
if c.Err() != nil {
234+
return
235+
}
236+
}
237+
238+
p.Lock()
239+
defer p.Unlock()
240+
// 若已存在 则先停止上一个 daemon
241+
i, loaded := p.m[c.Key()]
242+
if loaded {
243+
i.stop()
244+
}
245+
246+
i = &idleConn{
247+
WebsocketConn: c,
248+
stopChan: make(chan struct{}),
249+
p: p,
250+
}
251+
252+
// TODO: 使用可控大小的 goroutine 池,防止goroutine 泄露
253+
go i.daemon()
254+
p.m[c.Key()] = i
255+
}
256+
257+
func (p *namedPool) get(key string) *WebsocketConn {
258+
p.RLock()
259+
i, ok := p.m[key]
260+
p.RUnlock()
261+
262+
if ok {
263+
// 拿连接时 先判断是否已失效
264+
select {
265+
case <-i.ctx.Done():
266+
i.stop()
267+
default:
268+
if i.Err() != nil {
269+
i.stop()
270+
return nil
271+
}
272+
return i.WebsocketConn
273+
}
274+
}
275+
276+
return nil
277+
}
278+
279+
// 需要读取所有连接做全量推送的需求
280+
func (p *namedPool) loadAllConns() chan *WebsocketConn {
281+
p.Lock()
282+
defer p.Unlock()
283+
284+
ch := make(chan *WebsocketConn, len(p.m))
285+
for _, i := range p.m {
286+
ch <- i.WebsocketConn
287+
}
288+
289+
close(ch)
290+
return ch
291+
}
292+
293+
func (p *namedPool) delete(key string) {
294+
p.Lock()
295+
defer p.Unlock()
296+
delete(p.m, key)
297+
}
298+
299+
```
300+
301+
经过 `idleConn``namedPool` 配合,连接管理算是达到预期的要求了且通过了测试,后期可以做一些优化项。
302+
303+
## 消息推送
304+
305+
在解决了非业务层的各种问题后,现在只剩下消息推送的问题了。消息推送最开始其实直接调用的框架提供的原生方法 `WriteMessage`,即需要推送时,从连接池拿到对应的连接然后直接调用写消息的方法,但是有大量消息需要写入时可能会出现消息顺序异常的问题,因此在 `WebsocketConn` 层屏蔽了原生的方法,并添加一个消息 `channel` 来控制并发带来的消息顺序异常问题。
306+
307+
新增消息 `channel` :
308+
309+
```gotype WebsocketConn struct {
310+
conn *websocket.Conn
311+
312+
ctx context.Context
313+
cancel context.CancelFunc
314+
315+
uid string
316+
// 消息 channel
317+
writeChan chan []byte
318+
// 无法写到客户端时回调该方法 上层一般会进行重试或写到 mq 待会儿进行处理
319+
onWriteError func()
320+
err error
321+
}
322+
```
323+
324+
新的写消息方法:
325+
326+
```go
327+
func (w *WebsocketConn) Write(data []byte) error {
328+
select {
329+
case w.writeChan <- data:
330+
return nil
331+
default:
332+
}
333+
334+
// 尝试等待一小段时间,如果这段时间内写入失败则直接报错,这种情况下大概率是服务端与客户端的连接异常,消息阻塞了
335+
timer := time.NewTimer(time.Millisecond * 10)
336+
select {
337+
case w.writeChan <- data:
338+
return nil
339+
case <-timer.C:
340+
return ErrWriteChanFull
341+
}
342+
}
343+
```
344+
345+
写入到 `writeChan` 后,由上述 `idleConn.daemon` 方法读取 `channel` 内消息后,调用以下方法来正式写入的ws 上:
346+
347+
```go
348+
func (w *WebsocketConn) writeToClient(data []byte) {
349+
_ = w.conn.SetWriteDeadline(time.Now().Add(time.Second))
350+
err := w.conn.WriteMessage(websocket.TextMessage, data)
351+
if err != nil {
352+
w.onWriteError()
353+
return
354+
}
355+
}
356+
```
357+
358+
至此,消息的推送也解决了已遇到的问题。
359+
360+
## 总结
361+
362+
本文记录了在开发 goim 的推送服务(长连接服务)时遇到的一些问题和解决方案,方便理解在看相关代码时其背后的思考。
363+
364+
内容分为三个部分:
365+
366+
- websocket 连接中的一些事件处理和如何与业务关联起来
367+
- websocket 连接池的维护
368+
- 在聊天系统中确保消息推送的顺序

content/blog/worker-pool/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
---
2+
slug: "worker-pool"
23
title: "实现异步并发 worker 队列"
34
description: "在开发 broadcast 功能的时候,碰到一个比较棘手的问题,需要并发执行多个 worker 来将 broadcast 消息推送到所有在线用户,同时我希望能控制并发数量。本文记录实现一个异步并发 worker 队列的过程。"
45
lead: "在开发 broadcast 功能的时候,碰到一个比较棘手的问题,需要并发执行多个 worker 来讲 broadcast 消息推送到所有在线用户,同时我希望能控制并发数量。本文记录实现一个异步并发 worker 队列的过程。"

0 commit comments

Comments
 (0)