Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor of metricbeat process-gathering metrics and system/process #30076

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
b4d6724
start work on process refactoring
fearful-symmetry Nov 15, 2021
3c3e6b0
finish new pid fetcher
fearful-symmetry Nov 16, 2021
11acfe7
continue work on linux process metrics refactoring
fearful-symmetry Nov 30, 2021
2d846f5
continue work on linux implementation
fearful-symmetry Dec 1, 2021
057da2f
complete first draft of linux support
fearful-symmetry Jan 14, 2022
46ee831
Merge remote-tracking branch 'upstream/master' into system-process-ne…
fearful-symmetry Jan 14, 2022
ddecc2d
finish linux, start MacOS
fearful-symmetry Jan 20, 2022
49816b6
fix tests
fearful-symmetry Jan 20, 2022
765f5f4
fix darwin calls
fearful-symmetry Jan 20, 2022
f9e5dd6
finish first pass at darwin
fearful-symmetry Jan 24, 2022
784a7a9
add aix, start cleanup
fearful-symmetry Jan 26, 2022
d9f8771
add new benchmark
fearful-symmetry Jan 26, 2022
578babf
start cleanup, fix misc. config values
fearful-symmetry Jan 27, 2022
88b3858
fix darwin caching
fearful-symmetry Jan 27, 2022
1bb007e
format
fearful-symmetry Jan 27, 2022
7ca6436
clean up code, fix some tests
fearful-symmetry Jan 28, 2022
ebe1a9c
Merge remote-tracking branch 'upstream/master' into system-process-ne…
fearful-symmetry Jan 28, 2022
efcdc66
move over root event logic, fix a ton of little bugs
fearful-symmetry Jan 28, 2022
3a96cf7
fix tests, add default hostfs to init
fearful-symmetry Jan 28, 2022
c53f86e
fix cgroup field
fearful-symmetry Jan 31, 2022
8e8b1ac
try to fix process tests
fearful-symmetry Feb 1, 2022
aeb77f2
no idea, just trying to fix windows
fearful-symmetry Feb 1, 2022
ee56545
remove mb import from libbeat
fearful-symmetry Feb 1, 2022
789570c
blindly attempt to fix windows unit tests
fearful-symmetry Feb 1, 2022
ab10589
fix log typo
fearful-symmetry Feb 1, 2022
c51f113
skip broken windows tests, at least for now
fearful-symmetry Feb 2, 2022
41be095
code cleanup
fearful-symmetry Feb 2, 2022
1a63b58
fix windows process states
fearful-symmetry Feb 2, 2022
1f07cae
yet another attempt at fixing windows tests
fearful-symmetry Feb 3, 2022
4bd598a
continued attempts at making windows tests less flaky
fearful-symmetry Feb 3, 2022
de6e3ea
Merge remote-tracking branch 'upstream/main' into system-process-new-…
fearful-symmetry Feb 3, 2022
9d968eb
try to fix formatting
fearful-symmetry Feb 3, 2022
712acb2
give up, disable flaky windows tests
fearful-symmetry Feb 4, 2022
0dcf56a
Merge remote-tracking branch 'upstream/main' into system-process-new-…
fearful-symmetry Feb 7, 2022
f98d3c2
remove old code
fearful-symmetry Feb 7, 2022
ffac2a9
another shot at playing with windows tests
fearful-symmetry Feb 8, 2022
aa69465
clean up now-working tests
fearful-symmetry Feb 8, 2022
b542e35
Merge remote-tracking branch 'upstream/main' into system-process-new-…
fearful-symmetry Feb 9, 2022
8e73a09
Merge remote-tracking branch 'upstream/main' into system-process-new-…
fearful-symmetry Feb 9, 2022
2f1b1ed
Merge remote-tracking branch 'upstream/main' into system-process-new-…
fearful-symmetry Feb 9, 2022
85c99f1
Merge remote-tracking branch 'upstream/main' into system-process-new-…
fearful-symmetry Feb 10, 2022
09051eb
try to give python unit tests a little more room
fearful-symmetry Feb 10, 2022
2857e08
fix conditionals in tests
fearful-symmetry Feb 10, 2022
45859c4
try to tune python tests a little
fearful-symmetry Feb 10, 2022
96b8917
just rewrite test
fearful-symmetry Feb 11, 2022
4cbce53
Merge remote-tracking branch 'upstream/main' into system-process-new-…
fearful-symmetry Feb 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
start work on process refactoring
  • Loading branch information
fearful-symmetry committed Nov 15, 2021
commit b4d6724216442f69399eb41bebac2eb1c0d236e6
204 changes: 102 additions & 102 deletions libbeat/metric/system/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,108 @@ type Ticks struct {
Total uint64
}

// Init initializes a Stats instance. It returns errors if the provided process regexes
// cannot be compiled.
func (procStats *Stats) Init() error {
procStats.logger = logp.NewLogger("processes")

var err error
procStats.host, err = sysinfo.Host()
if err != nil {
procStats.host = nil
procStats.logger.Warnf("Getting host details: %v", err)
}

procStats.ProcsMap = make(ProcsMap)

if len(procStats.Procs) == 0 {
return nil
}

procStats.procRegexps = []match.Matcher{}
for _, pattern := range procStats.Procs {
reg, err := match.Compile(pattern)
if err != nil {
return fmt.Errorf("Failed to compile regexp [%s]: %v", pattern, err)
}
procStats.procRegexps = append(procStats.procRegexps, reg)
}

procStats.envRegexps = make([]match.Matcher, 0, len(procStats.EnvWhitelist))
for _, pattern := range procStats.EnvWhitelist {
reg, err := match.Compile(pattern)
if err != nil {
return fmt.Errorf("failed to compile env whitelist regexp [%v]: %v", pattern, err)
}
procStats.envRegexps = append(procStats.envRegexps, reg)
}

if procStats.EnableCgroups {
cgReader, err := cgroup.NewReaderOptions(procStats.CgroupOpts)
if err == cgroup.ErrCgroupsMissing {
logp.Warn("cgroup data collection will be disabled: %v", err)
} else if err != nil {
return errors.Wrap(err, "error initializing cgroup reader")
}
procStats.cgroups = cgReader
}

return nil
}

// Get fetches process data which matches the provided regexes from the host.
func (procStats *Stats) Get() ([]common.MapStr, error) {
if len(procStats.Procs) == 0 {
return nil, nil
}

pids, err := Pids()
if err != nil {
return nil, errors.Wrap(err, "failed to fetch the list of PIDs")
}

var processes []Process
newProcs := make(ProcsMap, len(pids))

for _, pid := range pids {
process := procStats.getSingleProcess(pid, newProcs)
if process == nil {
continue
}
processes = append(processes, *process)
}
procStats.ProcsMap = newProcs

processes = procStats.includeTopProcesses(processes)
procStats.logger.Debugf("Filtered top processes down to %d processes", len(processes))

procs := make([]common.MapStr, 0, len(processes))
for _, process := range processes {
proc := procStats.getProcessEvent(&process)
procs = append(procs, proc)
}

return procs, nil
}

// GetOne fetches process data for a given PID if its name matches the regexes provided from the host.
func (procStats *Stats) GetOne(pid int) (common.MapStr, error) {
if len(procStats.Procs) == 0 {
return nil, nil
}

newProcs := make(ProcsMap, 1)
p := procStats.getSingleProcess(pid, newProcs)
if p == nil {
return common.MapStr{}, nil
}

e := procStats.getProcessEvent(p)
procStats.ProcsMap = newProcs

return e, nil
}

// newProcess creates a new Process object and initializes it with process
// state information. If the process's command line and environment variables
// are known they should be passed in to avoid re-fetching the information.
Expand Down Expand Up @@ -422,108 +524,6 @@ func (procStats *Stats) matchProcess(name string) bool {
return false
}

// Init initializes a Stats instance. It returns errors if the provided process regexes
// cannot be compiled.
func (procStats *Stats) Init() error {
procStats.logger = logp.NewLogger("processes")

var err error
procStats.host, err = sysinfo.Host()
if err != nil {
procStats.host = nil
procStats.logger.Warnf("Getting host details: %v", err)
}

procStats.ProcsMap = make(ProcsMap)

if len(procStats.Procs) == 0 {
return nil
}

procStats.procRegexps = []match.Matcher{}
for _, pattern := range procStats.Procs {
reg, err := match.Compile(pattern)
if err != nil {
return fmt.Errorf("Failed to compile regexp [%s]: %v", pattern, err)
}
procStats.procRegexps = append(procStats.procRegexps, reg)
}

procStats.envRegexps = make([]match.Matcher, 0, len(procStats.EnvWhitelist))
for _, pattern := range procStats.EnvWhitelist {
reg, err := match.Compile(pattern)
if err != nil {
return fmt.Errorf("failed to compile env whitelist regexp [%v]: %v", pattern, err)
}
procStats.envRegexps = append(procStats.envRegexps, reg)
}

if procStats.EnableCgroups {
cgReader, err := cgroup.NewReaderOptions(procStats.CgroupOpts)
if err == cgroup.ErrCgroupsMissing {
logp.Warn("cgroup data collection will be disabled: %v", err)
} else if err != nil {
return errors.Wrap(err, "error initializing cgroup reader")
}
procStats.cgroups = cgReader
}

return nil
}

// Get fetches process data which matches the provided regexes from the host.
func (procStats *Stats) Get() ([]common.MapStr, error) {
if len(procStats.Procs) == 0 {
return nil, nil
}

pids, err := Pids()
if err != nil {
return nil, errors.Wrap(err, "failed to fetch the list of PIDs")
}

var processes []Process
newProcs := make(ProcsMap, len(pids))

for _, pid := range pids {
process := procStats.getSingleProcess(pid, newProcs)
if process == nil {
continue
}
processes = append(processes, *process)
}
procStats.ProcsMap = newProcs

processes = procStats.includeTopProcesses(processes)
procStats.logger.Debugf("Filtered top processes down to %d processes", len(processes))

procs := make([]common.MapStr, 0, len(processes))
for _, process := range processes {
proc := procStats.getProcessEvent(&process)
procs = append(procs, proc)
}

return procs, nil
}

// GetOne fetches process data for a given PID if its name matches the regexes provided from the host.
func (procStats *Stats) GetOne(pid int) (common.MapStr, error) {
if len(procStats.Procs) == 0 {
return nil, nil
}

newProcs := make(ProcsMap, 1)
p := procStats.getSingleProcess(pid, newProcs)
if p == nil {
return common.MapStr{}, nil
}

e := procStats.getProcessEvent(p)
procStats.ProcsMap = newProcs

return e, nil
}

func (procStats *Stats) getSingleProcess(pid int, newProcs ProcsMap) *Process {
var cmdline string
var env common.MapStr
Expand Down
115 changes: 115 additions & 0 deletions libbeat/metric/system/process/process_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,19 @@
package process

import (
"bytes"
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
"strconv"
"syscall"

"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/opt"
"github.com/elastic/gosigar"
"github.com/pkg/errors"
)

// GetSelfPid returns the PID for this process
Expand All @@ -35,3 +43,110 @@ func GetSelfPid() (int, error) {

return strconv.Atoi(pid)
}

// FetchPids is the linux implementation of FetchPids
func FetchPids(hostfs string, filter func(name string) bool) ([]ProcState, error) {
dir, err := os.Open(hostfs)
if err != nil {
return nil, errors.Wrapf(err, "error reading from procfs %s", hostfs)
}
defer dir.Close()

const readAllDirnames = -1 // see os.File.Readdirnames doc

names, err := dir.Readdirnames(readAllDirnames)
if err != nil {
return nil, errors.Wrap(err, "error reading directory names")
}

capacity := len(names)
list := make([]ProcState, 0, capacity)

// Iterate over the directory, fetch just enough info so we can filter based on user input.
logger := logp.L()
for _, name := range names {

if !dirIsPid(name) {
continue
}
// Will this actually fail?
pid, err := strconv.Atoi(name)
if err != nil {
continue
}
// Fetch proc state so we can get the name for filtering based on user's filter.
status, err := GetInfoForPid(hostfs, pid)
if err != nil {
logger.Debugf("Skipping over PID=%d, due to: %d", pid, err)
continue
}
// Filter based on user-supplied func
if !filter(status.Name) {
logger.Debugf("Process name does not matches the provided regex; PID=%d; name=%s", pid, status.Name)
continue
}

}

return list, nil
}

// GetInfoForPid fetches the basic hostinfo from /proc/[PID]/stat
func GetInfoForPid(hostfs string, pid int) (ProcState, error) {
path := filepath.Join(hostfs, strconv.Itoa(pid), "stat")
data, err := ioutil.ReadFile(path)
// Transform the error into a more sensible error in cases where the directory doesn't exist, i.e the process is gone
if err != nil {
if os.IsNotExist(err) {
return ProcState{}, syscall.ESRCH
} else {
return ProcState{}, errors.Wrapf(err, "error reading procdir %s", path)
}
}

state := ProcState{}

// Extract the comm value with is surrounded by parentheses.
lIdx := bytes.Index(data, []byte("("))
rIdx := bytes.LastIndex(data, []byte(")"))
if lIdx < 0 || rIdx < 0 || lIdx >= rIdx || rIdx+2 >= len(data) {
return state, fmt.Errorf("failed to extract comm for pid %d from '%v'", pid, string(data))
}
state.Name = string(data[lIdx+1 : rIdx])

// Extract the rest of the fields that we are interested in.
fields := bytes.Fields(data[rIdx+2:])
if len(fields) <= 36 {
return state, fmt.Errorf("expected more stat fields for pid %d from '%v'", pid, string(data))
}

interests := bytes.Join([][]byte{
fields[0], // state
fields[1], // ppid
fields[2], // pgrp
}, []byte(" "))

var procState string
var ppid, pgid int

_, err = fmt.Fscan(bytes.NewBuffer(interests),
&procState,
&ppid,
&pgid,
)
if err != nil {
return state, fmt.Errorf("failed to parse stat fields for pid %d from '%v': %v", pid, string(data), err)
}
state.State = getProcState(procState[0])
state.Ppid = opt.IntWith(ppid)
state.Pgid = opt.IntWith(pgid)

return state, nil
}

func dirIsPid(name string) bool {
if name[0] < '0' || name[0] > '9' {
return false
}
return true
}
Loading