From bbb8d97b23aea25df3edb716065a910f5ed1865c Mon Sep 17 00:00:00 2001 From: Ramki Subramanian <5551887+rsubra13@users.noreply.github.com> Date: Tue, 11 Apr 2023 10:59:43 -0700 Subject: [PATCH] Do not show a load completion message if all steps failed (#1055) * Replace unneeded comprehension with generator `any` allows us to pass in a generator rather than a collection. Doing so removes a pair of brackets, making the intent slightly clearer. It will also have lazy evaluation advantages as the whole list is not generated. * Use f'' strings * Merge isinstance calls for better readability * Added `skip_report_completion` function to determine whether to skip slack notications * Fixed test failures by fixing datatypes * Reorder tests for clarity * Add check for condition where all record counts by media type are None * Lint --------- Co-authored-by: Madison Swain-Bowden --- .gitignore | 1 + .../dags/common/loader/reporting.py | 36 ++++++++--- tests/dags/common/loader/test_reporting.py | 60 ++++++++++++++++--- 3 files changed, 81 insertions(+), 16 deletions(-) diff --git a/.gitignore b/.gitignore index e859539e8ca..6378e82b3a6 100644 --- a/.gitignore +++ b/.gitignore @@ -126,6 +126,7 @@ dmypy.json # Editors and IDE's .vscode/ +.history/ .idea/ *.sublime-workspace diff --git a/openverse_catalog/dags/common/loader/reporting.py b/openverse_catalog/dags/common/loader/reporting.py index 4d3e1d4f089..08625054ae0 100644 --- a/openverse_catalog/dags/common/loader/reporting.py +++ b/openverse_catalog/dags/common/loader/reporting.py @@ -4,6 +4,8 @@ from collections.abc import Sequence from typing import NamedTuple +from airflow.exceptions import AirflowSkipException + from common.slack import send_message @@ -53,17 +55,17 @@ def humanize_time_duration(seconds: float | int) -> str: for unit, div in TIME_DURATION_UNITS: amount, seconds = divmod(int(seconds), div) if amount > 0: - parts.append("{} {}{}".format(amount, unit, "" if amount == 1 else "s")) + parts.append(f'{amount} {unit}{"" if amount == 1 else "s"}') return ", ".join(parts) -def clean_duration(duration: float | list[float]): +def clean_duration(duration: float | list[float] | None) -> str | None: # If a list of duration values is provided, get the sum of all non-None values if isinstance(duration, list): - duration = sum([x for x in duration if x]) + duration = sum(x for x in duration if x) # Truncate the duration value if it's provided - if isinstance(duration, float) or isinstance(duration, int): + if isinstance(duration, (float, int)): duration = humanize_time_duration(duration) return duration @@ -72,12 +74,12 @@ def clean_duration(duration: float | list[float]): def clean_record_counts( record_counts_by_media_type: MediaTypeRecordMetrics | list[MediaTypeRecordMetrics], media_types: Sequence[str], -): +) -> dict[str, RecordMetrics]: # If a list of record_counts dicts is provided, sum all of the individual values if isinstance(record_counts_by_media_type, list): return { media_type: sum( - [x[media_type] for x in record_counts_by_media_type], + (x[media_type] for x in record_counts_by_media_type), RecordMetrics(0, 0, 0, 0), ) for media_type in media_types @@ -85,6 +87,22 @@ def clean_record_counts( return record_counts_by_media_type +def skip_report_completion( + duration: str | None, + record_counts_by_media_type: dict[str, RecordMetrics], +) -> bool: + return ( + # Duration must be provided and be a value greater than 1 second + duration is None + or duration in ("inf", "less than 1 sec") + ) and ( + # Record counts by media type must be provided and at least one value must + # be truthy (i.e. not None) + not record_counts_by_media_type + or all([val is None for val in record_counts_by_media_type.values()]) + ) + + def report_completion( dag_id: str, media_types: Sequence[str], @@ -121,6 +139,10 @@ def report_completion( record_counts_by_media_type = clean_record_counts( record_counts_by_media_type, media_types ) + if skip_report_completion(duration, record_counts_by_media_type): + raise AirflowSkipException( + "An upstream failure occured and no rows were loaded" + ) # List record count per media type media_type_reports = "" @@ -130,7 +152,7 @@ def report_completion( else: upserted_human_readable = f"{counts.upserted:,}" media_type_reports += f" - `{media_type}`: {upserted_human_readable}" - if counts is None or any([count is None for count in counts]): + if counts is None or any(count is None for count in counts): # Can't make calculation without data continue extras = [] diff --git a/tests/dags/common/loader/test_reporting.py b/tests/dags/common/loader/test_reporting.py index 10261713834..cbe54f17a07 100644 --- a/tests/dags/common/loader/test_reporting.py +++ b/tests/dags/common/loader/test_reporting.py @@ -1,12 +1,14 @@ from unittest import mock import pytest +from airflow.exceptions import AirflowSkipException from common.loader.reporting import ( RecordMetrics, clean_duration, clean_record_counts, report_completion, + skip_report_completion, ) @@ -154,15 +156,21 @@ def test_report_completion_contents( expected_date_range, ): with mock.patch("common.loader.reporting.send_message"): - message = report_completion( - "jamendo_workflow", - ["audio", "image"], - None, - {**audio_data, **image_data}, - dated, - date_range_start, - date_range_end, - ) + record_counts_by_media_type = {**audio_data, **image_data} + should_skip = skip_report_completion(None, record_counts_by_media_type) + try: + message = report_completion( + "jamendo_workflow", + ["audio", "image"], + None, + record_counts_by_media_type, + dated, + date_range_start, + date_range_end, + ) + except AirflowSkipException: + assert should_skip, "AirflowSkipException raised unexpectedly" + return for expected in [audio_expected, image_expected]: assert ( expected in message @@ -234,6 +242,7 @@ def test_report_completion_contents_with_lists( "seconds, expected", [ (0.1, "less than 1 sec"), + (None, None), (1, "1 sec"), (10, "10 secs"), (100, "1 min, 40 secs"), @@ -256,9 +265,42 @@ def test_clean_time_duration(seconds, expected): assert actual == expected +@pytest.mark.parametrize( + "duration, duration_expected", + [ + (None, True), + ("less than 1 sec", True), + ("inf", True), + ("10 secs", False), + ("16 weeks, 3 days, 17 hours, 46 mins, 40 secs", False), + ], +) +@pytest.mark.parametrize( + "record_counts_by_media_type, expected_counts", + [ + (None, True), + ({}, True), + ({"image": None, "audio": None}, True), + ( + {"image": RecordMetrics(1, 2, 3, 4), "audio": RecordMetrics(1, 2, 0, 0)}, + False, + ), + ], +) +def test_skip_report_completion( + duration, duration_expected, record_counts_by_media_type, expected_counts +) -> None: + actual = skip_report_completion(duration, record_counts_by_media_type) + assert actual == (duration_expected and expected_counts) + + @pytest.mark.parametrize( "record_counts_by_media_type, media_types, expected", [ + # Empty values + ({}, [], {}), + # Null values + (None, None, None), # Single, one media type ( {"image": RecordMetrics(1, 2, 3, 4)},