Skip to content

ES|QL: Support STATS after FORK #128745

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jun 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import static org.elasticsearch.xpack.esql.CsvTestUtils.isEnabled;
import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.ENRICH_SOURCE_INDICES;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.classpathResources;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.FORK_V6;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.FORK_V7;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V2;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V7;
Expand Down Expand Up @@ -132,7 +132,7 @@ protected void shouldSkipTest(String testName) throws IOException {
assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP_V12.capabilityName()));
// Unmapped fields require a coorect capability response from every cluster, which isn't currently implemented.
assumeFalse("UNMAPPED FIELDS not yet supported in CCS", testCase.requiredCapabilities.contains(UNMAPPED_FIELDS.capabilityName()));
assumeFalse("FORK not yet supported in CCS", testCase.requiredCapabilities.contains(FORK_V6.capabilityName()));
assumeFalse("FORK not yet supported in CCS", testCase.requiredCapabilities.contains(FORK_V7.capabilityName()));
}

@Override
Expand Down
89 changes: 75 additions & 14 deletions x-pack/plugin/esql/qa/testFixtures/src/main/resources/fork.csv-spec
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//

simpleFork
required_capability: fork_v6
required_capability: fork_v7

FROM employees
| FORK ( WHERE emp_no == 10001 )
Expand All @@ -18,7 +18,7 @@ emp_no:integer | _fork:keyword
;

forkWithWhereSortAndLimit
required_capability: fork_v6
required_capability: fork_v7

FROM employees
| FORK ( WHERE hire_date < "1985-03-01T00:00:00Z" | SORT first_name | LIMIT 5 )
Expand All @@ -38,7 +38,7 @@ emp_no:integer | first_name:keyword | _fork:keyword
;

fiveFork
required_capability: fork_v6
required_capability: fork_v7

FROM employees
| FORK ( WHERE emp_no == 10005 )
Expand All @@ -59,7 +59,7 @@ fork5 | 10001
;

forkWithWhereSortDescAndLimit
required_capability: fork_v6
required_capability: fork_v7

FROM employees
| FORK ( WHERE hire_date < "1985-03-01T00:00:00Z" | SORT first_name DESC | LIMIT 2 )
Expand All @@ -76,7 +76,7 @@ fork2 | 10087 | Xinglin
;

forkWithCommonPrefilter
required_capability: fork_v6
required_capability: fork_v7

FROM employees
| WHERE emp_no > 10050
Expand All @@ -94,7 +94,7 @@ fork2 | 10100
;

forkWithSemanticSearchAndScore
required_capability: fork_v6
required_capability: fork_v7
required_capability: semantic_text_field_caps
required_capability: metadata_score

Expand All @@ -114,7 +114,7 @@ fork2 | 6.093784261960139E18 | 2 | all we have to decide is w
;

forkWithEvals
required_capability: fork_v6
required_capability: fork_v7

FROM employees
| FORK (WHERE emp_no == 10048 OR emp_no == 10081 | EVAL x = "abc" | EVAL y = 1)
Expand All @@ -131,7 +131,7 @@ fork2 | 10087 | def | null | 2
;

forkWithStats
required_capability: fork_v6
required_capability: fork_v7

FROM employees
| FORK (WHERE emp_no == 10048 OR emp_no == 10081)
Expand All @@ -152,7 +152,7 @@ fork4 | null | 100 | 10001 | null
;

forkWithDissect
required_capability: fork_v6
required_capability: fork_v7

FROM employees
| WHERE emp_no == 10048 OR emp_no == 10081
Expand All @@ -172,7 +172,7 @@ fork2 | 10081 | Rosen | 10081 | null | Zhongwei
;

forkWithMixOfCommands
required_capability: fork_v6
required_capability: fork_v7

FROM employees
| WHERE emp_no == 10048 OR emp_no == 10081
Expand All @@ -197,7 +197,7 @@ fork4 | 10081 | abc | aaa | null | null
;

forkWithFiltersOnConstantValues
required_capability: fork_v6
required_capability: fork_v7

FROM employees
| EVAL z = 1
Expand All @@ -218,7 +218,7 @@ fork3 | null | 100 | 10100 | 10001
;

forkWithUnsupportedAttributes
required_capability: fork_v6
required_capability: fork_v7

FROM heights
| FORK (SORT description DESC | LIMIT 1 | EVAL x = length(description) )
Expand All @@ -232,7 +232,7 @@ Medium Height | null | null | fork2
;

forkAfterLookupJoin
required_capability: fork_v6
required_capability: fork_v7

FROM employees
| EVAL language_code = languages
Expand All @@ -253,7 +253,7 @@ fork3 | 10081 | 2 | Klingon
;

forkBeforeLookupJoin
required_capability: fork_v6
required_capability: fork_v7

FROM employees
| EVAL language_code = languages
Expand All @@ -272,3 +272,64 @@ fork2 | 10081 | 2 | French
fork2 | 10087 | 5 | null
fork3 | 10081 | 2 | French
;


forkBeforeStats
required_capability: fork_v7

FROM employees
| WHERE emp_no == 10048 OR emp_no == 10081
| FORK ( EVAL a = CONCAT(first_name, " ", emp_no::keyword, " ", last_name)
| DISSECT a "%{x} %{y} %{z}"
| EVAL y = y::keyword )
( STATS x = COUNT(*)::keyword, y = MAX(emp_no)::keyword, z = MIN(emp_no)::keyword )
( SORT emp_no ASC | LIMIT 2 | EVAL x = last_name )
( EVAL x = "abc" | EVAL y = "aaa" )
| STATS c = count(*), m = max(_fork)
;

c:long | m:keyword
7 | fork4
;

forkBeforeStatsWithWhere
required_capability: fork_v7

FROM employees
| WHERE emp_no == 10048 OR emp_no == 10081
| FORK ( EVAL a = CONCAT(first_name, " ", emp_no::keyword, " ", last_name)
| DISSECT a "%{x} %{y} %{z}"
| EVAL y = y::keyword )
( STATS x = COUNT(*)::keyword, y = MAX(emp_no)::keyword, z = MIN(emp_no)::keyword )
( SORT emp_no ASC | LIMIT 2 | EVAL x = last_name )
( EVAL x = "abc" | EVAL y = "aaa" )
| STATS a = count(*) WHERE _fork == "fork1",
b = max(_fork)
;

a:long | b:keyword
2 | fork4
;

forkBeforeStatsByWithWhere
required_capability: fork_v7

FROM employees
| WHERE emp_no == 10048 OR emp_no == 10081
| FORK ( EVAL a = CONCAT(first_name, " ", emp_no::keyword, " ", last_name)
| DISSECT a "%{x} %{y} %{z}"
| EVAL y = y::keyword )
( STATS x = COUNT(*)::keyword, y = MAX(emp_no)::keyword, z = MIN(emp_no)::keyword )
( SORT emp_no ASC | LIMIT 2 | EVAL x = last_name )
( EVAL x = "abc" | EVAL y = "aaa" )
| STATS a = count(*) WHERE emp_no > 10000,
b = max(x) WHERE _fork == "fork1" BY _fork
| SORT _fork
;

a:long | b:keyword | _fork:keyword
2 | Zhongwei | fork1
0 | null | fork2
2 | null | fork3
2 | null | fork4
;
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,39 @@ public void testWithStatsSimple() {
}
}

public void testWithStatsAfterFork() {
var query = """
FROM test
| FORK ( WHERE content:"fox" | EVAL a = 1)
( WHERE content:"cat" | EVAL b = 2 )
( WHERE content:"dog" | EVAL c = 3 )
| STATS c = count(*)
""";
try (var resp = run(query)) {
assertColumnNames(resp.columns(), List.of("c"));
assertColumnTypes(resp.columns(), List.of("long"));
Iterable<Iterable<Object>> expectedValues = List.of(List.of(7L));
assertValues(resp.values(), expectedValues);
}
}

public void testWithStatsWithWhereAfterFork() {
var query = """
FROM test
| FORK ( WHERE content:"fox" | EVAL a = 1)
( WHERE content:"cat" | EVAL b = 2 )
( WHERE content:"dog" | EVAL c = 3 )
| STATS c = count(*) WHERE _fork == "fork1"
""";
try (var resp = run(query)) {
assertColumnNames(resp.columns(), List.of("c"));
assertColumnTypes(resp.columns(), List.of("long"));

Iterable<Iterable<Object>> expectedValues = List.of(List.of(2L));
assertValues(resp.values(), expectedValues);
}
}

public void testWithConditionOnForkField() {
var query = """
FROM test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1035,7 +1035,7 @@ public enum Cap {
/**
* Support streaming of sub plan results
*/
FORK_V6(Build.current().isSnapshot()),
FORK_V7(Build.current().isSnapshot()),

/**
* Support for the {@code leading_zeros} named parameter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ public final void test() throws Throwable {
);
assumeFalse(
"CSV tests cannot currently handle FORK",
testCase.requiredCapabilities.contains(EsqlCapabilities.Cap.FORK_V6.capabilityName())
testCase.requiredCapabilities.contains(EsqlCapabilities.Cap.FORK_V7.capabilityName())
);
assumeFalse(
"CSV tests cannot currently handle multi_match function that depends on Lucene",
Expand Down