Skip to content

[RFC] Search pipelines #80

Closed
Closed
@msfroh

Description

Search pipelines

This RFC is intended to replace #12.

Overview

We are proposing a set of new APIs to manage composable processors to transform search requests and search responses in OpenSearch. Expected transformers include (but are not limited to):

  • Reranking final search results using an external ranking service (which would be impractical to apply when collecting results per-shard).
  • Modifying a search request by calling a query understanding service.
  • Excluding search results based on some externally-configured filtering logic (without needing to modify and deploy changes to the search application).

The new APIs will aim to mirror the ingest APIs, which are responsible for transforming documents before they are indexed, to ensure that all documents going into the index are processed in a consistent way. The ingest API makes use of pipelines of processors. We will do the same, but for search.

Argument over alternatives

Everyone should just implement logic in their calling application

The most obvious counterargument to this proposal is “this logic belongs in the search application that calls OpenSearch”. That is a valid approach and this proposal does not prevent any developer from transforming search requests and responses in their application.

We believe that providing an API within OpenSearch will make it easier for developers to build and share components that perform common transformations, reducing duplicated effort in the calling search applications.

Put this logic in a library that people can use from their calling applications

In theory, we could provide a common “toolbox” of request and response processors as a library that application developers could use. That would mean building libraries for a specific languages/runtimes. By including search processors in OpenSearch itself, any calling application (regardless of implementation) can benefit. In particular, it is possible to modify query processing behavior without modifying the application (by specifying a default search pipeline for the target index(es)).

Write search plugins

Search plugins can significantly impact how search requests are processed, both on the coordinator node and on individual shards. Each processor we can think of could be implemented as a search plugin that runs on the coordinator node. The challenges with that approach are a) writing a whole search plugin complete with parameter parsing is pretty complicated, b) the order in which search plugins run is not immediately obvious to a user, and c) without some overarching framework providing guidelines, every search plugin may have its own style of taking parameters (especially with regards to default behavior).

Similarities with ingest pipelines

A built-in orchestrator can call out to processors defined in plugins

Ingest pipelines have a core orchestrator responsible for calling out to each ingest processor in the pipeline, but the processors themselves may be defined in separate ingest plugins. These plugins can implement specific transformations without needing to consider the broader pipeline execution. Similarly, search pipelines will run from the OpenSearch core, but may call out to named search processors registered via plugins.

Processed on entry (or exit)

Just as ingest pipelines operate before documents get routed to shards, the search pipelines operate “on top of” the index when processing a search request. That is, a SearchRequest gets transformed on the coordinator node before being sent to individual shards, and the SearchResponse gets transformed on the coordinator node after being aggregated from the shard responses.

Processing that happens on each shard is out of scope for this proposal. The SearchPlugin API remains the appropriate extension point for per-shard processing.

Pipelines are named entities stored in the cluster

To use an ingest pipeline, you will generally PUT to create or update the pipeline definition using a REST API. The body of that request defines the pipeline with a description and a list of ingest processors. We will provide a similar API to define named search pipelines built from search processors.

Can be referenced per-request or per-index

When using the index document API or the bulk API, you can include a request parameter like ?pipeline=my-pipeline to indicate that the given request should be processed by a specific pipeline. Similarly, we will add a pipeline parameter to the search API and the multi-search API.

Generally, we want to apply the same pipeline to every document being added to an index. To simplify that, the index API has a setting index.default_pipeline, that designates a pipeline to use if none is specified in an index document or bulk request. Similarly, we will add a setting, index.default_search_pipeline, to apply a pipeline by default for all search or multi-search requests against the given index.

Differences from ingest pipelines

Processing different things in different places

While an ingest processor only ever operates on a document, potentially modifying it, a search processor may operate on a search request, a search response, or both. We also assume that processing a search response requires information from the search request.

To support these different cases, we will provide different interfaces for search request processors, search response processors, and request + response (“bracket”) processors. The search pipeline definition will have separate sections for request and response processors. (Bracket processors must be specified in the request processor list, but may be referenced by ID in the response processor list to explicitly order them relative to response processors.)

