Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve: provide opts to set the truncate size in text mode to reduce memory cost #731

Merged
merged 1 commit into from
Mar 2, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -228,3 +228,7 @@ format:
@clang-format -i -style=$(STYLE) kern/openssl_masterkey_3.2.h
@clang-format -i -style=$(STYLE) kern/boringssl_masterkey.h
@clang-format -i -style=$(STYLE) utils/*.c

.PHONY: test
test:
go test -v -race ./...
7 changes: 5 additions & 2 deletions cli/cmd/root.go
Original file line number Diff line number Diff line change
@@ -50,8 +50,9 @@ var (
)

const (
defaultPid uint64 = 0
defaultUid uint64 = 0
defaultPid uint64 = 0
defaultUid uint64 = 0
defaultTruncateSize uint64 = 0
)

const (
@@ -134,6 +135,7 @@ func init() {
rootCmd.PersistentFlags().StringVarP(&globalConf.LoggerAddr, "logaddr", "l", "", "send logs to this server. -l /tmp/ecapture.log or -l tcp://127.0.0.1:8080")
rootCmd.PersistentFlags().StringVar(&globalConf.EventCollectorAddr, "eventaddr", "", "the server address that receives the captured event. --eventaddr tcp://127.0.0.1:8090, default: same as logaddr")
rootCmd.PersistentFlags().StringVar(&globalConf.Listen, "listen", eCaptureListenAddr, "listen on this address for http server, default: 127.0.0.1:28256")
rootCmd.PersistentFlags().Uint64VarP(&globalConf.TruncateSize, "tsize", "t", defaultTruncateSize, "the truncate size in text mode, default: 0 (B), no truncate")

rootCmd.SilenceUsage = true
}
@@ -156,6 +158,7 @@ func setModConfig(globalConf config.BaseConfig, modConf config.IConfig) {
modConf.SetBTF(globalConf.BtfMode)
modConf.SetPerCpuMapSize(globalConf.PerCpuMapSize)
modConf.SetAddrType(loggerTypeStdout)
modConf.SetTruncateSize(globalConf.TruncateSize)

switch ByteCodeFiles {
case "core":
5 changes: 5 additions & 0 deletions pkg/event_processor/iworker.go
Original file line number Diff line number Diff line change
@@ -141,6 +141,11 @@ func (ew *eventWorker) writeEvent(e event.IEventStruct) {
// 解析类型,输出
func (ew *eventWorker) parserEvents() []byte {
ew.status = ProcessStateProcessing
tsize := int(ew.processor.truncateSize)
if tsize > 0 && ew.payload.Len() > tsize {
ew.payload.Truncate(tsize)
_ = ew.writeToChan(fmt.Sprintf("Events truncated, size: %d bytes\n", tsize))
}
parser := NewParser(ew.payload.Bytes())
ew.parser = parser
n, e := ew.parser.Write(ew.payload.Bytes())
7 changes: 5 additions & 2 deletions pkg/event_processor/processor.go
Original file line number Diff line number Diff line change
@@ -42,7 +42,8 @@ type EventProcessor struct {
errChan chan error

// output model
isHex bool
isHex bool
truncateSize uint64
}

func (ep *EventProcessor) GetLogger() io.Writer {
@@ -161,11 +162,13 @@ func (ep *EventProcessor) Close() error {
func (ep *EventProcessor) ErrorChan() chan error {
return ep.errChan
}
func NewEventProcessor(logger io.Writer, isHex bool) *EventProcessor {

func NewEventProcessor(logger io.Writer, isHex bool, truncateSize uint64) *EventProcessor {
var ep *EventProcessor
ep = &EventProcessor{}
ep.logger = logger
ep.isHex = isHex
ep.truncateSize = truncateSize
ep.isClosed = false
ep.init()
return ep
83 changes: 82 additions & 1 deletion pkg/event_processor/processor_test.go
Original file line number Diff line number Diff line change
@@ -39,7 +39,8 @@ func TestEventProcessor_Serve(t *testing.T) {
t.Fatal(e)
}
logger.SetOutput(f)
ep := NewEventProcessor(f, true)
// no truncate
ep := NewEventProcessor(f, true, 0)
go func() {
var err error
err = ep.Serve()
@@ -108,3 +109,83 @@ func TestEventProcessor_Serve(t *testing.T) {
//t.Log(string(bufString))
t.Log("done")
}

func Test_Truncated_EventProcessor_Serve(t *testing.T) {

logger := log.Default()
//var buf bytes.Buffer
//logger.SetOutput(&buf)
var output = "./output_truncated.log"
f, e := os.Create(output)
if e != nil {
t.Fatal(e)
}
logger.SetOutput(f)

// truncate 1000 bytes
ep := NewEventProcessor(f, true, 1000)
go func() {
var err error
err = ep.Serve()
if err != nil {
//log.Fatalf(err.Error())
t.Error(err)
return
}
}()

lines := []string{
// short, no truncated
`{"DataType":0,"Timestamp":952253597324253,"Pid":469929,"Tid":469929,"DataLen":308,"Comm":[119,103,101,116,0,0,0,0,0,0,0,0,0,0,0,0],"Fd":3,"Version":771}`,
// long, should truncated
`{"DataType":0,"Timestamp":952282712204824,"Pid":469953,"Tid":469953,"DataLen":4096,"Comm":[99,117,114,108,0,0,0,0,0,0,0,0,0,0,0,0],"Fd":5,"Version":771}`,
}

for _, line := range lines {
if line == "" {
continue
}
var eventSSL SSLDataEventTmp
err := json.Unmarshal([]byte(line), &eventSSL)
if err != nil {
t.Fatalf("json unmarshal error: %s, body:%v", err.Error(), line)
}
payloadFile := fmt.Sprintf("testdata/%d.bin", eventSSL.Timestamp)
b, e := os.ReadFile(payloadFile)
if e != nil {
t.Fatalf("read payload file error: %s, file:%s", e.Error(), payloadFile)
}
copy(eventSSL.Data[:], b)
ep.Write(&BaseEvent{DataLen: eventSSL.DataLen, Data: eventSSL.Data, DataType: eventSSL.DataType, Timestamp: eventSSL.Timestamp, Pid: eventSSL.Pid, Tid: eventSSL.Tid, Comm: eventSSL.Comm, Fd: eventSSL.Fd, Version: eventSSL.Version})
}

tick := time.NewTicker(time.Second * 10)
<-tick.C

err := ep.Close()
logger.SetOutput(io.Discard)
bufString, e := os.ReadFile(output)
if e != nil {
t.Fatal(e)
}

lines = strings.Split(string(bufString), "\n")
ok := true
for _, line := range lines {
// truncated once
if strings.Contains(line, "Events truncated, size:") {
t.Log(line)
}
}

if err != nil {
t.Fatalf("close error: %s", err.Error())
}

if !ok {
t.Fatalf("some errors occurred")
}
//t.Log(string(bufString))
t.Log("done")
}

18 changes: 15 additions & 3 deletions user/config/iconfig.go
Original file line number Diff line number Diff line change
@@ -63,6 +63,9 @@ type IConfig interface {
EnableGlobalVar() bool
// Bytes serializes the configuration to JSON bytes
Bytes() []byte
// Set/Get TruncateSize
SetTruncateSize(uint64)
GetTruncateSize() uint64
}

// TLS capture mode constants defining different output formats
@@ -90,9 +93,10 @@ const (

// BaseConfig implements the IConfig interface and holds the basic configuration settings
type BaseConfig struct {
Pid uint64 `json:"pid"` // Process ID to monitor
Uid uint64 `json:"uid"` // User ID to monitor
Listen string `json:"listen"` // Listen address for the server (default: 127.0.0.1:28256)
Pid uint64 `json:"pid"` // Process ID to monitor
Uid uint64 `json:"uid"` // User ID to monitor
Listen string `json:"listen"` // Listen address for the server (default: 127.0.0.1:28256)
TruncateSize uint64 `json:"truncate_size"` // truncate size in text mode

// eBPF map configuration
PerCpuMapSize int `json:"per_cpu_map_size"` // Size of eBPF map per CPU core
@@ -173,6 +177,14 @@ func (c *BaseConfig) SetPerCpuMapSize(size int) {
c.PerCpuMapSize = size * os.Getpagesize()
}

func (c *BaseConfig) SetTruncateSize(TruncateSize uint64) {
c.TruncateSize = TruncateSize
}

func (c *BaseConfig) GetTruncateSize() uint64 {
return c.TruncateSize
}

func (c *BaseConfig) EnableGlobalVar() bool {
kv, err := kernel.HostVersion()
if err != nil {
4 changes: 3 additions & 1 deletion user/module/imodule.go
Original file line number Diff line number Diff line change
@@ -104,7 +104,8 @@ func (m *Module) Init(ctx context.Context, logger *zerolog.Logger, conf config.I
m.isKernelLess5_2 = false //set false default
m.eventCollector = eventCollector
//var epl = epLogger{logger: logger}
m.processor = event_processor.NewEventProcessor(eventCollector, conf.GetHex())
tsize := conf.GetTruncateSize()
m.processor = event_processor.NewEventProcessor(eventCollector, conf.GetHex(), tsize)

go func() {
// 读取错误信息
@@ -129,6 +130,7 @@ func (m *Module) Init(ctx context.Context, logger *zerolog.Logger, conf config.I
}

logger.Info().Int("Pid", os.Getpid()).Str("Kernel Info", kv.String()).Send()
logger.Info().Int("TruncateSize", int(tsize)).Str("Unit", "bytes").Send()

if conf.GetBTF() == config.BTFModeAutoDetect {
// 如果是自动检测模式