Skip to content

[FR][DAC] Add Support for Custom Schemas #3679

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions detection_rules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
assert (3, 12) <= sys.version_info < (4, 0), "Only Python 3.12+ supported"

from . import ( # noqa: E402
custom_schemas,
custom_rules,
devtools,
docs,
Expand All @@ -30,6 +31,7 @@

__all__ = (
'custom_rules',
'custom_schemas',
'devtools',
'docs',
'eswrap',
Expand Down
44 changes: 44 additions & 0 deletions detection_rules/custom_schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.

"""Custom Schemas management."""
from pathlib import Path

import eql
import eql.types

from .config import parse_rules_config
from .utils import cached

RULES_CONFIG = parse_rules_config()


@cached
def get_custom_schemas(stack_version: str) -> dict:
"""Load custom schemas if present."""
custom_schema_dump = {}
stack_schema_map = RULES_CONFIG.stack_schema_map[stack_version]

for schema, value in stack_schema_map.items():
if schema not in ["beats", "ecs", "endgame"]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if it is one of these three?

Copy link
Contributor Author

@eric-forte-elastic eric-forte-elastic May 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should ignore the schema if it is one of those, as those are reserved "schema words" that are by definition not custom. Do you agree?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I think that is the smart approach, but wont this return an empty dict in this case? It would be better to raise an error, forbidding using reserved words. Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that is a good idea, we will need to have it function such that it will allow the words once, as we would want them to be able to specify beats, etc. or to not use those if desired. But we would not want them to use multiple as the result would be confusing 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upon further testing and reflection, I think the test for additional beats, ecs, or endgame schemas should not be in the custom_schemas.py as it might be confusing to have the custom schema loader be validating non-custom schemas. I think this check would be considered increased validation for the schema map in general. As such, I think this could target main, as the check is not DAC specific.

schema_path = Path(value)
if not schema_path.is_absolute():
schema_path = RULES_CONFIG.stack_schema_map_file.parent / value
if schema_path.is_file():
custom_schema_dump.update(eql.utils.load_dump(str(schema_path)))
elif schema_path.is_dir():
custom_schema_dump.update(load_schemas_from_dir(schema_path))

return custom_schema_dump


def load_schemas_from_dir(schema_dir: Path) -> dict:
"""Load all schemas from a directory."""
schemas_dump = {}
for file_path in schema_dir.iterdir():
if file_path.is_file() and file_path.suffix in [".json"]:
schemas_dump.update(eql.utils.load_dump(str(file_path)))

return schemas_dump
8 changes: 8 additions & 0 deletions detection_rules/ecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@
from semver import Version
import yaml

from .config import parse_rules_config
from .custom_schemas import get_custom_schemas
from .utils import (DateTimeEncoder, cached, get_etc_path, gzip_compress,
load_etc_dump, read_gzip, unzip)

ECS_NAME = "ecs_schemas"
ECS_SCHEMAS_DIR = get_etc_path(ECS_NAME)
ENDPOINT_NAME = "endpoint_schemas"
ENDPOINT_SCHEMAS_DIR = get_etc_path(ENDPOINT_NAME)
RULES_CONFIG = parse_rules_config()


def add_field(schema, name, info):
Expand Down Expand Up @@ -148,6 +151,11 @@ def get_non_ecs_schema():
return load_etc_dump('non-ecs-schema.json')


@cached
def get_custom_index_schema(index_name: str, stack_version: str):
return get_custom_schemas(stack_version).get(index_name, {})


@cached
def get_index_schema(index_name):
return get_non_ecs_schema().get(index_name, {})
Expand Down
13 changes: 12 additions & 1 deletion detection_rules/rule_validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import kql

from . import ecs, endgame
from .config import load_current_package_version
from .config import CUSTOM_RULES_DIR, load_current_package_version
from .integrations import (get_integration_schema_data,
load_integrations_manifests)
from .rule import (EQLRuleData, QueryRuleData, QueryValidator, RuleMeta,
Expand Down Expand Up @@ -192,11 +192,17 @@ def validate_integration(
integration_schema_data["integration"],
)
integration_schema = integration_schema_data["schema"]
stack_version = integration_schema_data["stack_version"]

# Add non-ecs-schema fields
for index_name in data.index:
integration_schema.update(**ecs.flatten(ecs.get_index_schema(index_name)))

# Add custom schema fields for appropriate stack version
if data.index and CUSTOM_RULES_DIR:
for index_name in data.index:
integration_schema.update(**ecs.flatten(ecs.get_custom_index_schema(index_name, stack_version)))
Comment on lines +201 to +204
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This looks identical to lines L396-L400. Might be worth moving to a small method for cleanliness.

Copy link
Contributor Author

@eric-forte-elastic eric-forte-elastic Jun 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it would be better, but since I think both validation methods have a number of nearly identical code blocks, that this would be better done in a larger refactor.

For instance another identical block would be for non-ecs schema fields.

for index_name in data.index:
    integration_schema.update(**ecs.flatten(ecs.get_index_schema(index_name)))


# Add endpoint schema fields for multi-line fields
integration_schema.update(**ecs.flatten(ecs.get_endpoint_schemas()))
if integration:
Expand Down Expand Up @@ -387,6 +393,11 @@ def validate_integration(self, data: QueryRuleData, meta: RuleMeta,
for index_name in data.index:
integration_schema.update(**ecs.flatten(ecs.get_index_schema(index_name)))

# Add custom schema fields for appropriate stack version
if data.index and CUSTOM_RULES_DIR:
for index_name in data.index:
integration_schema.update(**ecs.flatten(ecs.get_custom_index_schema(index_name, stack_version)))

# add endpoint schema fields for multi-line fields
integration_schema.update(**ecs.flatten(ecs.get_endpoint_schemas()))
package_schemas[package].update(**integration_schema)
Expand Down
5 changes: 4 additions & 1 deletion detection_rules/schemas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,10 @@ def load_stack_schema_map() -> dict:

@cached
def get_stack_schemas(stack_version: Optional[str] = '0.0.0') -> OrderedDictType[str, dict]:
"""Return all ECS + beats to stack versions for every stack version >= specified stack version and <= package."""
"""
Return all ECS, beats, and custom stack versions for every stack version.
Only versions >= specified stack version and <= package are returned.
"""
stack_version = Version.parse(stack_version or '0.0.0', optional_minor_and_patch=True)
current_package = Version.parse(load_current_package_version(), optional_minor_and_patch=True)

Expand Down
34 changes: 34 additions & 0 deletions docs/custom-rules.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,37 @@ from detection_rules.generic_loader import GenericLoader
loader = GenericLoader()
loader.load_directory(...)
```

### Using Custom Schemas

You can specify custom defined schemas for custom indexes using the `etc/stack-schema-map.yaml` in your custom rules directory.

To add a custom schema, add a sub key in the `etc/stack-schema-map.yaml` file under the stack version you wish the custom schema to apply.
Then for its value, reference the json file, or folder of files, where you have your schema defined.

Example:

```yaml
8.14.0:
beats: 8.12.2
ecs: 8.11.0
endgame: 8.4.0
custom: schemas/custom-schema.json
```

Note: the `custom` key can be any alpha numeric value except `beats`, `ecs`, or `endgame` as these are reserved terms.

Example schema json:

```json

{
"custom-index*": {
"process.NewCustomValue": "keyword",
"process.AnotherCustomValue": "keyword"
}
}
```

This can then be used in a rule query by adding the index to the applicable rule e.g. `index = ["logs-endpoint.events.*", "custom-index*"]`.
Then one can use the index in the query e.g. `process where host.os.type == "linux" and process.NewCustomValue == "GoodValue"`