Skip to content

Implement local distributed reindexing #4148

Closed

Description

Problem

Blocked by #4146, #4147, https://github.com/WordPress/openverse-infrastructure/issues/849

This issue tracks adding the orchestration steps for the distributed reindex to the new data refresh DAGs.

Description

In this step we will add tasks to the data refresh DAGs to orchestrate the distributed reindex. At the end of this step, it will be possible to run a distributed reindex locally, but because the infrastructure work to create the ASGs is not complete, it can not be run on production yet. The following code can all be refactored from distributed_reindex_scheduler.py.

  • Use describe_auto_scaling_groups and filter by tags to select the appropriate ASG for the desired environment. (Skips in local env.)
  • Use set_desired_capacity to increase the desired capacity of the ASG to the desired number of workers, depending on the environment. This will cause the ASG to begin spinning up instances. (Skips in local env.)
  • Use describe_auto_scaling_groups to poll the ASG until all instances have been started, and get the EC2 instance IDs. (Skips in local env.)
  • Use dynamic task mapping to distribute reindexing across the indexer workers by first calculating start and end indices that will split the records in the media table into even portions, depending on the number of workers available in the given environment. Then:
    • POST to each worker’s reindexing_task endpoint the start_index and end_index it should handle
    • Use a Sensor to ping the worker’s task/{task_id} endpoint until the task is complete, logging the progress as it goes
  • Use terminate_instance_in_auto_scaling_group to terminate the instance. Make sure to set ShouldDecrementDesiredCapacity to True to ensure that the ASG does not try to replace the instance. This task should use the NONE_SKIPPED TriggerRule to ensure that the instances are terminated, even if there are upstream failures. (Skips in local env.)
  • Finally, after all tasks have finished (regardless of success/failure), we should have a cleanup task that calls set_desired_capacity to 0. Generally this should be a no-op, but if an instance crashes during reindexing (rather than simply failing during reindexing) the ASG will spin up a replacement and Airflow will not automatically clean it up. This task ensures that any dangling instances are terminated.

Additional context

See this section of the IP.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Assignees

Labels

✨ goal: improvementImprovement to an existing user-facing feature💻 aspect: codeConcerns the software code in the repository🟨 priority: mediumNot blocking but should be addressed soon🧱 stack: catalogRelated to the catalog and Airflow DAGs

Type

No type

Projects

  • Status

    ✅ Done

Relationships

None yet

Development

No branches or pull requests

Issue actions