Skip to content

ESQL: Skip unused STATS groups by adding a Top N BlockHash implementation #127148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 51 commits into from
Jun 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
3322f34
Add initial blockhash with TreeSet and basic operator test
ivancea Apr 16, 2025
fcdfbd4
Fix dedupe for nulls and multivalues
ivancea Apr 16, 2025
7282e98
Added tests for null order and sort order
ivancea Apr 16, 2025
b6d7265
Merge branch 'main' into esql-top-n-agg-grouping
ivancea Apr 21, 2025
4950f5f
Move generic BlockHash code to base TestCase
ivancea Apr 21, 2025
a425c5d
Initial TopNBlockHash test structure, WIP
ivancea Apr 21, 2025
665ace3
Added TopNBlockHash test without nulls, and fixed bug in ords
ivancea Apr 22, 2025
6616fdf
Fix BlockHashTestCase keys assertion early return not checking all keys
ivancea Apr 22, 2025
d838057
Added nulls tests, fixed dedupe lookup nulls handling and nulls error…
ivancea Apr 22, 2025
e78fac5
Added multivalue tests and fixed dedupe with multivalues and no matches
ivancea Apr 22, 2025
143d86f
Update docs/changelog/127148.yaml
ivancea Apr 22, 2025
fa3a779
[CI] Auto commit changes from spotless
elasticsearchmachine Apr 22, 2025
4b212cd
Remove TopNBlockHash abstract class
ivancea Apr 22, 2025
87eb135
Remove unused class
ivancea Apr 23, 2025
c9635f9
Merge branch 'main' into esql-top-n-agg-grouping
ivancea Apr 23, 2025
29fa4d1
Fix SeenGroupIds and added extra tests for it
ivancea Apr 23, 2025
7ca3ce3
Included TopNLongBlockHash into BlockHash.ubuild() and GroupSpec logi…
ivancea Apr 24, 2025
13900f6
[CI] Auto commit changes from spotless
elasticsearchmachine Apr 24, 2025
c778b3b
Keep the last value in the top while using TreeSet, for performance
ivancea Apr 25, 2025
270eb6b
Merge branch 'main' into esql-top-n-agg-grouping
ivancea Apr 25, 2025
5483dc2
Use LongBucketedSort and LongHash for unique seen values
ivancea Apr 25, 2025
3a775ac
Add field recording amount on values in top structure
ivancea Apr 25, 2025
2dde5c1
Fixed to have both last top value and value count
ivancea Apr 29, 2025
a5855c1
Specialize block parameters on AddInput
ivancea Apr 30, 2025
5176663
Call the specific add() methods for eacj block type
ivancea Apr 30, 2025
fb670bd
Implement custom add in HashAggregationOperator
ivancea Apr 30, 2025
ea433a2
Merge branch 'main' into esql-top-n-agg-grouping-tests
ivancea May 5, 2025
fd97a8c
Updated new aggs
ivancea May 5, 2025
ddb6735
Added custom sorted structure
ivancea May 5, 2025
3d83784
Improve limit migration logic
ivancea May 5, 2025
23195a8
Merge branch 'main' into esql-top-n-agg-grouping-tests
ivancea May 7, 2025
afff00e
Merge branch 'main' into esql-top-n-agg-grouping
ivancea May 20, 2025
a54ba97
Spotless
ivancea May 20, 2025
20f75c1
Merge branch 'main' into esql-top-n-agg-grouping-tests
ivancea May 21, 2025
8d101fb
Remove count var
ivancea May 21, 2025
c27faa0
Removed lastValue field
ivancea May 22, 2025
9cff744
Added LonghHash for early detection of existing values
ivancea May 22, 2025
22da89e
Revert "Added LonghHash for early detection of existing values"
ivancea May 22, 2025
9c12c35
Merge branch 'main' into esql-top-n-agg-grouping-tests
ivancea May 27, 2025
714aed9
Merge branch 'main' into esql-top-n-agg-grouping
ivancea May 28, 2025
9d8f749
Merge branch 'esql-top-n-agg-grouping' into esql-top-n-agg-grouping-t…
ivancea May 28, 2025
ed18e7c
Make operator tests not depend directly on the BlockHash implementation
ivancea May 28, 2025
16c88eb
Restore original BucketedSort
ivancea May 28, 2025
2b1f868
Improve migration and remove unused BucketedSort test
ivancea May 28, 2025
f0349f4
Remove fixed TO-DOs
ivancea May 28, 2025
e7ebd05
Fix potential memory leak on circuit breaker error
ivancea May 28, 2025
40b7682
Merge branch 'main' into esql-top-n-agg-grouping
ivancea May 29, 2025
76291a6
Renamed topN set and added tests
ivancea May 29, 2025
dd48d97
Remove unused code from TopNMultivalueDedupeLong
ivancea May 29, 2025
e63b73a
Fix missing success=true in constructor
ivancea May 29, 2025
b2c42e4
Merge branch 'main' into esql-top-n-agg-grouping
ivancea Jun 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class AggregatorBenchmark {
static final int BLOCK_LENGTH = 8 * 1024;
private static final int OP_COUNT = 1024;
private static final int GROUPS = 5;
private static final int TOP_N_LIMIT = 3;

private static final BlockFactory blockFactory = BlockFactory.getInstance(
new NoopCircuitBreaker("noop"),
Expand All @@ -90,6 +91,7 @@ public class AggregatorBenchmark {
private static final String TWO_ORDINALS = "two_" + ORDINALS;
private static final String LONGS_AND_BYTES_REFS = LONGS + "_and_" + BYTES_REFS;
private static final String TWO_LONGS_AND_BYTES_REFS = "two_" + LONGS + "_and_" + BYTES_REFS;
private static final String TOP_N_LONGS = "top_n_" + LONGS;

private static final String VECTOR_DOUBLES = "vector_doubles";
private static final String HALF_NULL_DOUBLES = "half_null_doubles";
Expand Down Expand Up @@ -147,7 +149,8 @@ static void selfTest() {
TWO_BYTES_REFS,
TWO_ORDINALS,
LONGS_AND_BYTES_REFS,
TWO_LONGS_AND_BYTES_REFS }
TWO_LONGS_AND_BYTES_REFS,
TOP_N_LONGS }
)
public String grouping;

Expand All @@ -161,8 +164,7 @@ static void selfTest() {
public String filter;

private static Operator operator(DriverContext driverContext, String grouping, String op, String dataType, String filter) {

if (grouping.equals("none")) {
if (grouping.equals(NONE)) {
return new AggregationOperator(
List.of(supplier(op, dataType, filter).aggregatorFactory(AggregatorMode.SINGLE, List.of(0)).apply(driverContext)),
driverContext
Expand All @@ -188,6 +190,9 @@ private static Operator operator(DriverContext driverContext, String grouping, S
new BlockHash.GroupSpec(1, ElementType.LONG),
new BlockHash.GroupSpec(2, ElementType.BYTES_REF)
);
case TOP_N_LONGS -> List.of(
new BlockHash.GroupSpec(0, ElementType.LONG, false, new BlockHash.TopNDef(0, true, true, TOP_N_LIMIT))
);
default -> throw new IllegalArgumentException("unsupported grouping [" + grouping + "]");
};
return new HashAggregationOperator(
Expand Down Expand Up @@ -271,10 +276,14 @@ private static void checkGrouped(String prefix, String grouping, String op, Stri
case BOOLEANS -> 2;
default -> GROUPS;
};
int availableGroups = switch (grouping) {
case TOP_N_LONGS -> TOP_N_LIMIT;
default -> groups;
};
switch (op) {
case AVG -> {
DoubleBlock dValues = (DoubleBlock) values;
for (int g = 0; g < groups; g++) {
for (int g = 0; g < availableGroups; g++) {
long group = g;
long sum = LongStream.range(0, BLOCK_LENGTH).filter(l -> l % groups == group).sum();
long count = LongStream.range(0, BLOCK_LENGTH).filter(l -> l % groups == group).count();
Expand All @@ -286,7 +295,7 @@ private static void checkGrouped(String prefix, String grouping, String op, Stri
}
case COUNT -> {
LongBlock lValues = (LongBlock) values;
for (int g = 0; g < groups; g++) {
for (int g = 0; g < availableGroups; g++) {
long group = g;
long expected = LongStream.range(0, BLOCK_LENGTH).filter(l -> l % groups == group).count() * opCount;
if (lValues.getLong(g) != expected) {
Expand All @@ -296,7 +305,7 @@ private static void checkGrouped(String prefix, String grouping, String op, Stri
}
case COUNT_DISTINCT -> {
LongBlock lValues = (LongBlock) values;
for (int g = 0; g < groups; g++) {
for (int g = 0; g < availableGroups; g++) {
long group = g;
long expected = LongStream.range(0, BLOCK_LENGTH).filter(l -> l % groups == group).distinct().count();
long count = lValues.getLong(g);
Expand All @@ -310,15 +319,15 @@ private static void checkGrouped(String prefix, String grouping, String op, Stri
switch (dataType) {
case LONGS -> {
LongBlock lValues = (LongBlock) values;
for (int g = 0; g < groups; g++) {
for (int g = 0; g < availableGroups; g++) {
if (lValues.getLong(g) != (long) g) {
throw new AssertionError(prefix + "expected [" + g + "] but was [" + lValues.getLong(g) + "]");
}
}
}
case DOUBLES -> {
DoubleBlock dValues = (DoubleBlock) values;
for (int g = 0; g < groups; g++) {
for (int g = 0; g < availableGroups; g++) {
if (dValues.getDouble(g) != (long) g) {
throw new AssertionError(prefix + "expected [" + g + "] but was [" + dValues.getDouble(g) + "]");
}
Expand All @@ -331,7 +340,7 @@ private static void checkGrouped(String prefix, String grouping, String op, Stri
switch (dataType) {
case LONGS -> {
LongBlock lValues = (LongBlock) values;
for (int g = 0; g < groups; g++) {
for (int g = 0; g < availableGroups; g++) {
long group = g;
long expected = LongStream.range(0, BLOCK_LENGTH).filter(l -> l % groups == group).max().getAsLong();
if (lValues.getLong(g) != expected) {
Expand All @@ -341,7 +350,7 @@ private static void checkGrouped(String prefix, String grouping, String op, Stri
}
case DOUBLES -> {
DoubleBlock dValues = (DoubleBlock) values;
for (int g = 0; g < groups; g++) {
for (int g = 0; g < availableGroups; g++) {
long group = g;
long expected = LongStream.range(0, BLOCK_LENGTH).filter(l -> l % groups == group).max().getAsLong();
if (dValues.getDouble(g) != expected) {
Expand All @@ -356,7 +365,7 @@ private static void checkGrouped(String prefix, String grouping, String op, Stri
switch (dataType) {
case LONGS -> {
LongBlock lValues = (LongBlock) values;
for (int g = 0; g < groups; g++) {
for (int g = 0; g < availableGroups; g++) {
long group = g;
long expected = LongStream.range(0, BLOCK_LENGTH).filter(l -> l % groups == group).sum() * opCount;
if (lValues.getLong(g) != expected) {
Expand All @@ -366,7 +375,7 @@ private static void checkGrouped(String prefix, String grouping, String op, Stri
}
case DOUBLES -> {
DoubleBlock dValues = (DoubleBlock) values;
for (int g = 0; g < groups; g++) {
for (int g = 0; g < availableGroups; g++) {
long group = g;
long expected = LongStream.range(0, BLOCK_LENGTH).filter(l -> l % groups == group).sum() * opCount;
if (dValues.getDouble(g) != expected) {
Expand All @@ -391,6 +400,14 @@ private static void checkGroupingBlock(String prefix, String grouping, Block blo
}
}
}
case TOP_N_LONGS -> {
LongBlock groups = (LongBlock) block;
for (int g = 0; g < TOP_N_LIMIT; g++) {
if (groups.getLong(g) != (long) g) {
throw new AssertionError(prefix + "bad group expected [" + g + "] but was [" + groups.getLong(g) + "]");
}
}
}
case INTS -> {
IntBlock groups = (IntBlock) block;
for (int g = 0; g < GROUPS; g++) {
Expand Down Expand Up @@ -495,7 +512,7 @@ private static void checkUngrouped(String prefix, String op, String dataType, Pa

private static Page page(BlockFactory blockFactory, String grouping, String blockType) {
Block dataBlock = dataBlock(blockFactory, blockType);
if (grouping.equals("none")) {
if (grouping.equals(NONE)) {
return new Page(dataBlock);
}
List<Block> blocks = groupingBlocks(grouping, blockType);
Expand Down Expand Up @@ -564,7 +581,7 @@ private static Block groupingBlock(String grouping, String blockType) {
default -> throw new UnsupportedOperationException("bad grouping [" + grouping + "]");
};
return switch (grouping) {
case LONGS -> {
case TOP_N_LONGS, LONGS -> {
var builder = blockFactory.newLongBlockBuilder(BLOCK_LENGTH);
for (int i = 0; i < BLOCK_LENGTH; i++) {
for (int v = 0; v < valuesPerGroup; v++) {
Expand Down
5 changes: 5 additions & 0 deletions docs/changelog/127148.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 127148
summary: Skip unused STATS groups by adding a Top N `BlockHash` implementation
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public abstract class BinarySearcher {
/**
* @return the index who's underlying value is closest to the value being searched for.
*/
private int getClosestIndex(int index1, int index2) {
protected int getClosestIndex(int index1, int index2) {
if (distance(index1) < distance(index2)) {
return index1;
} else {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.ReleasableIterator;
import org.elasticsearch.index.analysis.AnalysisRegistry;
Expand Down Expand Up @@ -113,13 +114,30 @@ public abstract class BlockHash implements Releasable, SeenGroupIds {
@Override
public abstract BitArray seenGroupIds(BigArrays bigArrays);

/**
* Configuration for a BlockHash group spec that is later sorted and limited (Top-N).
* <p>
* Part of a performance improvement to avoid aggregating groups that will not be used.
* </p>
*
* @param order The order of this group in the sort, starting at 0
* @param asc True if this group will be sorted ascending. False if descending.
* @param nullsFirst True if the nulls should be the first elements in the TopN. False if they should be kept last.
* @param limit The number of elements to keep, including nulls.
*/
public record TopNDef(int order, boolean asc, boolean nullsFirst, int limit) {}

/**
* @param isCategorize Whether this group is a CATEGORIZE() or not.
* May be changed in the future when more stateful grouping functions are added.
*/
public record GroupSpec(int channel, ElementType elementType, boolean isCategorize) {
public record GroupSpec(int channel, ElementType elementType, boolean isCategorize, @Nullable TopNDef topNDef) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the connection between the QL and compute code. It's always null now; it should be filled with info from the nearest affecting TopN

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

public GroupSpec(int channel, ElementType elementType) {
this(channel, elementType, false);
this(channel, elementType, false, null);
}

public GroupSpec(int channel, ElementType elementType, boolean isCategorize) {
this(channel, elementType, isCategorize, null);
}
}

Expand All @@ -134,7 +152,12 @@ public GroupSpec(int channel, ElementType elementType) {
*/
public static BlockHash build(List<GroupSpec> groups, BlockFactory blockFactory, int emitBatchSize, boolean allowBrokenOptimizations) {
if (groups.size() == 1) {
return newForElementType(groups.get(0).channel(), groups.get(0).elementType(), blockFactory);
GroupSpec group = groups.get(0);
if (group.topNDef() != null && group.elementType() == ElementType.LONG) {
TopNDef topNDef = group.topNDef();
return new LongTopNBlockHash(group.channel(), topNDef.asc(), topNDef.nullsFirst(), topNDef.limit(), blockFactory);
}
return newForElementType(group.channel(), group.elementType(), blockFactory);
}
if (groups.stream().allMatch(g -> g.elementType == ElementType.BYTES_REF)) {
switch (groups.size()) {
Expand Down
Loading
Loading