Closed
Description
openedon Apr 17, 2024
Problem
Blocked by #4146, #4147, https://github.com/WordPress/openverse-infrastructure/issues/849
This issue tracks adding the orchestration steps for the distributed reindex to the new data refresh DAGs.
Description
In this step we will add tasks to the data refresh DAGs to orchestrate the distributed reindex. At the end of this step, it will be possible to run a distributed reindex locally, but because the infrastructure work to create the ASGs is not complete, it can not be run on production yet. The following code can all be refactored from distributed_reindex_scheduler.py.
- Use describe_auto_scaling_groups and filter by tags to select the appropriate ASG for the desired environment. (Skips in local env.)
- Use set_desired_capacity to increase the desired capacity of the ASG to the desired number of workers, depending on the environment. This will cause the ASG to begin spinning up instances. (Skips in local env.)
- Use describe_auto_scaling_groups to poll the ASG until all instances have been started, and get the EC2 instance IDs. (Skips in local env.)
- Use dynamic task mapping to distribute reindexing across the indexer workers by first calculating start and end indices that will split the records in the media table into even portions, depending on the number of workers available in the given environment. Then:
- POST to each worker’s
reindexing_task
endpoint thestart_index
andend_index
it should handle - Use a Sensor to ping the worker’s
task/{task_id}
endpoint until the task is complete, logging the progress as it goes
- POST to each worker’s
- Use terminate_instance_in_auto_scaling_group to terminate the instance. Make sure to set
ShouldDecrementDesiredCapacity
toTrue
to ensure that the ASG does not try to replace the instance. This task should use the NONE_SKIPPED TriggerRule to ensure that the instances are terminated, even if there are upstream failures. (Skips in local env.) - Finally, after all tasks have finished (regardless of success/failure), we should have a cleanup task that calls
set_desired_capacity
to 0. Generally this should be a no-op, but if an instance crashes during reindexing (rather than simply failing during reindexing) the ASG will spin up a replacement and Airflow will not automatically clean it up. This task ensures that any dangling instances are terminated.
Additional context
See this section of the IP.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Metadata
Assignees
Labels
Type
Projects
Status
✅ Done