Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KATC] Update config schema, including overlays #1772

Merged
merged 21 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
88097df
Rename source column to path
RebeccaMahany Jul 3, 2024
93aa764
Rename Source to SourcePaths and support multiple paths
RebeccaMahany Jul 3, 2024
77a31a6
Rename query to source_query
RebeccaMahany Jul 3, 2024
b3f251c
Fix typos in test
RebeccaMahany Jul 3, 2024
5fa2de9
Rename Platform to Filter
RebeccaMahany Jul 3, 2024
03777f0
Move name inside config and make config a list of tables instead of a…
RebeccaMahany Jul 3, 2024
4d68485
Merge remote-tracking branch 'upstream/main' into becca/katc-cfg-update
RebeccaMahany Jul 3, 2024
2367637
Rename sourceConstraints to pathConstraints in function signature
RebeccaMahany Jul 3, 2024
479571c
Add comment
RebeccaMahany Jul 8, 2024
e2c93da
Implement overlays
RebeccaMahany Jul 8, 2024
b698ac7
Cleanup
RebeccaMahany Jul 8, 2024
cef7a88
Merge remote-tracking branch 'upstream/main' into becca/katc-overlay
RebeccaMahany Jul 8, 2024
2d276fd
Merge remote-tracking branch 'upstream/main' into becca/katc-overlay
RebeccaMahany Jul 10, 2024
2efd17e
dataFunc should take queryContext instead of constraint list for grea…
RebeccaMahany Jul 11, 2024
50b478e
Standardize log messages a little
RebeccaMahany Jul 12, 2024
8894bc2
Add config consumer that can handle non-string values
RebeccaMahany Jul 12, 2024
70ec1c9
Merge remote-tracking branch 'upstream/main' into becca/katc-overlay
RebeccaMahany Jul 15, 2024
4b9b56b
Update comments for clarity
RebeccaMahany Jul 15, 2024
428a872
Remove source paths from logs to avoid overly verbose logs
RebeccaMahany Jul 15, 2024
8c46927
Fix comments
RebeccaMahany Jul 15, 2024
606ade6
Combine table definition structs
RebeccaMahany Jul 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 27 additions & 18 deletions ee/katc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"log/slog"
"runtime"

"github.com/kolide/launcher/ee/indexeddb"
"github.com/osquery/osquery-go"
Expand All @@ -16,8 +15,11 @@ import (
// identifier parsed from the JSON KATC config, and the `dataFunc` is the function
// that performs the query against the source.
type katcSourceType struct {
name string
dataFunc func(ctx context.Context, slogger *slog.Logger, sourcePattern string, query string, sourceConstraints *table.ConstraintList) ([]sourceData, error)
name string
// `pathConstraints` comes from the live query or scheduled query, and constrains `sourcePaths` to a particular value.
RebeccaMahany marked this conversation as resolved.
Show resolved Hide resolved
// For example, `sourcePaths` may allow for querying files belonging to any user ("/Users/%/path/to/file"), and
// `pathConstraints` may narrow the query to a path for a particular user ("/Users/example/path/to/file").
dataFunc func(ctx context.Context, slogger *slog.Logger, sourcePaths []string, query string, pathConstraints *table.ConstraintList) ([]sourceData, error)
}

// sourceData holds the result of calling `katcSourceType.dataFunc`. It maps the
Expand Down Expand Up @@ -98,36 +100,43 @@ func (r *rowTransformStep) UnmarshalJSON(data []byte) error {
}
}

// katcTableConfig is the configuration for a specific KATC table. The control server
// sends down these configurations.
type katcTableConfig struct {
SourceType katcSourceType `json:"source_type"`
Source string `json:"source"` // Describes how to connect to source (e.g. path to db) -- % and _ wildcards supported
Platform string `json:"platform"`
Columns []string `json:"columns"`
Query string `json:"query"` // Query to run against `path`
RowTransformSteps []rowTransformStep `json:"row_transform_steps"`
}
type (
// katcTableConfig is the configuration for a specific KATC table. The control server
// sends down these configurations.
katcTableConfig struct {
Columns []string `json:"columns"`
SourceType katcSourceType `json:"source_type"`
SourcePaths []string `json:"source_paths"` // Describes how to connect to source (e.g. path to db) -- % and _ wildcards supported
SourceQuery string `json:"source_query"` // Query to run against each source path
RowTransformSteps []rowTransformStep `json:"row_transform_steps"`
Overlays []katcTableConfigOverlay `json:"overlays"`
RebeccaMahany marked this conversation as resolved.
Show resolved Hide resolved
}

katcTableConfigOverlay struct {
Filters map[string]string `json:"filters"` // determines if this overlay is applicable to this launcher installation
RebeccaMahany marked this conversation as resolved.
Show resolved Hide resolved
SourceType *katcSourceType `json:"source_type,omitempty"`
SourcePaths *[]string `json:"source_paths,omitempty"` // Describes how to connect to source (e.g. path to db) -- % and _ wildcards supported
SourceQuery *string `json:"source_query,omitempty"` // Query to run against each source path
RowTransformSteps *[]rowTransformStep `json:"row_transform_steps,omitempty"`
}
)

