-
Notifications
You must be signed in to change notification settings - Fork 25.3k
ESQL: INLINESTATS implementation with multiple LogicalPlan updates #128917
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ESQL: INLINESTATS implementation with multiple LogicalPlan updates #128917
Conversation
coordinated from the EsqlSession.
Hi @astefan, I've created a changelog YAML for you. |
the inlinestats JOIN
…support_multi_inlinestats_logicalPlan_approach
…support_multi_inlinestats_logicalPlan_approach
| INLINESTATS min_scalerank=MIN(scalerank) BY type | ||
| MV_EXPAND type | ||
| WHERE scalerank == MV_MIN(scalerank); | ||
| EVAL mvMin_scalerank = MV_MIN(scalerank) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't know the original intention with this test, so I've updated it to make some kind of sense and to also keep what I thought to be its original purpose.
// as first columns in the output followed by whatever the right hand side of join adds in this order: aggregates first, | ||
// followed by groupings (this order should be preserved inside the rightFields() output) | ||
output = mergeOutputAttributes(right, leftOutputWithoutMatchFields); | ||
List<Attribute> leftOutputWithoutKeys = left.stream().filter(attr -> config().leftFields().contains(attr) == false).toList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is taken from @alex-spies great suggestion here
@@ -207,14 +206,10 @@ private PhysicalPlan mapBinary(BinaryPlan bp) { | |||
throw new EsqlIllegalArgumentException("unsupported join type [" + config.type() + "]"); | |||
} | |||
|
|||
if (join instanceof InlineJoin) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This temporary section has been removed; it was introduced with the first PR about reviving inlinestats, and it was also questioned about its usefulness. After changing the approach to use LogicalPlan rebuild (instead of PhysicalPlans) this part was not needed anymore.
…support_multi_inlinestats_logicalPlan_approach
Pinging @elastic/es-analytical-engine (Team:Analytics) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work, LGTM.
My only significant note, not directly PR-specific, is having INLINESTATS hit the same source (i.e. with same filters, sorting, etc. that come before the first INLINESTATS) multiple times (i.e. for each INLINESTATS). It'd be great if we could save this computation and reuse it, that might become heavy otherwise if there's denser logic than just Lucene (like more LOOKUP JOINs, ENRICHes, and then more INLINESTATs). I guess we could look into this at some later point.
// execute main plan | ||
runner.run(physicalPlan, listener); | ||
} | ||
} | ||
|
||
private LogicalPlanTuple firstSubPlan(LogicalPlan optimizedPlan) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually maybe add a comment (or rename it) that it actually returns the completed / "destubbed" righthand side of the bottommost InlineJoin
that's still stubbed -- the method is very specific to this command.
ActionListener<Result> listener | ||
) { | ||
PlanTuple tuple = subPlanIterator.next(); | ||
// Create a physical plan out of the logical sub-plan | ||
var physicalSubPlan = logicalPlanToPhysicalPlan(subPlans.nonStubbedSubPlan, request); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Abs. optional, but these logs helped a bit go through the execution.
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java
index d8118b0e0f4e..7fac395ddfb4 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java
@@ -285,6 +285,7 @@ public class EsqlSession {
EsqlQueryRequest request,
ActionListener<Result> listener
) {
+ LOGGER.debug("Executing subplan:\n{}", subPlans.nonStubbedSubPlan);
// Create a physical plan out of the logical sub-plan
var physicalSubPlan = logicalPlanToPhysicalPlan(subPlans.nonStubbedSubPlan, request);
@@ -300,10 +301,12 @@ public class EsqlSession {
ij -> ij.right() == subPlans.originalSubPlan ? InlineJoin.inlineData(ij, resultWrapper) : ij
);
newLogicalPlan.setOptimized();
+ LOGGER.debug("Plan after previous subplan execution:\n{}", newLogicalPlan);
// look for the next inlinejoin plan
var newSubPlan = firstSubPlan(newLogicalPlan);
if (newSubPlan == null) {// run the final "main" plan
+ LOGGER.debug("Executing final plan:\n{}", newLogicalPlan);
var newPhysicalPlan = logicalPlanToPhysicalPlan(newLogicalPlan, request);
runner.run(newPhysicalPlan, next.delegateFailureAndWrap((finalListener, finalResult) -> {
completionInfoAccumulator.accumulate(finalResult.completionInfo());
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added them
@@ -217,8 +217,8 @@ public void executeOptimizedPlan( | |||
LogicalPlan optimizedPlan, | |||
ActionListener<Result> listener | |||
) { | |||
PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request); | |||
if (explainMode) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: would it be worth extracting this case into an own method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a TODO to revisit this. explain
in EsqlSession needs a second look, some things do not feel like they belong here.
@@ -231,85 +231,88 @@ public void executeOptimizedPlan( | |||
values.add(List.of("coordinator", "optimizedPhysicalPlan", physicalPlanString)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be nice to be able to provide multiple rows of optimized logical plans, as each InlineJoin righthand side would get executed, since the outcome is predictable (a LocalRelation
), but I guess we can leave a comment as a TODO (or an issue) for later (when/if EXPLAIN stabilizes).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this PR wants to be a solid base for further parallel branches of work on inlinestats
. Modelling the InlineJoin properly is one of those follow up things.
} | ||
|
||
private record PlanTuple(PhysicalPlan physical, LogicalPlan logical) {} | ||
private record LogicalPlanTuple(LogicalPlan nonStubbedSubPlan, LogicalPlan originalSubPlan) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A logistical suggestion would be to isolate all the InlineJoin logic into its own class (could be nested), as there are a few methods here (plus this record) that are dedicated just for this feature.
@@ -1001,3 +1108,171 @@ FROM hosts METADATA _index | |||
description:text | host:keyword | ip0:ip | _index:keyword | x:ip | ip1:long | host_group:text | card:keyword | |||
alpha db server |alpha |127.0.0.1 |hosts |127.0.0.1|1 |DB servers |eth0 | |||
; | |||
|
|||
doubleShadowing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd maybe add a similar test with multiple INLINESTATS and shaddowing with no groups and another one w/o shadowing, but still no groups.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The "no groups" part is the problematic one now (as in what I am still working on as part of the same PR that's still evolving).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did give it a try and it worked fine. But haven't gone crazy with it, so more work is welcome, if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added the fix in the PruneColumns
rule. It's about removing the inlinestats
completely if its output is not needed. What happened in some queries was that the aggregation of the inlinestats
was removed and the inlinejoin
was left in a completely messed up state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Second round - focused on the execution in EsqlSession this time.
Looks like we're going back in the direction of the initial approach to inline stats, where each phase had a full optimizer run. That's the right approach IMO.
I want to have another look next week to better understand the changes to the optimizer rules. But please go ahead and merge whenever you're happy (I see Bogdan already 👍'd the PR); if I find something I find important, I'll leave a comment after the merge.
} | ||
|
||
private record PlanTuple(PhysicalPlan physical, LogicalPlan logical) {} | ||
private record LogicalPlanTuple(LogicalPlan nonStubbedSubPlan, LogicalPlan originalSubPlan) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: a comment explaining the purpose wouldn't hurt; it's not clear without reading more what the non stubbed plan is, and the name LogicalPlanTuple
is also rather generic.
}); | ||
LogicalPlan newLogicalPlan = optimizedPlan.transformUp( | ||
InlineJoin.class, | ||
ij -> ij.right() == subPlans.originalSubPlan ? InlineJoin.inlineData(ij, resultWrapper) : ij |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! I think it's important we use object equality here - regular equality will not suffice because it ignores e.g. name ids.
Maybe worth a comment?
InlineJoin.class, | ||
ij -> ij.right() == subPlans.originalSubPlan ? InlineJoin.inlineData(ij, resultWrapper) : ij | ||
); | ||
newLogicalPlan.setOptimized(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is surprising. Shouldn't we re-optimize the plan now that we were able to replace the stub with an actual result?
After the stub was replaced, we can actually do more stuff, like push down limits (which before that would be wrong as it would have affected the stats).
If I understand correctly, this is something we might want to improve later, right? If so, let's leave a comment; maybe a TODO
to make the intention clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, a TODO is right.
I had my doubts with this thing. I feel like there are things here that were left hanging (things do still seem they can be optimized further), but at this stage of the inlinestats
progress, I think it's worth ignoring it. But TODO is needed, because it's something we need to think about a little.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I thought this was sufficient. The approach basically gradually executes righthand sides of (inline) joins, always resulting in a LocalRelation
. Ending up with a join with whatever was before on the left and this local relation on the right. Not sure if we can further optimise this (and haven't done it before - the LIMIT makes it past the InlineJoin into the lefthand side already, since InlineJoin preserves the rows count).
But a TODO can re-eval things later, 👍 .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, we can further optimize it alright :)
The limit being pushed/copied down into the left hand side is a bug - that should only happen in subsequent passes. See my comment here.
Also, if the INLINESTATS
has no BY
clause, it can be turned into an EVAL
with literals in subsequent phases, which can trigger more optimizations (like constant folding, filter pushdown, optimizing away checks against constant keyword fields). Some of this could probably be somehow hacked into the first optimization pass, but currently I think it's more natural (or at least easier) to just have another optimizer pass per query phase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The limit being pushed/copied down into the left hand side is a bug - that should only happen in subsequent passes.
Actually, very true!
Also, if the INLINESTATS has no BY clause, it can be turned into an EVAL with literals in subsequent phases
I see. Yes, this could be done, but not with the current shape of the planning (i.e. rerunning the optimiser as it is now won't replan). But yes, you're right, this could still be evolved.
Aggregation. Move two methods from EsqlSession to InlineJoin. Address reviews Add more comments
…support_multi_inlinestats_logicalPlan_approach
…support_multi_inlinestats_logicalPlan_approach
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Heya, all done now.
This is a nice advancement of INLINE STATS, let's go! I think there are a couple things to be looked at in follow ups, but this is fine to merge as-is IMO; we can continue iterating in follow-ups as the changes here do not seem to affect the code health of the non-INLINE STATS code paths.
Most notable follow-up items IMO:
- Correctness of optimization rules w.r.t. stubs, esp. the limit pushdown.
- More logical plan optimizer tests for PruneColumns and a good hard look at PruneColumns (maybe I'm misreading things though, see comments below)
@@ -60,7 +59,16 @@ public static LogicalPlan stubSource(UnaryPlan sourcePlan, LogicalPlan target) { | |||
* Replaces the stubbed source with the actual source. | |||
*/ | |||
public static LogicalPlan replaceStub(LogicalPlan source, LogicalPlan stubbed) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we add a note that this will replace all stubs with the new source? In case of a plan with 2 stubs (dual inlinestats in the query), this method should normally be avoided to avoid chaos.
@@ -63,14 +67,18 @@ public LogicalPlan apply(LogicalPlan plan) { | |||
return p; | |||
} | |||
|
|||
if (p instanceof InlineJoin ij) { | |||
inlineJoinRightOutput.addAll(ij.right().outputSet()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to have a logical plan optimizer test that demonstrates the necessity for tracking the right outputs of inline stats.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this will be followups. Added TODO
@@ -111,7 +119,7 @@ public LogicalPlan apply(LogicalPlan plan) { | |||
// Normally, pruning EsRelation has no effect because InsertFieldExtraction only extracts the required fields, anyway. | |||
// However, InsertFieldExtraction can't be currently used in LOOKUP JOIN right index, | |||
// it works differently as we extract all fields (other than the join key) that the EsRelation has. | |||
var remaining = removeUnused(esr.output(), used); | |||
var remaining = removeUnused(esr.output(), used, inlineJoinRightOutput); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is odd. The right outputs should be computed from STATS
, so the only attribute that could also occur here is a looked up field attr that's later used in the inlinestats' BY
clause.
But this should also be contained in the InlineJoin
's .references
, i.e. should end up in the used
set.
@@ -97,7 +105,7 @@ public LogicalPlan apply(LogicalPlan plan) { | |||
} | |||
} | |||
} else if (p instanceof Eval eval) { | |||
var remaining = removeUnused(eval.fields(), used); | |||
var remaining = removeUnused(eval.fields(), used, inlineJoinRightOutput); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some aggs leave downstream EVAL
s, like AVG
. Does this prevent them from being pruned, even if they're not needed downstream at all?
E.g. in INLINESTATS x = avg(y), a = count_distinct(field) BY z | EVAL x = z
I'd expect that making the inline join's right hand side "unpruneable" prevents this rule from correctly pruning unused columns.
// remember used | ||
boolean recheck; | ||
// analyze the unused items against dedicated 'producer' nodes such as Eval and Aggregate | ||
// perform a loop to retry checking if the current node is completely eliminated | ||
do { | ||
recheck = false; | ||
if (p instanceof Aggregate aggregate) { | ||
var remaining = removeUnused(aggregate.aggregates(), used); | ||
var remaining = removeUnused(aggregate.aggregates(), used, inlineJoinRightOutput); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to protect the computed aggs in an inlinestats, no matter what?
I don't think this is correct long term; unused aggs in inlinestats need to be pruned, too. If the problematic case is when the inline stats ends up with 0 aggs, then I think we need an extra case that prunes the whole inline join. As inline joins preserve row count, this should be correct.
Can we at least leave a TODO? I think we'll need to revisit this and add a bunch of tests to make sure all works as intended.
x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
Show resolved
Hide resolved
// TODO: InlineStats - prune ONLY the unused output columns from it? In other words, don't perform more aggs | ||
// if they will not be used anyway |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++
@@ -104,6 +104,17 @@ public LogicalPlan apply(LogicalPlan plan) { | |||
p = aggregate.with(aggregate.groupings(), remaining); | |||
} | |||
} | |||
} else if (p instanceof InlineJoin ij) {// TODO: InlineStats - add unit tests for this IJ removal | |||
var remaining = removeUnused(ij.right().output(), used, inlineJoinRightOutput); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aren't all of the attributes in ij.right().output()
protected because they're contained in inlineJoinRightOutput
?
@@ -39,15 +40,18 @@ public final class PruneColumns extends Rule<LogicalPlan, LogicalPlan> { | |||
public LogicalPlan apply(LogicalPlan plan) { | |||
// track used references | |||
var used = plan.outputSet().asBuilder(); | |||
// track inlinestats' own aggregation output (right-hand side of the join) so that any other plan on the left-hand side of the | |||
// inline join won't have its columns pruned due to the lack of "visibility" into the right hand side output/Attributes | |||
var inlineJoinRightOutput = new ArrayList<Attribute>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused how this works in general. Expressions in the inline join's left side generally cannot depend on the stats output, resp. the inline join's right hand side. The dependency is converse - the IJ's right hand side depends on expressions in the left, because it will normally contain a stub.
Also, LOOKUP JOIN doesn't require handling the right hand side output attributes especially, which is suspicious.
Maybe I'm just not seeing it and need an example, though :)
The complication that I would expect: expressions on the left may get pruned because they don't appear in used
. Without having tried, something like:
EVAL x = 2*y | INLINE STATS x = avg(x)
The first x
is only required in the stats in the right hand side of the inline join, so we may optimize this away on accident if PruneColumns
first descends into the left hand side and thus didn't yet add the x
to the used
attributes.
I didn't check if this actually happens, or if we maybe descend first into the right after all. Still, probably worth a test.
…support_multi_inlinestats_logicalPlan_approach
Asking some feedback from @nik9000 as well, especially related to CopyingLocalSupplier and whatever I am doing in there related to Blocks :-). I didn't change the ImmediateLocalSupplier to always behave like CopyingLocalSupplier because I wanted this behavior only for |
…support_multi_inlinestats_logicalPlan_approach
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have some opinions on the serialization change. Compared to the rest of the work they are super minor, but I think we should resolve my questions before merging.
...esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/local/CopyingLocalSupplier.java
Show resolved
Hide resolved
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOCAL_RELATION_WITH_NEW_BLOCKS)) { | ||
this.supplier = in.readNamedWriteable(LocalSupplier.class); | ||
} else { | ||
this.supplier = LocalSupplier.readFrom((PlanStreamInput) in); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's important this method say that it's for the legacy serialization format somehow. Maybe just rename it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or, what about moving it to right here?
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
out.writeVInt(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should probably be empty instead of compatible with the previous serialization.
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOCAL_RELATION_WITH_NEW_BLOCKS)) { | ||
out.writeNamedWriteable(supplier); | ||
} else { | ||
supplier.writeTo(out); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels like it's subtle in a way that folks aren't likely to catch by reading it. I did because you told me to look here. But I don't trust myself to see it six months from now.
You made this work by forcing each variant to serialize in a way that's compatible with the original serialization. I think it'd be less surprising if instead you explicitly made an ImmediateSupplier
or even just copied the old serialization code to here.
new PlanStreamOutput(output, null).writeNamedWriteable(instance); | ||
} else { | ||
instance.writeTo(new PlanStreamOutput(output, null)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be more traditional to make this abstract and make a subclass for each LocalSupplier
subclass. This works as is though.
…support_multi_inlinestats_logicalPlan_approach
…support_multi_inlinestats_logicalPlan_approach
…support_multi_inlinestats_logicalPlan_approach
Would Inline stats provide the result that this issue presents? |
…lastic#128917) Part of elastic#124715 and similar to elastic#128476. Different from elastic#128476 in that it takes a "LogicalPlan" approach to running a sub-query, integrating its result back in the "main" LogicalPlan and continuing running the query.
Part of #124715 and similar to #128476.
Different from #128476 in that it takes a "LogicalPlan" approach to running a sub-query, integrating its result back in the "main" LogicalPlan and continuing running the query.