Skip to content

ESQL: INLINESTATS implementation with multiple LogicalPlan updates #128917

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversation

astefan
Copy link
Contributor

@astefan astefan commented Jun 4, 2025

Part of #124715 and similar to #128476.
Different from #128476 in that it takes a "LogicalPlan" approach to running a sub-query, integrating its result back in the "main" LogicalPlan and continuing running the query.

@elasticsearchmachine
Copy link
Collaborator

Hi @astefan, I've created a changelog YAML for you.

| INLINESTATS min_scalerank=MIN(scalerank) BY type
| MV_EXPAND type
| WHERE scalerank == MV_MIN(scalerank);
| EVAL mvMin_scalerank = MV_MIN(scalerank)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know the original intention with this test, so I've updated it to make some kind of sense and to also keep what I thought to be its original purpose.

// as first columns in the output followed by whatever the right hand side of join adds in this order: aggregates first,
// followed by groupings (this order should be preserved inside the rightFields() output)
output = mergeOutputAttributes(right, leftOutputWithoutMatchFields);
List<Attribute> leftOutputWithoutKeys = left.stream().filter(attr -> config().leftFields().contains(attr) == false).toList();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is taken from @alex-spies great suggestion here

@@ -207,14 +206,10 @@ private PhysicalPlan mapBinary(BinaryPlan bp) {
throw new EsqlIllegalArgumentException("unsupported join type [" + config.type() + "]");
}

if (join instanceof InlineJoin) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This temporary section has been removed; it was introduced with the first PR about reviving inlinestats, and it was also questioned about its usefulness. After changing the approach to use LogicalPlan rebuild (instead of PhysicalPlans) this part was not needed anymore.

@astefan astefan requested review from alex-spies and costin June 12, 2025 12:44
@astefan astefan marked this pull request as ready for review June 12, 2025 12:45
@astefan astefan added >enhancement and removed >bug labels Jun 12, 2025
@elasticsearchmachine elasticsearchmachine added the Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) label Jun 12, 2025
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-analytical-engine (Team:Analytics)

@astefan astefan added the v9.0.3 label Jun 12, 2025
Copy link
Contributor

@bpintea bpintea left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work, LGTM.

My only significant note, not directly PR-specific, is having INLINESTATS hit the same source (i.e. with same filters, sorting, etc. that come before the first INLINESTATS) multiple times (i.e. for each INLINESTATS). It'd be great if we could save this computation and reuse it, that might become heavy otherwise if there's denser logic than just Lucene (like more LOOKUP JOINs, ENRICHes, and then more INLINESTATs). I guess we could look into this at some later point.

// execute main plan
runner.run(physicalPlan, listener);
}
}

private LogicalPlanTuple firstSubPlan(LogicalPlan optimizedPlan) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually maybe add a comment (or rename it) that it actually returns the completed / "destubbed" righthand side of the bottommost InlineJoin that's still stubbed -- the method is very specific to this command.

