Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

usm: Refactor nodejs monitor to use new uprobe attacher #29305

Merged
merged 7 commits into from
Oct 7, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 29 additions & 163 deletions pkg/network/usm/nodejs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,12 @@
package usm

import (
"errors"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"

manager "github.com/DataDog/ebpf-manager"

"github.com/DataDog/datadog-agent/pkg/ebpf/uprobes"
"github.com/DataDog/datadog-agent/pkg/network/config"
"github.com/DataDog/datadog-agent/pkg/network/usm/utils"
"github.com/DataDog/datadog-agent/pkg/process/monitor"
"github.com/DataDog/datadog-agent/pkg/util/kernel"
"github.com/DataDog/datadog-agent/pkg/util/log"
)
Expand All @@ -32,6 +25,8 @@ const (
nodejsSslReadExRetprobe = "nodejs_uretprobe__SSL_read_ex"
nodejsSslWriteRetprobe = "nodejs_uretprobe__SSL_write"
nodejsSslWriteExRetprobe = "nodejs_uretprobe__SSL_write_ex"

NodeJsAttacherName = "nodejs"
gjulianm marked this conversation as resolved.
Show resolved Hide resolved
)

var (
Expand Down Expand Up @@ -111,35 +106,33 @@ var (
// nodeJSMonitor essentially scans for Node processes and attaches SSL uprobes
// to them.
type nodeJSMonitor struct {
registry *utils.FileRegistry
procRoot string

// `utils.FileRegistry` callbacks
registerCB func(utils.FilePath) error
unregisterCB func(utils.FilePath) error

// Termination
wg sync.WaitGroup
done chan struct{}
attacher *uprobes.UprobeAttacher
}

// Validate that nodeJSMonitor implements the Attacher interface.
var _ utils.Attacher = &nodeJSMonitor{}

func newNodeJSMonitor(c *config.Config, mgr *manager.Manager) *nodeJSMonitor {
if !c.EnableNodeJSMonitoring {
return nil
}

procRoot := kernel.ProcFSRoot()
return &nodeJSMonitor{
registry: utils.NewFileRegistry("nodejs"),
procRoot: procRoot,
done: make(chan struct{}),
attachCfg := uprobes.AttacherConfig{
ProcRoot: kernel.ProcFSRoot(),
Rules: []*uprobes.AttachRule{{
Targets: uprobes.AttachToExecutable,
ProbesSelector: nodeJSProbes,
ExecutableFilter: isNodeJSBinary,
}},
EbpfConfig: &c.Config,
ExcludeTargets: uprobes.ExcludeSelf | uprobes.ExcludeInternal | uprobes.ExcludeBuildkit | uprobes.ExcludeContainerdTmp,
EnablePeriodicScanNewProcesses: true,
}

// Callbacks
registerCB: addHooks(mgr, procRoot, nodeJSProbes),
unregisterCB: removeHooks(mgr, nodeJSProbes),
attacher, err := uprobes.NewUprobeAttacher(NodeJsAttacherName, attachCfg, mgr, nil, &uprobes.NativeBinaryInspector{})
gjulianm marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
log.Errorf("Cannot create uprobe attacher: %v", err)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we failed creating the attacher, why do we continue?

Copy link
Contributor Author

@gjulianm gjulianm Sep 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An oversight. I changed the code to check and return the error if it happens


return &nodeJSMonitor{
attacher: attacher,
}
}

Expand All @@ -149,47 +142,7 @@ func (m *nodeJSMonitor) Start() {
return
}

processMonitor := monitor.GetProcessMonitor()

// Subscribe to process events
doneExec := processMonitor.SubscribeExec(m.handleProcessExec)
doneExit := processMonitor.SubscribeExit(m.handleProcessExit)

// Attach to existing processes
m.sync()

m.wg.Add(1)
go func() {
// This ticker is responsible for controlling the rate at which
// we scrape the whole procFS again in order to ensure that we
// terminate any dangling uprobes and register new processes
// missed by the process monitor stream
processSync := time.NewTicker(scanTerminatedProcessesInterval)

defer func() {
processSync.Stop()
// Execute process monitor callback termination functions
doneExec()
doneExit()
// Stopping the process monitor (if we're the last instance)
processMonitor.Stop()
// Cleaning up all active hooks
m.registry.Clear()
// marking we're finished.
m.wg.Done()
}()

for {
select {
case <-m.done:
return
case <-processSync.C:
m.sync()
m.registry.Log()
}
}
}()
utils.AddAttacher("nodejs-tls", m)
m.attacher.Start()
log.Info("Node JS TLS monitoring enabled")
}

Expand All @@ -199,101 +152,14 @@ func (m *nodeJSMonitor) Stop() {
return
}

close(m.done)
m.wg.Wait()
}

// DetachPID detaches a given pid from the eBPF program
func (m *nodeJSMonitor) DetachPID(pid uint32) error {
// We avoid filtering PIDs here because it's cheaper to simply do a registry lookup
// instead of fetching a process name in order to determine whether it is an
// envoy process or not (which at the very minimum involves syscalls)
return m.registry.Unregister(pid)
m.attacher.Stop()
}

var (
// ErrNoNodeJSPath is returned when no nodejs path is found for a given PID
ErrNoNodeJSPath = errors.New("no nodejs path found for PID")
)

// AttachPID attaches a given pid to the eBPF program
func (m *nodeJSMonitor) AttachPID(pid uint32) error {
path := m.getNodeJSPath(pid)
if path == "" {
return ErrNoNodeJSPath
}

return m.registry.Register(
path,
pid,
m.registerCB,
m.unregisterCB,
utils.IgnoreCB,
)
}

// sync state of nodeJSMonitor with the current state of procFS
// the purpose of this method is two-fold:
// 1) register processes for which we missed exec events (targeted mostly at startup)
// 2) unregister processes for which we missed exit events
func (m *nodeJSMonitor) sync() {
deletionCandidates := m.registry.GetRegisteredProcesses()

_ = kernel.WithAllProcs(m.procRoot, func(pid int) error {
if _, ok := deletionCandidates[uint32(pid)]; ok {
// We have previously hooked into this process and it remains active,
// so we remove it from the deletionCandidates list, and move on to the next PID
delete(deletionCandidates, uint32(pid))
return nil
}

// This is a new PID so we attempt to attach SSL probes to it
m.handleProcessExec(uint32(pid))
return nil
})

// At this point all entries from deletionCandidates are no longer alive, so
// we should detach our SSL probes from them
for pid := range deletionCandidates {
m.handleProcessExit(pid)
}
}

func (m *nodeJSMonitor) handleProcessExit(pid uint32) {
_ = m.DetachPID(pid)
}

func (m *nodeJSMonitor) handleProcessExec(pid uint32) {
_ = m.AttachPID(pid)
}

// getNodeJSPath returns the executable path of the nodejs binary for a given PID.
// In case the PID doesn't represent a nodejs process, an empty string is returned.
func (m *nodeJSMonitor) getNodeJSPath(pid uint32) string {
pidAsStr := strconv.FormatUint(uint64(pid), 10)
exePath := filepath.Join(m.procRoot, pidAsStr, "exe")

binPath, err := os.Readlink(exePath)
// getNodeJSPath checks if the given PID is a NodeJS process and returns the path to the binary
func isNodeJSBinary(path string, procInfo *uprobes.ProcInfo) bool {
exe, err := procInfo.Exe()
if err != nil {
// We receive the Exec event, /proc could be slow to update
end := time.Now().Add(10 * time.Millisecond)
for end.After(time.Now()) {
binPath, err = os.Readlink(exePath)
if err == nil {
break
}
time.Sleep(time.Millisecond)
}
return false
}
if err != nil {
// we can't access to the binary path here (pid probably ended already)
// there are not much we can do, and we don't want to flood the logs
return ""
}

if strings.Contains(binPath, nodeJSPath) {
return binPath
}

return ""
return strings.Contains(exe, nodeJSPath)
}
Loading