Skip to content

Commit

Permalink
[filebeat] Elasticsearch state storage for httpjson and cel inputs (#…
Browse files Browse the repository at this point in the history
…41446)

This enables Elasticsearch as State Store Backend for Security Integrations for
the Agentless solution. 

The scope of this change was narrowed down to supporting only `httpjson` inputs
in order to support Okta integration for the initial release. All the other
integrations inputs still use the file storage as before.
This is a short term solution for the state storage for k8s.

The feature currently can only be enabled with the
`AGENTLESS_ELASTICSEARCH_STATE_STORE_INPUT_TYPES` env var.

The existing code relied on the inputs state storage to be fully configurable
before the main beat managers runs. The change delays the configuration of
`httpjson` input to the time when the actual configuration is received from the
Agent. 

Example of the state storage index content for Okta integration:
```
{
  "took": 6,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 1,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": "agentless-state-httpjson-okta.system-028ecf4b-babe-44c6-939e-9e3096af6959",
        "_id": "httpjson::httpjson-okta.system-028ecf4b-babe-44c6-939e-9e3096af6959::https://dev-36006609.okta.com/api/v1/logs",
        "_seq_no": 39,
        "_primary_term": 1,
        "_score": 1,
        "_source": {
          "v": {
            "ttl": 1800000000000,
            "updated": "2024-10-24T20:21:22.032Z",
            "cursor": {
              "published": "2024-10-24T20:19:53.542Z"
            }
          }
        }
      }
    ]
  }
}
```

The naming convention for all state store is `agentless-state-<input id>`,
since the expectation for agentless we would have only one agent per policy and
the agents are ephemeral. 

Closes https://github.com/elastic/security-team/issues/11101

Co-authored-by: Orestis Floros <orestis.floros@elastic.co>
  • Loading branch information
aleksmaus and orestisfl authored Jan 28, 2025
1 parent 12c36bd commit 8180f23
Show file tree
Hide file tree
Showing 29 changed files with 1,313 additions and 124 deletions.
34 changes: 31 additions & 3 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
Expand Down Expand Up @@ -81,7 +82,9 @@ type Filebeat struct {
type PluginFactory func(beat.Info, *logp.Logger, StateStore) []v2.Plugin

type StateStore interface {
Access() (*statestore.Store, error)
// Access returns the storage registry depending on the type. This is needed for the Elasticsearch state store which
// is guarded by the feature.IsElasticsearchStateStoreEnabledForInput(typ) check.
Access(typ string) (*statestore.Store, error)
CleanupInterval() time.Duration
}

Expand Down Expand Up @@ -300,13 +303,36 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
return err
}

stateStore, err := openStateStore(b.Info, logp.NewLogger("filebeat"), config.Registry)
// Use context, like normal people do, hooking up to the beat.done channel
ctx, cn := context.WithCancel(context.Background())
go func() {
<-fb.done
cn()
}()

stateStore, err := openStateStore(ctx, b.Info, logp.NewLogger("filebeat"), config.Registry)
if err != nil {
logp.Err("Failed to open state store: %+v", err)
return err
}
defer stateStore.Close()

// If notifier is set, configure the listener for output configuration
// The notifier passes the elasticsearch output configuration down to the Elasticsearch backed state storage
// in order to allow it fully configure
if stateStore.notifier != nil {
b.OutputConfigReloader = reload.ReloadableFunc(func(r *reload.ConfigWithMeta) error {
outCfg := conf.Namespace{}
if err := r.Config.Unpack(&outCfg); err != nil || outCfg.Name() != "elasticsearch" {
logp.Err("Failed to unpack the output config: %v", err)
return nil
}

stateStore.notifier.Notify(outCfg.Config())
return nil
})
}

