Skip to content

Distribute scheduling and experiment with throttling #422

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from contentctl.objects.config import CustomApp

import datetime
import random
from functools import cached_property

from contentctl.enrichments.cve_enrichment import CveEnrichmentObj
Expand Down Expand Up @@ -52,12 +53,17 @@
from contentctl.objects.rba import RBAObject, RiskScoreValue_Type
from contentctl.objects.security_content_object import SecurityContentObject
from contentctl.objects.test_group import TestGroup
from contentctl.objects.throttling import Throttling
from contentctl.objects.unit_test import UnitTest

# Those AnalyticsTypes that we do not test via contentctl
SKIPPED_ANALYTICS_TYPES: set[str] = {AnalyticsType.Correlation}


GLOBAL_COUNTER = 0
random.seed(42) # For reproducibility in tests


class Detection_Abstract(SecurityContentObject):
name: str = Field(..., max_length=CONTENTCTL_MAX_SEARCH_NAME_LENGTH)
# contentType: SecurityContentType = SecurityContentType.detections
Expand All @@ -70,6 +76,80 @@ class Detection_Abstract(SecurityContentObject):
known_false_positives: str = Field(..., min_length=4)
rba: Optional[RBAObject] = Field(default=None)

@computed_field
@property
def statistically_disabled(self) -> str:
global GLOBAL_COUNTER
"""
Returns a string that indicates whether the detection is statistically disabled.
This is used to determine whether or not in test app builds, for the purposes
of performance testing, this detection should be enabled by default or not.
"""

# Convert the UUID and mod by 100, letting us set probability of this
# search being enabled between 0 and 100

PERCENT_OF_SEARCHES_TO_ENABLE = 0
# Remember, the name of this field is disabled, so 0 means the search
# should be "enabled" and 1 means disabled. Kind of feels backwards.
if random.randint(0, 99) < PERCENT_OF_SEARCHES_TO_ENABLE:
return "false"
else:
return "true"

@computed_field
@property
def calculated_cron(self) -> str:
global GLOBAL_COUNTER
"""
Returns the cron expression for the detection.
Read the docs here to have a better understranding of what cron
expressions are skewable (and good or bad candidates for skewing):
https://docs.splunk.com/Documentation/SplunkCloud/latest/Report/Skewscheduledreportstarttimes#How_the_search_schedule_affects_the_potential_schedule_offset

"""
"""
# Convert the UUID, which is unique per detection, to an integer.
uuid_as_int = int(self.id)
name_hash = hash(self.name)

# Then, mod this by 60. This should give us a fairly random distribution from 0-60
MIN_TIME = 0
MAX_TIME = 59
TIME_DIFF = (MAX_TIME + 1) - MIN_TIME

# We do this instead of imply using randrandge or similar because using the UUID makes
# generation of the cron schedule deterministic, which is useful for testing different
# windows. For example, there is a good chance we may get another request to not have
# things starts within the first 5 minutes, given that many other searches are scheduled
# in ES to kick off at that time.
new_start_minute = name_hash % TIME_DIFF

# Every cron schedule for an ESCU Search is 0 * * * *, we we will just substitute what
# we generated above, ignoring what is actually in the deploymnet
"""

# The spacing of the above implementation winds up being quite poor, maybe because
# our sample size is too small to approach a uniform distribution.
# So just use an int and mod it
MIN_TIME = 0
MAX_TIME = 14
TIME_DIFF = (MAX_TIME + 1) - MIN_TIME
new_start_minute = GLOBAL_COUNTER % TIME_DIFF
GLOBAL_COUNTER = GLOBAL_COUNTER + 1

if self.type is AnalyticsType.TTP:
minute_start = new_start_minute % 15
minute_stop = minute_start + 45

return self.deployment.scheduling.cron_schedule.format(
minute_range=f"{minute_start}-{minute_stop}"
)

# return "0 * * * *"

return self.deployment.scheduling.cron_schedule.format(minute=new_start_minute)

@computed_field
@property
def risk_score(self) -> RiskScoreValue_Type:
Expand Down Expand Up @@ -804,22 +884,40 @@ def addTags_nist(self):
return self

@model_validator(mode="after")
def ensureThrottlingFieldsExist(self):
def automaticallyCreateThrottling(self, default_throttling_period: str = "86400s"):
"""
If throttling is not explicitly configured, then automatically create
it from the risk and threat objects defined in the RBA config.


For throttling to work properly, the fields to throttle on MUST
exist in the search itself. If not, then we cannot apply the throttling
"""
if self.tags.throttling is None:
# No throttling configured for this detection
return self

# Automatically add throttling fields based on the risk and threat objects
if self.rba is None:
# Cannot add any throttling because there is no RBA config
return self

self.tags.throttling = Throttling(
fields=[ro.field for ro in self.rba.risk_objects] # type: ignore
+ [to.field for to in self.rba.threat_objects], # type: ignore
period=default_throttling_period, # provide a default period of 1 day
)

missing_fields: list[str] = [
field for field in self.tags.throttling.fields if field not in self.search
]
if len(missing_fields) > 0:
raise ValueError(
f"The following throttle fields were missing from the search: {missing_fields}"
print(
f"\nThe following throttle fields were missing from the search [{self.name}]. This is just a warning for now since this is an experimental feature: {missing_fields}\n"
)
return self
# raise ValueError(
# f"The following throttle fields were missing from the search [{self.name}]: {missing_fields}"
# )

else:
# All throttling fields present in search
Expand Down
54 changes: 54 additions & 0 deletions contentctl/objects/baseline.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
model_serializer,
)

