Skip to content

Enable scripted fixing of detection metadata validation errors #405

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion contentctl/actions/inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
DetectionMissingError,
MetadataValidationError,
VersionBumpingError,
VersionBumpingNotNeededError,
VersionBumpingTooFarError,
VersionDecrementedError,
)
Expand Down Expand Up @@ -427,19 +428,36 @@ def check_detection_metadata(self, config: inspect) -> None:
rule_name=rule_name,
current_version=current_stanza.metadata.detection_version,
previous_version=previous_stanza.metadata.detection_version,
expected_version=previous_stanza.metadata.detection_version + 1,
)
)

# Versions should never increase more than one version between releases
if (
current_stanza.metadata.detection_version
> previous_stanza.metadata.detection_version + 1
):
if current_stanza.hash != previous_stanza.hash:
validation_errors[rule_name].append(
VersionBumpingTooFarError(
rule_name=rule_name,
current_version=current_stanza.metadata.detection_version,
previous_version=previous_stanza.metadata.detection_version,
expected_version=previous_stanza.metadata.detection_version
+ 1,
)
)
# Versions should not be bumped if the stanza has not changed
if (current_stanza.hash == previous_stanza.hash) & (
current_stanza.metadata.detection_version
!= previous_stanza.metadata.detection_version
):
validation_errors[rule_name].append(
VersionBumpingTooFarError(
VersionBumpingNotNeededError(
rule_name=rule_name,
current_version=current_stanza.metadata.detection_version,
previous_version=previous_stanza.metadata.detection_version,
expected_version=previous_stanza.metadata.detection_version,
)
)

Expand All @@ -451,12 +469,25 @@ def check_detection_metadata(self, config: inspect) -> None:
# Report failure/success
print("\nDetection Metadata Validation:")
if len(validation_error_list) > 0:
with open("metadata_validation_errors.json", "w") as f:
fixable_errors = [
x
for x in validation_error_list
if type(x)
in [
VersionBumpingError,
VersionBumpingNotNeededError,
VersionBumpingTooFarError,
]
]
json.dump([x.toJSON() for x in fixable_errors], f, indent=4)
# Iterate over each rule and report the failures
for rule_name in validation_errors:
if len(validation_errors[rule_name]) > 0:
print(f"\t❌ {rule_name}")
for error in validation_errors[rule_name]:
print(f"\t\t🔸 {error.short_message}")

else:
# If no errors in the list, report success
print(
Expand Down
112 changes: 111 additions & 1 deletion contentctl/objects/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,16 @@ class VersioningError(MetadataValidationError, ABC):
previous_version: int

def __init__(
self, rule_name: str, current_version: int, previous_version: int, *args: object
self,
rule_name: str,
current_version: int,
previous_version: int,
*args: object,
) -> None:
self.rule_name = rule_name
self.current_version = current_version
self.previous_version = previous_version

super().__init__(self.long_message, *args)


Expand Down Expand Up @@ -176,6 +181,20 @@ class VersionBumpingError(VersioningError):
An error indicating the detection changed but its version wasn't bumped appropriately
"""

# The version expected in the current build, when possible
expected_version: int

def __init__(
self,
rule_name: str,
current_version: int,
previous_version: int,
expected_version: int,
*args: object,
) -> None:
self.expected_version = expected_version
super().__init__(rule_name, current_version, previous_version, *args)

@property
def long_message(self) -> str:
"""
Expand All @@ -196,12 +215,90 @@ def short_message(self) -> str:
"""
return f"Detection version in current build should be bumped to {self.previous_version + 1}."

def toJSON(self) -> dict[str, object]:
"""
Convert the error to a JSON-serializable dict
:returns: a dict, the error
"""
return {
"rule_name": self.rule_name,
"message": self.short_message,
"current_version": self.current_version,
"previous_version": self.previous_version,
"expected_version": self.expected_version,
}


class VersionBumpingNotNeededError(VersioningError):
"""
An error indicating the detection did not change but its version was bumped
"""

expected_version: int

def __init__(
self,
rule_name: str,
current_version: int,
previous_version: int,
expected_version: int,
*args: object,
) -> None:
self.expected_version = expected_version
super().__init__(rule_name, current_version, previous_version, *args)

@property
def long_message(self) -> str:
"""
A long-form error message
:returns: a str, the message
"""
return (
f"Rule '{self.rule_name}' has not changed in current build compared to previous "
"build (stanza hashes are the same); the detection version should not be bumped."
)

@property
def short_message(self) -> str:
"""
A short-form error message
:returns: a str, the message
"""
return "Detection version in current build should not be bumped."

def toJSON(self) -> dict[str, object]:
"""
Convert the error to a JSON-serializable dict
:returns: a dict, the error
"""
return {
"rule_name": self.rule_name,
"message": self.short_message,
"current_version": self.current_version,
"previous_version": self.previous_version,
"expected_version": self.expected_version,
}


class VersionBumpingTooFarError(VersioningError):
"""
An error indicating the detection changed but its version was bumped too far
"""

# The version expected in the current build, when possible
expected_version: int

def __init__(
self,
rule_name: str,
current_version: int,
previous_version: int,
expected_version: int,
*args: object,
) -> None:
self.expected_version = expected_version
super().__init__(rule_name, current_version, previous_version, *args)

@property
def long_message(self) -> str:
"""
Expand All @@ -221,3 +318,16 @@ def short_message(self) -> str:
:returns: a str, the message
"""
return f"Detection version in current build should be reduced to {self.previous_version + 1}."

def toJSON(self) -> dict[str, object]:
"""
Convert the error to a JSON-serializable dict
:returns: a dict, the error
"""
return {
"rule_name": self.rule_name,
"current_version": self.current_version,
"message": self.short_message,
"previous_version": self.previous_version,
"expected_version": self.expected_version,
}
Loading