Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/crowdsec-cli/clisetup/setup/detect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ detect:
source: journalctl
filename: /path/to/file.log`,
want: nil,
wantErr: `invalid acquisition spec for foobar: cannot parse JournalCtlSource configuration: [1:1] unknown field "filename"`,
wantErr: `invalid acquisition spec for foobar: cannot parse journalctl acquisition config: [1:1] unknown field "filename"`,
}, {
name: "source file: required fields",
config: `
Expand Down
20 changes: 10 additions & 10 deletions cmd/crowdsec/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,21 +111,21 @@ func LoadBuckets(cConfig *csconfig.Config, hub *cwhub.Hub) error {
}

func LoadAcquisition(ctx context.Context, cConfig *csconfig.Config) ([]acquisition.DataSource, error) {
var err error

if flags.SingleFileType != "" && flags.OneShotDSN != "" {
flags.Labels = labels
flags.Labels = additionalLabels
flags.Labels["type"] = flags.SingleFileType

dataSources, err = acquisition.LoadAcquisitionFromDSN(ctx, flags.OneShotDSN, flags.Labels, flags.Transform)
ds, err := acquisition.LoadAcquisitionFromDSN(ctx, flags.OneShotDSN, flags.Labels, flags.Transform)
if err != nil {
return nil, fmt.Errorf("failed to configure datasource for %s: %w", flags.OneShotDSN, err)
return nil, err
}
dataSources = append(dataSources, ds)
} else {
dataSources, err = acquisition.LoadAcquisitionFromFiles(ctx, cConfig.Crowdsec, cConfig.Prometheus)
dss, err := acquisition.LoadAcquisitionFromFiles(ctx, cConfig.Crowdsec, cConfig.Prometheus)
if err != nil {
return nil, err
}
dataSources = dss
}

if len(dataSources) == 0 {
Expand All @@ -136,9 +136,9 @@ func LoadAcquisition(ctx context.Context, cConfig *csconfig.Config) ([]acquisiti
}

var (
dumpFolder string
dumpStates bool
labels = make(labelsMap)
dumpFolder string
dumpStates bool
additionalLabels = make(labelsMap)
)

func (*labelsMap) String() string {
Expand Down Expand Up @@ -172,7 +172,7 @@ func (f *Flags) Parse() {
flag.StringVar(&f.OneShotDSN, "dsn", "", "Process a single data source in time-machine")
flag.StringVar(&f.Transform, "transform", "", "expr to apply on the event after acquisition")
flag.StringVar(&f.SingleFileType, "type", "", "Labels.type for file in time-machine")
flag.Var(&labels, "label", "Additional Labels for file in time-machine")
flag.Var(&additionalLabels, "label", "Additional Labels for file in time-machine")
flag.BoolVar(&f.TestMode, "t", false, "only test configs")
flag.BoolVar(&f.DisableAgent, "no-cs", false, "disable crowdsec agent")
flag.BoolVar(&f.DisableAPI, "no-api", false, "disable local API")
Expand Down
111 changes: 85 additions & 26 deletions pkg/acquisition/acquisition.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import (
"io"
"maps"
"os"
"time"
"slices"
"strings"

"github.com/cenkalti/backoff/v5"
"github.com/expr-lang/expr"
"github.com/expr-lang/expr/vm"
"github.com/goccy/go-yaml"
Expand Down Expand Up @@ -69,6 +71,11 @@ type Tailer interface {
StreamingAcquisition(ctx context.Context, out chan pipeline.Event, acquisTomb *tomb.Tomb) error
}

// RestartableStreamer works Like Tailer but should return any error and leave the retry logic to the caller
type RestartableStreamer interface {
Stream(ctx context.Context, out chan pipeline.Event) error
}

type MetricsProvider interface {
// Returns pointers to metrics that are managed by the module
GetMetrics() []prometheus.Collector
Expand Down Expand Up @@ -121,14 +128,14 @@ func registerDataSource(dataSourceType string, dsGetter func() DataSource) {
}

// setupLogger creates a logger for the datasource to use at runtime.
func setupLogger(source, name string, level log.Level) (*log.Entry, error) {
func setupLogger(typ, name string, level log.Level) (*log.Entry, error) {
clog := log.New()
if err := logging.ConfigureLogger(clog, level); err != nil {
return nil, fmt.Errorf("while configuring datasource logger: %w", err)
return nil, fmt.Errorf("configuring datasource logger: %w", err)
}

fields := log.Fields{
"type": source,
"type": typ,
}

if name != "" {
Expand Down Expand Up @@ -167,18 +174,20 @@ func DataSourceConfigure(ctx context.Context, commonConfig configuration.DataSou
return dataSrc, nil
}

func LoadAcquisitionFromDSN(ctx context.Context, dsn string, labels map[string]string, transformExpr string) ([]DataSource, error) {
func LoadAcquisitionFromDSN(ctx context.Context, dsn string, labels map[string]string, transformExpr string) (DataSource, error) {
frags := strings.Split(dsn, ":")
if len(frags) == 1 {
return nil, fmt.Errorf("%s isn't valid dsn (no protocol)", dsn)
return nil, fmt.Errorf("%s is not a valid dsn (no protocol)", dsn)
}

dataSrc, err := GetDataSourceIface(frags[0])
if err != nil {
return nil, fmt.Errorf("no acquisition for protocol %s:// - %w", frags[0], err)
}

subLogger, err := setupLogger(dsn, "", 0)
typ := labels["type"]

subLogger, err := setupLogger(typ, "", 0)
if err != nil {
return nil, err
}
Expand All @@ -200,10 +209,10 @@ func LoadAcquisitionFromDSN(ctx context.Context, dsn string, labels map[string]s
}

if err = dsnConf.ConfigureByDSN(ctx, dsn, labels, subLogger, uniqueID); err != nil {
return nil, fmt.Errorf("while configuration datasource for %s: %w", dsn, err)
return nil, fmt.Errorf("configuring datasource for %q: %w", dsn, err)
}

return []DataSource{dataSrc}, nil
return dataSrc, nil
}

func GetMetricsLevelFromPromCfg(prom *csconfig.PrometheusCfg) metrics.AcquisitionMetricsLevel {
Expand Down Expand Up @@ -349,7 +358,7 @@ func sourcesFromFile(ctx context.Context, acquisFile string, metricsLevel metric
continue
}

return nil, fmt.Errorf("while configuring datasource of type %s from %s (position %d): %w", sub.Source, acquisFile, idx, err)
return nil, fmt.Errorf("configuring datasource of type %s from %s (position %d): %w", sub.Source, acquisFile, idx, err)
}

if sub.TransformExpr != "" {
Expand Down Expand Up @@ -484,6 +493,72 @@ func transform(transformChan chan pipeline.Event, output chan pipeline.Event, ac
}
}


func runRestartableStream(ctx context.Context, rs RestartableStreamer, name string, output chan pipeline.Event, acquisTomb *tomb.Tomb) error {
// wrap tomb logic with context
ctx, cancel := context.WithCancel(ctx)
go func() {
<-acquisTomb.Dying()
cancel()
}()

acquisTomb.Go(func() error {
// TODO: check timing and exponential?
bo := backoff.NewConstantBackOff(10 * time.Second)
bo.Reset() // TODO: reset according to run time

for {
select {
case <-ctx.Done():
return nil
default:
}

if err := rs.Stream(ctx, output); err != nil {
log.Errorf("datasource %q: stream error: %v (retrying)", name, err)
}

select {
case <-ctx.Done():
return nil
default:
}

d := bo.NextBackOff()
log.Infof("datasource %q: restarting stream in %s", name, d)

select {
case <-ctx.Done():
return nil
case <-time.After(d):
}
}
})

return nil
}


func acquireSource(ctx context.Context, source DataSource, name string, output chan pipeline.Event, acquisTomb *tomb.Tomb) error {
if source.GetMode() == configuration.CAT_MODE {
if s, ok := source.(Fetcher); ok {
return s.OneShotAcquisition(ctx, output, acquisTomb)
}

return fmt.Errorf("%s: cat mode is set but OneShotAcquisition is not supported", source.GetName())
}

if s, ok := source.(Tailer); ok {
return s.StreamingAcquisition(ctx, output, acquisTomb)
}

if s, ok := source.(RestartableStreamer); ok {
return runRestartableStream(ctx, s, name, output, acquisTomb)
}

return fmt.Errorf("%s: tail mode is set but the datasource does not support streaming acquisition", source.GetName())
}

func StartAcquisition(ctx context.Context, sources []DataSource, output chan pipeline.Event, acquisTomb *tomb.Tomb) error {
// Don't wait if we have no sources, as it will hang forever
if len(sources) == 0 {
Expand All @@ -497,8 +572,6 @@ func StartAcquisition(ctx context.Context, sources []DataSource, output chan pip
acquisTomb.Go(func() error {
defer trace.CatchPanic("crowdsec/acquis")

var err error

outChan := output

log.Debugf("datasource %s UUID: %s", subsrc.GetName(), subsrc.GetUuid())
Expand All @@ -519,21 +592,7 @@ func StartAcquisition(ctx context.Context, sources []DataSource, output chan pip
})
}

if subsrc.GetMode() == configuration.TAIL_MODE {
if s, ok := subsrc.(Tailer); ok {
err = s.StreamingAcquisition(ctx, outChan, acquisTomb)
} else {
err = fmt.Errorf("%s: tail mode is set but StreamingAcquisition is not supported", subsrc.GetName())
}
} else {
if s, ok := subsrc.(Fetcher); ok {
err = s.OneShotAcquisition(ctx, outChan, acquisTomb)
} else {
err = fmt.Errorf("%s: cat mode is set but OneShotAcquisition is not supported", subsrc.GetName())
}
}

if err != nil {
if err := acquireSource(ctx, subsrc, subsrc.GetName(), output, acquisTomb); err != nil {
// if one of the acqusition returns an error, we kill the others to properly shutdown
acquisTomb.Kill(err)
}
Expand Down
21 changes: 12 additions & 9 deletions pkg/acquisition/acquisition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func TestLoadAcquisitionFromFiles(t *testing.T) {
Config: csconfig.CrowdsecServiceCfg{
AcquisitionFiles: []string{"testdata/bad_filetype.yaml"},
},
ExpectedError: "while configuring datasource of type file from testdata/bad_filetype.yaml",
ExpectedError: "configuring datasource of type file from testdata/bad_filetype.yaml",
},
{
TestName: "from_env",
Expand Down Expand Up @@ -399,9 +399,8 @@ func TestStartAcquisitionCat(t *testing.T) {
acquisTomb := tomb.Tomb{}

go func() {
if err := StartAcquisition(ctx, sources, out, &acquisTomb); err != nil {
t.Error("unexpected error")
}
err := StartAcquisition(ctx, sources, out, &acquisTomb)
assert.NoError(t, err)
}()

count := 0
Expand Down Expand Up @@ -534,15 +533,14 @@ func TestConfigureByDSN(t *testing.T) {
}{
{
dsn: "baddsn",
ExpectedError: "baddsn isn't valid dsn (no protocol)",
ExpectedError: "baddsn is not a valid dsn (no protocol)",
},
{
dsn: "foobar://toto",
ExpectedError: "no acquisition for protocol foobar://",
},
{
dsn: "mockdsn://test_expect",
ExpectedResLen: 1,
},
{
dsn: "mockdsn://bad",
Expand All @@ -554,10 +552,15 @@ func TestConfigureByDSN(t *testing.T) {

for _, tc := range tests {
t.Run(tc.dsn, func(t *testing.T) {
srcs, err := LoadAcquisitionFromDSN(ctx, tc.dsn, map[string]string{"type": "test_label"}, "")
source, err := LoadAcquisitionFromDSN(ctx, tc.dsn, map[string]string{"type": "test_label"}, "")
cstest.RequireErrorContains(t, err, tc.ExpectedError)

assert.Len(t, srcs, tc.ExpectedResLen)
if tc.ExpectedError != "" {
return
}

assert.NotNil(t, source)
assert.Equal(t, "mockdsn", source.GetName())
})
}
}
Expand All @@ -582,7 +585,7 @@ func TestStartAcquisition_MissingTailer(t *testing.T) {

go func() { errCh <- StartAcquisition(ctx, []DataSource{&TailModeNoTailer{}}, out, &tb) }()

require.ErrorContains(t, <-errCh, "tail_no_tailer: tail mode is set but StreamingAcquisition is not supported")
require.ErrorContains(t, <-errCh, "tail_no_tailer: tail mode is set but the datasource does not support streaming acquisition")
}


Expand Down
12 changes: 6 additions & 6 deletions pkg/acquisition/journalctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (

var (
// verify interface compliance
_ DataSource = (*journalctlacquisition.JournalCtlSource)(nil)
_ DSNConfigurer = (*journalctlacquisition.JournalCtlSource)(nil)
_ Fetcher = (*journalctlacquisition.JournalCtlSource)(nil)
_ Tailer = (*journalctlacquisition.JournalCtlSource)(nil)
_ MetricsProvider = (*journalctlacquisition.JournalCtlSource)(nil)
_ DataSource = (*journalctlacquisition.Source)(nil)
_ DSNConfigurer = (*journalctlacquisition.Source)(nil)
_ Fetcher = (*journalctlacquisition.Source)(nil)
_ RestartableStreamer = (*journalctlacquisition.Source)(nil)
_ MetricsProvider = (*journalctlacquisition.Source)(nil)
)

//nolint:gochecknoinits
func init() {
registerDataSource("journalctl", func() DataSource { return &journalctlacquisition.JournalCtlSource{} })
registerDataSource("journalctl", func() DataSource { return &journalctlacquisition.Source{} })
}
Loading