Skip to content

[ES|QL] COMPLETION command - Inference Operator implementation #127409

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 66 commits into from
Jun 5, 2025

Conversation

afoucret
Copy link
Contributor

@afoucret afoucret commented Apr 25, 2025

Description

This PR implements the inference operator for the completion command

Changes included:

  • Completion CSV tests: Added tests for completion inference on CSV datasets, and updated the inference test service to support completion use cases
  • New InferenceOperator: Introduced a shared operator to centralize and reuse logic across all inference operations (e.g., rerank, completion)
  • Throttled execution via BulkInferenceExecutor: Inference batches are now executed using the BulkInferenceExecutor, allowing for controlled concurrency and improved robustness.
  • RerankOperator refactor:
    • Migrated to use the new InferenceOperator
    • Fixed a flaky circuit breaker test
    • Improved memory efficiency by switching request handling to use an iterator
    • Better parallelism: big pages are now sliced in several parallel requests

Note: The completion command is currently available only in snapshot builds.

Related issue: elastic/elasticsearch#124405

@afoucret afoucret force-pushed the esql-completion-inference-operator branch from 54e4c85 to 42e14c1 Compare May 7, 2025 12:35
@afoucret afoucret force-pushed the esql-completion-inference-operator branch from 5f9829e to e0a14ae Compare May 21, 2025 08:31
afoucret and others added 25 commits May 22, 2025 17:25
* Specialize block parameters on AddInput

(cherry picked from commit a5855c1)

* Call the specific add() methods for eacj block type

(cherry picked from commit 5176663)

* Implement custom add in HashAggregationOperator

(cherry picked from commit fb670bd)

* Migrated everything to the new add() calls

* Update docs/changelog/127582.yaml

* Spotless format

* Remove unused ClassName for IntVectorBlock

* Fixed tests

* Randomize groupIds block types to check most AddInput cases

* Minor fix and added some docs

* Renamed BlockHashWrapper
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-analytical-engine (Team:Analytics)

@@ -300,9 +300,6 @@ tests:
- class: org.elasticsearch.search.basic.SearchWithRandomDisconnectsIT
method: testSearchWithRandomDisconnects
issue: https://github.com/elastic/elasticsearch/issues/122707
- class: org.elasticsearch.xpack.esql.inference.RerankOperatorTests
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ Re-enable RerankOperator flaky tests because they are fixed right now.

@@ -254,7 +247,7 @@ protected boolean supportsInferenceTestService() {
}

