Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat] add netflow input metrics #38055

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: introduce input/netmetrics to allow multiple inputs utilise the…
… same tcp and/or udp metrics
  • Loading branch information
pkoutsovasilis committed Feb 20, 2024
commit de72a9cea3084abbef2b1040c06cc6147a421a2b
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,27 @@
// specific language governing permissions and limitations
// under the License.

// Package procnet provides support for obtaining and formatting /proc/net
// network addresses for linux systems.
package procnet
// Package netmetrics provides different metricsets for capturing network-related metrics,
// such as UDP and TCP metrics through NewUDPMetrics and NewTCPMetrics, respectively.
package netmetrics

import (
"bytes"
"encoding/hex"
"fmt"
"net"
"strconv"
"strings"

"github.com/elastic/elastic-agent-libs/logp"
)

// Addrs returns the linux /proc/net/tcp or /proc/net/udp addresses for the
// addrs returns the linux /proc/net/tcp or /proc/net/udp addresses for the
// provided host address, addr. addr is a host:port address and host may be
// an IPv4 or IPv6 address, or an FQDN for the host. The returned slices
// contain the string representations of the address as they would appear in
// the /proc/net tables.
func Addrs(addr string, log *logp.Logger) (addr4, addr6 []string, err error) {
func addrs(addr string, log *logp.Logger) (addr4, addr6 []string, err error) {
host, port, err := net.SplitHostPort(addr)
if err != nil {
return nil, nil, fmt.Errorf("failed to get address for %s: could not split host and port: %w", addr, err)
Expand All @@ -54,23 +57,23 @@ func Addrs(addr string, log *logp.Logger) (addr4, addr6 []string, err error) {
// the len constants all addresses may appear to be IPv6.
switch {
case len(p.To4()) == net.IPv4len:
addr4 = append(addr4, IPv4(p, int(pn)))
addr4 = append(addr4, ipV4(p, int(pn)))
case len(p.To16()) == net.IPv6len:
addr6 = append(addr6, IPv6(p, int(pn)))
addr6 = append(addr6, ipV6(p, int(pn)))
default:
log.Warnf("unexpected addr length %d for %s", len(p), p)
}
}
return addr4, addr6, nil
}

// IPv4 returns the string representation of an IPv4 address in a /proc/net table.
func IPv4(ip net.IP, port int) string {
// ipV4 returns the string representation of an IPv4 address in a /proc/net table.
func ipV4(ip net.IP, port int) string {
return fmt.Sprintf("%08X:%04X", reverse(ip.To4()), port)
}

// IPv6 returns the string representation of an IPv6 address in a /proc/net table.
func IPv6(ip net.IP, port int) string {
// ipV6 returns the string representation of an IPv6 address in a /proc/net table.
func ipV6(ip net.IP, port int) string {
return fmt.Sprintf("%032X:%04X", reverse(ip.To16()), port)
}

Expand All @@ -81,3 +84,37 @@ func reverse(b []byte) []byte {
}
return c
}

func contains(b []byte, addr []string, addrIsUnspecified []bool) bool {
for i, a := range addr {
if addrIsUnspecified[i] {
_, ap, pok := strings.Cut(a, ":")
_, bp, bok := bytes.Cut(b, []byte(":"))
if pok && bok && strings.EqualFold(string(bp), ap) {
return true
}
} else if strings.EqualFold(string(b), a) {
return true
}
}
return false
}

func containsUnspecifiedAddr(addr []string) (yes bool, which []bool, bad []string) {
which = make([]bool, len(addr))
for i, a := range addr {
prefix, _, ok := strings.Cut(a, ":")
if !ok {
continue
}
ip, err := hex.DecodeString(prefix)
if err != nil {
bad = append(bad, a)
}
if net.IP(ip).IsUnspecified() {
yes = true
which[i] = true
}
}
return yes, which, bad
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package procnet
package netmetrics

import (
"testing"
Expand All @@ -25,7 +25,7 @@ import (

func TestAddrs(t *testing.T) {
t.Run("ipv4", func(t *testing.T) {
addr4, addr6, err := Addrs("0.0.0.0:9001", logp.L())
addr4, addr6, err := addrs("0.0.0.0:9001", logp.L())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand All @@ -38,7 +38,7 @@ func TestAddrs(t *testing.T) {
})

t.Run("ipv6", func(t *testing.T) {
addr4, addr6, err := Addrs("[::]:9001", logp.L())
addr4, addr6, err := addrs("[::]:9001", logp.L())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down
224 changes: 224 additions & 0 deletions filebeat/input/netmetrics/tcp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package netmetrics

import (
"bytes"
"errors"
"fmt"
"os"
"runtime"
"strconv"
"time"

"github.com/rcrowley/go-metrics"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-libs/monitoring/adapter"
)

// TCP handles the TCP metric reporting.
type TCP struct {
done chan struct{}

lastPacket time.Time

device *monitoring.String // name of the device being monitored
packets *monitoring.Uint // number of packets processed
bytes *monitoring.Uint // number of bytes processed
rxQueue *monitoring.Uint // value of the rx_queue field from /proc/net/tcp{,6} (only on linux systems)
arrivalPeriod metrics.Sample // histogram of the elapsed time between packet arrivals
processingTime metrics.Sample // histogram of the elapsed time between packet receipt and publication
}

// NewTCPMetrics returns an input metric for the TCP processor.
func NewTCPMetrics(reg *monitoring.Registry, device string, poll time.Duration, log *logp.Logger) *TCP {
out := &TCP{
device: monitoring.NewString(reg, "device"),
packets: monitoring.NewUint(reg, "received_events_total"),
bytes: monitoring.NewUint(reg, "received_bytes_total"),
rxQueue: monitoring.NewUint(reg, "receive_queue_length"),
arrivalPeriod: metrics.NewUniformSample(1024),
processingTime: metrics.NewUniformSample(1024),
}
_ = adapter.NewGoMetrics(reg, "arrival_period", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.arrivalPeriod))
_ = adapter.NewGoMetrics(reg, "processing_time", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.processingTime))

out.device.Set(device)

if poll > 0 && runtime.GOOS == "linux" {
addr4, addr6, err := addrs(device, log)
if err != nil {
log.Warn(err)
return out
}
out.done = make(chan struct{})
go out.poll(addr4, addr6, poll, log)
}

return out
}

// Log logs metric for the given packet.
func (m *TCP) Log(data []byte, timestamp time.Time) {
if m == nil {
return
}
m.processingTime.Update(time.Since(timestamp).Nanoseconds())
m.packets.Add(1)
m.bytes.Add(uint64(len(data)))
if !m.lastPacket.IsZero() {
m.arrivalPeriod.Update(timestamp.Sub(m.lastPacket).Nanoseconds())
}
m.lastPacket = timestamp
}

// poll periodically gets TCP buffer stats from the OS.
func (m *TCP) poll(addr, addr6 []string, each time.Duration, log *logp.Logger) {
hasUnspecified, addrIsUnspecified, badAddr := containsUnspecifiedAddr(addr)
if badAddr != nil {
log.Warnf("failed to parse IPv4 addrs for metric collection %q", badAddr)
}
hasUnspecified6, addrIsUnspecified6, badAddr := containsUnspecifiedAddr(addr6)
if badAddr != nil {
log.Warnf("failed to parse IPv6 addrs for metric collection %q", badAddr)
}

// Do an initial check for access to the filesystem and of the
// value constructed by containsUnspecifiedAddr. This gives a
// base level for the rx_queue values and ensures that if the
// constructed address values are malformed we panic early
// within the period of system testing.
want4 := true
rx, err := procNetTCP("/proc/net/tcp", addr, hasUnspecified, addrIsUnspecified)
if err != nil {
want4 = false
log.Infof("did not get initial tcp stats from /proc: %v", err)
}
want6 := true
rx6, err := procNetTCP("/proc/net/tcp6", addr6, hasUnspecified6, addrIsUnspecified6)
if err != nil {
want6 = false
log.Infof("did not get initial tcp6 stats from /proc: %v", err)
}
if !want4 && !want6 {
log.Warnf("failed to get initial tcp or tcp6 stats from /proc: %v", err)
} else {
m.rxQueue.Set(uint64(rx + rx6))
}

t := time.NewTicker(each)
for {
select {
case <-t.C:
var found bool
rx, err := procNetTCP("/proc/net/tcp", addr, hasUnspecified, addrIsUnspecified)
if err != nil {
if want4 {
log.Warnf("failed to get tcp stats from /proc: %v", err)
}
} else {
found = true
want4 = true
}
rx6, err := procNetTCP("/proc/net/tcp6", addr6, hasUnspecified6, addrIsUnspecified6)
if err != nil {
if want6 {
log.Warnf("failed to get tcp6 stats from /proc: %v", err)
}
} else {
found = true
want6 = true
}
if found {
m.rxQueue.Set(uint64(rx + rx6))
}
case <-m.done:
t.Stop()
return
}
}
}

// procNetTCP returns the rx_queue field of the TCP socket table for the
// socket on the provided address formatted in hex, xxxxxxxx:xxxx or the IPv6
// equivalent.
// This function is only useful on linux due to its dependence on the /proc
// filesystem, but is kept in this file for simplicity. If hasUnspecified
// is true, all addresses listed in the file in path are considered, and the
// sum of rx_queue matching the addr ports is returned where the corresponding
// addrIsUnspecified is true.
func procNetTCP(path string, addr []string, hasUnspecified bool, addrIsUnspecified []bool) (rx int64, err error) {
if len(addr) == 0 {
return 0, nil
}
if len(addr) != len(addrIsUnspecified) {
return 0, errors.New("mismatched address/unspecified lists: please report this")
}
b, err := os.ReadFile(path)
if err != nil {
return 0, err
}
lines := bytes.Split(b, []byte("\n"))
if len(lines) < 2 {
return 0, fmt.Errorf("%s entry not found for %s (no line)", path, addr)
}
var found bool
for _, l := range lines[1:] {
f := bytes.Fields(l)
const queuesField = 4
if len(f) > queuesField && contains(f[1], addr, addrIsUnspecified) {
_, r, ok := bytes.Cut(f[4], []byte(":"))
if !ok {
return 0, errors.New("no rx_queue field " + string(f[queuesField]))
}
found = true

// queue lengths are hex, e.g.:
// - https://elixir.bootlin.com/linux/v6.2.11/source/net/ipv4/tcp_ipv4.c#L2643
// - https://elixir.bootlin.com/linux/v6.2.11/source/net/ipv6/tcp_ipv6.c#L1987
v, err := strconv.ParseInt(string(r), 16, 64)
if err != nil {
return 0, fmt.Errorf("failed to parse rx_queue: %w", err)
}
rx += v

if hasUnspecified {
continue
}
return rx, nil
}
}
if found {
return rx, nil
}
return 0, fmt.Errorf("%s entry not found for %s", path, addr)
}

func (m *TCP) Close() {
if m == nil {
return
}
if m.done != nil {
// Shut down poller and wait until done before unregistering metrics.
m.done <- struct{}{}
}
}
Loading