Skip to content

Migrate integration testing to RBA paradigm - Step 2 #345

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1094,6 +1094,7 @@ def retry_search_until_timeout(
job = self.get_conn().search(query=search, **kwargs)
results = JSONResultsReader(job.results(output_mode="json"))

# TODO (cmcginley): @ljstella you're removing this ultimately, right?
# Consolidate a set of the distinct observable field names
observable_fields_set = set([o.name for o in detection.tags.observable]) # keeping this around for later
risk_object_fields_set = set([o.name for o in detection.tags.observable if "Victim" in o.role ]) # just the "Risk Objects"
Expand Down Expand Up @@ -1121,7 +1122,10 @@ def retry_search_until_timeout(
missing_risk_objects = risk_object_fields_set - results_fields_set
if len(missing_risk_objects) > 0:
# Report a failure in such cases
e = Exception(f"The observable field(s) {missing_risk_objects} are missing in the detection results")
e = Exception(
f"The risk object field(s) {missing_risk_objects} are missing in the "
"detection results"
)
test.result.set_job_content(
job.content,
self.infrastructure,
Expand All @@ -1137,6 +1141,8 @@ def retry_search_until_timeout(
# on a field. In this case, the field will appear but will not contain any values
current_empty_fields: set[str] = set()

# TODO (cmcginley): @ljstella is this something we're keeping for testing as
# well?
for field in observable_fields_set:
if result.get(field, 'null') == 'null':
if field in risk_object_fields_set:
Expand Down
3 changes: 3 additions & 0 deletions contentctl/objects/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
"Actions on Objectives": 7
}

# TODO (cmcginley): @ljstella should this be removed? also referenced in new_content.py
SES_OBSERVABLE_ROLE_MAPPING = {
"Other": -1,
"Unknown": 0,
Expand All @@ -93,6 +94,7 @@
"Observer": 9
}

# TODO (cmcginley): @ljstella should this be removed? also referenced in new_content.py
SES_OBSERVABLE_TYPE_MAPPING = {
"Unknown": 0,
"Hostname": 1,
Expand Down Expand Up @@ -135,6 +137,7 @@
"Impact": "TA0040"
}

# TODO (cmcginley): is this just for the transition testing?
RBA_OBSERVABLE_ROLE_MAPPING = {
"Attacker": 0,
"Victim": 1
Expand Down
142 changes: 68 additions & 74 deletions contentctl/objects/correlation_search.py

Large diffs are not rendered by default.

9 changes: 4 additions & 5 deletions contentctl/objects/detection_tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
from pydantic import (
BaseModel,
Field,
NonNegativeInt,
PositiveInt,
computed_field,
UUID4,
HttpUrl,
Expand Down Expand Up @@ -34,25 +32,26 @@
from contentctl.objects.atomic import AtomicEnrichment, AtomicTest
from contentctl.objects.annotated_types import MITRE_ATTACK_ID_TYPE, CVE_TYPE


class DetectionTags(BaseModel):
# detection spec

model_config = ConfigDict(validate_default=False, extra='forbid')
analytic_story: list[Story] = Field(...)
asset_type: AssetType = Field(...)
group: list[str] = []

mitre_attack_id: List[MITRE_ATTACK_ID_TYPE] = []
nist: list[NistCategory] = []

# TODO (cmcginley): observable should be removed as well, yes?
# TODO (#249): Add pydantic validator to ensure observables are unique within a detection
observable: List[Observable] = []
product: list[SecurityContentProductName] = Field(..., min_length=1)
throttling: Optional[Throttling] = None
security_domain: SecurityDomain = Field(...)
cve: List[CVE_TYPE] = []
atomic_guid: List[AtomicTest] = []


# enrichment
mitre_attack_enrichments: List[MitreAttackEnrichment] = Field([], validate_default=True)
Expand Down Expand Up @@ -84,7 +83,7 @@ def cis20(self) -> list[Cis18Value]:

# TODO (#268): Validate manual_test has length > 0 if not None
manual_test: Optional[str] = None

# The following validator is temporarily disabled pending further discussions
# @validator('message')
# def validate_message(cls,v,values):
Expand Down
1 change: 1 addition & 0 deletions contentctl/objects/drilldown.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class Drilldown(BaseModel):
"but it is NOT the default value and must be supplied explicitly.",
min_length= 1)

# TODO (cmcginley): @ljstella the drilldowns will need to be updated
@classmethod
def constructDrilldownsFromDetection(cls, detection: Detection) -> list[Drilldown]:
victim_observables = [o for o in detection.tags.observable if o.role[0] == "Victim"]
Expand Down
1 change: 1 addition & 0 deletions contentctl/objects/observable.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from pydantic import BaseModel, field_validator, ConfigDict
from contentctl.objects.constants import SES_OBSERVABLE_TYPE_MAPPING, RBA_OBSERVABLE_ROLE_MAPPING

# TODO (cmcginley): should this class be removed?

class Observable(BaseModel):
model_config = ConfigDict(extra="forbid")
Expand Down
Loading
Loading