Skip to content

[ES|QL] COMPLETION command - Inference Operator implementation #127409

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 66 commits into from
Jun 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
b85e704
InferenceOperator refactoring.
afoucret Apr 11, 2025
e6ac175
CompletionOperator skeleton.
afoucret Apr 11, 2025
7fe8adc
CSV tests inference refactoring
afoucret Apr 11, 2025
757fbe4
Draft CompletionOperator.
afoucret Apr 23, 2025
39ad919
Draft CompletionOperator.
afoucret Apr 23, 2025
6f5a8b3
Move inference result type check to the InferenceOperator
afoucret Apr 23, 2025
bbd1f69
Refactored inference operator.
afoucret Apr 24, 2025
71ad3b8
Restore removed code.
afoucret Apr 24, 2025
33a289d
Refactored bulk inference execution.
afoucret Apr 25, 2025
d5797d3
Rollback muted tests changes.
afoucret Apr 25, 2025
62f39eb
Fix some tests for the rerank operator.
afoucret Apr 29, 2025
2229c94
Bulk inference refactoring.
afoucret Apr 30, 2025
3a59b96
[CI] Auto commit changes from spotless
elasticsearchmachine Apr 25, 2025
7423cfc
BulkInferenceExecutor unit tests.
afoucret May 5, 2025
7acd6dd
Fix a memory leak in the InferenceOperator
afoucret May 6, 2025
1c1c003
Lint.
afoucret May 6, 2025
505dbdc
Finished refactoring the inference operator implementation.
afoucret May 7, 2025
815d479
[CI] Auto commit changes from spotless
elasticsearchmachine May 7, 2025
1657d2c
Fixing some tests.
afoucret May 7, 2025
3d48b05
Improving BulkInferenceExecutorTests
afoucret May 7, 2025
06b99ed
Another refactoring
afoucret May 7, 2025
a5ea05a
ESQL: Specialize aggs AddInput for each block type (#127582)
ivancea May 7, 2025
66563b6
One more refactoring.
afoucret May 7, 2025
5384db0
Code simplification.
afoucret May 9, 2025
04f0d36
Lint
afoucret May 9, 2025
e0020e2
Revert useless changes.
afoucret May 9, 2025
9e57399
Fix error in completion tests
afoucret May 21, 2025
65da3ef
Better handling of release in the InferenceOperator
afoucret May 21, 2025
9f88520
Improve code reliability
afoucret May 21, 2025
cb20ce7
Fix circuit breaker tests.
afoucret May 21, 2025
bf73b40
Lint
afoucret May 21, 2025
1698f5f
Remove useless change.
afoucret May 21, 2025
d107767
Fix circuit breaker errors.
afoucret May 22, 2025
e77a8bb
Move input block building outside the async portion.
afoucret May 22, 2025
58aa070
Code simplification.
afoucret May 22, 2025
1c858c6
Change completion type to keyword instead of text
afoucret May 22, 2025
c37d3dc
Create a new service to test completion
afoucret May 22, 2025
b9c22bd
Init CSV test for the completion command
afoucret May 22, 2025
89cb900
First version of the CSV tests.
afoucret May 22, 2025
190f7d7
Fix a test error.
afoucret May 22, 2025
72541c0
Remove useless SuppressWarnings
afoucret May 22, 2025
1d486b6
[CI] Auto commit changes from spotless
elasticsearchmachine May 22, 2025
88a63a8
Code improvement.
afoucret May 22, 2025
c70b0a1
[CI] Auto commit changes from spotless
elasticsearchmachine May 22, 2025
1d6b589
Add completion_test_service to InferenceGetServicesIT
afoucret May 23, 2025
3d0819d
Ensure CSV tests are not run im multi_cluster environment
afoucret May 23, 2025
115ee49
Minor code improvement.
afoucret May 23, 2025
1e95722
Fix inference throttling.
afoucret May 23, 2025
340c189
Update throttling mechanism
afoucret May 27, 2025
3a8422d
Use a bounded size queue for inference tasks.
afoucret May 28, 2025
d3f7a0e
Fix out of memory error in the CompletionOperatorTests
afoucret May 28, 2025
d3a47a2
Adding more tests for RerankOperatorOutputBuilder and RerankOperatorR…
afoucret May 30, 2025
e64b81c
Do not use a fixed size queue anymore (avoid the thread to be blocked).
afoucret May 30, 2025
1901bde
Add comments.
afoucret May 30, 2025
12ab742
Restore fixed size queue
afoucret May 30, 2025
d99971f
Reduce input size in BulkInferenceExecutorTests
afoucret May 30, 2025
7463dad
Improved input management.
afoucret May 30, 2025
63f47a8
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
afoucret May 30, 2025
20773db
Moving tests to the right place.
afoucret May 30, 2025
2387d14
Lint.
afoucret May 30, 2025
035ae04
Ensure InferenceOperatorTestCase execute InferenceOperation out of or…
afoucret May 30, 2025
0a6ef3f
Minor improvements
afoucret Jun 2, 2025
cc7d8fa
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
afoucret Jun 2, 2025
04191cb
Adding a test case for multivalued prompt.
afoucret Jun 3, 2025
7582956
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
afoucret Jun 3, 2025
2f751cc
Merge branch 'main' into esql-completion-inference-operator
afoucret Jun 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,6 @@ tests:
- class: org.elasticsearch.search.basic.SearchWithRandomDisconnectsIT
method: testSearchWithRandomDisconnects
issue: https://github.com/elastic/elasticsearch/issues/122707
- class: org.elasticsearch.xpack.esql.inference.RerankOperatorTests
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ Re-enable RerankOperator flaky tests because they are fixed right now.

method: testSimpleCircuitBreaking
issue: https://github.com/elastic/elasticsearch/issues/124337
- class: org.elasticsearch.index.engine.ThreadPoolMergeSchedulerTests
method: testSchedulerCloseWaitsForRunningMerge
issue: https://github.com/elastic/elasticsearch/issues/125236
Expand Down Expand Up @@ -384,9 +381,6 @@ tests:
- class: org.elasticsearch.packaging.test.DockerTests
method: test024InstallPluginFromArchiveUsingConfigFile
issue: https://github.com/elastic/elasticsearch/issues/126936
- class: org.elasticsearch.xpack.esql.qa.multi_node.EsqlSpecIT
method: test {rerank.Reranker before a limit ASYNC}
issue: https://github.com/elastic/elasticsearch/issues/127051
- class: org.elasticsearch.packaging.test.DockerTests
method: test026InstallBundledRepositoryPlugins
issue: https://github.com/elastic/elasticsearch/issues/127081
Expand All @@ -399,9 +393,6 @@ tests:
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
method: test {p0=ml/data_frame_analytics_cat_apis/Test cat data frame analytics all jobs with header}
issue: https://github.com/elastic/elasticsearch/issues/127625
- class: org.elasticsearch.xpack.esql.qa.multi_node.EsqlSpecIT
method: test {rerank.Reranker using another sort order ASYNC}
issue: https://github.com/elastic/elasticsearch/issues/127638
- class: org.elasticsearch.xpack.search.CrossClusterAsyncSearchIT
method: testCancellationViaTimeoutWithAllowPartialResultsSetToFalse
issue: https://github.com/elastic/elasticsearch/issues/127096
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,11 @@
import static org.elasticsearch.xpack.esql.CsvTestUtils.isEnabled;
import static org.elasticsearch.xpack.esql.CsvTestUtils.loadCsvSpecValues;
import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.availableDatasetsForEs;
import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.clusterHasInferenceEndpoint;
import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.clusterHasRerankInferenceEndpoint;
import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.createInferenceEndpoint;
import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.createRerankInferenceEndpoint;
import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.deleteInferenceEndpoint;
import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.deleteRerankInferenceEndpoint;
import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.createInferenceEndpoints;
import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.deleteInferenceEndpoints;
import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.loadDataSetIntoEs;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.classpathResources;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.COMPLETION;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.METRICS_COMMAND;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.RERANK;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.SEMANTIC_TEXT_FIELD_CAPS;
Expand Down Expand Up @@ -138,12 +135,8 @@ protected EsqlSpecTestCase(

@Before
public void setup() throws IOException {
if (supportsInferenceTestService() && clusterHasInferenceEndpoint(client()) == false) {
createInferenceEndpoint(client());
}

if (supportsInferenceTestService() && clusterHasRerankInferenceEndpoint(client()) == false) {
createRerankInferenceEndpoint(client());
if (supportsInferenceTestService()) {
createInferenceEndpoints(adminClient());
}

boolean supportsLookup = supportsIndexModeLookup();
Expand All @@ -164,8 +157,8 @@ public static void wipeTestData() throws IOException {
}
}

deleteInferenceEndpoint(client());
deleteRerankInferenceEndpoint(client());
deleteInferenceEndpoints(adminClient());

}

public boolean logResults() {
Expand Down Expand Up @@ -254,7 +247,7 @@ protected boolean supportsInferenceTestService() {
}

protected boolean requiresInferenceEndpoint() {
return Stream.of(SEMANTIC_TEXT_FIELD_CAPS.capabilityName(), RERANK.capabilityName())
return Stream.of(SEMANTIC_TEXT_FIELD_CAPS.capabilityName(), RERANK.capabilityName(), COMPLETION.capabilityName())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ Can not test completion in multi_cluster cause the inference test plugin is not available.

.anyMatch(testCase.requiredCapabilities::contains);
}

Expand Down Expand Up @@ -372,6 +365,11 @@ private Object valueMapper(CsvTestUtils.Type type, Object value) {
return new BigDecimal(s).round(new MathContext(7, RoundingMode.DOWN)).doubleValue();
}
}
if (type == CsvTestUtils.Type.TEXT || type == CsvTestUtils.Type.KEYWORD || type == CsvTestUtils.Type.SEMANTIC_TEXT) {
if (value instanceof String s) {
value = s.replaceAll("\\\\n", "\n");
}
}
return value.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.inference.TaskType;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.test.rest.ESRestTestCase;
Expand Down Expand Up @@ -317,7 +318,7 @@ public static Set<TestDataset> availableDatasetsForEs(
boolean supportsIndexModeLookup,
boolean supportsSourceFieldMapping
) throws IOException {
boolean inferenceEnabled = clusterHasInferenceEndpoint(client);
boolean inferenceEnabled = clusterHasSparseEmbeddingInferenceEndpoint(client);

Set<TestDataset> testDataSets = new HashSet<>();

Expand Down Expand Up @@ -379,77 +380,90 @@ private static void loadDataSetIntoEs(
}
}

public static void createInferenceEndpoints(RestClient client) throws IOException {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ Added new inference endpoint "test_completion" available in CSV tests

if (clusterHasSparseEmbeddingInferenceEndpoint(client) == false) {
createSparseEmbeddingInferenceEndpoint(client);
}

if (clusterHasRerankInferenceEndpoint(client) == false) {
createRerankInferenceEndpoint(client);
}

if (clusterHasCompletionInferenceEndpoint(client) == false) {
createCompletionInferenceEndpoint(client);
}
}

public static void deleteInferenceEndpoints(RestClient client) throws IOException {
deleteSparseEmbeddingInferenceEndpoint(client);
deleteRerankInferenceEndpoint(client);
deleteCompletionInferenceEndpoint(client);
}

/** The semantic_text mapping type require an inference endpoint that needs to be setup before creating the index. */
public static void createInferenceEndpoint(RestClient client) throws IOException {
Request request = new Request("PUT", "_inference/sparse_embedding/test_sparse_inference");
request.setJsonEntity("""
public static void createSparseEmbeddingInferenceEndpoint(RestClient client) throws IOException {
createInferenceEndpoint(client, TaskType.SPARSE_EMBEDDING, "test_sparse_inference", """
{
"service": "test_service",
"service_settings": {
"model": "my_model",
"api_key": "abc64"
},
"task_settings": {
}
"service_settings": { "model": "my_model", "api_key": "abc64" },
"task_settings": { }
}
""");
client.performRequest(request);
}

public static void deleteInferenceEndpoint(RestClient client) throws IOException {
try {
client.performRequest(new Request("DELETE", "_inference/test_sparse_inference"));
} catch (ResponseException e) {
// 404 here means the endpoint was not created
if (e.getResponse().getStatusLine().getStatusCode() != 404) {
throw e;
}
}
public static void deleteSparseEmbeddingInferenceEndpoint(RestClient client) throws IOException {
deleteInferenceEndpoint(client, "test_sparse_inference");
}

public static boolean clusterHasInferenceEndpoint(RestClient client) throws IOException {
Request request = new Request("GET", "_inference/sparse_embedding/test_sparse_inference");
try {
client.performRequest(request);
} catch (ResponseException e) {
if (e.getResponse().getStatusLine().getStatusCode() == 404) {
return false;
}
throw e;
}
return true;
public static boolean clusterHasSparseEmbeddingInferenceEndpoint(RestClient client) throws IOException {
return clusterHasInferenceEndpoint(client, TaskType.SPARSE_EMBEDDING, "test_sparse_inference");
}

public static void createRerankInferenceEndpoint(RestClient client) throws IOException {
Request request = new Request("PUT", "_inference/rerank/test_reranker");
request.setJsonEntity("""
createInferenceEndpoint(client, TaskType.RERANK, "test_reranker", """
{
"service": "test_reranking_service",
"service_settings": {
"model_id": "my_model",
"api_key": "abc64"
},
"task_settings": {
"use_text_length": true
}
"service_settings": { "model_id": "my_model", "api_key": "abc64" },
"task_settings": { "use_text_length": true }
}
""");
client.performRequest(request);
}

public static void deleteRerankInferenceEndpoint(RestClient client) throws IOException {
try {
client.performRequest(new Request("DELETE", "_inference/rerank/test_reranker"));
} catch (ResponseException e) {
// 404 here means the endpoint was not created
if (e.getResponse().getStatusLine().getStatusCode() != 404) {
throw e;
}
}
deleteInferenceEndpoint(client, "test_reranker");
}

public static boolean clusterHasRerankInferenceEndpoint(RestClient client) throws IOException {
Request request = new Request("GET", "_inference/rerank/test_reranker");
return clusterHasInferenceEndpoint(client, TaskType.RERANK, "test_reranker");
}

public static void createCompletionInferenceEndpoint(RestClient client) throws IOException {
createInferenceEndpoint(client, TaskType.COMPLETION, "test_completion", """
{
"service": "completion_test_service",
"service_settings": { "model": "my_model", "api_key": "abc64" },
"task_settings": { "temperature": 3 }
}
""");
}

public static void deleteCompletionInferenceEndpoint(RestClient client) throws IOException {
deleteInferenceEndpoint(client, "test_completion");
}

public static boolean clusterHasCompletionInferenceEndpoint(RestClient client) throws IOException {
return clusterHasInferenceEndpoint(client, TaskType.COMPLETION, "test_completion");
}

private static void createInferenceEndpoint(RestClient client, TaskType taskType, String inferenceId, String modelSettings)
throws IOException {
Request request = new Request("PUT", "_inference/" + taskType.name() + "/" + inferenceId);
request.setJsonEntity(modelSettings);
client.performRequest(request);
}

private static boolean clusterHasInferenceEndpoint(RestClient client, TaskType taskType, String inferenceId) throws IOException {
Request request = new Request("GET", "_inference/" + taskType.name() + "/" + inferenceId);
try {
client.performRequest(request);
} catch (ResponseException e) {
Expand All @@ -461,6 +475,17 @@ public static boolean clusterHasRerankInferenceEndpoint(RestClient client) throw
return true;
}

private static void deleteInferenceEndpoint(RestClient client, String inferenceId) throws IOException {
try {
client.performRequest(new Request("DELETE", "_inference/" + inferenceId));
Copy link
Contributor

@ioanatia ioanatia Jun 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we give the right path here? is it supposed to be DELETE _inference/rerank/test_reranker? do we need to pass the task type to the deleteInferenceEndpoint method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, both endpoints are valid and can be used indifferently.

} catch (ResponseException e) {
// 404 here means the endpoint was not created
if (e.getResponse().getStatusLine().getStatusCode() != 404) {
throw e;
}
}
}

private static void loadEnrichPolicy(RestClient client, String policyName, String policyFileName, Logger logger) throws IOException {
URL policyMapping = getResource("/" + policyFileName);
String entity = readTextFile(policyMapping);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Note:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ Added CSV tests for the completion command

// The "test_completion" service returns the prompt in uppercase, making the output easy to guess.


completion using a ROW source operator
required_capability: completion

ROW prompt="Who is Victor Hugo?"
| COMPLETION prompt WITH test_completion AS completion_output
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if prompt is a multi valued field? can COMPLETION handle it? can we get a test for this use case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the prompt is multi-valued, the PromptReader is joining the different values using a \n.

So the multi-value input: ["Translate this movie description in French", movie_description]

Will be translated into the following prompt:

Translate this movie description in French:
Long time ago....

I built this as a quite good alternative to concat in some case.

Also, I added a CSV test cases for it.

;

prompt:keyword | completion_output:keyword
Who is Victor Hugo? | WHO IS VICTOR HUGO?
;


completion using a ROW source operator and prompt is a multi-valued field
required_capability: completion

ROW prompt=["Answer the following question:", "Who is Victor Hugo?"]
| COMPLETION prompt WITH test_completion AS completion_output
;

prompt:keyword | completion_output:keyword
[Answer the following question:, Who is Victor Hugo?] | ANSWER THE FOLLOWING QUESTION:\nWHO IS VICTOR HUGO?
;


completion after a search
required_capability: completion
required_capability: match_operator_colon

FROM books METADATA _score
| WHERE title:"war and peace" AND author:"Tolstoy"
| SORT _score DESC
| LIMIT 2
| COMPLETION title WITH test_completion
| KEEP title, completion
;

title:text | completion:keyword
War and Peace | WAR AND PEACE
War and Peace (Signet Classics) | WAR AND PEACE (SIGNET CLASSICS)
;

completion using a function as a prompt
required_capability: completion
required_capability: match_operator_colon

FROM books METADATA _score
| WHERE title:"war and peace" AND author:"Tolstoy"
| SORT _score DESC
| LIMIT 2
| COMPLETION CONCAT("This is a prompt: ", title) WITH test_completion
| KEEP title, completion
;

title:text | completion:keyword
War and Peace | THIS IS A PROMPT: WAR AND PEACE
War and Peace (Signet Classics) | THIS IS A PROMPT: WAR AND PEACE (SIGNET CLASSICS)
;
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ private LogicalPlan resolveCompletion(Completion p, List<Attribute> childrenOutp
Expression prompt = p.prompt();

if (targetField instanceof UnresolvedAttribute ua) {
targetField = new ReferenceAttribute(ua.source(), ua.name(), TEXT);
targetField = new ReferenceAttribute(ua.source(), ua.name(), KEYWORD);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ keyword is the recommended ES|QL type for non-analyzed text.

}

if (prompt.resolved() == false) {
Expand Down
Loading