err = filestream.ValidateInputIDs(config.Inputs, logp.NewLogger("input.filestream"))
if err != nil {
logp.Err("invalid filestream configuration: %+v", err)
Expand Down Expand Up @@ -357,6 +383,8 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
defer func() {
_ = inputTaskGroup.Stop()
}()

// Store needs to be fully configured at this point
if err := v2InputLoader.Init(&inputTaskGroup); err != nil {
logp.Err("Failed to initialize the input managers: %v", err)
return err
Expand Down Expand Up @@ -541,7 +569,7 @@ func processLogInputTakeOver(stateStore StateStore, config *cfg.Config) error {
return nil
}

store, err := stateStore.Access()
store, err := stateStore.Access("")
if err != nil {
return fmt.Errorf("Failed to access state when attempting take over: %w", err)
}
Expand Down
46 changes: 40 additions & 6 deletions filebeat/beater/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,43 +18,77 @@
package beater

import (
"context"
"time"

"github.com/elastic/beats/v7/filebeat/config"
"github.com/elastic/beats/v7/filebeat/features"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/beats/v7/libbeat/statestore/backend"
"github.com/elastic/beats/v7/libbeat/statestore/backend/es"
"github.com/elastic/beats/v7/libbeat/statestore/backend/memlog"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/paths"
)

type filebeatStore struct {
registry *statestore.Registry
esRegistry *statestore.Registry
storeName string
cleanInterval time.Duration

// Notifies the Elasticsearch store about configuration change
// which is available only after the beat runtime manager connects to the Agent
// and receives the output configuration
notifier *es.Notifier
}

func openStateStore(info beat.Info, logger *logp.Logger, cfg config.Registry) (*filebeatStore, error) {
memlog, err := memlog.New(logger, memlog.Settings{
func openStateStore(ctx context.Context, info beat.Info, logger *logp.Logger, cfg config.Registry) (*filebeatStore, error) {
var (
reg backend.Registry
err error

esreg *es.Registry
notifier *es.Notifier
)

if features.IsElasticsearchStateStoreEnabled() {
notifier = es.NewNotifier()
esreg = es.New(ctx, logger, notifier)
}

reg, err = memlog.New(logger, memlog.Settings{
Root: paths.Resolve(paths.Data, cfg.Path),
FileMode: cfg.Permissions,
})
if err != nil {
return nil, err
}

return &filebeatStore{
registry: statestore.NewRegistry(memlog),
store := &filebeatStore{
registry: statestore.NewRegistry(reg),
storeName: info.Beat,
cleanInterval: cfg.CleanInterval,
}, nil
notifier: notifier,
}

if esreg != nil {
store.esRegistry = statestore.NewRegistry(esreg)
}

return store, nil
}

func (s *filebeatStore) Close() {
s.registry.Close()
}

func (s *filebeatStore) Access() (*statestore.Store, error) {
// Access returns the storage registry depending on the type. Default is the file store.
func (s *filebeatStore) Access(typ string) (*statestore.Store, error) {
if features.IsElasticsearchStateStoreEnabledForInput(typ) && s.esRegistry != nil {
return s.esRegistry.Get(s.storeName)
}
return s.registry.Get(s.storeName)
}

Expand Down
59 changes: 59 additions & 0 deletions filebeat/features/features.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package features

import (
"os"
"strings"
)

// List of input types Elasticsearch state store is enabled for
var esTypesEnabled map[string]struct{}

var isESEnabled bool

func init() {
initFromEnv("AGENTLESS_ELASTICSEARCH_STATE_STORE_INPUT_TYPES")
}

func initFromEnv(envName string) {
esTypesEnabled = make(map[string]struct{})

arr := strings.Split(os.Getenv(envName), ",")
for _, e := range arr {
k := strings.TrimSpace(e)
if k != "" {
esTypesEnabled[k] = struct{}{}
}
}
isESEnabled = len(esTypesEnabled) > 0
}

// IsElasticsearchStateStoreEnabled returns true if feature is enabled for agentless
func IsElasticsearchStateStoreEnabled() bool {
return isESEnabled
}

// IsElasticsearchStateStoreEnabledForInput returns true if the provided input type uses Elasticsearch for state storage if the Elasticsearch state store feature is enabled
func IsElasticsearchStateStoreEnabledForInput(inputType string) bool {
if IsElasticsearchStateStoreEnabled() {
_, ok := esTypesEnabled[inputType]
return ok
}
return false
}
86 changes: 86 additions & 0 deletions filebeat/features/features_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package features

import (
"testing"

"github.com/stretchr/testify/assert"
)

func Test_initFromEnv(t *testing.T) {
const envName = "TEST_AGENTLESS_ENV"

t.Run("Without setting env", func(t *testing.T) {
// default init
assert.False(t, IsElasticsearchStateStoreEnabled())
assert.Empty(t, esTypesEnabled)
assert.False(t, IsElasticsearchStateStoreEnabledForInput("xxx"))

// init from env
initFromEnv(envName)
assert.False(t, IsElasticsearchStateStoreEnabled())
assert.Empty(t, esTypesEnabled)
assert.False(t, IsElasticsearchStateStoreEnabledForInput("xxx"))
})

tests := []struct {
name string
value string
wantEnabled bool
wantContains []string
}{
{
name: "Empty",
value: "",
wantEnabled: false,
wantContains: nil,
},
{
name: "Single value",
value: "xxx",
wantEnabled: true,
wantContains: []string{"xxx"},
},
{
name: "Multiple values",
value: "xxx,yyy",
wantEnabled: true,
wantContains: []string{"xxx", "yyy"},
},
{
name: "Multiple values with spaces",
value: ",,, , xxx , yyy, ,,,,",
wantEnabled: true,
wantContains: []string{"xxx", "yyy"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Setenv(envName, tt.value)
initFromEnv(envName)

assert.Equal(t, tt.wantEnabled, IsElasticsearchStateStoreEnabled())
for _, contain := range tt.wantContains {
assert.Contains(t, esTypesEnabled, contain)
assert.True(t, IsElasticsearchStateStoreEnabledForInput(contain))
}
assert.Len(t, esTypesEnabled, len(tt.wantContains))
})
}
}
8 changes: 4 additions & 4 deletions filebeat/input/filestream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (e *inputTestingEnvironment) abspath(filename string) string {
}

func (e *inputTestingEnvironment) requireRegistryEntryCount(expectedCount int) {
inputStore, _ := e.stateStore.Access()
inputStore, _ := e.stateStore.Access("")

actual := 0
err := inputStore.Each(func(_ string, _ statestore.ValueDecoder) (bool, error) {
Expand Down Expand Up @@ -331,7 +331,7 @@ func (e *inputTestingEnvironment) requireNoEntryInRegistry(filename, inputID str
e.t.Fatalf("cannot stat file when cheking for offset: %+v", err)
}

inputStore, _ := e.stateStore.Access()
inputStore, _ := e.stateStore.Access("")
id := getIDFromPath(filepath, inputID, fi)

var entry registryEntry
Expand All @@ -352,7 +352,7 @@ func (e *inputTestingEnvironment) requireOffsetInRegistryByID(key string, expect
}

func (e *inputTestingEnvironment) getRegistryState(key string) (registryEntry, error) {
inputStore, _ := e.stateStore.Access()
inputStore, _ := e.stateStore.Access("")

var entry registryEntry
err := inputStore.Get(key, &entry)
Expand Down Expand Up @@ -553,7 +553,7 @@ func (s *testInputStore) Close() {
s.registry.Close()
}

func (s *testInputStore) Access() (*statestore.Store, error) {
func (s *testInputStore) Access(_ string) (*statestore.Store, error) {
return s.registry.Get("filebeat")
}

Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/filestream/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (s *testStore) Close() {
s.registry.Close()
}

func (s *testStore) Access() (*statestore.Store, error) {
func (s *testStore) Access(_ string) (*statestore.Store, error) {
return s.registry.Get("filestream-benchmark")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ const globalInputID = ".global"

// StateStore interface and configurations used to give the Manager access to the persistent store.
type StateStore interface {
Access() (*statestore.Store, error)
Access(typ string) (*statestore.Store, error)
CleanupInterval() time.Duration
}

Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/filestream/internal/input-logfile/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ var closeStore = (*store).close
func openStore(log *logp.Logger, statestore StateStore, prefix string) (*store, error) {
ok := false

persistentStore, err := statestore.Access()
persistentStore, err := statestore.Access("")
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ func createSampleStore(t *testing.T, data map[string]state) testStateStore {

func (ts testStateStore) WithGCPeriod(d time.Duration) testStateStore { ts.GCPeriod = d; return ts }
func (ts testStateStore) CleanupInterval() time.Duration { return ts.GCPeriod }
func (ts testStateStore) Access() (*statestore.Store, error) {
func (ts testStateStore) Access(string) (*statestore.Store, error) {
if ts.Store == nil {
return nil, errors.New("no store configured")
}
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/journald/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (s *testInputStore) Close() {
s.registry.Close()
}

func (s *testInputStore) Access() (*statestore.Store, error) {
func (s *testInputStore) Access(_ string) (*statestore.Store, error) {
return s.registry.Get("filebeat")
}

Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/journald/input_filtering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func TestInputSeek(t *testing.T) {
env := newInputTestingEnvironment(t)

if testCase.cursor != "" {
store, _ := env.stateStore.Access()
store, _ := env.stateStore.Access("")
tmp := map[string]any{}
if err := json.Unmarshal([]byte(testCase.cursor), &tmp); err != nil {
t.Fatal(err)
Expand Down
Loading

0 comments on commit 8180f23

Please sign in to comment.