Make batched_update DAG resilient to task failures #2507
Labels
💻 aspect: code
Concerns the software code in the repository
✨ goal: improvement
Improvement to an existing user-facing feature
🟧 priority: high
Stalls work on the project or its dependents
🧱 stack: catalog
Related to the catalog and Airflow DAGs
Milestone
Current Situation
We plan to use the
batched_update
DAG to run popularity refreshes for each provider. In the case of very large providers like Flickr, theupdate_batches
task may be very long-running (more than a week long). This means that it is highly likely that we will need to run catalog deployments while the task is still in progress.If the catalog is deployed while the task is running, the task will fail and then retry (assuming it has remaining retries) once Airflow is back up. Currently, this means that the
update_batches
task will resume doing its update from the BEGINNING of the rows_to_update table. Thus while it looks like the DAG is still running fine, the batched update has actually started over from the beginning and is re-updating rows that were already updated.We need a way to ensure that when the task retries, it picks up where it left off instead of starting over.
Note: A developer can do this manually, by failing the DAG, noting in the logs where it left off (what was the last
row_id
processed), and then manually triggering a new DagRun with the same query parameters but withbatch_start
set to this row_id, andresume_update
enabled to prevent the update table from being recreated. However this is a tedious manual process that may need to be done multiple times, for multiple providers, and which could easily be forgotten.Suggested Improvement
There are a couple ways we could implement this:
Deleting records from the
rows_to_update
table as we goThe simplest fix would be to delete records from the
rows_to_update
table after their corresponding records are updated on the media table. Then when the task fails and is retried, anything that was already updated will be missing from the table.Cons:
batch_start
logic. Currently we assume that unless a custom start is provided, we start atrow_id = 0
, but that would no longer be the case.Store the last processed row_id somewhere
We could persist a variable keeping track of the last row_id processed, which we update each time a batch is processed. Because this must persist through a task failure and retry, we'd need to store this in the database. When the
update_batches
task runs, it chooses where to begin the update in this order:batch_start
if supplied is always preferredlast_processed_row_id
if it existsThe simplest way might be to use an Airflow variable using the
query_id
to uniquely name it (last_processed_row_id_{{query_id}}
), which we can easily set in the task. The only hitch is cleaning up/deleting the variable when the DagRun finishes, but I believe we can do that with the cli and a bash operator (example).Dynamically generated batch update tasks, instead of a single update task
The current approach has a single
update_batches
task which iteratively runs the updates. We could instead use dynamic task mapping to generate a separate Airflow task per batch. Each task succeeds when it is done, so only the currently running batches will be retried if they are halted during a deployment.This was not implemented originally because large providers may require huge numbers of batches (50k batches for Flickr, for example). Airflow by default only allows a maximum of 1024 dynamic tasks per DAG at a time. As this number gets higher, the scheduler apparently becomes clogged and Airflow gets extremely slow.
I did have some ideas for handling this, but it's not really how Airflow is intended to be used. For example, we could have the batched_update DAG schedule the first 1024 batches and then, if not all batches were processed, use the TriggerDagRunOperator to trigger another DagRun of itself using the existing
batch_start
andresume_update
params to get the next batches.Cons:
max_active_tasks
to restrict how many run at one time, but critically there is no way to make the entire DAG fail as soon as one fails. If there's a problem with the SQL, we could expect a bombardment of 1024 task failures and Slack notifications. (It is possible to prevent the Slack notifications, but this is still unfortunate).Benefit
Allows very long-running batched updates to continue through deployments and to resume from failure without manual intervention.
Additional context
#2331
The text was updated successfully, but these errors were encountered: