Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make batched_update DAG resilient to task failures #2507

Closed
stacimc opened this issue Jun 29, 2023 · 0 comments · Fixed by #2570
Closed

Make batched_update DAG resilient to task failures #2507

stacimc opened this issue Jun 29, 2023 · 0 comments · Fixed by #2570
Assignees
Labels
💻 aspect: code Concerns the software code in the repository ✨ goal: improvement Improvement to an existing user-facing feature 🟧 priority: high Stalls work on the project or its dependents 🧱 stack: catalog Related to the catalog and Airflow DAGs

Comments

@stacimc
Copy link
Contributor

stacimc commented Jun 29, 2023

Current Situation

We plan to use the batched_update DAG to run popularity refreshes for each provider. In the case of very large providers like Flickr, the update_batches task may be very long-running (more than a week long). This means that it is highly likely that we will need to run catalog deployments while the task is still in progress.

If the catalog is deployed while the task is running, the task will fail and then retry (assuming it has remaining retries) once Airflow is back up. Currently, this means that the update_batches task will resume doing its update from the BEGINNING of the rows_to_update table. Thus while it looks like the DAG is still running fine, the batched update has actually started over from the beginning and is re-updating rows that were already updated.

We need a way to ensure that when the task retries, it picks up where it left off instead of starting over.

Note: A developer can do this manually, by failing the DAG, noting in the logs where it left off (what was the last row_id processed), and then manually triggering a new DagRun with the same query parameters but with batch_start set to this row_id, and resume_update enabled to prevent the update table from being recreated. However this is a tedious manual process that may need to be done multiple times, for multiple providers, and which could easily be forgotten.

Suggested Improvement

There are a couple ways we could implement this:

Deleting records from the rows_to_update table as we go

The simplest fix would be to delete records from the rows_to_update table after their corresponding records are updated on the media table. Then when the task fails and is retried, anything that was already updated will be missing from the table.

Cons:

  • Adds an extra query to each batch update. It should be a very quick query, but adding any overhead to this update is undesirable.
  • We would need to update the batch_start logic. Currently we assume that unless a custom start is provided, we start at row_id = 0, but that would no longer be the case.

Store the last processed row_id somewhere

We could persist a variable keeping track of the last row_id processed, which we update each time a batch is processed. Because this must persist through a task failure and retry, we'd need to store this in the database. When the update_batches task runs, it chooses where to begin the update in this order:

  • batch_start if supplied is always preferred
  • last_processed_row_id if it exists
  • defaults to 0, beginning from the start of the update table.

The simplest way might be to use an Airflow variable using the query_id to uniquely name it (last_processed_row_id_{{query_id}}), which we can easily set in the task. The only hitch is cleaning up/deleting the variable when the DagRun finishes, but I believe we can do that with the cli and a bash operator (example).

Dynamically generated batch update tasks, instead of a single update task

The current approach has a single update_batches task which iteratively runs the updates. We could instead use dynamic task mapping to generate a separate Airflow task per batch. Each task succeeds when it is done, so only the currently running batches will be retried if they are halted during a deployment.

This was not implemented originally because large providers may require huge numbers of batches (50k batches for Flickr, for example). Airflow by default only allows a maximum of 1024 dynamic tasks per DAG at a time. As this number gets higher, the scheduler apparently becomes clogged and Airflow gets extremely slow.

I did have some ideas for handling this, but it's not really how Airflow is intended to be used. For example, we could have the batched_update DAG schedule the first 1024 batches and then, if not all batches were processed, use the TriggerDagRunOperator to trigger another DagRun of itself using the existing batch_start and resume_update params to get the next batches.

Cons:

  • Even with my solution to the task-number problem, this is introducing a lot of Airflow overhead and a big strain on the scheduler -- especially if multiple batched updates are running at the same time. This may considerably slow down an already slow process.
  • Dynamically mapped tasks are meant to run concurrently. We can use max_active_tasks to restrict how many run at one time, but critically there is no way to make the entire DAG fail as soon as one fails. If there's a problem with the SQL, we could expect a bombardment of 1024 task failures and Slack notifications. (It is possible to prevent the Slack notifications, but this is still unfortunate).

Benefit

Allows very long-running batched updates to continue through deployments and to resume from failure without manual intervention.

Additional context

#2331

@stacimc stacimc added 🟧 priority: high Stalls work on the project or its dependents ✨ goal: improvement Improvement to an existing user-facing feature 💻 aspect: code Concerns the software code in the repository 🧱 stack: catalog Related to the catalog and Airflow DAGs labels Jun 29, 2023
@stacimc stacimc self-assigned this Jun 29, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
💻 aspect: code Concerns the software code in the repository ✨ goal: improvement Improvement to an existing user-facing feature 🟧 priority: high Stalls work on the project or its dependents 🧱 stack: catalog Related to the catalog and Airflow DAGs
Projects
Archived in project
Development

Successfully merging a pull request may close this issue.

1 participant