The name “bracket processor” is chosen to indicate that they process things on the way in and on the way out, and must be balanced like brackets or parentheses. That is, given two bracket processors B1 and B2, we require that if B1 processes a search request before B2, then B1 processes the search response after B2.

Pipelines can be specified inline “for real”

The ingest API includes “_simulate” endpoint that you can use to preview the behavior of a named pipeline or a pipeline definition included in the request body (before creating a named pipeline). This makes sense, since we wouldn’t want to pollute the index with documents processed with a half-baked, untested pipeline.

Since search requests are read-only, we don’t need a separate API to test an ad hoc search pipeline definition. Instead, we will allow anonymous search pipelines to be defined inline as part of any search or multi-search request. In practice, we don’t expect this approach to be common in production scenarios, but it’s useful for ad hoc testing when creating / modifying a search pipeline.

API definition

Java search processor interfaces

package org.opensearch.search.pipeline;

// Copied from [org.opensearch.ingest.Processor](https://github.com/opensearch-project/OpenSearch/blob/main/server/src/main/java/org/opensearch/ingest/Processor.java)

interface Processor {
  /**
   * Gets the type of a processor
   */
  String getType();

  /**
   * Gets the tag of a processor.
   */
  String getTag();

  /**
   * Gets the description of a processor.
   */
  String getDescription();
}

/**
 * Processor that (potentially) modifies SearchRequests.
 */
interface RequestProcessor extends Processor {
  SearchRequest execute(SearchRequest request);
}

/**
 * Processor that (potentially) modifies SearchResponses. Behavior may be
 * influenced by parameters from the SearchRequest.
 */
interface ResponseProcessor extends Processor {
  SearchResponse execute(SearchRequest request, SearchResponse response);
}

/**
 * Processor that may modify the request, response, both, or neither.
 */
interface BracketProcessor extends RequestProcessor, ResponseProcessor {

  /**
   * May be specified in the request pipeline and referenced in the response
   * pipeline to determine the order of response processing. 
   */
  String getId();
}

REST APIs

Search pipeline CRUD

// Create/update a search pipeline.
PUT /_search_processing/pipeline/my_pipeline
{
  "description": "A pipeline to apply custom synonyms, result post-filtering, an ML ranking model",
  "request_processors" : [
    {
      "external_synonyms" : {
        "service_url" : "https://my-synonym-service/"
      }
    },
    {
      "ml_ranker_bracket" : {
        "result_oversampling" : 2, // Request 2 * size results.
        "model_id" : "doc-features-20230109",
        "id" : "ml_ranker_identifier"
      }
    }
  ],
  "response_processors" : [
    {
      "result_blocker" : {
        "service_url" : "https://result-blocklist-service/"
      },
      "ml_ranker_bracket" : {
        // Placed here to indicate that it should run after result_blocker.
        // If not part of response_processors, it will run before result_blocker.
        "id" : "ml_ranker_identifier" 
      }
    }
  ]
}

// Return identifiers for all search pipelines.
GET /_search_processing/pipeline

// Return a single search pipeline definition.
GET /_search_processing/pipeline/my_pipeline

// Delete a search pipeline.
DELETE /_search_processing/pipeline/my_pipeline

Search API changes

// Apply a search pipeline to a search request.
POST /my-index/_search?pipeline=my_pipeline
{
  "query" : {
    "match" : {
      "text_field" : "some search text"
    }
  }
}

// Specify an ad hoc search pipeline as part of a search request.
POST /my-index/_search

{
  "query" : {
    "match" : {
      "text_field" : "some search text"
    }
  },
  "pipeline" : {
    "request_processors" : [
      {
        "external_synonyms" : {
          "service_url" : "https://my-synonym-service/"
        }
      },
      {
        "ml_ranker_bracket" : {
          "result_oversampling" : 2, // Request 2 * size results
          "model_id" : "doc-features-20230109",
          "id" : "ml_ranker_identifier"
        }
      }
    ],
    "response_processors" : [
      {
        "result_blocker" : {
          "service_url" : "https://result-blocklist-service/"
        },
        "ml_ranker_bracket" : {
          // Placed here to indicate that it should run after result_blocker.
          // If not part of response_processors, it will run before result_blocker.
          "id" : "ml_ranker_identifier" 
        }
      }
    ]
  }
}

Index settings

// Set default search pipeline for an existing index.
PUT /my-index/_settings
{
  "index" : {
    "default_search_pipeline" : "my_pipeline"
  }
}

// Remove default search pipeline for an index.
PUT /my-index/_settings
{
  "index" : {
    "default_search_pipeline" : "_none"
  }
}

// Create a new index with a default search pipeline.
PUT my-index
{
  "mappings" : {
    // ...index mappings...
  },
  "settings" : {
    "index" : {
      "default_search_pipeline" : "my_pipeline",
      // ... other settings ...
    }
  }
}

Proposed integrations

Kendra ranking

Our first implementation (already in the search-processor repository) provides connectivity to the Amazon Kendra Intelligent Ranking service. This will need to be reworked to match the BracketProcessor interface, because it modifies the SearchRequest as well as the SearchResponse. The processor modifies the SearchRequest to a) request the top 25 search hits (if start is less than 25), and b) request document source (to ensure that the body and title fields for reranking are available). The top 25 results in the SearchResponse are preprocessed (to extract text passages) and sent to the Amazon Kendra Intelligent Ranking service, which returns a (potentially) reordered list of document IDs, which is used to rerank the top 25 results. The originally-requested range of results (by start and size) is returned.

Metarank

To provide search results that learn from user interaction, we could implement a ResponseProcessor that calls out with Metarank.

Note that we would need to make sure that the SearchRequest API has the ability (via the ext property?) to carry additional metadata about the request, like user and session identifiers.

Querqy

Search pipelines could be a convenient interface to integrate with Querqy.

Individual Querqy rewriters could be wrapped in adapters that implement the RequestProcessor interface and added to a search pipeline.

Script processor

Ingest pipelines support processing documents with scripts. We could provide a similar capability to allow users to modify their search request or response with a Painless or Mustache script.

Block expensive query types

About 10 years ago, I worked on a search hosting service (based on Apache Solr) where we added a SearchComponent to our SearchHandler that would reject potentially expensive queries (e.g. leading wildcards, regex) by default. We would lift the restrictions by request and only after discussing the risks (and usually we could explain why another option would be better). A similar RequestProcessor that’s installed as part of a default search pipeline for an index could save an OpenSearch admin from impact from users accidentally sending expensive queries.

Proposed roadmap

Initial release (“soon”)

Based on feedback to this RFC, we intend to refactor the search-processor plugin to be similar to the APIs described above (with the assumption that there will be some changes required when imagination collides with reality). We should (hopefully?) be able to do this in time for the 2.6 release.

At this point, the REST APIs would still be considered “experimental” and we may break backwards compatibility (though we would like to avoid that if possible). The Java APIs may still be subject to change.

We would include additional processors in this repository.

Move to core

After getting some feedback from users of the plugin, we will move the pipeline execution logic into OpenSearch core, with individual processor implementations either in a “common” module (similar to ingest-common) or in separate plugins. Ideally, the OpenSearch SDK for Java will make it possible to implement search processors as extensions.

Search configurations

We’re thinking about using search pipelines as an initial model of “search configurations”, where the pipeline definition captures enough information about how a search request is processed from end-to-end to provide a reproducible configuration.

We can make it easier for application builders to run experiments, both offline and online, by running queries through one pipeline or another. For A/B testing, you could define a default search pipeline that randomly selects a search pipelines to process a request and then link user behavior to the pipeline used.

More complicated search processing

Just as ingest pipelines support conditional execution of processors and nested pipelines, we could add similar capabilities to search processors to effectively turn the pipeline into a directed acyclic graph. If that becomes the norm, we would likely want a visualization tool to view and edit a search pipeline (since nested JSON would be hard for a human to understand).

In the “middle” of the graph, there’s a component to call into OpenSearch to turn a SearchRequest into a SearchResponse. What if we want to use something other than OpenSearch, though? For example, we could precompute result sets for known one-word queries offline and do a lookup to return those results online.

Task list

Metadata

Assignees

No one assigned

    Labels

    SearchrfcSubstantial changes or new features that require community input to garner consensus.

    Type

    No type

    Projects

    • Status

      ✅ Done

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions