Skip to content

Commit

Permalink
Add ingest stage to provide synthetic workload for benchmarks (#395)
Browse files Browse the repository at this point in the history
* move GenerateConnectionFlowEntries from test to utils

* added fields to GenerateConnectionEntries

* added infrastructure for ingest synthetic

* added ingest synthetic

* added operational metric to count flow logs produced

* added error message if metricsSettings missing

* addressed reviewer comments

* simplified ingest loop
  • Loading branch information
KalmanMeth authored Mar 28, 2023
1 parent 99a2598 commit a31a0c0
Show file tree
Hide file tree
Showing 11 changed files with 308 additions and 40 deletions.
8 changes: 8 additions & 0 deletions docs/operational-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@ Each table below provides documentation for an exported flowlogs-pipeline operat
| **Labels** | stage |


### ingest_synthetic_flows_processed
| **Name** | ingest_synthetic_flows_processed |
|:---|:---|
| **Description** | Number of flow logs processed |
| **Type** | counter |
| **Labels** | stage |


### metrics_dropped
| **Name** | metrics_dropped |
|:---|:---|
Expand Down
1 change: 1 addition & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
FileType = "file"
FileLoopType = "file_loop"
FileChunksType = "file_chunks"
SyntheticType = "synthetic"
CollectorType = "collector"
GRPCType = "grpc"
FakeType = "fake"
Expand Down
24 changes: 24 additions & 0 deletions pkg/api/ingest_synthetic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (C) 2023 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package api

type IngestSynthetic struct {
Connections int `yaml:"connections,omitempty" json:"connections,omitempty" doc:"number of connections to maintain"`
BatchMaxLen int `yaml:"batchMaxLen,omitempty" json:"batchMaxLen,omitempty" doc:"the number of accumulated flows before being forwarded for processing"`
FlowLogsPerMin int `yaml:"flowLogsPerMin,omitempty" json:"flowLogsPerMin,omitempty" doc:"the number of flow logs to send per minute"`
}
3 changes: 3 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type Ingest struct {
Collector *api.IngestCollector `yaml:"collector,omitempty" json:"collector,omitempty"`
Kafka *api.IngestKafka `yaml:"kafka,omitempty" json:"kafka,omitempty"`
GRPC *api.IngestGRPCProto `yaml:"grpc,omitempty" json:"grpc,omitempty"`
Synthetic *api.IngestSynthetic `yaml:"synthetic,omitempty" json:"synthetic,omitempty"`
}

type File struct {
Expand Down Expand Up @@ -153,6 +154,8 @@ func ParseConfig(opts Options) (ConfigFileStruct, error) {
return out, err
}
logrus.Debugf("metrics settings = %v ", out.MetricsSettings)
} else {
logrus.Errorf("metrics settings missing")
}

return out, nil
Expand Down
13 changes: 7 additions & 6 deletions pkg/pipeline/encode/prom_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
"github.com/netobserv/flowlogs-pipeline/pkg/test"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -109,12 +110,12 @@ func Test_Prom_Cache1(t *testing.T) {
promEncode, err := initProm(cfg.Parameters[0].Encode.Prom)
require.NoError(t, err)

entries = test.GenerateConnectionEntries(10)
entries = utils.GenerateConnectionFlowEntries(10)
require.Equal(t, 10, len(entries))
encodeEntries(promEncode, entries)
require.Equal(t, 10, promEncode.mCache.GetCacheLen())

entries = test.GenerateConnectionEntries(40)
entries = utils.GenerateConnectionFlowEntries(40)
require.Equal(t, 40, len(entries))
encodeEntries(promEncode, entries)
require.Equal(t, 30, promEncode.mCache.GetCacheLen())
Expand All @@ -129,12 +130,12 @@ func Test_Prom_Cache2(t *testing.T) {
promEncode, err := initProm(cfg.Parameters[0].Encode.Prom)
require.NoError(t, err)

entries = test.GenerateConnectionEntries(10)
entries = utils.GenerateConnectionFlowEntries(10)
require.Equal(t, 10, len(entries))
encodeEntries(promEncode, entries)
require.Equal(t, 20, promEncode.mCache.GetCacheLen())

entries = test.GenerateConnectionEntries(40)
entries = utils.GenerateConnectionFlowEntries(40)
require.Equal(t, 40, len(entries))
encodeEntries(promEncode, entries)
require.Equal(t, 30, promEncode.mCache.GetCacheLen())
Expand All @@ -149,12 +150,12 @@ func Test_Prom_Cache3(t *testing.T) {
promEncode, err := initProm(cfg.Parameters[0].Encode.Prom)
require.NoError(t, err)

entries = test.GenerateConnectionEntries(10)
entries = utils.GenerateConnectionFlowEntries(10)
require.Equal(t, 10, len(entries))
encodeEntries(promEncode, entries)
require.Equal(t, 20, promEncode.mCache.GetCacheLen())

entries = test.GenerateConnectionEntries(40)
entries = utils.GenerateConnectionFlowEntries(40)
require.Equal(t, 40, len(entries))
encodeEntries(promEncode, entries)
require.Equal(t, 80, promEncode.mCache.GetCacheLen())
Expand Down
7 changes: 4 additions & 3 deletions pkg/pipeline/extract/conntrack/conntrack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
"github.com/netobserv/flowlogs-pipeline/pkg/test"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -901,15 +902,15 @@ func TestMaxConnections(t *testing.T) {
ct := extract.(*conntrackImpl)
require.Equal(t, 0, ct.connStore.len())

flowLogs := test.GenerateConnectionEntries(10)
flowLogs := utils.GenerateConnectionFlowEntries(10)
ct.Extract(flowLogs)
require.Equal(t, 10, ct.connStore.len())

flowLogs = test.GenerateConnectionEntries(20)
flowLogs = utils.GenerateConnectionFlowEntries(20)
ct.Extract(flowLogs)
require.Equal(t, 20, ct.connStore.len())

flowLogs = test.GenerateConnectionEntries(40)
flowLogs = utils.GenerateConnectionFlowEntries(40)
ct.Extract(flowLogs)
require.Equal(t, maxConnections, ct.connStore.len())
}
Expand Down
107 changes: 107 additions & 0 deletions pkg/pipeline/ingest/ingest_synthetic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright (C) 2023 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package ingest

import (
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
)

type IngestSynthetic struct {
params api.IngestSynthetic
exitChan <-chan struct{}
flowLogsProcessed prometheus.Counter
}

const (
defaultConnections = 100
defaultBatchLen = 10
defaultFlowLogsPerMin = 2000
)

var (
flowLogsProcessed = operational.DefineMetric(
"ingest_synthetic_flows_processed",
"Number of flow logs processed",
operational.TypeCounter,
"stage",
)
)

// Ingest generates flow logs according to provided parameters
func (ingestS *IngestSynthetic) Ingest(out chan<- config.GenericMap) {
log.Debugf("entering IngestSynthetic Ingest, params = %v", ingestS.params)
// get a list of flow log entries, one per desired connection
// these flow logs will be sent again and again to simulate ongoing traffic on those connections
flowLogs := utils.GenerateConnectionFlowEntries(ingestS.params.Connections)
nLogs := len(flowLogs)
next := 0

// compute time interval between batches; divide BatchMaxLen by FlowLogsPerMin and adjust the types
ticker := time.NewTicker(time.Duration(int(time.Minute*time.Duration(ingestS.params.BatchMaxLen)) / ingestS.params.FlowLogsPerMin))

// loop forever
for {
select {
case <-ingestS.exitChan:
log.Debugf("exiting IngestSynthetic because of signal")
return
case <-ticker.C:
log.Debugf("sending a batch of %d flow logs from index %d", ingestS.params.BatchMaxLen, next)
for i := 0; i < ingestS.params.BatchMaxLen; i++ {
out <- flowLogs[next]
ingestS.flowLogsProcessed.Inc()
next++
if next >= nLogs {
next = 0
}
}
}
}
}

// NewIngestSynthetic create a new ingester
func NewIngestSynthetic(opMetrics *operational.Metrics, params config.StageParam) (Ingester, error) {
log.Debugf("entering NewIngestSynthetic")
confIngestSynthetic := api.IngestSynthetic{}
if params.Ingest != nil || params.Ingest.Synthetic != nil {
confIngestSynthetic = *params.Ingest.Synthetic
}
if confIngestSynthetic.Connections == 0 {
confIngestSynthetic.Connections = defaultConnections
}
if confIngestSynthetic.FlowLogsPerMin == 0 {
confIngestSynthetic.FlowLogsPerMin = defaultFlowLogsPerMin
}
if confIngestSynthetic.BatchMaxLen == 0 {
confIngestSynthetic.BatchMaxLen = defaultBatchLen
}
log.Debugf("params = %v", confIngestSynthetic)

return &IngestSynthetic{
params: confIngestSynthetic,
exitChan: utils.ExitChannel(),
flowLogsProcessed: opMetrics.NewCounter(&flowLogsProcessed, params.Name),
}, nil
}
91 changes: 91 additions & 0 deletions pkg/pipeline/ingest/ingest_synthetic_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright (C) 2023 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package ingest

import (
"testing"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/stretchr/testify/require"
)

func TestIngestSynthetic(t *testing.T) {
// check default values
params := config.StageParam{
Ingest: &config.Ingest{
Type: "synthetic",
Synthetic: &api.IngestSynthetic{},
},
}
ingest, err := NewIngestSynthetic(operational.NewMetrics(&config.MetricsSettings{}), params)
syn := ingest.(*IngestSynthetic)
require.NoError(t, err)
require.Equal(t, defaultBatchLen, syn.params.BatchMaxLen)
require.Equal(t, defaultConnections, syn.params.Connections)
require.Equal(t, defaultFlowLogsPerMin, syn.params.FlowLogsPerMin)

batchMaxLen := 3
connections := 20
flowLogsPerMin := 1000
synthetic := api.IngestSynthetic{
BatchMaxLen: batchMaxLen,
Connections: connections,
FlowLogsPerMin: flowLogsPerMin,
}
params = config.StageParam{
Ingest: &config.Ingest{
Type: "synthetic",
Synthetic: &synthetic,
},
}
ingest, err = NewIngestSynthetic(operational.NewMetrics(&config.MetricsSettings{}), params)
syn = ingest.(*IngestSynthetic)
require.NoError(t, err)
require.Equal(t, batchMaxLen, syn.params.BatchMaxLen)
require.Equal(t, connections, syn.params.Connections)
require.Equal(t, flowLogsPerMin, syn.params.FlowLogsPerMin)

// run the Ingest method in a separate thread
ingestOutput := make(chan config.GenericMap)
go syn.Ingest(ingestOutput)

type connection struct {
srcAddr string
dstAddr string
srcPort int
dstPort int
protocol int
}

// Start collecting flows from the ingester and ensure we have the specified number of distinct connections
connectionMap := make(map[connection]int)
for i := 0; i < (3 * connections); i++ {
flowEntry := <-ingestOutput
conn := connection{
srcAddr: flowEntry["SrcAddr"].(string),
dstAddr: flowEntry["DstAddr"].(string),
srcPort: flowEntry["SrcPort"].(int),
dstPort: flowEntry["DstPort"].(int),
protocol: flowEntry["Proto"].(int),
}
connectionMap[conn]++
}
require.Equal(t, connections, len(connectionMap))
}
2 changes: 2 additions & 0 deletions pkg/pipeline/pipeline_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,8 @@ func getIngester(opMetrics *operational.Metrics, params config.StageParam) (inge
switch params.Ingest.Type {
case api.FileType, api.FileLoopType, api.FileChunksType:
ingester, err = ingest.NewIngestFile(params)
case api.SyntheticType:
ingester, err = ingest.NewIngestSynthetic(opMetrics, params)
case api.CollectorType:
ingester, err = ingest.NewIngestCollector(opMetrics, params)
case api.KafkaType:
Expand Down
Loading

0 comments on commit a31a0c0

Please sign in to comment.