Skip to content

Commit

Permalink
Net O11y K8s cluster name retrieval (#648)
Browse files Browse the repository at this point in the history
* Net O11y K8s cluster name retrieval

* replace cluster.name by k8s.cluster.name

* Add flow printer node

* make linter happy

* Fixed EC2

* updated NOTICE

* Push vendor files

* Fix azure endpoint
  • Loading branch information
mariomac authored Feb 29, 2024
1 parent 490e84f commit ad647b1
Show file tree
Hide file tree
Showing 877 changed files with 434,934 additions and 22 deletions.
4 changes: 4 additions & 0 deletions NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ The Initial Developer of some parts of the product, which are copied from, deriv
inspired by NetObserv Flowlogs-Pipeline (https://github.com/netobserv/flowlogs-pipeline).
Copyright Red Hat/IBM.

The Initial Developer of some parts of the product, which are copied from, derived from, or
inspired by the DataDog Agent (https://github.com/DataDog/datadog-agent).
Copyright DataDog.

Grafana Beyla uses third-party libraries or other resources that may be
distributed under licenses different than the Grafana Beyla software. The licenses for
these third-party libraries are listed in the attached third_party_licenses.csv file
Expand Down
11 changes: 10 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ go 1.21

require (
github.com/AlessandroPomponio/go-gibberish v0.0.0-20191004143433-a2d4156f0396
github.com/aws/aws-sdk-go-v2 v1.25.2
github.com/aws/aws-sdk-go-v2/credentials v1.17.4
github.com/aws/aws-sdk-go-v2/service/ec2 v1.149.1
github.com/caarlos0/env/v9 v9.0.0
github.com/cilium/ebpf v0.12.3
github.com/gavv/monotime v0.0.0-20190418164738-30dba4353424
Expand All @@ -25,6 +28,7 @@ require (
github.com/vishvananda/netlink v1.1.0
github.com/vladimirvivien/gexe v0.2.0
github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2
go.opentelemetry.io/collector/consumer v0.94.1
go.opentelemetry.io/collector/pdata v1.1.0
go.opentelemetry.io/otel v1.23.1
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.23.1
Expand All @@ -50,6 +54,11 @@ require (
)

require (
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.2 // indirect
github.com/aws/smithy-go v1.20.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bytedance/sonic v1.9.1 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
Expand Down Expand Up @@ -78,6 +87,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect
github.com/hashicorp/go-version v1.6.0 // indirect
github.com/imdario/mergo v0.3.15 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
Expand All @@ -99,7 +109,6 @@ require (
github.com/ugorji/go/codec v1.2.11 // indirect
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.opentelemetry.io/collector/consumer v0.94.1 // indirect
go.opentelemetry.io/proto/otlp v1.1.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.19.0 // indirect
Expand Down
22 changes: 20 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,22 @@ github.com/AlessandroPomponio/go-gibberish v0.0.0-20191004143433-a2d4156f0396 h1
github.com/AlessandroPomponio/go-gibberish v0.0.0-20191004143433-a2d4156f0396/go.mod h1:2VCDG9kHYQ5vfYUqeoB7foVlcvIvB7rp9LxTELLD1qU=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/aws/aws-sdk-go-v2 v1.25.2 h1:/uiG1avJRgLGiQM9X3qJM8+Qa6KRGK5rRPuXE0HUM+w=
github.com/aws/aws-sdk-go-v2 v1.25.2/go.mod h1:Evoc5AsmtveRt1komDwIsjHFyrP5tDuF1D1U+6z6pNo=
github.com/aws/aws-sdk-go-v2/credentials v1.17.4 h1:h5Vztbd8qLppiPwX+y0Q6WiwMZgpd9keKe2EAENgAuI=
github.com/aws/aws-sdk-go-v2/credentials v1.17.4/go.mod h1:+30tpwrkOgvkJL1rUZuRLoxcJwtI/OkeBLYnHxJtVe0=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2 h1:bNo4LagzUKbjdxE0tIcR9pMzLR2U/Tgie1Hq1HQ3iH8=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2/go.mod h1:wRQv0nN6v9wDXuWThpovGQjqF1HFdcgWjporw14lS8k=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2 h1:EtOU5jsPdIQNP+6Q2C5e3d65NKT1PeCiQk+9OdzO12Q=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2/go.mod h1:tyF5sKccmDz0Bv4NrstEr+/9YkSPJHrcO7UsUKf7pWM=
github.com/aws/aws-sdk-go-v2/service/ec2 v1.149.1 h1:OGZUMBYZnz+R5nkW6FS1J8UlfLeM/pKojck+74+ZQGY=
github.com/aws/aws-sdk-go-v2/service/ec2 v1.149.1/go.mod h1:XxJNg7fIkR8cbm89i0zVZSxKpcPYsC8BWRwMIJOWbnk=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.1 h1:EyBZibRTVAs6ECHZOw5/wlylS9OcTzwyjeQMudmREjE=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.1/go.mod h1:JKpmtYhhPs7D97NL/ltqz7yCkERFW5dOlHyVl66ZYF8=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.2 h1:5ffmXjPtwRExp1zc7gENLgCPyHFbhEPwVTkTiH9niSk=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.2/go.mod h1:Ru7vg1iQ7cR4i7SZ/JTLYN9kaXtbL69UdgG0OQWQxW0=
github.com/aws/smithy-go v1.20.1 h1:4SZlSlMr36UEqC7XOyRVb27XMeZubNcBNN+9IgEPIQw=
github.com/aws/smithy-go v1.20.1/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM=
Expand Down Expand Up @@ -103,6 +119,10 @@ github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyf
github.com/imdario/mergo v0.3.15 h1:M8XP7IuFNsqUx6VPK2P9OSmsYsI/YFaGil0uD21V3dM=
github.com/imdario/mergo v0.3.15/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
Expand Down Expand Up @@ -198,8 +218,6 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.opentelemetry.io/collector v0.94.1 h1:bGHW5NKmh34oMflMEyNCHpes6vtiQNXpgea4GiscAOs=
go.opentelemetry.io/collector v0.94.1/go.mod h1:5ACZXRo6O23gBkRrHSxYs1sLaP4pZ8w+flZNE7pvoNg=
go.opentelemetry.io/collector/consumer v0.94.1 h1:l/9h5L71xr/d93snQ9fdxgz64C4UuB8mEDxpp456X8o=
go.opentelemetry.io/collector/consumer v0.94.1/go.mod h1:BIPWmw8wES6jlPTPC+acJxLvUzIdOm6uh/p/X85ALsY=
go.opentelemetry.io/collector/pdata v1.1.0 h1:cE6Al1rQieUjMHro6p6cKwcu3sjHXGG59BZ3kRVUvsM=
Expand Down
3 changes: 3 additions & 0 deletions pkg/beyla/network_cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ type NetworkConfig struct {
// This is an experimental feature and it is not guaranteed to work on most virtualized environments
// for external traffic.
ReverseDNS flow.ReverseDNS `yaml:"reverse_dns"`

// Print the network flows in the Standard Output, if true
Print bool `yaml:"print_flows" env:"BEYLA_NETWORK_PRINT_FLOWS"`
}

var defaultNetworkConfig = NetworkConfig{
Expand Down
16 changes: 13 additions & 3 deletions pkg/internal/netolly/agent/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ type FlowsPipeline struct {

ReverseDNS flow.ReverseDNS `forwardTo:"Decorator"`

Decorator `sendTo:"Exporter"`
Decorator `sendTo:"Exporter,Printer"`

Exporter export.MetricsConfig
Printer export.FlowPrinterEnabled
}

type MapTracer struct{}
Expand All @@ -48,12 +49,15 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (graph.Graph, error)
alog.Debug("creating flows' processing graph")
gb := graph.NewBuilder(node.ChannelBufferLen(f.cfg.ChannelBufferLen))

// Start nodes: those generating flow records (reading them from eBPF)
graph.RegisterStart(gb, func(_ MapTracer) (node.StartFunc[[]*ebpf.Record], error) {
return f.mapTracer.TraceLoop(ctx), nil
})
graph.RegisterStart(gb, func(_ RingBufTracer) (node.StartFunc[*ebpf.NetFlowRecordT], error) {
return f.rbTracer.TraceLoop(ctx), nil
})

// Middle nodes: apply transformations to the flow records, decorating and even removing them.
graph.RegisterMiddle(gb, func(_ Accounter) (node.MiddleFunc[*ebpf.NetFlowRecordT, []*ebpf.Record], error) {
return f.accounter.Account, nil
})
Expand All @@ -69,9 +73,14 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (graph.Graph, error)
}
return flow.Decorate(f.agentIP, ifaceNamer), nil
})
graph.RegisterMiddle(gb, k8s.MetadataDecoratorProvider)
graph.RegisterMiddle(gb, func(cfg k8s.MetadataDecorator) (node.MiddleFunc[[]*ebpf.Record, []*ebpf.Record], error) {
return k8s.MetadataDecoratorProvider(ctx, cfg)
})
graph.RegisterMiddle(gb, flow.ReverseDNSProvider)

// Terminal nodes export the flow record information out of the pipeline: OTEL and printer
graph.RegisterTerminal(gb, export.MetricsExporterProvider)
graph.RegisterTerminal(gb, export.FlowPrinterProvider)

var deduperExpireTime = f.cfg.NetworkFlows.DeduperFCExpiry
if deduperExpireTime <= 0 {
Expand All @@ -84,7 +93,8 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (graph.Graph, error)
},
Kubernetes: k8s.MetadataDecorator{Kubernetes: &f.cfg.Attributes.Kubernetes},
// TODO: allow prometheus exporting
Exporter: export.MetricsConfig{Metrics: &f.cfg.Metrics},
ReverseDNS: f.cfg.NetworkFlows.ReverseDNS,
Exporter: export.MetricsConfig{Metrics: &f.cfg.Metrics},
Printer: export.FlowPrinterEnabled(f.cfg.NetworkFlows.Print),
})
}
2 changes: 1 addition & 1 deletion pkg/internal/netolly/export/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func newMeterProvider(res *resource.Resource, exporter *metric.Exporter) (*metri
}

func attributes(m *ebpf.Record) []attribute.KeyValue {
res := make([]attribute.KeyValue, 0, 11+len(m.Attrs.Metadata))
res := make([]attribute.KeyValue, 0, 10+len(m.Attrs.Metadata))

res = append(res,
attribute.String("beyla.ip", m.Attrs.BeylaIP),
Expand Down
57 changes: 57 additions & 0 deletions pkg/internal/netolly/export/printer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package export

import (
"fmt"
"strings"

"github.com/mariomac/pipes/pkg/node"

"github.com/grafana/beyla/pkg/internal/netolly/ebpf"
)

type FlowPrinterEnabled bool

func (fpe FlowPrinterEnabled) Enabled() bool {
return bool(fpe)
}

func FlowPrinterProvider(_ FlowPrinterEnabled) (node.TerminalFunc[[]*ebpf.Record], error) {
return func(in <-chan []*ebpf.Record) {
for flows := range in {
for _, flow := range flows {
printFlow(flow)
}
}
}, nil
}

func printFlow(f *ebpf.Record) {
sb := strings.Builder{}
sb.WriteString("beyla.ip==")
sb.WriteString(f.Attrs.BeylaIP)
sb.WriteString(" iface=")
sb.WriteString(f.Attrs.Interface)
sb.WriteString(" direction=")
sb.WriteString(fmt.Sprint(f.Id.Direction))
sb.WriteString(" src.address=")
sb.WriteString(f.Id.SrcIP().IP().String())
sb.WriteString(" dst.address=")
sb.WriteString(f.Id.DstIP().IP().String())
sb.WriteString(" src.name=")
sb.WriteString(f.Attrs.SrcName)
sb.WriteString(" src.namespace=")
sb.WriteString(f.Attrs.SrcNamespace)
sb.WriteString(" dst.name=")
sb.WriteString(f.Attrs.DstName)
sb.WriteString(" dst.namespace=")
sb.WriteString(f.Attrs.DstNamespace)

for k, v := range f.Attrs.Metadata {
sb.WriteString(" ")
sb.WriteString(k)
sb.WriteString("=")
sb.WriteString(v)
}

fmt.Println("network_flow:", sb.String())
}
Loading

0 comments on commit ad647b1

Please sign in to comment.