Skip to content

Commit

Permalink
[KATC] Update config schema, including overlays (#1772)
Browse files Browse the repository at this point in the history
  • Loading branch information
RebeccaMahany authored Jul 15, 2024
1 parent fe97939 commit 70b6d0b
Show file tree
Hide file tree
Showing 11 changed files with 350 additions and 174 deletions.
2 changes: 1 addition & 1 deletion cmd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ func runLauncher(ctx context.Context, cancel func(), multiSlogger, systemMultiSl
// agentFlagConsumer handles agent flags pushed from the control server
controlService.RegisterConsumer(agentFlagsSubsystemName, keyvalueconsumer.New(flagController))
// katcConfigConsumer handles updates to Kolide's custom ATC tables
controlService.RegisterConsumer(katcSubsystemName, keyvalueconsumer.New(k.KatcConfigStore()))
controlService.RegisterConsumer(katcSubsystemName, keyvalueconsumer.NewConfigConsumer(k.KatcConfigStore()))
controlService.RegisterSubscriber(katcSubsystemName, osqueryRunner)
controlService.RegisterSubscriber(katcSubsystemName, startupSettingsWriter)

Expand Down
47 changes: 47 additions & 0 deletions ee/control/consumers/keyvalueconsumer/config_consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package keyvalueconsumer

import (
"encoding/json"
"errors"
"fmt"
"io"

"github.com/kolide/launcher/ee/agent/types"
)

type ConfigConsumer struct {
updater types.Updater
}

func NewConfigConsumer(updater types.Updater) *ConfigConsumer {
c := &ConfigConsumer{
updater: updater,
}

return c
}

func (c *ConfigConsumer) Update(data io.Reader) error {
if c == nil {
return errors.New("key value consumer is nil")
}

var kvPairs map[string]any
if err := json.NewDecoder(data).Decode(&kvPairs); err != nil {
return fmt.Errorf("failed to decode key-value json: %w", err)
}

kvStringPairs := make(map[string]string)
for k, v := range kvPairs {
b, err := json.Marshal(v)
if err != nil {
return fmt.Errorf("unable to marshal value for `%s`: %w", k, err)
}
kvStringPairs[k] = string(b)
}

// Turn the map into a slice of key, value, ... and send it to the thing storing this data
_, err := c.updater.Update(kvStringPairs)

return err
}
44 changes: 26 additions & 18 deletions ee/katc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"log/slog"
"runtime"

"github.com/kolide/launcher/ee/indexeddb"
"github.com/osquery/osquery-go"
Expand All @@ -16,8 +15,9 @@ import (
// identifier parsed from the JSON KATC config, and the `dataFunc` is the function
// that performs the query against the source.
type katcSourceType struct {
name string
dataFunc func(ctx context.Context, slogger *slog.Logger, sourcePattern string, query string, sourceConstraints *table.ConstraintList) ([]sourceData, error)
name string
// queryContext contains the constraints from the WHERE clause of the query against the KATC table.
dataFunc func(ctx context.Context, slogger *slog.Logger, sourcePaths []string, query string, queryContext table.QueryContext) ([]sourceData, error)
}

// sourceData holds the result of calling `katcSourceType.dataFunc`. It maps the
Expand Down Expand Up @@ -98,36 +98,44 @@ func (r *rowTransformStep) UnmarshalJSON(data []byte) error {
}
}

// katcTableConfig is the configuration for a specific KATC table. The control server
// sends down these configurations.
type katcTableConfig struct {
SourceType katcSourceType `json:"source_type"`
Source string `json:"source"` // Describes how to connect to source (e.g. path to db) -- % and _ wildcards supported
Platform string `json:"platform"`
Columns []string `json:"columns"`
Query string `json:"query"` // Query to run against `path`
RowTransformSteps []rowTransformStep `json:"row_transform_steps"`
}
type (
// katcTableConfig is the configuration for a specific KATC table. The control server
// sends down these configurations.
katcTableConfig struct {
Columns []string `json:"columns"`
katcTableDefinition
Overlays []katcTableConfigOverlay `json:"overlays"`
}

katcTableConfigOverlay struct {
Filters map[string]string `json:"filters"` // determines if this overlay is applicable to this launcher installation
katcTableDefinition
}

katcTableDefinition struct {
SourceType *katcSourceType `json:"source_type,omitempty"`
SourcePaths *[]string `json:"source_paths,omitempty"` // Describes how to connect to source (e.g. path to db) -- % and _ wildcards supported
SourceQuery *string `json:"source_query,omitempty"` // Query to run against each source path
RowTransformSteps *[]rowTransformStep `json:"row_transform_steps,omitempty"`
}
)

// ConstructKATCTables takes stored configuration of KATC tables, parses the configuration,
// and returns the constructed tables.
func ConstructKATCTables(config map[string]string, slogger *slog.Logger) []osquery.OsqueryPlugin {
plugins := make([]osquery.OsqueryPlugin, 0)

for tableName, tableConfigStr := range config {
var cfg katcTableConfig
if err := json.Unmarshal([]byte(tableConfigStr), &cfg); err != nil {
slogger.Log(context.TODO(), slog.LevelWarn,
"unable to unmarshal config for Kolide ATC table, skipping",
"unable to unmarshal config for KATC table, skipping",
"table_name", tableName,
"err", err,
)
continue
}

if cfg.Platform != runtime.GOOS {
continue
}

t, columns := newKatcTable(tableName, cfg, slogger)
plugins = append(plugins, table.NewPlugin(tableName, columns, t.generate))
}
Expand Down
108 changes: 75 additions & 33 deletions ee/katc/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,53 +21,96 @@ func TestConstructKATCTables(t *testing.T) {
{
testCaseName: "snappy_sqlite",
katcConfig: map[string]string{
"kolide_snappy_sqlite_test": fmt.Sprintf(`{
"kolide_snappy_sqlite_test": `{
"source_type": "sqlite",
"platform": "%s",
"columns": ["data"],
"source": "/some/path/to/db.sqlite",
"query": "SELECT data FROM object_data JOIN object_store ON (object_data.object_store_id = object_store.id) WHERE object_store.name=\"testtable\";",
"row_transform_steps": ["snappy"]
}`, runtime.GOOS),
"source_paths": ["/some/path/to/db.sqlite"],
"source_query": "SELECT data FROM object_data JOIN object_store ON (object_data.object_store_id = object_store.id) WHERE object_store.name=\"testtable\";",
"row_transform_steps": ["snappy"],
"overlays": []
}`,
},
expectedPluginCount: 1,
},
{
testCaseName: "indexeddb_leveldb",
katcConfig: map[string]string{
"kolide_indexeddb_leveldb_test": fmt.Sprintf(`{
"kolide_indexeddb_leveldb_test": `{
"source_type": "indexeddb_leveldb",
"platform": "%s",
"columns": ["data"],
"source": "/some/path/to/db.indexeddb.leveldb",
"query": "db.store",
"row_transform_steps": ["deserialize_chrome"]
"source_paths": ["/some/path/to/db.indexeddb.leveldb"],
"source_query": "db.store",
"row_transform_steps": ["deserialize_chrome"],
"overlays": []
}`,
},
expectedPluginCount: 1,
},
{
testCaseName: "overlay",
katcConfig: map[string]string{
"kolide_overlay_test": fmt.Sprintf(`{
"source_type": "indexeddb_leveldb",
"columns": ["data"],
"source_paths": ["/some/path/to/db.indexeddb.leveldb"],
"source_query": "db.store",
"row_transform_steps": ["deserialize_chrome"],
"overlays": [
{
"filters": {
"goos": "%s"
},
"source_paths": ["/some/different/path/to/db.indexeddb.leveldb"]
}
]
}`, runtime.GOOS),
},
expectedPluginCount: 1,
},
{
testCaseName: "multiple plugins",
katcConfig: map[string]string{
"test_1": fmt.Sprintf(`{
"test_1": `{
"source_type": "sqlite",
"platform": "%s",
"columns": ["data"],
"source": "/some/path/to/db.sqlite",
"query": "SELECT data FROM object_data;",
"row_transform_steps": ["snappy"]
}`, runtime.GOOS),
"test_2": fmt.Sprintf(`{
"source_paths": ["/some/path/to/db.sqlite"],
"source_query": "SELECT data FROM object_data;",
"row_transform_steps": ["snappy"],
"overlays": []
}`,
"test_2": `{
"source_type": "sqlite",
"platform": "%s",
"columns": ["col1", "col2"],
"source": "/some/path/to/a/different/db.sqlite",
"query": "SELECT col1, col2 FROM some_table;",
"row_transform_steps": ["camel_to_snake"]
}`, runtime.GOOS),
"source_paths": ["/some/path/to/a/different/db.sqlite"],
"source_query": "SELECT col1, col2 FROM some_table;",
"row_transform_steps": ["camel_to_snake"],
"overlays": []
}`,
},
expectedPluginCount: 2,
},
{
testCaseName: "skips invalid tables and returns valid tables",
katcConfig: map[string]string{
"not_a_valid_table": `{
"source_type": "not a real type",
"columns": ["col1", "col2"],
"source_paths": ["/some/path/to/a/different/db.sqlite"],
"source_query": "SELECT col1, col2 FROM some_table;",
"row_transform_steps": ["not a real row transform step"],
"overlays": []
}`,
"valid_table": `{
"source_type": "sqlite",
"columns": ["data"],
"source_paths": ["/some/path/to/db.sqlite"],
"source_query": "SELECT data FROM object_data;",
"row_transform_steps": ["snappy"],
"overlays": []
}`,
},
expectedPluginCount: 1,
},
{
testCaseName: "malformed config",
katcConfig: map[string]string{
Expand All @@ -78,27 +121,26 @@ func TestConstructKATCTables(t *testing.T) {
{
testCaseName: "invalid table source",
katcConfig: map[string]string{
"kolide_snappy_test": fmt.Sprintf(`{
"kolide_snappy_test": `{
"source_type": "unknown_source",
"platform": "%s",
"columns": ["data"],
"source": "/some/path/to/db.sqlite",
"query": "SELECT data FROM object_data;"
}`, runtime.GOOS),
"source_paths": ["/some/path/to/db.sqlite"],
"source_query": "SELECT data FROM object_data;",
"overlays": []
}`,
},
expectedPluginCount: 0,
},
{
testCaseName: "invalid data processing step type",
katcConfig: map[string]string{
"kolide_snappy_test": fmt.Sprintf(`{
"kolide_snappy_test": `{
"source_type": "sqlite",
"platform": "%s",
"columns": ["data"],
"source": "/some/path/to/db.sqlite",
"query": "SELECT data FROM object_data;",
"source_paths": ["/some/path/to/db.sqlite"],
"source_query": "SELECT data FROM object_data;",
"row_transform_steps": ["unknown_step"]
}`, runtime.GOOS),
}`,
},
expectedPluginCount: 0,
},
Expand Down
58 changes: 32 additions & 26 deletions ee/katc/indexeddb_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,39 +15,45 @@ import (
// found at the filepath in `sourcePattern`. It retrieves all rows from the database
// and object store specified in `query`, which it expects to be in the format
// `<db name>.<object store name>`.
func indexeddbLeveldbData(ctx context.Context, slogger *slog.Logger, sourcePattern string, query string, sourceConstraints *table.ConstraintList) ([]sourceData, error) {
pathPattern := sourcePatternToGlobbablePattern(sourcePattern)
leveldbs, err := filepath.Glob(pathPattern)
if err != nil {
return nil, fmt.Errorf("globbing for leveldb files: %w", err)
}

// Extract database and table from query
dbName, objectStoreName, err := extractQueryTargets(query)
if err != nil {
return nil, fmt.Errorf("getting db and object store names: %w", err)
}
func indexeddbLeveldbData(ctx context.Context, slogger *slog.Logger, sourcePaths []string, query string, queryContext table.QueryContext) ([]sourceData, error) {
// Pull out path constraints from the query against the KATC table, to avoid querying more leveldb files than we need to.
pathConstraintsFromQuery := getPathConstraint(queryContext)

// Query databases
results := make([]sourceData, 0)
for _, db := range leveldbs {
// Check to make sure `db` adheres to sourceConstraints
valid, err := checkSourceConstraints(db, sourceConstraints)
for _, sourcePath := range sourcePaths {
pathPattern := sourcePatternToGlobbablePattern(sourcePath)
leveldbs, err := filepath.Glob(pathPattern)
if err != nil {
return nil, fmt.Errorf("checking source path constraints: %w", err)
}
if !valid {
continue
return nil, fmt.Errorf("globbing for leveldb files: %w", err)
}

rowsFromDb, err := indexeddb.QueryIndexeddbObjectStore(db, dbName, objectStoreName)
// Extract database and table from query
dbName, objectStoreName, err := extractQueryTargets(query)
if err != nil {
return nil, fmt.Errorf("querying %s: %w", db, err)
return nil, fmt.Errorf("getting db and object store names: %w", err)
}

// Query databases
for _, db := range leveldbs {
// Check to make sure `db` adheres to pathConstraintsFromQuery. This is an
// optimization to avoid work, if osquery sqlite filtering is going to exclude it.
valid, err := checkPathConstraints(db, pathConstraintsFromQuery)
if err != nil {
return nil, fmt.Errorf("checking source path constraints: %w", err)
}
if !valid {
continue
}

rowsFromDb, err := indexeddb.QueryIndexeddbObjectStore(db, dbName, objectStoreName)
if err != nil {
return nil, fmt.Errorf("querying %s: %w", db, err)
}
results = append(results, sourceData{
path: db,
rows: rowsFromDb,
})
}
results = append(results, sourceData{
path: db,
rows: rowsFromDb,
})
}

return results, nil
Expand Down
Loading

0 comments on commit 70b6d0b

Please sign in to comment.