from contentctl.objects.abstract_security_content_objects.detection_abstract import (
GLOBAL_COUNTER,
)
from contentctl.objects.baseline_tags import BaselineTags
from contentctl.objects.config import CustomApp
from contentctl.objects.constants import (
Expand All @@ -39,6 +42,57 @@ class Baseline(SecurityContentObject):
deployment: Deployment = Field({})
status: ContentStatus

@computed_field
@property
def calculated_cron(self) -> str:
global GLOBAL_COUNTER
"""
Returns the cron expression for the detection.
Read the docs here to have a better understranding of what cron
expressions are skewable (and good or bad candidates for skewing):
https://docs.splunk.com/Documentation/SplunkCloud/latest/Report/Skewscheduledreportstarttimes#How_the_search_schedule_affects_the_potential_schedule_offset

"""
"""
# Convert the UUID, which is unique per detection, to an integer.
uuid_as_int = int(self.id)
name_hash = hash(self.name)

# Then, mod this by 60. This should give us a fairly random distribution from 0-60
MIN_TIME = 0
MAX_TIME = 59
TIME_DIFF = (MAX_TIME + 1) - MIN_TIME

# We do this instead of imply using randrandge or similar because using the UUID makes
# generation of the cron schedule deterministic, which is useful for testing different
# windows. For example, there is a good chance we may get another request to not have
# things starts within the first 5 minutes, given that many other searches are scheduled
# in ES to kick off at that time.
new_start_minute = name_hash % TIME_DIFF

# Every cron schedule for an ESCU Search is 0 * * * *, we we will just substitute what
# we generated above, ignoring what is actually in the deploymnet
"""

# The spacing of the above implementation winds up being quite poor, maybe because
# our sample size is too small to approach a uniform distribution.
# So just use an int and mod it
MIN_TIME = 0
MAX_TIME = 14
TIME_DIFF = (MAX_TIME + 1) - MIN_TIME
new_start_minute = GLOBAL_COUNTER % TIME_DIFF
GLOBAL_COUNTER = GLOBAL_COUNTER + 1

try:
return self.deployment.scheduling.cron_schedule.format(
minute=new_start_minute
)
except Exception as e:
print(e)
import code

code.interact(local=locals())

@field_validator("status", mode="after")
@classmethod
def NarrowStatus(cls, status: ContentStatus) -> ContentStatus:
Expand Down
5 changes: 4 additions & 1 deletion contentctl/objects/throttling.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from pydantic import BaseModel, Field, field_validator
from typing import Annotated

from pydantic import BaseModel, Field, computed_field, field_validator


# Alert Suppression/Throttling settings have been taken from
# https://docs.splunk.com/Documentation/Splunk/9.2.2/Admin/Savedsearchesconf
Expand Down Expand Up @@ -28,6 +29,8 @@ def no_spaces_in_fields(cls, v: list[str]) -> list[str]:
)
return v

@computed_field
@property
def conf_formatted_fields(self) -> str:
"""
TODO:
Expand Down
2 changes: 1 addition & 1 deletion contentctl/output/templates/savedsearches_baselines.j2
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ action.escu.analytic_story = {{ objectListToNameList(detection.tags.analytic_sto
action.escu.analytic_story = []
{% endif %}
action.escu.data_models = {{ detection.datamodel | tojson }}
cron_schedule = {{ detection.deployment.scheduling.cron_schedule }}
cron_schedule = {{ detection.calculated_cron }}
enableSched = 1
dispatch.earliest_time = {{ detection.deployment.scheduling.earliest_time }}
dispatch.latest_time = {{ detection.deployment.scheduling.latest_time }}
Expand Down
6 changes: 3 additions & 3 deletions contentctl/output/templates/savedsearches_detections.j2
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ action.risk.param._risk = {{ detection.risk | tojson }}
action.risk.param._risk_score = 0
action.risk.param.verbose = 0
{% endif %}
cron_schedule = {{ detection.deployment.scheduling.cron_schedule }}
cron_schedule = {{ detection.calculated_cron }}
dispatch.earliest_time = {{ detection.deployment.scheduling.earliest_time }}
dispatch.latest_time = {{ detection.deployment.scheduling.latest_time }}
action.correlationsearch.enabled = 1
Expand Down Expand Up @@ -70,7 +70,7 @@ action.sendtophantom.param.sensitivity = {{ detection.deployment.alert_action.ph
action.sendtophantom.param.severity = {{ detection.deployment.alert_action.phantom.severity | custom_jinja2_enrichment_filter(detection) }}
{% endif %}
alert.digest_mode = 1
disabled = {{ (not detection.enabled_by_default) | lower }}
disabled = {{ detection.statistically_disabled }}
enableSched = 1
allow_skew = 100%
counttype = number of events
Expand All @@ -80,7 +80,7 @@ realtime_schedule = 0
is_visible = false
{% if detection.tags.throttling %}
alert.suppress = true
alert.suppress.fields = {{ detection.tags.throttling.conf_formatted_fields() }}
alert.suppress.fields = {{ detection.tags.throttling.conf_formatted_fields }}
alert.suppress.period = {{ detection.tags.throttling.period }}
{% endif %}
search = {{ detection.search | escapeNewlines() }}
Expand Down
Loading