[Design Proposal] Offline Background Tasks #13554
Labels
enhancement
Enhancement or improvement to existing feature or request
Storage
Issues and PRs relating to data and metadata storage
v3.0.0
Issues and PRs related to version 3.0.0
Background
Offline/Background Nodes(RFC) to offload background work provides the required segregation to allow core operations(ingestion/search) to run and scale predictably on the dedicated online fleet(data nodes) while Offline nodes provisioned takes care of crunching through all the background work.
The offline nodes behave much differently from the existing Nodes in the cluster(Data/ClusterManager). They don't host any shards, not eligible to be Cluster Manager and nor can they participate in its voting. They exists to just perform background work.
Challenges
Introduction of Offline Nodes will require us to tackle multiple problems. Here are two High Level Problems we need to solve:
Apart from that, we need to consider solutions for some low level problems which arise with Offline Nodes in place.
We will share a dedicated proposal for both these low level problems.
To keep this discussion manageable, we pertain ourselves to “Merge” Background Task to begin with. Later similar approaches can be followed for other Background Tasks.
Task Management
Requirements
Proposal
To ensure the above requirements are satisfied in a system like Opensearch, any background “work” submission should go through a Persistent Task Store which acts as Queue, allowing available “work“ers to perform work at their own pace without getting overwhelmed and allowing system to smoothly handle any spikes in amount of work to be completed.
Task Store/Queue can be implemented in multiple ways and different implementations might appeal to different set of users, each constituting of different set of complexities in terms of validations, security etc. A few examples of a Queue implementation might be:
PersistentTaskCustomMetadata
Allowing users to build their preferred implementation of Queue which is extensible enough to allowing them to seamlessly integrate and cater to their use is of paramount importance. Keeping this in mind, we provide Abstractions over Task Management which helps users leverage Offline Nodes without compromising on their internal intricacies.
Above Abstractions are provided as part of a new library in :libs (offline-tasks). Using the above abstractions, each user is free to plugin their own implementations of the Task Management.
A sample local implementation POC using PersistantTasksCustomMetadata is provided here. Similarly a more scalable implementation could be put in place by using a dedicated system index and building queuing logic around it.
Task Modelling
Requirements
Proposal
To run operations on Offline Nodes, we need to extract the operation out and model as a “Task”. This allows independent execution of these operations without the unnecessary dependencies/components they are entangled with today.
To achieve that, each operation, ex Merge, Snapshot, RemoteGC etc., would be extracted out as independent Task Unit in a separate Module/Plugin, preferably in dedicated Repository, to allow independent development, maintenance and release. Each Offline Node installs the configured Task Modules/Plugins to run the submitted Tasks.
There is an ongoing effort to break apart the Opensearch :server monolith, proposed in #5910, targeting to refactor the :server monolith and move out components to :libs :modules and :plugins. Since the vision with our proposal is to achieve a segregation b/w core and background operations, we can align with the ongoing refactoring effort and strive to refactor and move out all Merge Components to a separate Module or a Plugin.
Here is an example, using Segment Merges, how the above abstraction would work in an Opensearch Cluster with and without dedicated Offline Nodes.
NOTE: The actual interfaces of Merge may look different once #12726 completes, but the interaction and extension points for Task Management remain intact.
We can leverage an existing construct, MergeScheduler which handles all Merges triggered by the Index Engine. Once the Engine asks MergeScheduler to schedule a merge, the MergeScheduler injected by our Merge Module would decide whether to perform this merge locally on the data node, or to send to Offline Nodes via TaskClient .
This decision making in MergeScheduler.runLocally() can be tweaked dynamically via few cluster level dynamic settings we register as part of the Merge Module.
The MergeModule would be installed on both Data Node and Offline Node. On the Offline Node it registers the MergeTaskWorker to the BackgroundTaskService. BackgroundTaskService coordinates the execution of various Tasks which are to be executed by the current Node.
The Merge Task sent by the DataNode via TaskClient, eventually is assigned to one of the Offline Nodes for execution and sent to BackgroundTaskService on that particular node.
Architecture
The lifecycle of a Task since its conception to its completion can be divided into 4 main stages:
BackgroundTaskService
runs on each Offline Node, and periodically checks for available Tasks.TaskClient
allows Nodes to interact withTaskStore/Queue
to perform various Task Management Operations.Task Submission
- For Segment Merges,MergeScheduler
on the Data Node decides to offload Merge operation for a particular Shard to Offline Nodes. Same TaskClient is provided at Data Node to enqueue a new Task to our TaskQueue/Store for Offline Nodes to pick up and execute asynchronously. TaskClient hides all the implementation details related to TaskQueue/StoreOnce the Task is submitted successfully, BackgroundTaskService on Offline Node receive this new Task, eligible to be executed. Since multiple nodes could have received this Task for execution during polling, we need to ensure no 2 Nodes start to execute it parallelly, duplicating work. To overcome this, each Node which received the Task will try to ClaimTask and our Queue Implementation would ensure no 2 Nodes are able to Claim the same Task. Each implementation of Queue would handle this differently, for ex: Single Threaded Cluster Manager in PersistentTaskCustomMetadata, or conditional writes in Apache Cassandra etc.
TaskWorker
- After the Merge Task gets picked successfully by a Node, BackgroundTaskService find the registered TaskWorker for the particular TaskType . For Merge Task,MergeTaskExecutor
is invoked to perform the merge operation.Hearbeats
- During the Execution of Task, a periodic heartbeat is published to the TaskStore/Queue. This ensures the Node is still actively performing the Task and has not Abandoned it due to any Failure. If due to any Failure, the Node is not able to publish the heartbeat for X minutes, the Task is considered Abandoned and is eligible to be picked up by another Offline Node.Task Completion
- Once the Task completes, it updates the Task status in the TaskStore/Queue and Publishes a Task Completed Event. So that the Listeners at Data Node can know about completion of the Task. Since all this is asynchronous communication, due to transient failures, Completion Event might be lost and the submitter Data Node might never get to know about Task Completion. This happens today as well with any ClusterManager Event Listeners.Each flow needs to handle such failures by either checking the Task State via TaskClient or handle redundancies in TaskWorker implementation to avoid same Task to be executed again.
Other Considerations
Additional Cost
There is obviously an added cost, which would be directly dependent on the no of Offline Nodes provisioned(which would offset somewhat since resources are freed up on data nodes). Apart from that, with Offline Nodes, there would be 2 additional downloads. Consider Segment Merges:
Not all the users would want to spin up separate nodes for background operations, so however we choose to implement/execute this, we would ensure status quo is maintained.
Resource consumption and throttling
Each Offline Node would monitor its resource consumption to be informed about current load at the node and would pick Task only if there are enough Resources to execute the Task. If not, it waits for currently executing Task to complete and free up some resources.
Related component
Storage
The text was updated successfully, but these errors were encountered: