Skip to content

Commit

Permalink
Do not show a load completion message if all steps failed (#1055)
Browse files Browse the repository at this point in the history
* Replace unneeded comprehension with generator

`any` allows us to pass in a generator rather than a collection.
Doing so removes a pair of brackets, making the intent slightly clearer.
It will also have lazy evaluation advantages as the whole list is not generated.

* Use f'' strings

* Merge isinstance calls for better readability

* Added `skip_report_completion` function to determine whether to skip slack notications

* Fixed test failures by fixing datatypes

* Reorder tests for clarity

* Add check for condition where all record counts by media type are None

* Lint

---------

Co-authored-by: Madison Swain-Bowden <bowdenm@spu.edu>
  • Loading branch information
rsubra13 and AetherUnbound authored Apr 11, 2023
1 parent d92e6b6 commit bbb8d97
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 16 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ dmypy.json

# Editors and IDE's
.vscode/
.history/
.idea/
*.sublime-workspace

Expand Down
36 changes: 29 additions & 7 deletions openverse_catalog/dags/common/loader/reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from collections.abc import Sequence
from typing import NamedTuple

from airflow.exceptions import AirflowSkipException

from common.slack import send_message


Expand Down Expand Up @@ -53,17 +55,17 @@ def humanize_time_duration(seconds: float | int) -> str:
for unit, div in TIME_DURATION_UNITS:
amount, seconds = divmod(int(seconds), div)
if amount > 0:
parts.append("{} {}{}".format(amount, unit, "" if amount == 1 else "s"))
parts.append(f'{amount} {unit}{"" if amount == 1 else "s"}')
return ", ".join(parts)


def clean_duration(duration: float | list[float]):
def clean_duration(duration: float | list[float] | None) -> str | None:
# If a list of duration values is provided, get the sum of all non-None values
if isinstance(duration, list):
duration = sum([x for x in duration if x])
duration = sum(x for x in duration if x)

# Truncate the duration value if it's provided
if isinstance(duration, float) or isinstance(duration, int):
if isinstance(duration, (float, int)):
duration = humanize_time_duration(duration)

return duration
Expand All @@ -72,19 +74,35 @@ def clean_duration(duration: float | list[float]):
def clean_record_counts(
record_counts_by_media_type: MediaTypeRecordMetrics | list[MediaTypeRecordMetrics],
media_types: Sequence[str],
):
) -> dict[str, RecordMetrics]:
# If a list of record_counts dicts is provided, sum all of the individual values
if isinstance(record_counts_by_media_type, list):
return {
media_type: sum(
[x[media_type] for x in record_counts_by_media_type],
(x[media_type] for x in record_counts_by_media_type),
RecordMetrics(0, 0, 0, 0),
)
for media_type in media_types
}
return record_counts_by_media_type


def skip_report_completion(
duration: str | None,
record_counts_by_media_type: dict[str, RecordMetrics],
) -> bool:
return (
# Duration must be provided and be a value greater than 1 second
duration is None
or duration in ("inf", "less than 1 sec")
) and (
# Record counts by media type must be provided and at least one value must
# be truthy (i.e. not None)
not record_counts_by_media_type
or all([val is None for val in record_counts_by_media_type.values()])
)


def report_completion(
dag_id: str,
media_types: Sequence[str],
Expand Down Expand Up @@ -121,6 +139,10 @@ def report_completion(
record_counts_by_media_type = clean_record_counts(
record_counts_by_media_type, media_types
)
if skip_report_completion(duration, record_counts_by_media_type):
raise AirflowSkipException(
"An upstream failure occured and no rows were loaded"
)

# List record count per media type
media_type_reports = ""
Expand All @@ -130,7 +152,7 @@ def report_completion(
else:
upserted_human_readable = f"{counts.upserted:,}"
media_type_reports += f" - `{media_type}`: {upserted_human_readable}"
if counts is None or any([count is None for count in counts]):
if counts is None or any(count is None for count in counts):
# Can't make calculation without data
continue
extras = []
Expand Down
60 changes: 51 additions & 9 deletions tests/dags/common/loader/test_reporting.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from unittest import mock

import pytest
from airflow.exceptions import AirflowSkipException

from common.loader.reporting import (
RecordMetrics,
clean_duration,
clean_record_counts,
report_completion,
skip_report_completion,
)


Expand Down Expand Up @@ -154,15 +156,21 @@ def test_report_completion_contents(
expected_date_range,
):
with mock.patch("common.loader.reporting.send_message"):
message = report_completion(
"jamendo_workflow",
["audio", "image"],
None,
{**audio_data, **image_data},
dated,
date_range_start,
date_range_end,
)
record_counts_by_media_type = {**audio_data, **image_data}
should_skip = skip_report_completion(None, record_counts_by_media_type)
try:
message = report_completion(
"jamendo_workflow",
["audio", "image"],
None,
record_counts_by_media_type,
dated,
date_range_start,
date_range_end,
)
except AirflowSkipException:
assert should_skip, "AirflowSkipException raised unexpectedly"
return
for expected in [audio_expected, image_expected]:
assert (
expected in message
Expand Down Expand Up @@ -234,6 +242,7 @@ def test_report_completion_contents_with_lists(
"seconds, expected",
[
(0.1, "less than 1 sec"),
(None, None),
(1, "1 sec"),
(10, "10 secs"),
(100, "1 min, 40 secs"),
Expand All @@ -256,9 +265,42 @@ def test_clean_time_duration(seconds, expected):
assert actual == expected


@pytest.mark.parametrize(
"duration, duration_expected",
[
(None, True),
("less than 1 sec", True),
("inf", True),
("10 secs", False),
("16 weeks, 3 days, 17 hours, 46 mins, 40 secs", False),
],
)
@pytest.mark.parametrize(
"record_counts_by_media_type, expected_counts",
[
(None, True),
({}, True),
({"image": None, "audio": None}, True),
(
{"image": RecordMetrics(1, 2, 3, 4), "audio": RecordMetrics(1, 2, 0, 0)},
False,
),
],
)
def test_skip_report_completion(
duration, duration_expected, record_counts_by_media_type, expected_counts
) -> None:
actual = skip_report_completion(duration, record_counts_by_media_type)
assert actual == (duration_expected and expected_counts)


@pytest.mark.parametrize(
"record_counts_by_media_type, media_types, expected",
[
# Empty values
({}, [], {}),
# Null values
(None, None, None),
# Single, one media type
(
{"image": RecordMetrics(1, 2, 3, 4)},
Expand Down

0 comments on commit bbb8d97

Please sign in to comment.