protected boolean requiresInferenceEndpoint() {
return Stream.of(SEMANTIC_TEXT_FIELD_CAPS.capabilityName(), RERANK.capabilityName())
return Stream.of(SEMANTIC_TEXT_FIELD_CAPS.capabilityName(), RERANK.capabilityName(), COMPLETION.capabilityName())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ Can not test completion in multi_cluster cause the inference test plugin is not available.

@@ -617,7 +617,7 @@ private LogicalPlan resolveCompletion(Completion p, List<Attribute> childrenOutp
Expression prompt = p.prompt();

if (targetField instanceof UnresolvedAttribute ua) {
targetField = new ReferenceAttribute(ua.source(), ua.name(), TEXT);
targetField = new ReferenceAttribute(ua.source(), ua.name(), KEYWORD);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ keyword is the recommended ES|QL type for non-analyzed text.

* 2.0.
*/

package org.elasticsearch.xpack.esql.inference;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ Moved to the org.elasticsearch.xpack.esql.inference.rerank package

Copy link
Member

@kderusso kderusso left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work! 👏 Changes LGTM but will defer to others to accept.

public static final int DEFAULT_MAX_OUTSTANDING_REQUESTS = 50;

public static final BulkInferenceExecutionConfig DEFAULT = new BulkInferenceExecutionConfig(
DEFAULT_WORKERS,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these be configurable at some point? (Maybe not in scope of this PR)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something for later (anticipated through this config class)

private final int scoreChannel;

// Batch size used to group rows into a single inference request (currently fixed)
// TODO: make it configurable either in the command or as query pragmas
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// TODO: make it configurable either in the command or as query pragmas
// TODO: make it configurable either in the command or as query params

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the QueryPragmas class to figure out why I chose this weird terminology 😆

@afoucret afoucret requested a review from dnhatn May 30, 2025 19:27
@afoucret afoucret force-pushed the esql-completion-inference-operator branch from 0e009c7 to 0a6ef3f Compare June 2, 2025 07:38
Copy link
Member

@carlosdelest carlosdelest left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall this LGTM, although I have some questions:

  • I wonder about the need of using LocalCheckpointTracker as the basis for inference result processing. I understand the benefits in terms of comparing with the latest processed number and then buffering responses - but the seq_no underlying abstraction threw me off for a while.
  • What are the drivers for the decisions on the number of workers / max outstanding requests?
  • Do you think the threadpool should be a ML based one instead of using an ESQL worker?

It's a 2k LOC change 😓 . I've done my best but I'm sure I won't be covering everything I should.

@afoucret
Copy link
Contributor Author

afoucret commented Jun 2, 2025

Hey @carlosdelest,

Few answers:

I wonder about the need of using LocalCheckpointTracker as the basis for inference result processing. I understand the benefits in terms of comparing with the latest processed number and then buffering responses - but the seq_no underlying abstraction threw me off for a while.

LocalCheckpointTracker allow to receive to reorder the inference responses that can be received out of order and to persist them. This is a tool provided by the ES framework and I did not want to reinvent such a component with all the complexity it involves (thread safety, ...)

What are the drivers for the decisions on the number of workers / max outstanding requests?

It is a mix of several aspects but mostly chosen to work on a small allocation without an error.

Do you think the threadpool should be a ML based one instead of using an ESQL worker?

I do not think so. The inference tasks are run using a the inference threadpool (through the client call) but the operator coordination related tasks and data handling should stay in the ES|QL threadpool.

required_capability: completion

ROW prompt="Who is Victor Hugo?"
| COMPLETION prompt WITH test_completion AS completion_output
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if prompt is a multi valued field? can COMPLETION handle it? can we get a test for this use case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the prompt is multi-valued, the PromptReader is joining the different values using a \n.

So the multi-value input: ["Translate this movie description in French", movie_description]

Will be translated into the following prompt:

Translate this movie description in French:
Long time ago....

I built this as a quite good alternative to concat in some case.

Also, I added a CSV test cases for it.

@@ -461,6 +475,17 @@ public static boolean clusterHasRerankInferenceEndpoint(RestClient client) throw
return true;
}

private static void deleteInferenceEndpoint(RestClient client, String inferenceId) throws IOException {
try {
client.performRequest(new Request("DELETE", "_inference/" + inferenceId));
Copy link
Contributor

@ioanatia ioanatia Jun 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we give the right path here? is it supposed to be DELETE _inference/rerank/test_reranker? do we need to pass the task type to the deleteInferenceEndpoint method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, both endpoints are valid and can be used indifferently.


package org.elasticsearch.xpack.esql.inference.bulk;

public record BulkInferenceExecutionConfig(int workers, int maxOutstandingRequests) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't plan to make these configurable in the near future - do we ever use anything else than the DEFAULT?
I am not a fan of adding these types of record classes that are only used to store defaults that are not configurable.
To me this just adds a cognitive load for anyone looking into rerank/completion.
I'd rather have the DEFAULT_WORKERS and DEFAULT_MAX_OUTSTANDING_REQUESTS in the base operator that uses them, instead of carrying around these record objects.

Maybe we anticipate these will be configurable, but until then this serves no purpose. I am a huge believer that we shouldn't add abstractions/constructions like these until they are actually used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’d prefer to keep the config centralized in one place rather than spreading it across multiple classes. It makes things easier to manage and reason about for me.

It also helps with testing flexibility. In BulkInferenceExecutorTests, where I vary config values (e.g. number of outstanding requests) to test that it works with a wide range of config. This was really helpful to make sure that the component was working at different scale and would be more cumbersome if the config were just a constant in the class.

Maybe I’m anticipating a bit, but even if we don’t plan to make these settings configurable for end users, having the config passed a parameter of the InferenceOperator could make our life easier if we later realize we need different configs for RERANK and COMPLETION.

So unless it is very important for you to change it, I would definitely prefer to keep this class here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's okay - we don't need to block merging this PR because of this one

package org.elasticsearch.xpack.esql.inference.bulk;

public record BulkInferenceExecutionConfig(int workers, int maxOutstandingRequests) {
public static final int DEFAULT_WORKERS = 10;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how did we arrive to this default value? I see that we are using the same threadPool for InferenceRunner in TransportEsqlQueryAction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my response to Carlos here

public class RerankOperator extends InferenceOperator {

// Default number of rows to include per inference request
private static final int DEFAULT_BATCH_SIZE = 20;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we know if the rerank API has any limitations when sending large docs?
we might want to control the batch size not just by the number of rows, but on the total input size.
okay with me to follow up on this separately

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, this is something to be discussed with ML folks.

If possible, I would prefer to keep a batch_size with a number of row and let the inference team handle large docs on their side. There is also the extract_snippet function that should be used with large docs.

I will log an issue to determine what our strategy should be for large documents.

@afoucret afoucret requested a review from ioanatia June 3, 2025 16:19
Copy link
Contributor

@ioanatia ioanatia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚀 🚀 🚀
good work!

@afoucret afoucret merged commit 993090d into elastic:main Jun 5, 2025
18 checks passed
@afoucret afoucret deleted the esql-completion-inference-operator branch June 5, 2025 06:45
afoucret added a commit to afoucret/elasticsearch that referenced this pull request Jun 5, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Analytics/ES|QL AKA ESQL >non-issue Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v9.1.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants