Skip to content

[RFC] Parallel & Batch Ingestion #12457

@chishui

Description

@chishui

Is your feature request related to a problem? Please describe

Problem Statements

Today, users can utilize bulk API to ingest multiple documents in a single request. All documents from this request are handled by one ingest node and on this node, if there's any ingest pipeline configured, documents are processed by pipeline one at a time in a sequential order (ref). The ingest pipeline is constituted by a collection of processors and processor is the computing unit of a pipeline. Most of the processors are pretty light weighted such as append, uppercase, lowercase, and to process multiple documents one after another or to process them in parallel would make no observable difference. But for time-consuming processors such as neural search processors, which by their nature, require more time to compute, being able to run them in parallel could save user some valuable ingest time. Apart from ingestion time, processors like neural search, can benefit from processing batch documents together as it can reduce the requests to remote ML services via batch APIs to maximally avoid hitting rate limit restriction. (Feature request: opensearch-project/ml-commons#1840, rate limit example from OpenAI: https://platform.openai.com/docs/guides/rate-limits)

Due to the lack of parallel ingestion and batch ingestion capabilities in ingest flow, we propose below solution to address them.

Describe the solution you'd like

Proposed Features

1. Batch Ingestion

An ingest pipeline is constructed by a list of processors and a single document could flow through each processor one by one before it can be stored into index. Currently, both pipeline and processor can only handle one document each time and even if with bulk API, documents are iterated and handled in sequential order. As shown in figure 1, to ingest doc1, it would firstly flow through ingest pipeline 1, then through pipeline 2. Then, the next document would go through both pipeline.

ingest-Page-1

To support batch processing of documents, we'll add a batchExecute API in ingest pipeline and processors which take multiple documents as input parameters. We will provide a default implementation in Processor interface to iteratively call existingexecute API to process document one by one so that most of the processors don't need to make change and only if there's necessity for them to batch process documents (e.g. text embedding processor), they can have their own implementation, otherwise, even receiving documents altogether, they default to process them one by one.

To batch process documents, user need to use bulk API. We'll add two optional parameters for bulk API for user to enable batch feature and set batch size. Based on maximum_batch_size value, documents are split into batches.

Since in bulk API, different documents could be ingested to different indexes, indexes could use the same pipelines but in different order, e.g. index “movies” uses pipeline P1 as default pipeline, P2 as final pipeline; index “musics” uses P2 as default pipeline and P1 as final pipeline. To avoid over-complexity of handling cross indexes batching (topology sorting), we would batch documents in index level.

2. Parallel Ingestion

Apart from batch ingestion, we also propose to have parallel ingestion to accompany with batch ingestion to boost the ingestion performance. When user enables parallel ingestion, based on batch size, documents from bulk API will be split into batches, then, batches are processed in parallel with threads managed by thread pool. Although limiting the maximum concurrency of parallel ingestion, thread pool can help us protect host resources to not be exhausted by batch ingestion threads.

ingest-Page-2

Ingest flow logic change

Current logic of the ingestion flow of documents can be shown from the pseudo code below:

for (document in documents) {  
    for (pipeline in pipelines) {  
        for (processor in pipeline.processors) {  
            document = processor.execute(document)  
        }  
    }  
}

We'll change the flow to logic shown below if the pipeline has enable the batch option.

if (enabledBatch) {
    batches = calculateBatches(documents);
    for (batch in batches) {
        for (pipeline in pipelines) {  
            for (processor in pipeline.processors) {  
                documents = processor.batchExecute(documents)  
            }  
        }
    }
} else if (enabledParallelBatch) {
    batches = calculateBatches(documents);
    for (batch in batches) {
        threadpool.execute(()-> {
            for (pipeline in pipelines) {  
                for (processor in pipeline.processors) {  
                    documents = processor.batchExecute(documents)  
                }  
            }
        });
    }
} else {
    // fallback to exsiting ingestion logic
}

Update to Bulk API

We propose new parameters to bulk API, all of them are optional.

Parameter Type Description
batch_ingestion_option String Configure whether to enable batch ingestion. It has three options: none, enable and parallel. By default, it's none. When set it to enable, batch ingestion is enabled, and batches are processed in sequential order. When set it to parallel, batch ingestion is enabled and batches are processes in parallel.
maximum_batch_size Integer The batched document size. Only work when batch ingestion option is set to enable or parallel. It's 1 by default.

3. Split and Redistribute Bulk API

Users tend to use bulk API to ingest many documents which can be very time consuming sometimes. In order to achieve lower ingestion time, they have to use multiple clients to make multiple bulk requests with smaller document size so that the requests can be distributed to different ingest nodes. To offload the burden from user side, we can support the split and redistribute work from server side and help distribute the ingest load more evenly.
Note: although brought up here, we think it's better to discuss this topic in a separate RFC doc which will be published later.

Related component

Indexing:Performance

Describe alternatives you've considered

No response

Additional context

No response

Metadata

Metadata

Assignees

Labels

RFCIssues requesting major changesRoadmap:Cost/Performance/ScaleProject-wide roadmap labelenhancementEnhancement or improvement to existing feature or requestingest-pipelinev2.15.0Issues and PRs related to version 2.15.0

Type

No type

Projects

Status

New

Status

3.0.0 (TBD)

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions