Skip to content

Commit

Permalink
[LOGSC-1681] Add a check to validate that logs assets are owned by lo…
Browse files Browse the repository at this point in the history
…gs team
  • Loading branch information
txominpelu committed Mar 15, 2024
1 parent 4cd1514 commit 35492d6
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 1 deletion.
2 changes: 1 addition & 1 deletion datadog_checks_dev/datadog_checks/dev/__about__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# (C) Datadog, Inc. 2018-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
__version__ = '31.0.0'
__version__ = '32.0.0'
162 changes: 162 additions & 0 deletions datadog_checks_dev/datadog_checks/dev/tooling/codeowners.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import re
from typing import Generator, List, Optional, Pattern, Tuple

from typing_extensions import Literal

OwnerTuple = Tuple[Literal["USERNAME", "TEAM", "EMAIL"], str]


TEAM = re.compile(r"^@\S+/\S+")
USERNAME = re.compile(r"^@\S+")
EMAIL = re.compile(r"^\S+@\S+")
MASK = "/" * 20


def path_to_regex(pattern: str) -> Pattern[str]:
"""
// This code is adapted from [sbdchd/codeowners] (https://github.com/sbdchd/codeowners),
// originally licensed under the MIT license. You can find the complete license text in the repository.
"""
regex = ""

slash_pos = pattern.find("/")
anchored = slash_pos > -1 and slash_pos != len(pattern) - 1

regex += r"\A/" if anchored else r"(?:\A|/)"

matches_dir = pattern[-1] == "/"
matches_no_subdirs = pattern[-2:] == "/*"
pattern_trimmed = pattern.strip("/")

in_char_class = False
escaped = False

iterator = enumerate(pattern_trimmed)
for i, ch in iterator:

if escaped:
regex += re.escape(ch)
escaped = False
continue

if ch == "\\":
escaped = True
elif ch == "*":
if i + 1 < len(pattern_trimmed) and pattern_trimmed[i + 1] == "*":
left_anchored = i == 0
leading_slash = i > 0 and pattern_trimmed[i - 1] == "/"
right_anchored = i + 2 == len(pattern_trimmed)
trailing_slash = (
i + 2 < len(pattern_trimmed) and pattern_trimmed[i + 2] == "/"
)

if (left_anchored or leading_slash) and (
right_anchored or trailing_slash
):
regex += ".*"

next(iterator, None)
next(iterator, None)
continue
regex += "[^/]*"
elif ch == "?":
regex += "[^/]"
elif ch == "[":
in_char_class = True
regex += ch
elif ch == "]":
if in_char_class:
regex += ch
in_char_class = False
else:
regex += re.escape(ch)
else:
regex += re.escape(ch)

if in_char_class:
raise ValueError(f"unterminated character class in pattern {pattern}")

if matches_dir:
regex += "/"
elif matches_no_subdirs:
regex += r"\Z"
else:
regex += r"(?:\Z|/)"
return re.compile(regex)


def parse_owner(owner: str) -> Optional[OwnerTuple]:
if TEAM.match(owner):
return ("TEAM", owner)
if USERNAME.match(owner):
return ("USERNAME", owner)
if EMAIL.match(owner):
return ("EMAIL", owner)
return None


class CodeOwners:
def __init__(self, text: str) -> None:
section_name = None

paths: List[Tuple[Pattern[str], str, List[OwnerTuple], int, Optional[str]]] = []
for line_num, line in enumerate(text.splitlines(), start=1):
line = line.strip()
if line == "" or line.startswith("#"):
continue
# Track the GitLab section name (if used)
# https://docs.gitlab.com/ee/user/project/code_owners.html#code-owners-sections
elif line.startswith("[") and line.endswith("]"):
section_name = line[1:-1]
continue
elif line.startswith("^[") and line.endswith("]"):
section_name = line[2:-1]
continue

elements = iter(line.replace("\\ ", MASK).split())
path = next(elements, None)
if path is None:
continue
owners: List[OwnerTuple] = []
for owner in elements:
owner_res = parse_owner(owner)
if owner_res is not None:
owners.append(owner_res)
paths.append(
(
path_to_regex(path),
path.replace(MASK, "\\ "),
owners,
line_num,
section_name,
)
)
paths.reverse()
self.paths = paths

def matching_lines(
self, filepath: str
) -> Generator[
Tuple[List[OwnerTuple], Optional[int], Optional[str], Optional[str]], None, None
]:
for pattern, path, owners, line_num, section_name in self.paths:
if pattern.search(filepath.replace(" ", MASK)) is not None:
yield (owners, line_num, path, section_name)

def matching_line(
self, filepath: str
) -> Tuple[List[OwnerTuple], Optional[int], Optional[str], Optional[str]]:
return next(self.matching_lines(filepath), ([], None, None, None))

def section_name(self, filepath: str) -> Optional[str]:
"""
Find the section name of the specified file path.
None is returned when no matching section information
was found (or sections are not used in the CODEOWNERS file)
"""
return self.matching_line(filepath)[3]

def of(self, filepath: str) -> List[OwnerTuple]:
return self.matching_line(filepath)[0]
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@

from ...utils import get_codeowners, get_codeowners_file, get_valid_integrations
from ..console import CONTEXT_SETTINGS, abort, annotate_error, echo_failure, echo_success
from ...codeowners import CodeOwners

DIRECTORY_REGEX = re.compile(r"\/(.*)\/$")

LOGS_TEAM = '@DataDog/logs-backend'

# Integrations that are known to be tiles and have email-based codeowners
IGNORE_TILES = {
'1e',
Expand All @@ -30,6 +33,25 @@
'squadcast',
}

def create_codeowners_resolver():
"""Creates an object that resolves owners for files in a repo."""
owners_resolver = CodeOwners("\n".join(get_codeowners()))
return owners_resolver

def validate_logs_assets_codeowners():
"""Validate `CODEOWNERS` assigns logs as owner for all log assets."""

has_failed = False

owners_resolver = create_codeowners_resolver()
all_integrations = sorted(get_valid_integrations())
for integration in all_integrations:
logs_assets_owners = owners_resolver.of(f"/{integration}/assets/logs/")
if not (('TEAM', LOGS_TEAM) in logs_assets_owners):
echo_failure(f"/{integration}/assets/logs/ is not owned by {LOGS_TEAM}")
has_failed = True

return has_failed

def create_codeowners_map():
"""Creates a mapping of integrations to codeowners entries"""
Expand Down Expand Up @@ -59,6 +81,10 @@ def codeowners():

has_failed = False
codeowner_map = create_codeowners_map()
owners_resolver = create_codeowners_resolver()
if not validate_logs_assets_codeowners():
echo_success("All integrations have valid logs codeowners.")

codeowners_file = get_codeowners_file()
for integration, codeowner in codeowner_map.items():
if not codeowner:
Expand Down

0 comments on commit 35492d6

Please sign in to comment.