Skip to content

Commit

Permalink
feat: support rocketMQ (#231)
Browse files Browse the repository at this point in the history
feat: support rocketMQ

---------

Signed-off-by: spencercjh <spencercjh@gmail.com>
Co-authored-by: Spencer Cai <spencercjh@gmail.com>
Co-authored-by: 烈香 <hengyoush1@163.com>
  • Loading branch information
3 people authored Jan 7, 2025
1 parent 0fe020c commit 8ff2696
Show file tree
Hide file tree
Showing 17 changed files with 811 additions and 36 deletions.
22 changes: 22 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ jobs:
#install python pip
sudo apt install -y python3 python3-pip pipx
#install docker-compose
# apt-get install -y docker-compose-plugin
sudo curl -L "https://github.com/docker/compose/releases/download/v2.23.0/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose
docker-compose --version
- name: Test CAP_BPF privilege check
uses: cilium/little-vm-helper@97c89f004bd0ab4caeacfe92ebc956e13e362e6b # v0.0.19
if: ${{ !contains(fromJSON('["4.19-20240912.022020", "5.4-20240912.022020"]'), matrix.kernel) }}
Expand All @@ -181,6 +187,22 @@ jobs:
pushd /host
bash /host/testdata/run_cap_bpf_test.sh "" "CAP_SYS_ADMIN"
popd
- name: Test RocketMQ
uses: cilium/little-vm-helper@97c89f004bd0ab4caeacfe92ebc956e13e362e6b # v0.0.19
with:
provision: 'false'
cmd: |
set -ex
uname -a
cat /etc/issue
pushd /host
if [ -f "/var/lib/kyanos/btf/current.btf" ]; then
bash /host/testdata/test_rocketmq.sh 'sudo /host/kyanos/kyanos $kyanos_log_option --btf /var/lib/kyanos/btf/current.btf'
else
bash /host/testdata/test_rocketmq.sh 'sudo /host/kyanos/kyanos $kyanos_log_option'
fi
popd
- name: Test filter by comm
uses: cilium/little-vm-helper@97c89f004bd0ab4caeacfe92ebc956e13e362e6b # v0.0.19
Expand Down
42 changes: 42 additions & 0 deletions agent/protocol/rocketmq/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package rocketmq

import (
"kyanos/agent/protocol"
"kyanos/bpf"
"kyanos/common"
"slices"
)

type Filter struct {
TargetRequestCodes []int32
TargetLanguageCodes []LanguageCode
}

func (m Filter) Filter(req protocol.ParsedMessage, resp protocol.ParsedMessage) bool {
rocketMQReq, ok := req.(*RocketMQMessage)
if !ok {
common.ProtocolParserLog.Warnf("[RocketMQFilter] cast to RocketMQMessage failed: %v\n", req)
return false
}

pass := true

pass = pass && (len(m.TargetRequestCodes) == 0 || slices.Index(m.TargetRequestCodes, int32(rocketMQReq.RequestCode)) != -1)
pass = pass && (len(m.TargetLanguageCodes) == 0 || slices.Index(m.TargetLanguageCodes, rocketMQReq.LanguageCode) != -1)

return pass
}

func (m Filter) FilterByProtocol(p bpf.AgentTrafficProtocolT) bool {
return p == bpf.AgentTrafficProtocolTKProtocolRocketMQ
}

func (m Filter) FilterByRequest() bool {
return len(m.TargetRequestCodes) > 0 || len(m.TargetLanguageCodes) > 0
}

func (m Filter) FilterByResponse() bool {
return len(m.TargetRequestCodes) > 0 || len(m.TargetLanguageCodes) > 0
}

var _ protocol.ProtocolFilter = Filter{}
105 changes: 105 additions & 0 deletions agent/protocol/rocketmq/language_code.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package rocketmq

import (
"errors"
"fmt"
)

type LanguageCode byte

const (
JAVA LanguageCode = iota // 0
CPP // 1
DOTNET // 2
PYTHON // 3
DELPHI // 4
ERLANG // 5
RUBY // 6
OTHER // 7
HTTP // 8
GO // 9
PHP // 10
OMS // 11
RUST // 12
NODE_JS // 13
UNKNOWN_LANGUAGE
)

// ConvertToLanguageCode converts a string to a LanguageCode.
func ConvertToLanguageCode(language string) (LanguageCode, error) {
switch language {
case "JAVA":
return JAVA, nil
case "CPP":
return CPP, nil
case "DOTNET":
return DOTNET, nil
case "PYTHON":
return PYTHON, nil
case "DELPHI":
return DELPHI, nil
case "ERLANG":
return ERLANG, nil
case "RUBY":
return RUBY, nil
case "OTHER":
return OTHER, nil
case "HTTP":
return HTTP, nil
case "GO":
return GO, nil
case "PHP":
return PHP, nil
case "OMS":
return OMS, nil
case "RUST":
return RUST, nil
case "NODE_JS":
return NODE_JS, nil
default:
return 13, errors.New("unknown language: " + language)
}
}

// convertToLanguageCodeFromByte converts a byte to a LanguageCode.
func convertToLanguageCodeFromByte(flag byte) (LanguageCode, error) {
if flag > 13 {
return 0, errors.New("unknown language flag: " + fmt.Sprint(flag))
}
return LanguageCode(flag), nil
}

func (lc LanguageCode) String() string {
switch lc {
case JAVA:
return "JAVA"
case CPP:
return "CPP"
case DOTNET:
return "DOTNET"
case PYTHON:
return "PYTHON"
case DELPHI:
return "DELPHI"
case ERLANG:
return "ERLANG"
case RUBY:
return "RUBY"
case OTHER:
return "OTHER"
case HTTP:
return "HTTP"
case GO:
return "GO"
case PHP:
return "PHP"
case OMS:
return "OMS"
case RUST:
return "RUST"
case NODE_JS:
return "NODE_JS"
default:
return "UNKNOWN"
}
}
Loading

0 comments on commit 8ff2696

Please sign in to comment.