forked from cloudwego/kitex
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmessage.go
334 lines (288 loc) · 7.84 KB
/
message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
/*
* Copyright 2021 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package remote
import (
"sync"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/pkg/serviceinfo"
"github.com/cloudwego/kitex/transport"
)
var (
messagePool sync.Pool
transInfoPool sync.Pool
emptyServiceInfo serviceinfo.ServiceInfo
)
func init() {
messagePool.New = newMessage
transInfoPool.New = newTransInfo
}
// MessageType indicates the type of message.
type MessageType int32
// MessageTypes.
const (
// 0-4 corresponding to thrift.TMessageType
InvalidMessageType MessageType = 0
Call MessageType = 1
Reply MessageType = 2
Exception MessageType = 3
// Oneway means there's no need to wait for the response.
// When the actual message is transmitted, Oneway writes a Call to avoid compatibility issues
// and to maintain consistency with the original logic.
Oneway MessageType = 4
Stream MessageType = 5
)
const (
// ReadFailed .
ReadFailed string = "RFailed"
// MeshHeader use in message.Tag to check MeshHeader
MeshHeader string = "mHeader"
)
var emptyProtocolInfo ProtocolInfo
// ProtocolInfo is used to indicate the transport protocol and payload codec information.
type ProtocolInfo struct {
TransProto transport.Protocol
CodecType serviceinfo.PayloadCodec
}
// NewProtocolInfo creates a new ProtocolInfo using the given tp and ct.
func NewProtocolInfo(tp transport.Protocol, ct serviceinfo.PayloadCodec) ProtocolInfo {
return ProtocolInfo{
TransProto: tp,
CodecType: ct,
}
}
// Message is the core abstraction for Kitex message.
type Message interface {
RPCInfo() rpcinfo.RPCInfo
ServiceInfo() *serviceinfo.ServiceInfo
Data() interface{}
NewData(method string) (ok bool)
MessageType() MessageType
SetMessageType(MessageType)
RPCRole() RPCRole
PayloadLen() int
SetPayloadLen(size int)
TransInfo() TransInfo
Tags() map[string]interface{}
ProtocolInfo() ProtocolInfo
SetProtocolInfo(ProtocolInfo)
PayloadCodec() PayloadCodec
SetPayloadCodec(pc PayloadCodec)
Recycle()
}
// NewMessage creates a new Message using the given info.
func NewMessage(data interface{}, svcInfo *serviceinfo.ServiceInfo, ri rpcinfo.RPCInfo, msgType MessageType, rpcRole RPCRole) Message {
msg := messagePool.Get().(*message)
msg.data = data
msg.rpcInfo = ri
msg.svcInfo = svcInfo
msg.msgType = msgType
msg.rpcRole = rpcRole
msg.transInfo = transInfoPool.Get().(*transInfo)
return msg
}
// NewMessageWithNewer creates a new Message and set data later.
func NewMessageWithNewer(svcInfo *serviceinfo.ServiceInfo, ri rpcinfo.RPCInfo, msgType MessageType, rpcRole RPCRole) Message {
msg := messagePool.Get().(*message)
msg.rpcInfo = ri
msg.svcInfo = svcInfo
msg.msgType = msgType
msg.rpcRole = rpcRole
msg.transInfo = transInfoPool.Get().(*transInfo)
return msg
}
// RecycleMessage is used to recycle message.
func RecycleMessage(msg Message) {
if msg != nil {
msg.Recycle()
}
}
func newMessage() interface{} {
return &message{tags: make(map[string]interface{})}
}
type message struct {
msgType MessageType
data interface{}
rpcInfo rpcinfo.RPCInfo
svcInfo *serviceinfo.ServiceInfo
rpcRole RPCRole
compressType CompressType
payloadSize int
transInfo TransInfo
tags map[string]interface{}
protocol ProtocolInfo
payloadCodec PayloadCodec
}
func (m *message) zero() {
m.msgType = InvalidMessageType
m.data = nil
m.rpcInfo = nil
m.svcInfo = &emptyServiceInfo
m.rpcRole = -1
m.compressType = NoCompress
m.payloadSize = 0
if m.transInfo != nil {
m.transInfo.Recycle()
m.transInfo = nil
}
for k := range m.tags {
delete(m.tags, k)
}
m.protocol = emptyProtocolInfo
}
// RPCInfo implements the Message interface.
func (m *message) RPCInfo() rpcinfo.RPCInfo {
return m.rpcInfo
}
// ServiceInfo implements the Message interface.
func (m *message) ServiceInfo() *serviceinfo.ServiceInfo {
return m.svcInfo
}
// Data implements the Message interface.
func (m *message) Data() interface{} {
return m.data
}
// NewData implements the Message interface.
func (m *message) NewData(method string) (ok bool) {
if m.data != nil {
return false
}
if mt := m.svcInfo.MethodInfo(method); mt != nil {
m.data = mt.NewArgs()
}
if m.data == nil {
return false
}
return true
}
// MessageType implements the Message interface.
func (m *message) MessageType() MessageType {
return m.msgType
}
// SetMessageType implements the Message interface.
func (m *message) SetMessageType(mt MessageType) {
m.msgType = mt
}
// RPCRole implements the Message interface.
func (m *message) RPCRole() RPCRole {
return m.rpcRole
}
// TransInfo implements the Message interface.
func (m *message) TransInfo() TransInfo {
return m.transInfo
}
// Tags implements the Message interface.
func (m *message) Tags() map[string]interface{} {
return m.tags
}
// PayloadLen implements the Message interface.
func (m *message) PayloadLen() int {
return m.payloadSize
}
// SetPayloadLen implements the Message interface.
func (m *message) SetPayloadLen(size int) {
m.payloadSize = size
}
// ProtocolInfo implements the Message interface.
func (m *message) ProtocolInfo() ProtocolInfo {
return m.protocol
}
// SetProtocolInfo implements the Message interface.
func (m *message) SetProtocolInfo(pt ProtocolInfo) {
m.protocol = pt
}
// PayloadCodec implements the Message interface.
func (m *message) PayloadCodec() PayloadCodec {
return m.payloadCodec
}
// SetPayloadCodec implements the Message interface.
func (m *message) SetPayloadCodec(pc PayloadCodec) {
m.payloadCodec = pc
}
// Recycle is used to recycle the message.
func (m *message) Recycle() {
m.zero()
messagePool.Put(m)
}
// TransInfo contains transport information.
type TransInfo interface {
TransStrInfo() map[string]string
TransIntInfo() map[uint16]string
PutTransIntInfo(map[uint16]string)
PutTransStrInfo(kvInfo map[string]string)
Recycle()
}
func newTransInfo() interface{} {
return &transInfo{
intInfo: make(map[uint16]string),
strInfo: make(map[string]string),
}
}
type transInfo struct {
strInfo map[string]string
intInfo map[uint16]string
}
func (ti *transInfo) zero() {
for k := range ti.intInfo {
delete(ti.intInfo, k)
}
for k := range ti.strInfo {
delete(ti.strInfo, k)
}
}
// TransIntInfo implements the TransInfo interface.
func (ti *transInfo) TransIntInfo() map[uint16]string {
return ti.intInfo
}
// PutTransIntInfo implements the TransInfo interface.
func (ti *transInfo) PutTransIntInfo(kvInfo map[uint16]string) {
if kvInfo == nil {
return
}
if len(ti.intInfo) == 0 {
ti.intInfo = kvInfo
} else {
for k, v := range kvInfo {
ti.intInfo[k] = v
}
}
}
// TransStrInfo implements the TransInfo interface.
func (ti *transInfo) TransStrInfo() map[string]string {
return ti.strInfo
}
// PutTransStrInfo implements the TransInfo interface.
func (ti *transInfo) PutTransStrInfo(kvInfo map[string]string) {
if kvInfo == nil {
return
}
if len(ti.strInfo) == 0 {
ti.strInfo = kvInfo
} else {
for k, v := range kvInfo {
ti.strInfo[k] = v
}
}
}
// Recycle is used to recycle the transInfo.
func (ti *transInfo) Recycle() {
ti.zero()
transInfoPool.Put(ti)
}
// FillSendMsgFromRecvMsg is used to fill the transport information to the message to be sent.
func FillSendMsgFromRecvMsg(recvMsg, sendMsg Message) {
sendMsg.SetProtocolInfo(recvMsg.ProtocolInfo())
sendMsg.SetPayloadCodec(recvMsg.PayloadCodec())
}