Skip to content

Commit

Permalink
Osquerybeat: support differential query results (#33070)
Browse files Browse the repository at this point in the history
* Osquerybeat: support differential query results

* Update code owners as suggested

* Fix utz

* Add support for query 'removed' flag

* Fix linter
  • Loading branch information
aleksmaus authored Sep 14, 2022
1 parent 923e11f commit eaf937b
Show file tree
Hide file tree
Showing 12 changed files with 144 additions and 71 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -137,5 +137,6 @@ CHANGELOG*
/x-pack/metricbeat/docs/ # Listed without an owner to avoid maintaining doc ownership for each input and module.
/x-pack/metricbeat/module/ @elastic/integrations
/x-pack/metricbeat/module/enterprisesearch @elastic/ent-search-application-backend
/x-pack/osquerybeat/ @elastic/security-asset-management
/x-pack/packetbeat/ @elastic/security-external-integrations
/x-pack/winlogbeat/ @elastic/security-external-integrations
4 changes: 2 additions & 2 deletions x-pack/osquerybeat/beater/action_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var (
)

type publisher interface {
Publish(index, actionID, responseID string, hits []map[string]interface{}, ecsm ecs.Mapping, reqData interface{})
Publish(index, actionID, responseID string, meta map[string]interface{}, hits []map[string]interface{}, ecsm ecs.Mapping, reqData interface{})
}

type queryExecutor interface {
Expand Down Expand Up @@ -104,7 +104,7 @@ func (a *actionHandler) executeQuery(ctx context.Context, index string, ac actio

a.log.Debugf("Completed query in: %v", time.Since(start))

a.publisher.Publish(index, ac.ID, responseID, hits, ac.ECSMapping, req["data"])
a.publisher.Publish(index, ac.ID, responseID, nil, hits, ac.ECSMapping, req["data"])

return len(hits), nil
}
4 changes: 3 additions & 1 deletion x-pack/osquerybeat/beater/action_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,17 @@ type mockPublisher struct {
index string
actionID string
responseID string
meta map[string]interface{}
hits []map[string]interface{}
ecsm ecs.Mapping
reqData interface{}
}

func (p *mockPublisher) Publish(index, actionID, responseID string, hits []map[string]interface{}, ecsm ecs.Mapping, reqData interface{}) {
func (p *mockPublisher) Publish(index, actionID, responseID string, meta map[string]interface{}, hits []map[string]interface{}, ecsm ecs.Mapping, reqData interface{}) {
p.index = index
p.actionID = actionID
p.responseID = responseID
p.meta = meta
p.hits = hits
p.ecsm = ecsm
p.reqData = reqData
Expand Down
6 changes: 5 additions & 1 deletion x-pack/osquerybeat/beater/config_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,11 @@ func (p *ConfigPlugin) set(inputs []config.InputConfig) (err error) {
namespaces[name] = ns
queriesCount++

qi.Snapshot = true
// Force snapshot by default
if qi.Snapshot == nil {
snapshot := true
qi.Snapshot = &snapshot
}
return qi, nil
}

Expand Down
3 changes: 2 additions & 1 deletion x-pack/osquerybeat/beater/config_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ func renderFullConfigJSON(inputs []config.InputConfig) (string, error) {
Queries: make(map[string]config.Query),
}
for _, stream := range input.Streams {
snapshot := true
query := config.Query{
Query: stream.Query,
Interval: stream.Interval,
Platform: stream.Platform,
Version: stream.Version,
Snapshot: true, // enforce snapshot for all queries
Snapshot: &snapshot, // enforce snapshot for all queries
}
pack.Queries[stream.ID] = query
}
Expand Down
21 changes: 14 additions & 7 deletions x-pack/osquerybeat/beater/logger_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,19 @@ import (
"github.com/elastic/elastic-agent-libs/logp"
)

type SnapshotResult struct {
type QueryResult struct {
Action string `json:"action"`
Name string `json:"name"`
Numeric string `json:"numeric"`
Numeric bool `json:"numeric"`
CalendarTime string `json:"calendarTime"`
UnixTime int64 `json:"unixTime"`
Epoch int64 `json:"epoch"`
Counter int64 `json:"counter"`
Hits []map[string]string `json:"snapshot"`
DiffResults struct {
Added []map[string]string `json:"added"`
Removed []map[string]string `json:"removed"`
} `json:"diffResults"`
}

type osqueryLogMessage struct {
Expand Down Expand Up @@ -80,27 +86,28 @@ func (m *osqueryLogMessage) Log(typ logger.LogType, log *logp.Logger) {
}
}

type HandleSnapshotResultFunc func(res SnapshotResult)
type HandleQueryResultFunc func(res QueryResult)

type LoggerPlugin struct {
log *logp.Logger
logSnapshotFn HandleSnapshotResultFunc
logSnapshotFn HandleQueryResultFunc
}

func NewLoggerPlugin(log *logp.Logger, logSnapshotFn HandleSnapshotResultFunc) *LoggerPlugin {
func NewLoggerPlugin(log *logp.Logger, logSnapshotFn HandleQueryResultFunc) *LoggerPlugin {
return &LoggerPlugin{
log: log.With("ctx", "logger"),
logSnapshotFn: logSnapshotFn,
}
}

func (p *LoggerPlugin) Log(ctx context.Context, typ logger.LogType, logText string) error {
if typ == logger.LogTypeSnapshot {
var res SnapshotResult
if typ == logger.LogTypeSnapshot || typ == logger.LogTypeString {
var res QueryResult
if err := json.Unmarshal([]byte(logText), &res); err != nil {
p.log.Errorf("failed to unmarshal shapshot result: %v", err)
return err
}

if p.logSnapshotFn != nil {
p.logSnapshotFn(res)
}
Expand Down
88 changes: 44 additions & 44 deletions x-pack/osquerybeat/beater/logger_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,37 @@ func TestLoggerPlugin_New(t *testing.T) {
validLogger := logp.NewLogger("logger_test")

tests := []struct {
name string
log *logp.Logger
logSnapshotFn HandleSnapshotResultFunc
shouldPanic bool
name string
log *logp.Logger
logQueryResultFn HandleQueryResultFunc
shouldPanic bool
}{
{
name: "invalid",
log: nil,
logSnapshotFn: nil,
shouldPanic: true,
name: "invalid",
log: nil,
logQueryResultFn: nil,
shouldPanic: true,
},
{
name: "nologfunc",
log: validLogger,
logSnapshotFn: nil,
name: "nologfunc",
log: validLogger,
logQueryResultFn: nil,
},
{
name: "nonempty",
log: validLogger,
logSnapshotFn: func(res SnapshotResult) {},
name: "nonempty",
log: validLogger,
logQueryResultFn: func(res QueryResult) {},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
if tc.shouldPanic {
testutil.AssertPanic(t, func() { NewLoggerPlugin(tc.log, tc.logSnapshotFn) })
testutil.AssertPanic(t, func() { NewLoggerPlugin(tc.log, tc.logQueryResultFn) })
return
}

p := NewLoggerPlugin(tc.log, tc.logSnapshotFn)
p := NewLoggerPlugin(tc.log, tc.logQueryResultFn)
if p == nil {
t.Error("expected nil logger pluggin")
}
Expand All @@ -61,10 +61,10 @@ func TestLoggerPlugin_New(t *testing.T) {
func TestLoggerPlugin_Log(t *testing.T) {
validLogger := logp.NewLogger("logger_test")

snapshotFn := func(res SnapshotResult) {
queryResultFn := func(res QueryResult) {
}

result := SnapshotResult{
result := QueryResult{
Action: "foo",
Name: "bar",
Hits: []map[string]string{
Expand All @@ -83,44 +83,44 @@ func TestLoggerPlugin_Log(t *testing.T) {
}

tests := []struct {
name string
logSnapshotFn HandleSnapshotResultFunc
logType logger.LogType
logMessage string
err string
name string
logQueryResultFn HandleQueryResultFunc
logType logger.LogType
logMessage string
err string
}{
{
name: "nosnapshot",
logSnapshotFn: snapshotFn,
logType: logger.LogTypeString,
logMessage: "",
name: "nosnapshot",
logQueryResultFn: queryResultFn,
logType: logger.LogTypeString,
logMessage: "{}",
},
{
name: "snapshot invalid",
logSnapshotFn: snapshotFn,
logType: logger.LogTypeSnapshot,
logMessage: "",
err: "unexpected end of JSON input",
name: "snapshot invalid",
logQueryResultFn: queryResultFn,
logType: logger.LogTypeSnapshot,
logMessage: "",
err: "unexpected end of JSON input",
},
{
name: "snapshot empty",
logSnapshotFn: snapshotFn,
logType: logger.LogTypeSnapshot,
logMessage: "{}",
name: "snapshot empty",
logQueryResultFn: queryResultFn,
logType: logger.LogTypeSnapshot,
logMessage: "{}",
},
{
name: "snapshot nonempty",
logSnapshotFn: snapshotFn,
logType: logger.LogTypeSnapshot,
logMessage: string(resultbytes),
name: "snapshot nonempty",
logQueryResultFn: queryResultFn,
logType: logger.LogTypeSnapshot,
logMessage: string(resultbytes),
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
var capturedSnapshot *SnapshotResult
p := NewLoggerPlugin(validLogger, func(res SnapshotResult) {
capturedSnapshot = &res
var capturedQueryResult *QueryResult
p := NewLoggerPlugin(validLogger, func(res QueryResult) {
capturedQueryResult = &res
})
err := p.Log(context.Background(), tc.logType, tc.logMessage)
if err != nil {
Expand All @@ -137,7 +137,7 @@ func TestLoggerPlugin_Log(t *testing.T) {
t.Errorf("expected error: %v", tc.err)
}
if tc.logType == logger.LogTypeSnapshot && tc.logMessage == string(resultbytes) {
diff := cmp.Diff(capturedSnapshot, &result)
diff := cmp.Diff(capturedQueryResult, &result)
if diff != "" {
t.Error(diff)
}
Expand Down
63 changes: 54 additions & 9 deletions x-pack/osquerybeat/beater/osquerybeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,8 @@ func (bt *osquerybeat) runOsquery(ctx context.Context, b *beat.Beat, osq *osqd.O
cache.Resize(configPlugin.Count())

// Create osquery logger plugin
loggerPlugin := NewLoggerPlugin(bt.log, func(res SnapshotResult) {
bt.handleSnapshotResult(ctx, cli, configPlugin, res)
loggerPlugin := NewLoggerPlugin(bt.log, func(res QueryResult) {
bt.handleQueryResult(ctx, cli, configPlugin, res)
})

// Run main loop
Expand Down Expand Up @@ -360,7 +360,7 @@ func runExtensionServer(ctx context.Context, socketPath string, configPlugin *Co
return g.Wait()
}

func (bt *osquerybeat) handleSnapshotResult(ctx context.Context, cli *osqdcli.Client, configPlugin *ConfigPlugin, res SnapshotResult) {
func (bt *osquerybeat) handleQueryResult(ctx context.Context, cli *osqdcli.Client, configPlugin *ConfigPlugin, res QueryResult) {
ns, ok := configPlugin.LookupNamespace(res.Name)
if !ok {
bt.log.Debugf("failed to lookup query namespace: %s, the query was possibly removed recently from the schedule", res.Name)
Expand All @@ -375,14 +375,59 @@ func (bt *osquerybeat) handleSnapshotResult(ctx context.Context, cli *osqdcli.Cl
return
}

hits, err := cli.ResolveResult(ctx, qi.Query, res.Hits)
if err != nil {
bt.log.Errorf("failed to resolve query result types: %s", res.Name)
return
}
var (
hits []map[string]interface{}
)

responseID := uuid.Must(uuid.NewV4()).String()
bt.pub.Publish(config.Datastream(ns), res.Name, responseID, hits, qi.ECSMapping, nil)

if res.Action == "snapshot" {
snapshot, err := cli.ResolveResult(ctx, qi.Query, res.Hits)
if err != nil {
bt.log.Errorf("failed to resolve snapshot query result types: %s", res.Name)
return
}
hits = append(hits, snapshot...)
meta := queryResultMeta("snapshot", "", res)
bt.pub.Publish(config.Datastream(ns), res.Name, responseID, meta, hits, qi.ECSMapping, nil)
} else {
if len(res.DiffResults.Added) > 0 {
added, err := cli.ResolveResult(ctx, qi.Query, res.DiffResults.Added)
if err != nil {
bt.log.Errorf(`failed to resolve diff query "added" result types: %s`, res.Name)
return
}
hits = append(hits, added...)
meta := queryResultMeta("diff", "added", res)
bt.pub.Publish(config.Datastream(ns), res.Name, responseID, meta, hits, qi.ECSMapping, nil)
}
if len(res.DiffResults.Removed) > 0 {
removed, err := cli.ResolveResult(ctx, qi.Query, res.DiffResults.Added)
if err != nil {
bt.log.Errorf(`failed to resolve diff query "removed" result types: %s`, res.Name)
return
}
hits = append(hits, removed...)
meta := queryResultMeta("diff", "removed", res)
bt.pub.Publish(config.Datastream(ns), res.Name, responseID, meta, hits, qi.ECSMapping, nil)
}
}

}

func queryResultMeta(typ, action string, res QueryResult) map[string]interface{} {
m := map[string]interface{}{
"type": typ,
"calendar_type": res.CalendarTime,
"unix_time": res.UnixTime,
"epoch": res.Epoch,
"counter": res.Counter,
}

if action != "" {
m["action"] = action
}
return m
}

func (bt *osquerybeat) setManagerPayload(b *beat.Beat) {
Expand Down
9 changes: 7 additions & 2 deletions x-pack/osquerybeat/internal/config/osquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@ type Query struct {
// Optional ECS mapping for the query, not rendered into osqueryd configuration
ECSMapping map[string]interface{} `config:"ecs_mapping" json:"-"`

// Always enforced as snapshot, can't be changed via configuration
Snapshot bool `json:"snapshot"`
// A boolean to set 'snapshot' mode, default true
// This is different from the default osquery behavior where the missing value defaults to false
Snapshot *bool `config:"snapshot,omitempty" json:"snapshot,omitempty"`

// A boolean to determine if "removed" actions should be logged, default true
// This is the same as osquery behavior
Removed *bool `config:"removed,omitempty" json:"removed,omitempty"`
}

type Pack struct {
Expand Down
4 changes: 4 additions & 0 deletions x-pack/osquerybeat/internal/osqd/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ var protectedFlags = Flags{
// The delimiter for a full query name that is concatenated as "pack_" + {{pack name}} + "_" + {{query name}} by default
"pack_delimiter": "_",

// This enforces the batch format for differential results
// https://osquery.readthedocs.io/en/stable/deployment/logging
"logger_event_type": false,

// Refresh config every 60 seconds
// The previous setting was 10 seconds which is unnecessary frequent.
// Osquery does not expect that frequent policy/configuration changes
Expand Down
Loading

0 comments on commit eaf937b

Please sign in to comment.