-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathclient_hub.go
226 lines (206 loc) · 6.18 KB
/
client_hub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
package go_websocket
import (
"errors"
"sync"
)
type BroadcastChan struct {
Name string `json:"name"`
Msg []byte `json:"msg"`
}
type Hub struct {
Clients map[*Client]bool // 全部客户端列表 {*Client1: bool, *Client2: bool...}
SystemClients map[string][]*Client // 全部系统列表 {"systemId1": []*Clients{*Client1, *Client2...}, "systemId2": []*Clients{*Client1, *Client2...}}
GroupClients map[string][]*Client // 全部群组列表 {"groupId": []*Clients{*Client1, *Client2...}}
ClientRegister chan *Client // 客户端连接处理
ClientUnregister chan *Client // 客户端断开连接处理
ClientLock sync.RWMutex // 客户端列表读写锁
Broadcast chan []byte // 来自广播的入站消息
SystemBroadcast chan *BroadcastChan // 来自系统的入站消息 {Name:"systemId", Msg:"msg"}
GroupBroadcast chan *BroadcastChan // 来自群组的入站消息 {Name:"groupId", Msg:"msg"}
ClientBroadcast chan *BroadcastChan // 来自客户端的入站消息 {Name:"clientId", Msg:"msg"}
}
// NewHub 实例化
func NewHub() *Hub {
return &Hub{
Clients: make(map[*Client]bool),
GroupClients: make(map[string][]*Client, 1000),
SystemClients: make(map[string][]*Client, 1000),
ClientRegister: make(chan *Client),
ClientUnregister: make(chan *Client),
Broadcast: make(chan []byte),
SystemBroadcast: make(chan *BroadcastChan, 1000),
GroupBroadcast: make(chan *BroadcastChan, 1000),
ClientBroadcast: make(chan *BroadcastChan, 1000),
}
}
// Run run chan listener
func (m *Hub) Run() {
for {
select {
case client := <-m.ClientRegister:
m.handleClientRegister(client)
case client := <-m.ClientUnregister:
m.handleClientUnregister(client)
close(client.send)
// 全局广播
case message := <-m.Broadcast:
m.AllBroadcastHandle(message)
// 系统广播
case systems := <-m.SystemBroadcast:
m.SystemBroadcastHandle(systems.Name, systems.Msg)
// 群组广播
case groups := <-m.GroupBroadcast:
m.GroupBroadcastHandle(groups.Name, groups.Msg)
// 客户端推送
case clients := <-m.ClientBroadcast:
m.ClientBroadcastHandle(clients.Name, clients.Msg)
}
}
}
// handleClientRegister 客户端连接处理
func (m *Hub) handleClientRegister(client *Client) {
m.ClientLock.Lock()
m.SystemClients[client.SystemId] = append(m.SystemClients[client.SystemId], client)
if client.GroupId != "" {
m.GroupClients[client.GroupId] = append(m.GroupClients[client.GroupId], client)
}
m.Clients[client] = true
m.ClientLock.Unlock()
}
// handleClientUnregister 客户端断开连接处理
func (m *Hub) handleClientUnregister(client *Client) {
m.ClientLock.Lock()
if _, ok := m.Clients[client]; ok {
delete(m.Clients, client)
}
for index, _client := range m.SystemClients[client.SystemId] {
if _client.ClientId == client.ClientId {
m.SystemClients[client.SystemId] = append(m.SystemClients[client.SystemId][:index], m.SystemClients[client.SystemId][index+1:]...)
break
}
}
clients, ok := m.GroupClients[client.GroupId]
if ok {
for index, _client := range clients {
if _client.ClientId == client.ClientId {
m.GroupClients[client.GroupId] = append(m.GroupClients[client.GroupId][:index], m.GroupClients[client.GroupId][index+1:]...)
}
}
}
m.ClientLock.Unlock()
}
// AllBroadcastHandle 全局广播
func (m *Hub) AllBroadcastHandle(msg []byte) {
for client := range m.Clients {
select {
case client.send <- msg:
default:
close(client.send)
m.handleClientUnregister(client)
}
}
}
// SystemBroadcastHandle 系统广播处理
func (m *Hub) SystemBroadcastHandle(systemId string, msg []byte) {
clients, err := m.GetSystemClients(systemId)
if err != nil {
m.RemoveSystem(systemId)
}
for _, client := range clients {
select {
case client.send <- msg:
default:
close(client.send)
m.handleClientUnregister(client)
}
}
}
// GroupBroadcastHandle 群组消息通道处理
func (m *Hub) GroupBroadcastHandle(groupId string, msg []byte) {
clients, err := m.GetGroupClients(groupId)
if err != nil {
m.RemoveGroup(groupId)
}
for _, client := range clients {
select {
case client.send <- msg:
default:
close(client.send)
m.handleClientUnregister(client)
}
}
}
// ClientBroadcastHandle 单客户端通道处理
func (m *Hub) ClientBroadcastHandle(clientId string, msg []byte) {
var _client *Client
for client := range m.Clients {
if client.ClientId == clientId {
_client = client
break
}
}
if _client != nil {
select {
case _client.send <- msg:
break
default:
close(_client.send)
m.handleClientUnregister(_client)
}
}
}
// SetClientToGroups 添加客户端到分组
func (m *Hub) SetClientToGroups(groupId string, client *Client) bool {
clients, ok := m.GroupClients[groupId]
if !ok {
return false
}
for _, _client := range clients {
if _client.ClientId == client.ClientId {
return false
}
}
m.ClientLock.Lock()
m.GroupClients[groupId] = append(m.GroupClients[groupId], client)
m.ClientLock.Unlock()
return true
}
// GetSystemClients 获取系统的客户端列表
func (m *Hub) GetSystemClients(name string) ([]*Client, error) {
clients, ok := m.SystemClients[name]
if !ok {
return []*Client{}, errors.New("group does not exist")
}
return clients, nil
}
// GetGroupClients 获取群组的客户端列表
func (m *Hub) GetGroupClients(name string) ([]*Client, error) {
clients, ok := m.GroupClients[name]
if !ok {
return []*Client{}, errors.New("group does not exist")
}
return clients, nil
}
// RemoveSystem 删除system和系统中的client
func (m *Hub) RemoveSystem(name string) {
delete(m.SystemClients, name)
}
// RemoveGroup 删除group和群组中的client
func (m *Hub) RemoveGroup(name string) {
delete(m.GroupClients, name)
}
// RemoveClientByGroup 从群组删除客户端
func (m *Hub) RemoveClientByGroup(client *Client) error {
m.ClientLock.Lock()
clients, ok := m.GroupClients[client.GroupId]
if !ok {
return errors.New("group does not exist")
}
for index, _client := range clients {
if _client.ClientId == client.ClientId {
m.GroupClients[client.GroupId] = append(m.GroupClients[client.GroupId][:index], m.GroupClients[client.GroupId][index+1:]...)
}
}
m.ClientLock.Unlock()
return nil
}