Skip to content

Commit

Permalink
Split batched_update DAG into automated and manual DAGs (#4552)
Browse files Browse the repository at this point in the history
* Split batched update DAG into two identical DAGs

* Update popularity refresh to use automated batched update DAG instead

* Update DAG doc generation to handle new case
  • Loading branch information
AetherUnbound authored Jul 13, 2024
1 parent 34e9b2d commit 0a741a6
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 5 deletions.
9 changes: 5 additions & 4 deletions catalog/dags/database/batched_update/batched_update_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,7 @@

logger = logging.getLogger(__name__)


@dag(
dag_id=constants.DAG_ID,
DAG_CONFIG = dict(
schedule=None,
start_date=constants.START_DATE,
tags=["database"],
Expand Down Expand Up @@ -170,6 +168,8 @@
),
},
)


def batched_update():
# Unique Airflow variable name for tracking the batch_start for this query
BATCH_START_VAR = "batched_update_start_{{ params.query_id }}"
Expand Down Expand Up @@ -269,4 +269,5 @@ def batched_update():
]


batched_update()
for dag_id in [constants.DAG_ID, constants.AUTOMATED_DAG_ID]:
dag(dag_id, **DAG_CONFIG)(batched_update)()
1 change: 1 addition & 0 deletions catalog/dags/database/batched_update/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


DAG_ID = "batched_update"
AUTOMATED_DAG_ID = f"automated_{DAG_ID}"
START_DATE = datetime(2023, 5, 1)
SLACK_USERNAME = "Upstream Batched Update"
SLACK_ICON = ":database:"
Expand Down
2 changes: 1 addition & 1 deletion catalog/dags/popularity/popularity_refresh_dag_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

from common import slack
from common.constants import DAG_DEFAULT_ARGS, POSTGRES_CONN_ID
from database.batched_update.constants import DAG_ID as BATCHED_UPDATE_DAG_ID
from database.batched_update.constants import AUTOMATED_DAG_ID as BATCHED_UPDATE_DAG_ID
from popularity import sql
from popularity.popularity_refresh_types import (
POPULARITY_REFRESH_CONFIGS,
Expand Down
4 changes: 4 additions & 0 deletions catalog/utilities/dag_doc_gen/dag_doc_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
"audio": "{media_type}",
"production": "{environment}",
"staging": "{environment}",
"automated": "",
}
# DAG IDs to ignore when collapsing reference documentation for a mapped term
DAG_IDS_TO_IGNORE_COLLAPSE = {
Expand Down Expand Up @@ -147,6 +148,9 @@ def determine_mapped_dag_id(dag_id: str) -> str:
for idx, part in enumerate(parts):
if part in DAG_ID_TERMS_TO_COLLAPSE:
parts[idx] = DAG_ID_TERMS_TO_COLLAPSE[part]
if not parts[idx]:
# If there's no replacement, remove the section entirely
parts.pop(idx)
return "_".join(parts)


Expand Down
2 changes: 2 additions & 0 deletions documentation/catalog/reference/DAGs.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ The following are DAGs grouped by their primary tag:

| DAG ID | Schedule Interval |
| -------------------------------------------------------------------------------------- | ----------------- |
| [`automated_batched_update`](#batched_update) | `None` |
| [`batched_update`](#batched_update) | `None` |
| [`decode_and_deduplicate_image_tags`](#decode_and_deduplicate_media_type_tags) | `None` |
| [`delete_records`](#delete_records) | `None` |
Expand Down Expand Up @@ -136,6 +137,7 @@ The following is documentation associated with each DAG (where available):
1. [`add_license_url`](#add_license_url)
1. [`airflow_log_cleanup`](#airflow_log_cleanup)
1. [`auckland_museum_workflow`](#auckland_museum_workflow)
1. [`automated_batched_update`](#batched_update)
1. [`batched_update`](#batched_update)
1. [`cc_mixter_workflow`](#cc_mixter_workflow)
1. [`check_silenced_dags`](#check_silenced_dags)
Expand Down

0 comments on commit 0a741a6

Please sign in to comment.