// ConstructKATCTables takes stored configuration of KATC tables, parses the configuration,
// and returns the constructed tables.
func ConstructKATCTables(config map[string]string, slogger *slog.Logger) []osquery.OsqueryPlugin {
plugins := make([]osquery.OsqueryPlugin, 0)

for tableName, tableConfigStr := range config {
var cfg katcTableConfig
if err := json.Unmarshal([]byte(tableConfigStr), &cfg); err != nil {
slogger.Log(context.TODO(), slog.LevelWarn,
"unable to unmarshal config for Kolide ATC table, skipping",
"unable to unmarshal config for KATC table, skipping",
"table_name", tableName,
"err", err,
)
continue
}

if cfg.Platform != runtime.GOOS {
continue
}

t, columns := newKatcTable(tableName, cfg, slogger)
plugins = append(plugins, table.NewPlugin(tableName, columns, t.generate))
}
Expand Down
108 changes: 75 additions & 33 deletions ee/katc/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,53 +21,96 @@ func TestConstructKATCTables(t *testing.T) {
{
testCaseName: "snappy_sqlite",
katcConfig: map[string]string{
"kolide_snappy_sqlite_test": fmt.Sprintf(`{
"kolide_snappy_sqlite_test": `{
"source_type": "sqlite",
"platform": "%s",
"columns": ["data"],
"source": "/some/path/to/db.sqlite",
"query": "SELECT data FROM object_data JOIN object_store ON (object_data.object_store_id = object_store.id) WHERE object_store.name=\"testtable\";",
"row_transform_steps": ["snappy"]
}`, runtime.GOOS),
"source_paths": ["/some/path/to/db.sqlite"],
"source_query": "SELECT data FROM object_data JOIN object_store ON (object_data.object_store_id = object_store.id) WHERE object_store.name=\"testtable\";",
"row_transform_steps": ["snappy"],
"overlays": []
}`,
},
expectedPluginCount: 1,
},
{
testCaseName: "indexeddb_leveldb",
katcConfig: map[string]string{
"kolide_indexeddb_leveldb_test": fmt.Sprintf(`{
"kolide_indexeddb_leveldb_test": `{
"source_type": "indexeddb_leveldb",
"platform": "%s",
"columns": ["data"],
"source": "/some/path/to/db.indexeddb.leveldb",
"query": "db.store",
"row_transform_steps": ["deserialize_chrome"]
"source_paths": ["/some/path/to/db.indexeddb.leveldb"],
"source_query": "db.store",
"row_transform_steps": ["deserialize_chrome"],
"overlays": []
}`,
},
expectedPluginCount: 1,
},
{
testCaseName: "overlay",
katcConfig: map[string]string{
"kolide_overlay_test": fmt.Sprintf(`{
"source_type": "indexeddb_leveldb",
"columns": ["data"],
"source_paths": ["/some/path/to/db.indexeddb.leveldb"],
"source_query": "db.store",
"row_transform_steps": ["deserialize_chrome"],
"overlays": [
{
"filters": {
"goos": "%s"
},
"source_paths": ["/some/different/path/to/db.indexeddb.leveldb"]
}
]
}`, runtime.GOOS),
},
expectedPluginCount: 1,
},
{
testCaseName: "multiple plugins",
katcConfig: map[string]string{
"test_1": fmt.Sprintf(`{
"test_1": `{
"source_type": "sqlite",
"platform": "%s",
"columns": ["data"],
"source": "/some/path/to/db.sqlite",
"query": "SELECT data FROM object_data;",
"row_transform_steps": ["snappy"]
}`, runtime.GOOS),
"test_2": fmt.Sprintf(`{
"source_paths": ["/some/path/to/db.sqlite"],
"source_query": "SELECT data FROM object_data;",
"row_transform_steps": ["snappy"],
"overlays": []
}`,
"test_2": `{
"source_type": "sqlite",
"platform": "%s",
"columns": ["col1", "col2"],
"source": "/some/path/to/a/different/db.sqlite",
"query": "SELECT col1, col2 FROM some_table;",
"row_transform_steps": ["camel_to_snake"]
}`, runtime.GOOS),
"source_paths": ["/some/path/to/a/different/db.sqlite"],
"source_query": "SELECT col1, col2 FROM some_table;",
"row_transform_steps": ["camel_to_snake"],
"overlays": []
}`,
},
expectedPluginCount: 2,
},
{
testCaseName: "skips invalid tables and returns valid tables",
katcConfig: map[string]string{
"not_a_valid_table": `{
"source_type": "not a real type",
"columns": ["col1", "col2"],
"source_paths": ["/some/path/to/a/different/db.sqlite"],
"source_query": "SELECT col1, col2 FROM some_table;",
"row_transform_steps": ["not a real row transform step"],
"overlays": []
}`,
"valid_table": `{
"source_type": "sqlite",
"columns": ["data"],
"source_paths": ["/some/path/to/db.sqlite"],
"source_query": "SELECT data FROM object_data;",
"row_transform_steps": ["snappy"],
"overlays": []
}`,
},
expectedPluginCount: 1,
},
{
testCaseName: "malformed config",
katcConfig: map[string]string{
Expand All @@ -78,27 +121,26 @@ func TestConstructKATCTables(t *testing.T) {
{
testCaseName: "invalid table source",
katcConfig: map[string]string{
"kolide_snappy_test": fmt.Sprintf(`{
"kolide_snappy_test": `{
"source_type": "unknown_source",
"platform": "%s",
"columns": ["data"],
"source": "/some/path/to/db.sqlite",
"query": "SELECT data FROM object_data;"
}`, runtime.GOOS),
"source_paths": ["/some/path/to/db.sqlite"],
"source_query": "SELECT data FROM object_data;",
"overlays": []
}`,
},
expectedPluginCount: 0,
},
{
testCaseName: "invalid data processing step type",
katcConfig: map[string]string{
"kolide_snappy_test": fmt.Sprintf(`{
"kolide_snappy_test": `{
"source_type": "sqlite",
"platform": "%s",
"columns": ["data"],
"source": "/some/path/to/db.sqlite",
"query": "SELECT data FROM object_data;",
"source_paths": ["/some/path/to/db.sqlite"],
"source_query": "SELECT data FROM object_data;",
"row_transform_steps": ["unknown_step"]
}`, runtime.GOOS),
}`,
},
expectedPluginCount: 0,
},
Expand Down
56 changes: 29 additions & 27 deletions ee/katc/indexeddb_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,39 +15,41 @@ import (
// found at the filepath in `sourcePattern`. It retrieves all rows from the database
// and object store specified in `query`, which it expects to be in the format
// `<db name>.<object store name>`.
func indexeddbLeveldbData(ctx context.Context, slogger *slog.Logger, sourcePattern string, query string, sourceConstraints *table.ConstraintList) ([]sourceData, error) {
pathPattern := sourcePatternToGlobbablePattern(sourcePattern)
leveldbs, err := filepath.Glob(pathPattern)
if err != nil {
return nil, fmt.Errorf("globbing for leveldb files: %w", err)
}

// Extract database and table from query
dbName, objectStoreName, err := extractQueryTargets(query)
if err != nil {
return nil, fmt.Errorf("getting db and object store names: %w", err)
}

// Query databases
func indexeddbLeveldbData(ctx context.Context, slogger *slog.Logger, sourcePaths []string, query string, pathConstraints *table.ConstraintList) ([]sourceData, error) {
results := make([]sourceData, 0)
for _, db := range leveldbs {
// Check to make sure `db` adheres to sourceConstraints
valid, err := checkSourceConstraints(db, sourceConstraints)
for _, sourcePath := range sourcePaths {
pathPattern := sourcePatternToGlobbablePattern(sourcePath)
leveldbs, err := filepath.Glob(pathPattern)
if err != nil {
return nil, fmt.Errorf("checking source path constraints: %w", err)
}
if !valid {
continue
return nil, fmt.Errorf("globbing for leveldb files: %w", err)
}

rowsFromDb, err := indexeddb.QueryIndexeddbObjectStore(db, dbName, objectStoreName)
// Extract database and table from query
dbName, objectStoreName, err := extractQueryTargets(query)
if err != nil {
return nil, fmt.Errorf("querying %s: %w", db, err)
return nil, fmt.Errorf("getting db and object store names: %w", err)
}

// Query databases
for _, db := range leveldbs {
// Check to make sure `db` adheres to pathConstraints
valid, err := checkPathConstraints(db, pathConstraints)
if err != nil {
return nil, fmt.Errorf("checking source path constraints: %w", err)
}
if !valid {
continue
}

rowsFromDb, err := indexeddb.QueryIndexeddbObjectStore(db, dbName, objectStoreName)
if err != nil {
return nil, fmt.Errorf("querying %s: %w", db, err)
}
results = append(results, sourceData{
path: db,
rows: rowsFromDb,
})
}
results = append(results, sourceData{
path: db,
rows: rowsFromDb,
})
}

return results, nil
Expand Down
44 changes: 23 additions & 21 deletions ee/katc/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,34 @@ import (
)

// sqliteData is the dataFunc for sqlite KATC tables
func sqliteData(ctx context.Context, slogger *slog.Logger, sourcePattern string, query string, sourceConstraints *table.ConstraintList) ([]sourceData, error) {
pathPattern := sourcePatternToGlobbablePattern(sourcePattern)
sqliteDbs, err := filepath.Glob(pathPattern)
if err != nil {
return nil, fmt.Errorf("globbing for files with pattern %s: %w", pathPattern, err)
}

func sqliteData(ctx context.Context, slogger *slog.Logger, sourcePaths []string, query string, pathConstraints *table.ConstraintList) ([]sourceData, error) {
results := make([]sourceData, 0)
for _, sqliteDb := range sqliteDbs {
// Check to make sure `sqliteDb` adheres to sourceConstraints
valid, err := checkSourceConstraints(sqliteDb, sourceConstraints)
for _, sourcePath := range sourcePaths {
pathPattern := sourcePatternToGlobbablePattern(sourcePath)
sqliteDbs, err := filepath.Glob(pathPattern)
if err != nil {
return nil, fmt.Errorf("checking source path constraints: %w", err)
}
if !valid {
continue
return nil, fmt.Errorf("globbing for files with pattern %s: %w", pathPattern, err)
}

rowsFromDb, err := querySqliteDb(ctx, slogger, sqliteDb, query)
if err != nil {
return nil, fmt.Errorf("querying %s: %w", sqliteDb, err)
for _, sqliteDb := range sqliteDbs {
// Check to make sure `sqliteDb` adheres to pathConstraints
valid, err := checkPathConstraints(sqliteDb, pathConstraints)
if err != nil {
return nil, fmt.Errorf("checking source path constraints: %w", err)
}
if !valid {
continue
}

rowsFromDb, err := querySqliteDb(ctx, slogger, sqliteDb, query)
if err != nil {
return nil, fmt.Errorf("querying %s: %w", sqliteDb, err)
}
results = append(results, sourceData{
path: sqliteDb,
rows: rowsFromDb,
})
}
results = append(results, sourceData{
path: sqliteDb,
rows: rowsFromDb,
})
}

return results, nil
Expand Down
4 changes: 2 additions & 2 deletions ee/katc/sqlite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func Test_sqliteData(t *testing.T) {
}

// Query data
results, err := sqliteData(context.TODO(), multislogger.NewNopLogger(), filepath.Join(sqliteDir, "*.sqlite"), "SELECT uuid, value FROM test_data;", nil)
results, err := sqliteData(context.TODO(), multislogger.NewNopLogger(), []string{filepath.Join(sqliteDir, "*.sqlite")}, "SELECT uuid, value FROM test_data;", nil)
require.NoError(t, err)

// Confirm we have the correct number of `sourceData` returned (one per db)
Expand Down Expand Up @@ -89,7 +89,7 @@ func Test_sqliteData_noSourcesFound(t *testing.T) {
t.Parallel()

tmpDir := t.TempDir()
results, err := sqliteData(context.TODO(), multislogger.NewNopLogger(), filepath.Join(tmpDir, "db.sqlite"), "SELECT * FROM data;", nil)
results, err := sqliteData(context.TODO(), multislogger.NewNopLogger(), []string{filepath.Join(tmpDir, "db.sqlite")}, "SELECT * FROM data;", nil)
require.NoError(t, err)
require.Equal(t, 0, len(results))
}
Expand Down
Loading
Loading