Skip to content

Fix field resolution for FORK #128193

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -571,22 +571,45 @@ private void resolveInferences(
}

static PreAnalysisResult fieldNames(LogicalPlan parsed, Set<String> enrichPolicyMatchFields, PreAnalysisResult result) {
if (false == parsed.anyMatch(plan -> plan instanceof Aggregate || plan instanceof Project)) {
if (false == parsed.anyMatch(EsqlSession::shouldCollectReferencedFields)) {
// no explicit columns selection, for example "from employees"
return result.withFieldNames(IndexResolver.ALL_FIELDS);
}

if (parsed.anyMatch(plan -> plan instanceof Fork)) {
return result.withFieldNames(IndexResolver.ALL_FIELDS);
}

Holder<Boolean> projectAll = new Holder<>(false);
parsed.forEachExpressionDown(UnresolvedStar.class, us -> {// explicit "*" fields selection
if (projectAll.get()) {
return;
}
projectAll.set(true);
});

if (projectAll.get()) {
return result.withFieldNames(IndexResolver.ALL_FIELDS);
}

Holder<Boolean> projectAfterFork = new Holder<>(false);
Holder<Boolean> hasFork = new Holder<>(false);

parsed.forEachDown(plan -> {
if (projectAll.get()) {
return;
}

if (hasFork.get() == false && shouldCollectReferencedFields(plan)) {
projectAfterFork.set(true);
}

if (plan instanceof Fork fork && projectAfterFork.get() == false) {
hasFork.set(true);
fork.children().forEach(child -> {
if (child.anyMatch(EsqlSession::shouldCollectReferencedFields) == false) {
projectAll.set(true);
}
});
}
});

if (projectAll.get()) {
return result.withFieldNames(IndexResolver.ALL_FIELDS);
}
Expand Down Expand Up @@ -703,6 +726,13 @@ static PreAnalysisResult fieldNames(LogicalPlan parsed, Set<String> enrichPolicy
}
}

/**
* Indicates whether the given plan gives an exact list of fields that we need to collect from field_caps.
*/
private static boolean shouldCollectReferencedFields(LogicalPlan plan) {
return plan instanceof Aggregate || plan instanceof Project;
}

/**
* Could a plan "accidentally" override aliases?
* Examples are JOIN and ENRICH, that _could_ produce fields with the same
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1796,6 +1796,121 @@ public void testDropWildcardFields_WithLookupJoin() {
);
}

public void testForkFieldsWithKeepAfterFork() {
assumeTrue("FORK available as snapshot only", EsqlCapabilities.Cap.FORK.isEnabled());

assertFieldNames("""
FROM test
| WHERE a > 2000
| EVAL b = a + 100
| FORK (WHERE c > 1 AND a < 10000 | EVAL d = a + 500)
(WHERE d > 1000 AND e == "aaa" | EVAL c = a + 200)
| WHERE x > y
| KEEP a, b, c, d, x
""", Set.of("a", "a.*", "c", "c.*", "d", "d.*", "e", "e.*", "x", "x.*", "y", "y.*"));
}

public void testForkFieldsWithKeepBeforeFork() {
assumeTrue("FORK available as snapshot only", EsqlCapabilities.Cap.FORK.isEnabled());

assertFieldNames("""
FROM test
| KEEP a, b, c, d, x
| WHERE a > 2000
| EVAL b = a + 100
| FORK (WHERE c > 1 AND a < 10000 | EVAL d = a + 500)
(WHERE d > 1000 AND e == "aaa" | EVAL c = a + 200)
| WHERE x > y
""", Set.of("a", "a.*", "b", "b.*", "c", "c.*", "d", "d.*", "e", "e.*", "x", "x.*", "y", "y.*"));
}

public void testForkFieldsWithNoProjection() {
assumeTrue("FORK available as snapshot only", EsqlCapabilities.Cap.FORK.isEnabled());

assertFieldNames("""
FROM test
| WHERE a > 2000
| EVAL b = a + 100
| FORK (WHERE c > 1 AND a < 10000 | EVAL d = a + 500)
(WHERE d > 1000 AND e == "aaa" | EVAL c = a + 200)
| WHERE x > y
""", ALL_FIELDS);
}

public void testForkFieldsWithStatsInOneBranch() {
assumeTrue("FORK available as snapshot only", EsqlCapabilities.Cap.FORK.isEnabled());

assertFieldNames("""
FROM test
| WHERE a > 2000
| EVAL b = a + 100
| FORK (WHERE c > 1 AND a < 10000 | EVAL d = a + 500)
(STATS x = count(*), y=min(z))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm - now that I am looking at this more - I think we should return all fields, because one branch is not bounded by any command like KEEP or STATS that resets the output to a known list of fields. will fix 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is one more query where I am confused, in part because of probably not knowing what is the expectation for fork.

FROM employees | FORK ( WHERE true | stats min(salary) by gender) ( WHERE true | LIMIT 3 )

This shows these columns:

min(salary) | gender | _fork | salary

Should these be a "union" kind of set of columns and fieldNames is only limiting it to what it "sees" in the fork's first branch? If that's true, this means that fieldNames should consider a union kind of field names from all the branches of fork. As a shortcut, the first branch that it finds that's the "widest" it should stop checking the rest.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The FORK output should be a union of the outputs of the fork branches.

If we detect that the same field exists in multiple fork branches outputs, but the field has different type, we fail with a validation error.

FORK will pad with null columns the output of FORK branches that are missing fields.
For example:

ROW a = [1, 2, 3], b = "foo"
| MV_EXPAND a
| FORK (WHERE true | LIMIT 2)
        (STATS x = count(*))
        (WHERE a % 2 == 0 | EVAL y = 2)
| SORT _fork, a
| KEEP _fork, a, b, x, y

Will produce:

     _fork     |       a       |       b       |       x       |       y       
---------------+---------------+---------------+---------------+---------------
fork1          |1              |foo            |null           |null           
fork1          |2              |foo            |null           |null           
fork2          |null           |null           |3              |null           
fork3          |2              |foo            |null           |2          

I think fixed the field resolution for this case.
In this test example here we should ask for all fields, since not all branches are bounded by an Aggregate or Project.

| WHERE x > y
""", ALL_FIELDS);
}

public void testForkFieldsWithEnrichAndLookupJoins() {
assumeTrue("FORK available as snapshot only", EsqlCapabilities.Cap.FORK.isEnabled());
assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V12.isEnabled());

assertFieldNames(
"""
FROM test
| KEEP a, b, abc, def, z, xyz
| ENRICH enrich_policy ON abc
| EVAL b = a + 100
| LOOKUP JOIN my_lookup_index ON def
| FORK (WHERE c > 1 AND a < 10000 | EVAL d = a + 500)
(STATS x = count(*), y=min(z))
| LOOKUP JOIN my_lookup_index ON xyz
| WHERE x > y OR _fork == "fork1"
""",
Set.of(
"x",
"y",
"_fork",
"a",
"c",
"abc",
"b",
"def",
"z",
"xyz",
"def.*",
"_fork.*",
"y.*",
"x.*",
"xyz.*",
"z.*",
"abc.*",
"a.*",
"c.*",
"b.*"
)
);
}

public void testForkWithStatsInAllBranches() {
assumeTrue("FORK available as snapshot only", EsqlCapabilities.Cap.FORK.isEnabled());

assertFieldNames("""
FROM test
| WHERE a > 2000
| EVAL b = a + 100
| FORK (WHERE c > 1 AND a < 10000 | STATS m = count(*))
(EVAL z = a * b | STATS m = max(z))
(STATS x = count(*), y=min(z))
| WHERE x > y
""", Set.of("a", "a.*", "b", "b.*", "c", "c.*", "z", "z.*"));
}

public void testForkWithStatsAndWhere() {
assumeTrue("FORK available as snapshot only", EsqlCapabilities.Cap.FORK.isEnabled());

assertFieldNames(" FROM employees | FORK ( WHERE true | stats min(salary) by gender) ( WHERE true | LIMIT 3 )", ALL_FIELDS);
}

private Set<String> fieldNames(String query, Set<String> enrichPolicyMatchFields) {
var preAnalysisResult = new EsqlSession.PreAnalysisResult(null);
return EsqlSession.fieldNames(parser.createStatement(query), enrichPolicyMatchFields, preAnalysisResult).fieldNames();
Expand Down