Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

usm: Refactor nodejs monitor to use new uprobe attacher #29305

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/ebpf/uprobes/attacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ type FileRegistry interface {
// AttachCallback is a callback that is called whenever a probe is attached successfully
type AttachCallback func(*manager.Probe, *utils.FilePath)

var NopOnAttachCallback AttachCallback = nil

// UprobeAttacher is a struct that handles the attachment of uprobes to processes and libraries
type UprobeAttacher struct {
// name contains the name of this attacher for identification
Expand Down
7 changes: 6 additions & 1 deletion pkg/network/usm/ebpf_ssl.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,11 +461,16 @@ func newSSLProgramProtocolFactory(m *manager.Manager) protocols.ProtocolFactory
}
}

nodejs, err := newNodeJSMonitor(c, m)
if err != nil {
return nil, fmt.Errorf("error initializing nodejs monitor: %s", err)
}

return &sslProgram{
cfg: c,
watcher: watcher,
istioMonitor: newIstioMonitor(c, m),
nodeJSMonitor: newNodeJSMonitor(c, m),
nodeJSMonitor: nodejs,
}, nil
}
}
Expand Down
197 changes: 32 additions & 165 deletions pkg/network/usm/nodejs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,13 @@
package usm

import (
"errors"
"os"
"path/filepath"
"strconv"
"fmt"
"strings"
"sync"
"time"

manager "github.com/DataDog/ebpf-manager"

"github.com/DataDog/datadog-agent/pkg/ebpf/uprobes"
"github.com/DataDog/datadog-agent/pkg/network/config"
"github.com/DataDog/datadog-agent/pkg/network/usm/utils"
"github.com/DataDog/datadog-agent/pkg/process/monitor"
"github.com/DataDog/datadog-agent/pkg/util/kernel"
"github.com/DataDog/datadog-agent/pkg/util/log"
)
Expand All @@ -32,6 +26,8 @@ const (
nodejsSslReadExRetprobe = "nodejs_uretprobe__SSL_read_ex"
nodejsSslWriteRetprobe = "nodejs_uretprobe__SSL_write"
nodejsSslWriteExRetprobe = "nodejs_uretprobe__SSL_write_ex"

nodeJsAttacherName = "nodejs"
)

