Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to start FLP directly from the flow logs producer #538

Merged
merged 5 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 3 additions & 23 deletions cmd/flowlogs-pipeline/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package main

import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
Expand All @@ -35,7 +34,7 @@ import (
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
"github.com/prometheus/client_golang/prometheus"
"github.com/netobserv/flowlogs-pipeline/pkg/prometheus"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
Expand Down Expand Up @@ -182,27 +181,8 @@ func run() {

// Setup (threads) exit manager
utils.SetupElegantExit()

// set up private prometheus registry
if cfg.MetricsSettings.SuppressGoMetrics {
reg := prometheus.NewRegistry()
prometheus.DefaultRegisterer = reg
prometheus.DefaultGatherer = reg
}

// create prometheus server for operational metrics
// if value of address is empty, then by default it will take 0.0.0.0
addr := fmt.Sprintf("%s:%v", cfg.MetricsSettings.Address, cfg.MetricsSettings.Port)
log.Infof("startServer: addr = %s", addr)
promServer := &http.Server{
Addr: addr,
// TLS clients must use TLS 1.2 or higher
TLSConfig: &tls.Config{
MinVersion: tls.VersionTLS12,
},
}
tlsConfig := cfg.MetricsSettings.TLS
go utils.StartPromServer(tlsConfig, promServer, !cfg.MetricsSettings.NoPanic, prometheus.DefaultGatherer.(*prometheus.Registry))
prometheus.SetGlobalMetricsSettings(&cfg.MetricsSettings)
promServer := prometheus.StartServerAsync(&cfg.MetricsSettings.PromConnectionInfo, nil)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about making this optionnal ?
If we are looking for lightweight solution we may skip prometheus since we have other export capabilities now

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not only for prometheus pipeline output but also for the "operational metrics" ie. related to FLP health .. which doesn't mean it cannot be made optional, but I would say the default should still to have these metrics

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add a field enableOperational in MetricsSettings.
If it's set false, this "global" server would not be started, and it's still possible to start per-promencode stage servers.
However if it's disabled and the user didn't configure per-promencode servers (which is optional), then they wouldn't get any metric ... so it creates more configuration skews....

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe enableGlobalServer would be a better name for the knob as it carries more explicitly that's it's neededif you don't configure per-stage server.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks !


// Create new flows pipeline
mainPipeline, err = pipeline.NewPipeline(&cfg)
Expand Down
8 changes: 3 additions & 5 deletions pkg/confgen/flowlogs2metrics_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,13 @@ func (cg *ConfGen) GenerateFlowlogs2PipelineConfig() *config.ConfigFileStruct {
if cg.config.Write.Loki != nil {
forkedNode.WriteLoki("write_loki", *cg.config.Write.Loki)
}
return &config.ConfigFileStruct{
LogLevel: "error",
Pipeline: pipeline.GetStages(),
Parameters: pipeline.GetStageParams(),
return pipeline.IntoConfigFileStruct(&config.ConfigFileStruct{
LogLevel: "error",
MetricsSettings: config.MetricsSettings{
PromConnectionInfo: api.PromConnectionInfo{Port: 9102},
Prefix: "flp_op_",
},
}
})
}

func (cg *ConfGen) GenerateTruncatedConfig() []config.StageParam {
Expand Down
23 changes: 23 additions & 0 deletions pkg/config/pipeline_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type PipelineBuilderStage struct {
pipeline *pipeline
}

const PresetIngesterStage = "preset-ingester"

// NewPipeline creates a new pipeline from an existing ingest
func NewPipeline(name string, ingest *Ingest) (PipelineBuilderStage, error) {
if ingest.Collector != nil {
Expand Down Expand Up @@ -89,6 +91,15 @@ func NewKafkaPipeline(name string, ingest api.IngestKafka) PipelineBuilderStage
return PipelineBuilderStage{pipeline: &p, lastStage: name}
}

// NewPresetIngesterPipeline creates a new partial pipeline without ingest stage
func NewPresetIngesterPipeline() PipelineBuilderStage {
p := pipeline{
stages: []Stage{},
config: []StageParam{},
}
return PipelineBuilderStage{pipeline: &p, lastStage: PresetIngesterStage}
}

func (b *PipelineBuilderStage) next(name string, param StageParam) PipelineBuilderStage {
b.pipeline.stages = append(b.pipeline.stages, Stage{Name: name, Follows: b.lastStage})
b.pipeline.config = append(b.pipeline.config, param)
Expand Down Expand Up @@ -164,3 +175,15 @@ func (b *PipelineBuilderStage) GetStages() []Stage {
func (b *PipelineBuilderStage) GetStageParams() []StageParam {
return b.pipeline.config
}

// IntoConfigFileStruct injects the current pipeline and params in the provided ConfigFileStruct object.
func (b *PipelineBuilderStage) IntoConfigFileStruct(cfs *ConfigFileStruct) *ConfigFileStruct {
cfs.Pipeline = b.GetStages()
cfs.Parameters = b.GetStageParams()
return cfs
}

// ToConfigFileStruct returns the current pipeline and params as a new ConfigFileStruct object.
func (b *PipelineBuilderStage) ToConfigFileStruct() *ConfigFileStruct {
return b.IntoConfigFileStruct(&ConfigFileStruct{})
}
15 changes: 2 additions & 13 deletions pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@
package encode

import (
"crypto/tls"
"fmt"
"net/http"
"strings"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
putils "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
promserver "github.com/netobserv/flowlogs-pipeline/pkg/prometheus"
"github.com/netobserv/flowlogs-pipeline/pkg/utils"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -279,17 +278,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
if cfg.PromConnectionInfo != nil {
registry := prometheus.NewRegistry()
registerer = registry
addr := fmt.Sprintf("%s:%v", cfg.PromConnectionInfo.Address, cfg.PromConnectionInfo.Port)
log.Infof("startServer: addr = %s", addr)
promServer := &http.Server{
Addr: addr,
// TLS clients must use TLS 1.2 or higher
TLSConfig: &tls.Config{
MinVersion: tls.VersionTLS12,
},
}
tlsConfig := cfg.PromConnectionInfo.TLS
go putils.StartPromServer(tlsConfig, promServer, true, registry)
promserver.StartServerAsync(cfg.PromConnectionInfo, nil)
} else {
registerer = prometheus.DefaultRegisterer
}
Expand Down
43 changes: 43 additions & 0 deletions pkg/pipeline/ingest/ingest_inprocess.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package ingest

import (
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"

"github.com/netobserv/netobserv-ebpf-agent/pkg/decode"
"github.com/sirupsen/logrus"
)

var ilog = logrus.WithField("component", "ingest.InProcess")

// InProcess ingester is meant to be imported and used from another program
// via pipeline.StartFLPInProcess
type InProcess struct {
flowPackets chan *pbflow.Records
}

func NewInProcess(flowPackets chan *pbflow.Records) *InProcess {
return &InProcess{flowPackets: flowPackets}
}

func (d *InProcess) Ingest(out chan<- config.GenericMap) {
go func() {
<-utils.ExitChannel()
d.Close()
}()
for fp := range d.flowPackets {
ilog.Debugf("Ingested %v records", len(fp.Entries))
for _, entry := range fp.Entries {
out <- decode.PBFlowToMap(entry)
}
}
}

func (d *InProcess) Write(record *pbflow.Records) {
d.flowPackets <- record
}

func (d *InProcess) Close() {
close(d.flowPackets)
}
32 changes: 32 additions & 0 deletions pkg/pipeline/inprocess.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package pipeline

import (
"context"
"fmt"

"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/ingest"
"github.com/netobserv/flowlogs-pipeline/pkg/prometheus"
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"
)

// StartFLPInProcess is an entry point to start the whole FLP / pipeline processing from imported code
func StartFLPInProcess(cfg *config.ConfigFileStruct) (*ingest.InProcess, error) {
prometheus.SetGlobalMetricsSettings(&cfg.MetricsSettings)
promServer := prometheus.StartServerAsync(&cfg.MetricsSettings.PromConnectionInfo, nil)

// Create new flows pipeline
ingester := ingest.NewInProcess(make(chan *pbflow.Records, 100))
flp, err := newPipelineFromIngester(cfg, ingester)
if err != nil {
return nil, fmt.Errorf("failed to initialize pipeline %w", err)
}

// Starts the flows pipeline; blocking call
go func() {
flp.Run()
_ = promServer.Shutdown(context.Background())
}()

return ingester, nil
}
121 changes: 121 additions & 0 deletions pkg/pipeline/inprocess_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package pipeline

import (
"bufio"
"encoding/json"
"os"
"testing"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

func TestInProcessFLP(t *testing.T) {
pipeline := config.NewPresetIngesterPipeline()
pipeline = pipeline.WriteStdout("writer", api.WriteStdout{Format: "json"})
ingester, err := StartFLPInProcess(pipeline.ToConfigFileStruct())
require.NoError(t, err)
defer ingester.Close()

capturedOut, w, _ := os.Pipe()
old := os.Stdout
os.Stdout = w
defer func() {
os.Stdout = old
}()

// yield thread to allow pipe services correctly start
time.Sleep(10 * time.Millisecond)

startTime := time.Now()
endTime := startTime.Add(7 * time.Second)
someDuration := endTime.Sub(startTime)

ingester.Write(&pbflow.Records{
Entries: []*pbflow.Record{{
Interface: "eth0",
EthProtocol: 2048,
Bytes: 456,
Packets: 123,
Direction: pbflow.Direction_EGRESS,
TimeFlowStart: timestamppb.New(startTime),
TimeFlowEnd: timestamppb.New(endTime),
Network: &pbflow.Network{
SrcAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x01020304},
},
DstAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x05060708},
},
Dscp: 1,
},
DataLink: &pbflow.DataLink{
DstMac: 0x112233445566,
SrcMac: 0x010203040506,
},
Transport: &pbflow.Transport{
Protocol: 17,
SrcPort: 23000,
DstPort: 443,
},
AgentIp: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x0a0b0c0d},
},
PktDropBytes: 100,
PktDropPackets: 10,
PktDropLatestFlags: 1,
PktDropLatestState: 1,
PktDropLatestDropCause: 8,
DnsLatency: durationpb.New(someDuration),
DnsId: 1,
DnsFlags: 0x80,
DnsErrno: 0,
TimeFlowRtt: durationpb.New(someDuration),
}},
})

scanner := bufio.NewScanner(capturedOut)
require.True(t, scanner.Scan())
capturedRecord := map[string]interface{}{}
bytes := scanner.Bytes()
require.NoError(t, json.Unmarshal(bytes, &capturedRecord), string(bytes))

assert.NotZero(t, capturedRecord["TimeReceived"])
delete(capturedRecord, "TimeReceived")
assert.EqualValues(t, map[string]interface{}{
"FlowDirection": float64(1),
"Bytes": float64(456),
"SrcAddr": "1.2.3.4",
"DstAddr": "5.6.7.8",
"Dscp": float64(1),
"DstMac": "11:22:33:44:55:66",
"SrcMac": "01:02:03:04:05:06",
"SrcPort": float64(23000),
"DstPort": float64(443),
"Duplicate": false,
"Etype": float64(2048),
"Packets": float64(123),
"Proto": float64(17),
"TimeFlowStartMs": float64(startTime.UnixMilli()),
"TimeFlowEndMs": float64(endTime.UnixMilli()),
"Interface": "eth0",
"AgentIP": "10.11.12.13",
"PktDropBytes": float64(100),
"PktDropPackets": float64(10),
"PktDropLatestFlags": float64(1),
"PktDropLatestState": "TCP_ESTABLISHED",
"PktDropLatestDropCause": "SKB_DROP_REASON_NETFILTER_DROP",
"DnsLatencyMs": float64(someDuration.Milliseconds()),
"DnsId": float64(1),
"DnsFlags": float64(0x80),
"DnsErrno": float64(0),
"DnsFlagsResponseCode": "NoError",
"TimeFlowRttNs": float64(someDuration.Nanoseconds()),
}, capturedRecord)
}
23 changes: 15 additions & 8 deletions pkg/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/ingest"
"github.com/netobserv/gopipes/pkg/node"
log "github.com/sirupsen/logrus"
)
Expand All @@ -48,18 +49,24 @@ type Pipeline struct {

// NewPipeline defines the pipeline elements
func NewPipeline(cfg *config.ConfigFileStruct) (*Pipeline, error) {
log.Debugf("entering NewPipeline")
return newPipelineFromIngester(cfg, nil)
}

// newPipelineFromIngester defines the pipeline elements from a preset ingester (e.g. for in-process receiver)
func newPipelineFromIngester(cfg *config.ConfigFileStruct, ing ingest.Ingester) (*Pipeline, error) {
log.Debugf("entering newPipelineFromIngester")

stages := cfg.Pipeline
log.Debugf("stages = %v ", stages)
configParams := cfg.Parameters
log.Debugf("configParams = %v ", configParams)
log.Debugf("stages = %v ", cfg.Pipeline)
log.Debugf("configParams = %v ", cfg.Parameters)

build := newBuilder(cfg)
if err := build.readStages(); err != nil {
builder := newBuilder(cfg)
if ing != nil {
builder.presetIngester(ing)
}
if err := builder.readStages(); err != nil {
return nil, err
}
return build.build()
return builder.build()
}

func (p *Pipeline) Run() {
Expand Down
Loading