Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[LOGSC-1681] Add a check to validate that logs assets are owned by logs team #17185

Merged
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datadog_checks_dev/changelog.d/17185.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added a check to the `ddev validate codeowners` to make sure that logs assets are owned by `@DatadDog/logs-backend`.
151 changes: 151 additions & 0 deletions datadog_checks_dev/datadog_checks/dev/tooling/codeowners.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
# This file is adapted from [sbdchd/codeowners] (https://github.com/sbdchd/codeowners),
# originally licensed under the MIT license. You can find the complete license text in the repository.

import re
from typing import Generator, List, Optional, Pattern, Tuple

from typing_extensions import Literal

OwnerTuple = Tuple[Literal["USERNAME", "TEAM", "EMAIL"], str]


TEAM = re.compile(r"^@\S+/\S+")
USERNAME = re.compile(r"^@\S+")
EMAIL = re.compile(r"^\S+@\S+")
MASK = "/" * 20


def path_to_regex(pattern: str) -> Pattern[str]:
txominpelu marked this conversation as resolved.
Show resolved Hide resolved
regex = ""

slash_pos = pattern.find("/")
anchored = slash_pos > -1 and slash_pos != len(pattern) - 1

regex += r"\A/" if anchored else r"(?:\A|/)"

matches_dir = pattern[-1] == "/"
matches_no_subdirs = pattern[-2:] == "/*"
pattern_trimmed = pattern.strip("/")

in_char_class = False
escaped = False

iterator = enumerate(pattern_trimmed)
for i, ch in iterator:
if escaped:
regex += re.escape(ch)
escaped = False
continue

if ch == "\\":
escaped = True
elif ch == "*":
if i + 1 < len(pattern_trimmed) and pattern_trimmed[i + 1] == "*":
left_anchored = i == 0
leading_slash = i > 0 and pattern_trimmed[i - 1] == "/"
right_anchored = i + 2 == len(pattern_trimmed)
trailing_slash = i + 2 < len(pattern_trimmed) and pattern_trimmed[i + 2] == "/"

if (left_anchored or leading_slash) and (right_anchored or trailing_slash):
regex += ".*"

next(iterator, None)
next(iterator, None)
continue
regex += "[^/]*"
elif ch == "?":
regex += "[^/]"
elif ch == "[":
in_char_class = True
regex += ch
elif ch == "]":
if in_char_class:
regex += ch
in_char_class = False
else:
regex += re.escape(ch)
else:
regex += re.escape(ch)

if in_char_class:
raise ValueError(f"unterminated character class in pattern {pattern}")

if matches_dir:
regex += "/"
elif matches_no_subdirs:
regex += r"\Z"
else:
regex += r"(?:\Z|/)"
return re.compile(regex)


def parse_owner(owner: str) -> Optional[OwnerTuple]:
if TEAM.match(owner):
return ("TEAM", owner)
if USERNAME.match(owner):
return ("USERNAME", owner)
if EMAIL.match(owner):
return ("EMAIL", owner)
return None


class CodeOwners:
def __init__(self, text: str) -> None:
section_name = None

paths: List[Tuple[Pattern[str], str, List[OwnerTuple], int, Optional[str]]] = []
for line_num, line in enumerate(text.splitlines(), start=1):
line = line.strip()
if line == "" or line.startswith("#"):
continue
# Track the GitLab section name (if used)
# https://docs.gitlab.com/ee/user/project/code_owners.html#code-owners-sections
elif line.startswith("[") and line.endswith("]"):
section_name = line[1:-1]
continue
elif line.startswith("^[") and line.endswith("]"):
section_name = line[2:-1]
continue

elements = iter(line.replace("\\ ", MASK).split())
path = next(elements, None)
if path is None:
continue
owners: List[OwnerTuple] = []
for owner in elements:
owner_res = parse_owner(owner)
if owner_res is not None:
owners.append(owner_res)
paths.append(
(
path_to_regex(path),
path.replace(MASK, "\\ "),
owners,
line_num,
section_name,
)
)
paths.reverse()
self.paths = paths

def matching_lines(
self, filepath: str
) -> Generator[Tuple[List[OwnerTuple], Optional[int], Optional[str], Optional[str]], None, None]:
for pattern, path, owners, line_num, section_name in self.paths:
if pattern.search(filepath.replace(" ", MASK)) is not None:
yield (owners, line_num, path, section_name)

def matching_line(self, filepath: str) -> Tuple[List[OwnerTuple], Optional[int], Optional[str], Optional[str]]:
return next(self.matching_lines(filepath), ([], None, None, None))

def section_name(self, filepath: str) -> Optional[str]:
"""
Find the section name of the specified file path.

None is returned when no matching section information
was found (or sections are not used in the CODEOWNERS file)
"""
return self.matching_line(filepath)[3]

def of(self, filepath: str) -> List[OwnerTuple]:
return self.matching_line(filepath)[0]
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
# (C) Datadog, Inc. 2020-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
import os
import re

import click

from ...codeowners import CodeOwners
from ...constants import get_root
from ...utils import get_codeowners, get_codeowners_file, get_valid_integrations
from ..console import CONTEXT_SETTINGS, abort, annotate_error, echo_failure, echo_success

DIRECTORY_REGEX = re.compile(r"\/(.*)\/$")

LOGS_TEAM = '@DataDog/logs-backend'

# Integrations that are known to be tiles and have email-based codeowners
IGNORE_TILES = {
'1e',
Expand All @@ -31,6 +36,27 @@
}


def create_codeowners_resolver():
"""Creates an object that resolves owners for files in a repo."""
owners_resolver = CodeOwners("\n".join(get_codeowners()))
return owners_resolver


def validate_logs_assets_codeowners():
"""Validate `CODEOWNERS` assigns logs as owner for all log assets."""

failed_integrations = []
owners_resolver = create_codeowners_resolver()
all_integrations = sorted(get_valid_integrations())
for integration in all_integrations:
logs_assets_owners = owners_resolver.of(f"/{integration}/assets/logs/")
path = os.path.join(get_root(), integration, 'assets', 'logs')
if ("TEAM", LOGS_TEAM) not in logs_assets_owners and os.path.exists(path):
failed_integrations.append(integration)

return failed_integrations


def create_codeowners_map():
"""Creates a mapping of integrations to codeowners entries"""
codeowners = get_codeowners()
Expand Down Expand Up @@ -58,6 +84,16 @@ def create_codeowners_map():
def codeowners(ctx):
"""Validate that every integration has an entry in the `CODEOWNERS` file."""

failed_integrations = validate_logs_assets_codeowners()
if failed_integrations:
for integration in failed_integrations:
echo_failure(f"/{integration}/assets/logs/ is not owned by {LOGS_TEAM}")
abort()
Copy link
Member

@FlorentClarret FlorentClarret Mar 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
abort()
has_failed = True

I would drop it so we can go through the rest of the validation after :)

else:
echo_success("All integrations have valid logs codeowners.")
iliakur marked this conversation as resolved.
Show resolved Hide resolved

codeowner_map = create_codeowners_map()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
codeowner_map = create_codeowners_map()

That's initilialized in the if statement right after

has_failed = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and thus move this to the top 🙂

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean to the bottom ? Like, this commit: cc7deb8

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant has_failed = False, so it works if you put your code at the bottom 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aaaah, haha. Makes sense, I did it even if I had not fully understood your comment :P

is_core_check = ctx.obj['repo_choice'] == 'core'

Expand Down
Loading