ActionListener<Result> listener
) {
PlanTuple tuple = subPlanIterator.next();
// Create a physical plan out of the logical sub-plan
var physicalSubPlan = logicalPlanToPhysicalPlan(subPlans.nonStubbedSubPlan, request);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Abs. optional, but these logs helped a bit go through the execution.

diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java
index d8118b0e0f4e..7fac395ddfb4 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java
@@ -285,6 +285,7 @@ public class EsqlSession {
         EsqlQueryRequest request,
         ActionListener<Result> listener
     ) {
+        LOGGER.debug("Executing subplan:\n{}", subPlans.nonStubbedSubPlan);
         // Create a physical plan out of the logical sub-plan
         var physicalSubPlan = logicalPlanToPhysicalPlan(subPlans.nonStubbedSubPlan, request);

@@ -300,10 +301,12 @@ public class EsqlSession {
                     ij -> ij.right() == subPlans.originalSubPlan ? InlineJoin.inlineData(ij, resultWrapper) : ij
                 );
                 newLogicalPlan.setOptimized();
+                LOGGER.debug("Plan after previous subplan execution:\n{}", newLogicalPlan);
                 // look for the next inlinejoin plan
                 var newSubPlan = firstSubPlan(newLogicalPlan);

                 if (newSubPlan == null) {// run the final "main" plan
+                    LOGGER.debug("Executing final plan:\n{}", newLogicalPlan);
                     var newPhysicalPlan = logicalPlanToPhysicalPlan(newLogicalPlan, request);
                     runner.run(newPhysicalPlan, next.delegateFailureAndWrap((finalListener, finalResult) -> {
                         completionInfoAccumulator.accumulate(finalResult.completionInfo());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added them

@@ -217,8 +217,8 @@ public void executeOptimizedPlan(
LogicalPlan optimizedPlan,
ActionListener<Result> listener
) {
PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request);
if (explainMode) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: would it be worth extracting this case into an own method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a TODO to revisit this. explain in EsqlSession needs a second look, some things do not feel like they belong here.

@@ -231,85 +231,88 @@ public void executeOptimizedPlan(
values.add(List.of("coordinator", "optimizedPhysicalPlan", physicalPlanString));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be nice to be able to provide multiple rows of optimized logical plans, as each InlineJoin righthand side would get executed, since the outcome is predictable (a LocalRelation), but I guess we can leave a comment as a TODO (or an issue) for later (when/if EXPLAIN stabilizes).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this PR wants to be a solid base for further parallel branches of work on inlinestats. Modelling the InlineJoin properly is one of those follow up things.

}

private record PlanTuple(PhysicalPlan physical, LogicalPlan logical) {}
private record LogicalPlanTuple(LogicalPlan nonStubbedSubPlan, LogicalPlan originalSubPlan) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A logistical suggestion would be to isolate all the InlineJoin logic into its own class (could be nested), as there are a few methods here (plus this record) that are dedicated just for this feature.

@@ -1001,3 +1108,171 @@ FROM hosts METADATA _index
description:text | host:keyword | ip0:ip | _index:keyword | x:ip | ip1:long | host_group:text | card:keyword
alpha db server |alpha |127.0.0.1 |hosts |127.0.0.1|1 |DB servers |eth0
;

doubleShadowing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd maybe add a similar test with multiple INLINESTATS and shaddowing with no groups and another one w/o shadowing, but still no groups.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "no groups" part is the problematic one now (as in what I am still working on as part of the same PR that's still evolving).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did give it a try and it worked fine. But haven't gone crazy with it, so more work is welcome, if needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the fix in the PruneColumns rule. It's about removing the inlinestats completely if its output is not needed. What happened in some queries was that the aggregation of the inlinestats was removed and the inlinejoin was left in a completely messed up state.

Copy link
Contributor

@alex-spies alex-spies left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Second round - focused on the execution in EsqlSession this time.

Looks like we're going back in the direction of the initial approach to inline stats, where each phase had a full optimizer run. That's the right approach IMO.

I want to have another look next week to better understand the changes to the optimizer rules. But please go ahead and merge whenever you're happy (I see Bogdan already 👍'd the PR); if I find something I find important, I'll leave a comment after the merge.

}

private record PlanTuple(PhysicalPlan physical, LogicalPlan logical) {}
private record LogicalPlanTuple(LogicalPlan nonStubbedSubPlan, LogicalPlan originalSubPlan) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: a comment explaining the purpose wouldn't hurt; it's not clear without reading more what the non stubbed plan is, and the name LogicalPlanTuple is also rather generic.

});
LogicalPlan newLogicalPlan = optimizedPlan.transformUp(
InlineJoin.class,
ij -> ij.right() == subPlans.originalSubPlan ? InlineJoin.inlineData(ij, resultWrapper) : ij
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! I think it's important we use object equality here - regular equality will not suffice because it ignores e.g. name ids.

Maybe worth a comment?

InlineJoin.class,
ij -> ij.right() == subPlans.originalSubPlan ? InlineJoin.inlineData(ij, resultWrapper) : ij
);
newLogicalPlan.setOptimized();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is surprising. Shouldn't we re-optimize the plan now that we were able to replace the stub with an actual result?

After the stub was replaced, we can actually do more stuff, like push down limits (which before that would be wrong as it would have affected the stats).

If I understand correctly, this is something we might want to improve later, right? If so, let's leave a comment; maybe a TODO to make the intention clear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, a TODO is right.

I had my doubts with this thing. I feel like there are things here that were left hanging (things do still seem they can be optimized further), but at this stage of the inlinestats progress, I think it's worth ignoring it. But TODO is needed, because it's something we need to think about a little.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I thought this was sufficient. The approach basically gradually executes righthand sides of (inline) joins, always resulting in a LocalRelation. Ending up with a join with whatever was before on the left and this local relation on the right. Not sure if we can further optimise this (and haven't done it before - the LIMIT makes it past the InlineJoin into the lefthand side already, since InlineJoin preserves the rows count).
But a TODO can re-eval things later, 👍 .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, we can further optimize it alright :)

The limit being pushed/copied down into the left hand side is a bug - that should only happen in subsequent passes. See my comment here.

Also, if the INLINESTATS has no BY clause, it can be turned into an EVAL with literals in subsequent phases, which can trigger more optimizations (like constant folding, filter pushdown, optimizing away checks against constant keyword fields). Some of this could probably be somehow hacked into the first optimization pass, but currently I think it's more natural (or at least easier) to just have another optimizer pass per query phase.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The limit being pushed/copied down into the left hand side is a bug - that should only happen in subsequent passes.

Actually, very true!

Also, if the INLINESTATS has no BY clause, it can be turned into an EVAL with literals in subsequent phases

I see. Yes, this could be done, but not with the current shape of the planning (i.e. rerunning the optimiser as it is now won't replan). But yes, you're right, this could still be evolved.

astefan added 4 commits June 27, 2025 19:21
Aggregation.
Move two methods from EsqlSession to InlineJoin.
Address reviews
Add more comments
…support_multi_inlinestats_logicalPlan_approach
…support_multi_inlinestats_logicalPlan_approach
Copy link
Contributor

@alex-spies alex-spies left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heya, all done now.

This is a nice advancement of INLINE STATS, let's go! I think there are a couple things to be looked at in follow ups, but this is fine to merge as-is IMO; we can continue iterating in follow-ups as the changes here do not seem to affect the code health of the non-INLINE STATS code paths.

Most notable follow-up items IMO:

  • Correctness of optimization rules w.r.t. stubs, esp. the limit pushdown.
  • More logical plan optimizer tests for PruneColumns and a good hard look at PruneColumns (maybe I'm misreading things though, see comments below)

@@ -60,7 +59,16 @@ public static LogicalPlan stubSource(UnaryPlan sourcePlan, LogicalPlan target) {
* Replaces the stubbed source with the actual source.
*/
public static LogicalPlan replaceStub(LogicalPlan source, LogicalPlan stubbed) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we add a note that this will replace all stubs with the new source? In case of a plan with 2 stubs (dual inlinestats in the query), this method should normally be avoided to avoid chaos.

@@ -63,14 +67,18 @@ public LogicalPlan apply(LogicalPlan plan) {
return p;
}

if (p instanceof InlineJoin ij) {
inlineJoinRightOutput.addAll(ij.right().outputSet());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to have a logical plan optimizer test that demonstrates the necessity for tracking the right outputs of inline stats.

Copy link
Contributor Author

@astefan astefan Jun 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this will be followups. Added TODO

@@ -111,7 +119,7 @@ public LogicalPlan apply(LogicalPlan plan) {
// Normally, pruning EsRelation has no effect because InsertFieldExtraction only extracts the required fields, anyway.
// However, InsertFieldExtraction can't be currently used in LOOKUP JOIN right index,
// it works differently as we extract all fields (other than the join key) that the EsRelation has.
var remaining = removeUnused(esr.output(), used);
var remaining = removeUnused(esr.output(), used, inlineJoinRightOutput);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is odd. The right outputs should be computed from STATS, so the only attribute that could also occur here is a looked up field attr that's later used in the inlinestats' BY clause.

But this should also be contained in the InlineJoin's .references, i.e. should end up in the used set.

@@ -97,7 +105,7 @@ public LogicalPlan apply(LogicalPlan plan) {
}
}
} else if (p instanceof Eval eval) {
var remaining = removeUnused(eval.fields(), used);
var remaining = removeUnused(eval.fields(), used, inlineJoinRightOutput);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some aggs leave downstream EVALs, like AVG. Does this prevent them from being pruned, even if they're not needed downstream at all?

E.g. in INLINESTATS x = avg(y), a = count_distinct(field) BY z | EVAL x = z I'd expect that making the inline join's right hand side "unpruneable" prevents this rule from correctly pruning unused columns.

// remember used
boolean recheck;
// analyze the unused items against dedicated 'producer' nodes such as Eval and Aggregate
// perform a loop to retry checking if the current node is completely eliminated
do {
recheck = false;
if (p instanceof Aggregate aggregate) {
var remaining = removeUnused(aggregate.aggregates(), used);
var remaining = removeUnused(aggregate.aggregates(), used, inlineJoinRightOutput);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to protect the computed aggs in an inlinestats, no matter what?

I don't think this is correct long term; unused aggs in inlinestats need to be pruned, too. If the problematic case is when the inline stats ends up with 0 aggs, then I think we need an extra case that prunes the whole inline join. As inline joins preserve row count, this should be correct.

Can we at least leave a TODO? I think we'll need to revisit this and add a bunch of tests to make sure all works as intended.

Comment on lines +115 to +116
// TODO: InlineStats - prune ONLY the unused output columns from it? In other words, don't perform more aggs
// if they will not be used anyway
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

@@ -104,6 +104,17 @@ public LogicalPlan apply(LogicalPlan plan) {
p = aggregate.with(aggregate.groupings(), remaining);
}
}
} else if (p instanceof InlineJoin ij) {// TODO: InlineStats - add unit tests for this IJ removal
var remaining = removeUnused(ij.right().output(), used, inlineJoinRightOutput);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't all of the attributes in ij.right().output() protected because they're contained in inlineJoinRightOutput?

@@ -39,15 +40,18 @@ public final class PruneColumns extends Rule<LogicalPlan, LogicalPlan> {
public LogicalPlan apply(LogicalPlan plan) {
// track used references
var used = plan.outputSet().asBuilder();
// track inlinestats' own aggregation output (right-hand side of the join) so that any other plan on the left-hand side of the
// inline join won't have its columns pruned due to the lack of "visibility" into the right hand side output/Attributes
var inlineJoinRightOutput = new ArrayList<Attribute>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused how this works in general. Expressions in the inline join's left side generally cannot depend on the stats output, resp. the inline join's right hand side. The dependency is converse - the IJ's right hand side depends on expressions in the left, because it will normally contain a stub.

Also, LOOKUP JOIN doesn't require handling the right hand side output attributes especially, which is suspicious.

Maybe I'm just not seeing it and need an example, though :)

The complication that I would expect: expressions on the left may get pruned because they don't appear in used. Without having tried, something like:

EVAL x = 2*y | INLINE STATS x = avg(x)

The first x is only required in the stats in the right hand side of the inline join, so we may optimize this away on accident if PruneColumns first descends into the left hand side and thus didn't yet add the x to the used attributes.

I didn't check if this actually happens, or if we maybe descend first into the right after all. Still, probably worth a test.

@astefan astefan requested a review from nik9000 June 30, 2025 16:53
@astefan
Copy link
Contributor Author

astefan commented Jun 30, 2025

Asking some feedback from @nik9000 as well, especially related to CopyingLocalSupplier and whatever I am doing in there related to Blocks :-). I didn't change the ImmediateLocalSupplier to always behave like CopyingLocalSupplier because I wanted this behavior only for row commands where we know that blocks will always be released on a single node (the coordinator) whereas a LocalRelation might exist also on data nodes. I wanted to separate these two use cases completely.

Copy link
Member

@nik9000 nik9000 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some opinions on the serialization change. Compared to the rest of the work they are super minor, but I think we should resolve my questions before merging.

if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOCAL_RELATION_WITH_NEW_BLOCKS)) {
this.supplier = in.readNamedWriteable(LocalSupplier.class);
} else {
this.supplier = LocalSupplier.readFrom((PlanStreamInput) in);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's important this method say that it's for the legacy serialization format somehow. Maybe just rename it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or, what about moving it to right here?


@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should probably be empty instead of compatible with the previous serialization.

if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOCAL_RELATION_WITH_NEW_BLOCKS)) {
out.writeNamedWriteable(supplier);
} else {
supplier.writeTo(out);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like it's subtle in a way that folks aren't likely to catch by reading it. I did because you told me to look here. But I don't trust myself to see it six months from now.

You made this work by forcing each variant to serialize in a way that's compatible with the original serialization. I think it'd be less surprising if instead you explicitly made an ImmediateSupplier or even just copied the old serialization code to here.

new PlanStreamOutput(output, null).writeNamedWriteable(instance);
} else {
instance.writeTo(new PlanStreamOutput(output, null));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be more traditional to make this abstract and make a subclass for each LocalSupplier subclass. This works as is though.

@astefan astefan removed the v9.0.4 label Jul 7, 2025
…support_multi_inlinestats_logicalPlan_approach
@astefan astefan removed the auto-backport Automatically create backport pull requests when merged label Jul 8, 2025
@astefan astefan changed the title ESQL: Adopt a "LogicalPlan" approach to running multiple sub-queries (with INLINESTATS so far) ESQL: INLINESTATS implementation with multiple LogicalPlan updates Jul 8, 2025
@astefan astefan added the auto-merge-without-approval Automatically merge pull request when CI checks pass (NB doesn't wait for reviews!) label Jul 8, 2025
@elasticsearchmachine elasticsearchmachine merged commit c9a4206 into elastic:main Jul 8, 2025
33 checks passed
@astefan astefan deleted the support_multi_inlinestats_logicalPlan_approach branch July 8, 2025 09:52
@nicpenning
Copy link

Would Inline stats provide the result that this issue presents?

#128612

julian-elastic pushed a commit to julian-elastic/elasticsearch that referenced this pull request Jul 8, 2025
…lastic#128917)

Part of elastic#124715 and
similar to elastic#128476.
Different from elastic#128476 in
that it takes a "LogicalPlan" approach to running a sub-query,
integrating its result back in the "main" LogicalPlan and continuing
running the query.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Analytics/ES|QL AKA ESQL auto-merge-without-approval Automatically merge pull request when CI checks pass (NB doesn't wait for reviews!) >enhancement Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v9.2.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants