Skip to content

Commit

Permalink
feat: refactor tcp and udp metrics to allow multiple inputs to incorp…
Browse files Browse the repository at this point in the history
…orate them
  • Loading branch information
pkoutsovasilis committed Feb 19, 2024
1 parent ac74059 commit fa059a3
Show file tree
Hide file tree
Showing 13 changed files with 578 additions and 527 deletions.
254 changes: 6 additions & 248 deletions filebeat/input/tcp/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,11 @@
package tcp

import (
"bytes"
"encoding/hex"
"errors"
"fmt"
"net"
"os"
"runtime"
"strconv"
"strings"
"time"

"github.com/dustin/go-humanize"
"github.com/rcrowley/go-metrics"

"github.com/elastic/beats/v7/filebeat/input/internal/procnet"
input "github.com/elastic/beats/v7/filebeat/input/v2"
stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
"github.com/elastic/beats/v7/filebeat/inputsource"
Expand All @@ -42,10 +32,7 @@ import (
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-libs/monitoring/adapter"
"github.com/elastic/go-concert/ctxtool"
)

Expand Down Expand Up @@ -110,9 +97,12 @@ func (s *server) Run(ctx input.Context, publisher stateless.Publisher) error {
log.Info("starting tcp socket input")
defer log.Info("tcp input stopped")

reg, unreg := inputmon.NewInputRegistry("tcp", ctx.ID, nil)
defer unreg()

const pollInterval = time.Minute
metrics := newInputMetrics(ctx.ID, s.config.Host, pollInterval, log)
defer metrics.close()
metrics := tcp.NewMetrics(reg, s.config.Host, pollInterval, log)
defer metrics.Close()

split, err := streaming.SplitFunc(s.config.Framing, []byte(s.config.LineDelimiter))
if err != nil {
Expand All @@ -139,7 +129,7 @@ func (s *server) Run(ctx input.Context, publisher stateless.Publisher) error {

// This must be called after publisher.Publish to measure
// the processing time metric.
metrics.log(data, evt.Timestamp)
metrics.Log(data, evt.Timestamp)
},
split,
))
Expand All @@ -156,235 +146,3 @@ func (s *server) Run(ctx input.Context, publisher stateless.Publisher) error {
}
return err
}

// inputMetrics handles the input's metric reporting.
type inputMetrics struct {
unregister func()
done chan struct{}

lastPacket time.Time

device *monitoring.String // name of the device being monitored
packets *monitoring.Uint // number of packets processed
bytes *monitoring.Uint // number of bytes processed
rxQueue *monitoring.Uint // value of the rx_queue field from /proc/net/tcp{,6} (only on linux systems)
arrivalPeriod metrics.Sample // histogram of the elapsed time between packet arrivals
processingTime metrics.Sample // histogram of the elapsed time between packet receipt and publication
}

// newInputMetrics returns an input metric for the TCP processor. If id is empty
// a nil inputMetric is returned.
func newInputMetrics(id, device string, poll time.Duration, log *logp.Logger) *inputMetrics {
if id == "" {
return nil
}
reg, unreg := inputmon.NewInputRegistry("tcp", id, nil)
out := &inputMetrics{
unregister: unreg,
device: monitoring.NewString(reg, "device"),
packets: monitoring.NewUint(reg, "received_events_total"),
bytes: monitoring.NewUint(reg, "received_bytes_total"),
rxQueue: monitoring.NewUint(reg, "receive_queue_length"),
arrivalPeriod: metrics.NewUniformSample(1024),
processingTime: metrics.NewUniformSample(1024),
}
_ = adapter.NewGoMetrics(reg, "arrival_period", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.arrivalPeriod))
_ = adapter.NewGoMetrics(reg, "processing_time", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.processingTime))

out.device.Set(device)

if poll > 0 && runtime.GOOS == "linux" {
addr4, addr6, err := procnet.Addrs(device, log)
if err != nil {
log.Warn(err)
return out
}
out.done = make(chan struct{})
go out.poll(addr4, addr6, poll, log)
}

return out
}

// log logs metric for the given packet.
func (m *inputMetrics) log(data []byte, timestamp time.Time) {
if m == nil {
return
}
m.processingTime.Update(time.Since(timestamp).Nanoseconds())
m.packets.Add(1)
m.bytes.Add(uint64(len(data)))
if !m.lastPacket.IsZero() {
m.arrivalPeriod.Update(timestamp.Sub(m.lastPacket).Nanoseconds())
}
m.lastPacket = timestamp
}

// poll periodically gets TCP buffer stats from the OS.
func (m *inputMetrics) poll(addr, addr6 []string, each time.Duration, log *logp.Logger) {
hasUnspecified, addrIsUnspecified, badAddr := containsUnspecifiedAddr(addr)
if badAddr != nil {
log.Warnf("failed to parse IPv4 addrs for metric collection %q", badAddr)
}
hasUnspecified6, addrIsUnspecified6, badAddr := containsUnspecifiedAddr(addr6)
if badAddr != nil {
log.Warnf("failed to parse IPv6 addrs for metric collection %q", badAddr)
}

// Do an initial check for access to the filesystem and of the
// value constructed by containsUnspecifiedAddr. This gives a
// base level for the rx_queue values and ensures that if the
// constructed address values are malformed we panic early
// within the period of system testing.
want4 := true
rx, err := procNetTCP("/proc/net/tcp", addr, hasUnspecified, addrIsUnspecified)
if err != nil {
want4 = false
log.Infof("did not get initial tcp stats from /proc: %v", err)
}
want6 := true
rx6, err := procNetTCP("/proc/net/tcp6", addr6, hasUnspecified6, addrIsUnspecified6)
if err != nil {
want6 = false
log.Infof("did not get initial tcp6 stats from /proc: %v", err)
}
if !want4 && !want6 {
log.Warnf("failed to get initial tcp or tcp6 stats from /proc: %v", err)
} else {
m.rxQueue.Set(uint64(rx + rx6))
}

t := time.NewTicker(each)
for {
select {
case <-t.C:
var found bool
rx, err := procNetTCP("/proc/net/tcp", addr, hasUnspecified, addrIsUnspecified)
if err != nil {
if want4 {
log.Warnf("failed to get tcp stats from /proc: %v", err)
}
} else {
found = true
want4 = true
}
rx6, err := procNetTCP("/proc/net/tcp6", addr6, hasUnspecified6, addrIsUnspecified6)
if err != nil {
if want6 {
log.Warnf("failed to get tcp6 stats from /proc: %v", err)
}
} else {
found = true
want6 = true
}
if found {
m.rxQueue.Set(uint64(rx + rx6))
}
case <-m.done:
t.Stop()
return
}
}
}

func containsUnspecifiedAddr(addr []string) (yes bool, which []bool, bad []string) {
which = make([]bool, len(addr))
for i, a := range addr {
prefix, _, ok := strings.Cut(a, ":")
if !ok {
continue
}
ip, err := hex.DecodeString(prefix)
if err != nil {
bad = append(bad, a)
}
if net.IP(ip).IsUnspecified() {
yes = true
which[i] = true
}
}
return yes, which, bad
}

// procNetTCP returns the rx_queue field of the TCP socket table for the
// socket on the provided address formatted in hex, xxxxxxxx:xxxx or the IPv6
// equivalent.
// This function is only useful on linux due to its dependence on the /proc
// filesystem, but is kept in this file for simplicity. If hasUnspecified
// is true, all addresses listed in the file in path are considered, and the
// sum of rx_queue matching the addr ports is returned where the corresponding
// addrIsUnspecified is true.
func procNetTCP(path string, addr []string, hasUnspecified bool, addrIsUnspecified []bool) (rx int64, err error) {
if len(addr) == 0 {
return 0, nil
}
if len(addr) != len(addrIsUnspecified) {
return 0, errors.New("mismatched address/unspecified lists: please report this")
}
b, err := os.ReadFile(path)
if err != nil {
return 0, err
}
lines := bytes.Split(b, []byte("\n"))
if len(lines) < 2 {
return 0, fmt.Errorf("%s entry not found for %s (no line)", path, addr)
}
var found bool
for _, l := range lines[1:] {
f := bytes.Fields(l)
const queuesField = 4
if len(f) > queuesField && contains(f[1], addr, addrIsUnspecified) {
_, r, ok := bytes.Cut(f[4], []byte(":"))
if !ok {
return 0, errors.New("no rx_queue field " + string(f[queuesField]))
}
found = true

// queue lengths are hex, e.g.:
// - https://elixir.bootlin.com/linux/v6.2.11/source/net/ipv4/tcp_ipv4.c#L2643
// - https://elixir.bootlin.com/linux/v6.2.11/source/net/ipv6/tcp_ipv6.c#L1987
v, err := strconv.ParseInt(string(r), 16, 64)
if err != nil {
return 0, fmt.Errorf("failed to parse rx_queue: %w", err)
}
rx += v

if hasUnspecified {
continue
}
return rx, nil
}
}
if found {
return rx, nil
}
return 0, fmt.Errorf("%s entry not found for %s", path, addr)
}

func contains(b []byte, addr []string, addrIsUnspecified []bool) bool {
for i, a := range addr {
if addrIsUnspecified[i] {
_, ap, pok := strings.Cut(a, ":")
_, bp, bok := bytes.Cut(b, []byte(":"))
if pok && bok && strings.EqualFold(string(bp), ap) {
return true
}
} else if strings.EqualFold(string(b), a) {
return true
}
}
return false
}

func (m *inputMetrics) close() {
if m == nil {
return
}
if m.done != nil {
// Shut down poller and wait until done before unregistering metrics.
m.done <- struct{}{}
}
m.unregister()
}
Loading

0 comments on commit fa059a3

Please sign in to comment.