Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support rocketMQ #231

Merged
merged 33 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
1e0025c
update
XmchxUp Dec 30, 2024
fc38964
update
XmchxUp Dec 31, 2024
28798f3
docs: introduce `prettier` and `md-padding` to format all docs (#221)
spencercjh Dec 30, 2024
f677d60
fix: add fallback logic to calculate totaltime when nicin event misse…
hengyoush Dec 30, 2024
537a08a
test: introduce a script to test flag `--comm` (#222)
spencercjh Dec 31, 2024
fc80653
update
XmchxUp Dec 31, 2024
5b99af1
Merge branch 'hengyoush:main' into xmchx/feat_support_rocketmq
XmchxUp Dec 31, 2024
d8f722c
update
XmchxUp Jan 2, 2025
810f58e
Merge branch 'hengyoush:main' into xmchx/feat_support_rocketmq
XmchxUp Jan 2, 2025
6f32563
update
XmchxUp Jan 2, 2025
541008b
update
XmchxUp Jan 2, 2025
63345cf
update
XmchxUp Jan 2, 2025
2e22617
update
XmchxUp Jan 2, 2025
47d46ca
update
XmchxUp Jan 3, 2025
94a107e
Merge branch 'hengyoush:main' into xmchx/feat_support_rocketmq
XmchxUp Jan 3, 2025
1b7d62b
update
XmchxUp Jan 3, 2025
1b874e7
update
XmchxUp Jan 3, 2025
c4d4adc
Merge branch 'hengyoush:main' into xmchx/feat_support_rocketmq
XmchxUp Jan 6, 2025
f0bf0d4
update
XmchxUp Jan 6, 2025
fdc5e9f
Merge branch 'hengyoush:main' into xmchx/feat_support_rocketmq
XmchxUp Jan 7, 2025
70388cd
update
XmchxUp Jan 7, 2025
09283a1
update
XmchxUp Jan 7, 2025
5087042
update
XmchxUp Jan 7, 2025
5dbe6d0
update
XmchxUp Jan 7, 2025
966a768
update
XmchxUp Jan 7, 2025
85ffaa4
update
XmchxUp Jan 7, 2025
7c4c9fa
update
XmchxUp Jan 7, 2025
e4d718f
update
XmchxUp Jan 7, 2025
f7502fa
update
XmchxUp Jan 7, 2025
cb6cc3e
update
XmchxUp Jan 7, 2025
26fd35f
update
XmchxUp Jan 7, 2025
e88100a
update
XmchxUp Jan 7, 2025
917eed8
update
XmchxUp Jan 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions agent/protocol/rocketmq/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package rocketmq

import (
"kyanos/agent/protocol"
"kyanos/bpf"
)

type Filter struct {
}

func (m Filter) Filter(req protocol.ParsedMessage, resp protocol.ParsedMessage) bool {
XmchxUp marked this conversation as resolved.
Show resolved Hide resolved
return true
}

func (m Filter) FilterByProtocol(p bpf.AgentTrafficProtocolT) bool {
return p == bpf.AgentTrafficProtocolTKProtocolRocketMQ
}

func (m Filter) FilterByRequest() bool {
return false
}

func (m Filter) FilterByResponse() bool {
return false
}

var _ protocol.ProtocolFilter = Filter{}
140 changes: 140 additions & 0 deletions agent/protocol/rocketmq/rocketmq.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package rocketmq

import (
"encoding/binary"
"errors"
"fmt"
"kyanos/agent/buffer"
"kyanos/agent/protocol"
"kyanos/bpf"
)

func init() {
protocol.ParsersMap[bpf.AgentTrafficProtocolTKProtocolRocketMQ] = func() protocol.ProtocolStreamParser {
return &RocketMQStreamParser{}
}
}

func (r *RocketMQMessage) FormatToString() string {
return fmt.Sprintf("base=[%s] command=[%s] payload=[%s]", r.FrameBase.String(), "todo", r.Body)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the TODO here expected?

I mean because it's a draft PR

}

func (r *RocketMQMessage) IsReq() bool {
return r.isReq
}

func (r *RocketMQStreamParser) ParseStream(streamBuffer *buffer.StreamBuffer, messageType protocol.MessageType) protocol.ParseResult {
buffer := streamBuffer.Head().Buffer()
if len(buffer) < 8 {
return protocol.ParseResult{
ParseState: protocol.NeedsMoreData,
}
}

frameSize := int(binary.BigEndian.Uint32(buffer[:4]))
if frameSize > len(buffer) {
return protocol.ParseResult{ParseState: protocol.NeedsMoreData}
}

headerLength := binary.BigEndian.Uint32(buffer[4:8])
headerDataLen := headerLength & 0xFFFFFF
serializedType := byte((headerLength >> 24) & 0xFF)

if len(buffer) < 8+int(headerDataLen) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thr 4 and 8 are magical numbers here. I would like to suggest you to add constants

return protocol.ParseResult{ParseState: protocol.NeedsMoreData}
}

headerBody := buffer[8 : 8+headerDataLen]
body := buffer[8+headerDataLen : frameSize]
message, err := r.parseHeader(headerBody, serializedType)
if err != nil {
return protocol.ParseResult{ParseState: protocol.Invalid, ReadBytes: int(frameSize)}
}

message.Body = body
message.isReq = messageType == protocol.Request
fb, ok := protocol.CreateFrameBase(streamBuffer, frameSize)

if !ok {
return protocol.ParseResult{
ParseState: protocol.Ignore,
ReadBytes: frameSize,
}
} else {
message.FrameBase = fb
return protocol.ParseResult{
ParseState: protocol.Success,
ReadBytes: frameSize,
ParsedMessages: []protocol.ParsedMessage{message},
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be simplified

Suggested change
if !ok {
return protocol.ParseResult{
ParseState: protocol.Ignore,
ReadBytes: frameSize,
}
} else {
message.FrameBase = fb
return protocol.ParseResult{
ParseState: protocol.Success,
ReadBytes: frameSize,
ParsedMessages: []protocol.ParsedMessage{message},
}
}
if !ok {
return protocol.ParseResult{
ParseState: protocol.Ignore,
ReadBytes: frameSize,
}
}
message.FrameBase = fb
return protocol.ParseResult{
ParseState: protocol.Success,
ReadBytes: frameSize,
ParsedMessages: []protocol.ParsedMessage{message},
}

}

func (parser *RocketMQStreamParser) parseHeader(headerBody []byte, serializedType byte) (*RocketMQMessage, error) {
fmt.Println(serializedType)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one looks alike a left behind debug

message := &RocketMQMessage{}
if serializedType == 0 {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return the error currently in the else here if serializedType != 0

And simplify the dozen line if statement

if len(headerBody) < 18 {
return nil, errors.New("invalid header size")
}

message.RequestCode = int16(binary.BigEndian.Uint16(headerBody[:2]))
XmchxUp marked this conversation as resolved.
Show resolved Hide resolved
message.LanguageFlag = headerBody[2]
message.VersionFlag = int16(binary.BigEndian.Uint16(headerBody[3:5]))
message.Opaque = int32(binary.BigEndian.Uint32(headerBody[5:9]))
message.RequestFlag = int32(binary.BigEndian.Uint32(headerBody[9:13]))
message.RemarkLength = int32(binary.BigEndian.Uint32(headerBody[13:17]))

if int(message.RemarkLength) > len(headerBody[17:]) {
return nil, errors.New("invalid remark length")
}
message.Remark = headerBody[17 : 17+message.RemarkLength]
propertiesStart := 17 + message.RemarkLength

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here about magical number 3,5,13,17

if len(headerBody[propertiesStart:]) < 4 {
return nil, errors.New("invalid properties length")
}
message.PropertiesLen = int32(binary.BigEndian.Uint32(headerBody[propertiesStart:]))
message.Properties = headerBody[propertiesStart+4 : propertiesStart+4+message.PropertiesLen]
} else {
return nil, errors.New("unsupported serialization type")
}
return message, nil
}

func (r *RocketMQStreamParser) FindBoundary(streamBuffer *buffer.StreamBuffer, messageType protocol.MessageType, startPos int) int {
XmchxUp marked this conversation as resolved.
Show resolved Hide resolved
buffer := streamBuffer.Head().Buffer()[startPos:]
for i := range buffer {
if len(buffer[i:]) < 8 {
return -1
}
frameSize := binary.BigEndian.Uint32(buffer[i : i+4])
if int(frameSize) <= len(buffer[i:]) {
return startPos + i
}
}
return -1
}

func (r *RocketMQStreamParser) Match(reqStream *[]protocol.ParsedMessage, respStream *[]protocol.ParsedMessage) []protocol.Record {
records := []protocol.Record{}

reqMap := make(map[int32]*RocketMQMessage)
for _, msg := range *reqStream {
req := msg.(*RocketMQMessage)
reqMap[req.Opaque] = req
}

for _, msg := range *respStream {
resp := msg.(*RocketMQMessage)
if req, ok := reqMap[resp.Opaque]; ok {
records = append(records, protocol.Record{
Req: req,
Resp: resp,
})

delete(reqMap, resp.Opaque)
}
}

return records
XmchxUp marked this conversation as resolved.
Show resolved Hide resolved
}
27 changes: 27 additions & 0 deletions agent/protocol/rocketmq/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package rocketmq

import (
"kyanos/agent/protocol"
)

var _ protocol.ParsedMessage = &RocketMQMessage{}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you are validating the struct respect the interface.

This one and the previous one you do in the other files could be in tests file.

I'm unsure the compiler clean them in the final binary

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @Zxilly

Do you know if we could check that?

Thanks


type RocketMQMessage struct {
protocol.FrameBase
RequestCode int16
LanguageFlag byte
VersionFlag int16
Opaque int32
RequestFlag int32
RemarkLength int32
Remark []byte
PropertiesLen int32
Properties []byte
Body []byte
isReq bool
}

var _ protocol.ProtocolStreamParser = &RocketMQStreamParser{}

type RocketMQStreamParser struct {
}
31 changes: 16 additions & 15 deletions bpf/agent_arm64_bpfel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 16 additions & 15 deletions bpf/agent_x86_bpfel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bpf/pktlatency.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ enum traffic_protocol_t {
kProtocolKafka,
kProtocolMux,
kProtocolAMQP,
kProtocolRocketMQ,
kNumProtocols
};

Expand Down
24 changes: 24 additions & 0 deletions bpf/protocol_inference.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,28 @@ static __always_inline enum message_type_t is_http_protocol(const char *old_buf,
return kUnknown;
}

static __always_inline enum message_type_t is_rocketmq_protocol(const char *old_buf, size_t count) {
XmchxUp marked this conversation as resolved.
Show resolved Hide resolved
if (count < 8) {
return 0;
}

int32_t frame_size = 0;
bpf_probe_read_user(&frame_size, sizeof(int32_t), old_buf);

if (frame_size <= 0 || frame_size > 64 * 1024 * 1024) {
return kUnknown;
}

char serialized_type = 0;
bpf_probe_read_user(&serialized_type, 1, old_buf + 4);

if (serialized_type != 0x0 && serialized_type != 0x1) {
return kUnknown;
}

return kRequest;
}

static __always_inline struct protocol_message_t infer_protocol(const char *buf, size_t count, struct conn_info_t *conn_info) {
struct protocol_message_t protocol_message;
protocol_message.protocol = kProtocolUnknown;
Expand All @@ -137,6 +159,8 @@ static __always_inline struct protocol_message_t infer_protocol(const char *buf,
protocol_message.protocol = kProtocolMySQL;
} else if (is_redis_protocol(buf, count)) {
protocol_message.protocol = kProtocolRedis;
} else if (is_rocketmq_protocol(buf,count)) {
protocol_message.protocol = kProtocolRocketMQ;
}
conn_info->prev_count = count;
if (count == 4) {
Expand Down
27 changes: 27 additions & 0 deletions cmd/rocketmq.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package cmd

import (
"kyanos/agent/protocol/rocketmq"

"github.com/spf13/cobra"
)

var rocketmqCmd *cobra.Command = &cobra.Command{
Use: "rocketmq",
Short: "watch RocketMQ message",
Run: func(cmd *cobra.Command, args []string) {
options.MessageFilter = rocketmq.Filter{}
options.LatencyFilter = initLatencyFilter(cmd)
options.SizeFilter = initSizeFilter(cmd)
startAgent()

},
}

func init() {
rocketmqCmd.PersistentFlags().SortFlags = false
copy := *rocketmqCmd
watchCmd.AddCommand(&copy)
copy2 := *rocketmqCmd
statCmd.AddCommand(&copy2)
}
3 changes: 2 additions & 1 deletion cmd/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ import (
var maxRecords int
var supportedProtocols = []string{"http", "redis", "mysql"}
var watchCmd = &cobra.Command{
Use: "watch [http|redis|mysql] [flags]",
Use: "watch [http|redis|mysql|rocketmq] [flags]",
Example: `
sudo kyanos watch
sudo kyanos watch http --side server --pid 1234 --path /foo/bar --host ubuntu.com
sudo kyanos watch redis --comands GET,SET --keys foo,bar --key-prefix app1:
sudo kyanos watch mysql --latency 100 --req-size 1024 --resp-size 2048
sudo kyanos watch rocketmq
`,
Short: "Capture the request/response recrods",
PersistentPreRun: func(cmd *cobra.Command, args []string) { Mode = WatchMode },
Expand Down
Loading
Loading