var (
Expand Down Expand Up @@ -111,36 +107,34 @@ var (
// nodeJSMonitor essentially scans for Node processes and attaches SSL uprobes
// to them.
type nodeJSMonitor struct {
registry *utils.FileRegistry
procRoot string

// `utils.FileRegistry` callbacks
registerCB func(utils.FilePath) error
unregisterCB func(utils.FilePath) error

// Termination
wg sync.WaitGroup
done chan struct{}
attacher *uprobes.UprobeAttacher
}

// Validate that nodeJSMonitor implements the Attacher interface.
var _ utils.Attacher = &nodeJSMonitor{}

func newNodeJSMonitor(c *config.Config, mgr *manager.Manager) *nodeJSMonitor {
func newNodeJSMonitor(c *config.Config, mgr *manager.Manager) (*nodeJSMonitor, error) {
if !c.EnableNodeJSMonitoring {
return nil
return nil, nil
}

procRoot := kernel.ProcFSRoot()
return &nodeJSMonitor{
registry: utils.NewFileRegistry("nodejs"),
procRoot: procRoot,
done: make(chan struct{}),
attachCfg := uprobes.AttacherConfig{
ProcRoot: kernel.ProcFSRoot(),
Rules: []*uprobes.AttachRule{{
Targets: uprobes.AttachToExecutable,
ProbesSelector: nodeJSProbes,
ExecutableFilter: isNodeJSBinary,
}},
EbpfConfig: &c.Config,
ExcludeTargets: uprobes.ExcludeSelf | uprobes.ExcludeInternal | uprobes.ExcludeBuildkit | uprobes.ExcludeContainerdTmp,
EnablePeriodicScanNewProcesses: true,
}

// Callbacks
registerCB: addHooks(mgr, procRoot, nodeJSProbes),
unregisterCB: removeHooks(mgr, nodeJSProbes),
attacher, err := uprobes.NewUprobeAttacher(nodeJsAttacherName, attachCfg, mgr, uprobes.NopOnAttachCallback, &uprobes.NativeBinaryInspector{})
if err != nil {
return nil, fmt.Errorf("cannot create uprobe attacher: %w", err)
}

return &nodeJSMonitor{
attacher: attacher,
}, nil
}

// Start the nodeJSMonitor
Expand All @@ -149,47 +143,7 @@ func (m *nodeJSMonitor) Start() {
return
}

processMonitor := monitor.GetProcessMonitor()

// Subscribe to process events
doneExec := processMonitor.SubscribeExec(m.handleProcessExec)
doneExit := processMonitor.SubscribeExit(m.handleProcessExit)

// Attach to existing processes
m.sync()

m.wg.Add(1)
go func() {
// This ticker is responsible for controlling the rate at which
// we scrape the whole procFS again in order to ensure that we
// terminate any dangling uprobes and register new processes
// missed by the process monitor stream
processSync := time.NewTicker(scanTerminatedProcessesInterval)

defer func() {
processSync.Stop()
// Execute process monitor callback termination functions
doneExec()
doneExit()
// Stopping the process monitor (if we're the last instance)
processMonitor.Stop()
// Cleaning up all active hooks
m.registry.Clear()
// marking we're finished.
m.wg.Done()
}()

for {
select {
case <-m.done:
return
case <-processSync.C:
m.sync()
m.registry.Log()
}
}
}()
utils.AddAttacher("nodejs-tls", m)
m.attacher.Start()
log.Info("Node JS TLS monitoring enabled")
}

Expand All @@ -199,101 +153,14 @@ func (m *nodeJSMonitor) Stop() {
return
}

close(m.done)
m.wg.Wait()
}

// DetachPID detaches a given pid from the eBPF program
func (m *nodeJSMonitor) DetachPID(pid uint32) error {
// We avoid filtering PIDs here because it's cheaper to simply do a registry lookup
// instead of fetching a process name in order to determine whether it is an
// envoy process or not (which at the very minimum involves syscalls)
return m.registry.Unregister(pid)
m.attacher.Stop()
}

var (
// ErrNoNodeJSPath is returned when no nodejs path is found for a given PID
ErrNoNodeJSPath = errors.New("no nodejs path found for PID")
)

// AttachPID attaches a given pid to the eBPF program
func (m *nodeJSMonitor) AttachPID(pid uint32) error {
path := m.getNodeJSPath(pid)
if path == "" {
return ErrNoNodeJSPath
}

return m.registry.Register(
path,
pid,
m.registerCB,
m.unregisterCB,
utils.IgnoreCB,
)
}

// sync state of nodeJSMonitor with the current state of procFS
// the purpose of this method is two-fold:
// 1) register processes for which we missed exec events (targeted mostly at startup)
// 2) unregister processes for which we missed exit events
func (m *nodeJSMonitor) sync() {
deletionCandidates := m.registry.GetRegisteredProcesses()

_ = kernel.WithAllProcs(m.procRoot, func(pid int) error {
if _, ok := deletionCandidates[uint32(pid)]; ok {
// We have previously hooked into this process and it remains active,
// so we remove it from the deletionCandidates list, and move on to the next PID
delete(deletionCandidates, uint32(pid))
return nil
}

// This is a new PID so we attempt to attach SSL probes to it
m.handleProcessExec(uint32(pid))
return nil
})

// At this point all entries from deletionCandidates are no longer alive, so
// we should detach our SSL probes from them
for pid := range deletionCandidates {
m.handleProcessExit(pid)
}
}

func (m *nodeJSMonitor) handleProcessExit(pid uint32) {
_ = m.DetachPID(pid)
}

func (m *nodeJSMonitor) handleProcessExec(pid uint32) {
_ = m.AttachPID(pid)
}

// getNodeJSPath returns the executable path of the nodejs binary for a given PID.
// In case the PID doesn't represent a nodejs process, an empty string is returned.
func (m *nodeJSMonitor) getNodeJSPath(pid uint32) string {
pidAsStr := strconv.FormatUint(uint64(pid), 10)
exePath := filepath.Join(m.procRoot, pidAsStr, "exe")

binPath, err := os.Readlink(exePath)
// getNodeJSPath checks if the given PID is a NodeJS process and returns the path to the binary
func isNodeJSBinary(path string, procInfo *uprobes.ProcInfo) bool {
exe, err := procInfo.Exe()
if err != nil {
// We receive the Exec event, /proc could be slow to update
end := time.Now().Add(10 * time.Millisecond)
for end.After(time.Now()) {
binPath, err = os.Readlink(exePath)
if err == nil {
break
}
time.Sleep(time.Millisecond)
}
return false
}
if err != nil {
// we can't access to the binary path here (pid probably ended already)
// there are not much we can do, and we don't want to flood the logs
return ""
}

if strings.Contains(binPath, nodeJSPath) {
return binPath
}

return ""
return strings.Contains(exe, nodeJSPath)
}
Loading