Skip to content

Commit

Permalink
Refactor nodejs monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
gjulianm committed Sep 12, 2024
1 parent 2f8232b commit 70a7d6c
Showing 1 changed file with 29 additions and 163 deletions.
192 changes: 29 additions & 163 deletions pkg/network/usm/nodejs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,12 @@
package usm

import (
"errors"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"

manager "github.com/DataDog/ebpf-manager"

"github.com/DataDog/datadog-agent/pkg/ebpf/uprobes"
"github.com/DataDog/datadog-agent/pkg/network/config"
"github.com/DataDog/datadog-agent/pkg/network/usm/utils"
"github.com/DataDog/datadog-agent/pkg/process/monitor"
"github.com/DataDog/datadog-agent/pkg/util/kernel"
"github.com/DataDog/datadog-agent/pkg/util/log"
)
Expand All @@ -32,6 +25,8 @@ const (
nodejsSslReadExRetprobe = "nodejs_uretprobe__SSL_read_ex"
nodejsSslWriteRetprobe = "nodejs_uretprobe__SSL_write"
nodejsSslWriteExRetprobe = "nodejs_uretprobe__SSL_write_ex"

NodeJsAttacherName = "nodejs"
)

var (
Expand Down Expand Up @@ -111,35 +106,33 @@ var (
// nodeJSMonitor essentially scans for Node processes and attaches SSL uprobes
// to them.
type nodeJSMonitor struct {
registry *utils.FileRegistry
procRoot string

// `utils.FileRegistry` callbacks
registerCB func(utils.FilePath) error
unregisterCB func(utils.FilePath) error

// Termination
wg sync.WaitGroup
done chan struct{}
attacher *uprobes.UprobeAttacher
}

// Validate that nodeJSMonitor implements the Attacher interface.
var _ utils.Attacher = &nodeJSMonitor{}

func newNodeJSMonitor(c *config.Config, mgr *manager.Manager) *nodeJSMonitor {
if !c.EnableNodeJSMonitoring {
return nil
}

procRoot := kernel.ProcFSRoot()
return &nodeJSMonitor{
registry: utils.NewFileRegistry("nodejs"),
procRoot: procRoot,
done: make(chan struct{}),
attachCfg := uprobes.AttacherConfig{
ProcRoot: kernel.ProcFSRoot(),
Rules: []*uprobes.AttachRule{{
Targets: uprobes.AttachToExecutable,
ProbesSelector: nodeJSProbes,
ExecutableFilter: isNodeJSBinary,
}},
EbpfConfig: &c.Config,
ExcludeTargets: uprobes.ExcludeSelf | uprobes.ExcludeInternal | uprobes.ExcludeBuildkit | uprobes.ExcludeContainerdTmp,
EnablePeriodicScanNewProcesses: true,
}

// Callbacks
registerCB: addHooks(mgr, procRoot, nodeJSProbes),
unregisterCB: removeHooks(mgr, nodeJSProbes),
attacher, err := uprobes.NewUprobeAttacher(NodeJsAttacherName, attachCfg, mgr, nil, &uprobes.NativeBinaryInspector{})
if err != nil {
log.Errorf("Cannot create uprobe attacher: %v", err)
}

return &nodeJSMonitor{
attacher: attacher,
}
}

Expand All @@ -149,47 +142,7 @@ func (m *nodeJSMonitor) Start() {
return
}

processMonitor := monitor.GetProcessMonitor()

// Subscribe to process events
doneExec := processMonitor.SubscribeExec(m.handleProcessExec)
doneExit := processMonitor.SubscribeExit(m.handleProcessExit)

// Attach to existing processes
m.sync()

m.wg.Add(1)
go func() {
// This ticker is responsible for controlling the rate at which
// we scrape the whole procFS again in order to ensure that we
// terminate any dangling uprobes and register new processes
// missed by the process monitor stream
processSync := time.NewTicker(scanTerminatedProcessesInterval)

defer func() {
processSync.Stop()
// Execute process monitor callback termination functions
doneExec()
doneExit()
// Stopping the process monitor (if we're the last instance)
processMonitor.Stop()
// Cleaning up all active hooks
m.registry.Clear()
// marking we're finished.
m.wg.Done()
}()

for {
select {
case <-m.done:
return
case <-processSync.C:
m.sync()
m.registry.Log()
}
}
}()
utils.AddAttacher("nodejs-tls", m)
m.attacher.Start()
log.Info("Node JS TLS monitoring enabled")
}

Expand All @@ -199,101 +152,14 @@ func (m *nodeJSMonitor) Stop() {
return
}

close(m.done)
m.wg.Wait()
}

// DetachPID detaches a given pid from the eBPF program
func (m *nodeJSMonitor) DetachPID(pid uint32) error {
// We avoid filtering PIDs here because it's cheaper to simply do a registry lookup
// instead of fetching a process name in order to determine whether it is an
// envoy process or not (which at the very minimum involves syscalls)
return m.registry.Unregister(pid)
m.attacher.Stop()
}

var (
// ErrNoNodeJSPath is returned when no nodejs path is found for a given PID
ErrNoNodeJSPath = errors.New("no nodejs path found for PID")
)

// AttachPID attaches a given pid to the eBPF program
func (m *nodeJSMonitor) AttachPID(pid uint32) error {
path := m.getNodeJSPath(pid)
if path == "" {
return ErrNoNodeJSPath
}

return m.registry.Register(
path,
pid,
m.registerCB,
m.unregisterCB,
utils.IgnoreCB,
)
}

// sync state of nodeJSMonitor with the current state of procFS
// the purpose of this method is two-fold:
// 1) register processes for which we missed exec events (targeted mostly at startup)
// 2) unregister processes for which we missed exit events
func (m *nodeJSMonitor) sync() {
deletionCandidates := m.registry.GetRegisteredProcesses()

_ = kernel.WithAllProcs(m.procRoot, func(pid int) error {
if _, ok := deletionCandidates[uint32(pid)]; ok {
// We have previously hooked into this process and it remains active,
// so we remove it from the deletionCandidates list, and move on to the next PID
delete(deletionCandidates, uint32(pid))
return nil
}

// This is a new PID so we attempt to attach SSL probes to it
m.handleProcessExec(uint32(pid))
return nil
})

// At this point all entries from deletionCandidates are no longer alive, so
// we should detach our SSL probes from them
for pid := range deletionCandidates {
m.handleProcessExit(pid)
}
}

func (m *nodeJSMonitor) handleProcessExit(pid uint32) {
_ = m.DetachPID(pid)
}

func (m *nodeJSMonitor) handleProcessExec(pid uint32) {
_ = m.AttachPID(pid)
}

// getNodeJSPath returns the executable path of the nodejs binary for a given PID.
// In case the PID doesn't represent a nodejs process, an empty string is returned.
func (m *nodeJSMonitor) getNodeJSPath(pid uint32) string {
pidAsStr := strconv.FormatUint(uint64(pid), 10)
exePath := filepath.Join(m.procRoot, pidAsStr, "exe")

binPath, err := os.Readlink(exePath)
// getNodeJSPath checks if the given PID is a NodeJS process and returns the path to the binary
func isNodeJSBinary(path string, procInfo *uprobes.ProcInfo) bool {
exe, err := procInfo.Exe()
if err != nil {
// We receive the Exec event, /proc could be slow to update
end := time.Now().Add(10 * time.Millisecond)
for end.After(time.Now()) {
binPath, err = os.Readlink(exePath)
if err == nil {
break
}
time.Sleep(time.Millisecond)
}
return false
}
if err != nil {
// we can't access to the binary path here (pid probably ended already)
// there are not much we can do, and we don't want to flood the logs
return ""
}

if strings.Contains(binPath, nodeJSPath) {
return binPath
}

return ""
return strings.Contains(exe, nodeJSPath)
}

0 comments on commit 70a7d6c

Please sign in to comment.