diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 8f4f8065f1d9..9cd657605765 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -2147,9 +2147,10 @@ context). If query does have `maxQueuedBytes` in the context, then that value is ### TopN query config -|Property|Description|Default| -|--------|-----------|-------| -|`druid.query.topN.minTopNThreshold`|See [TopN Aliasing](../querying/topnquery.md#aliasing) for details.|1000| +|Property| Description | Default | +|--------|-------------------------------------------------------------------------------|---------| +|`druid.query.topN.minTopNThreshold`| See [TopN Aliasing](../querying/topnquery.md#aliasing) for details. | 1000 | +|`druid.query.topN.maxTopNAggregatorHeapSizeBytes`| The maximum amount of aggregator heap bytes a given segment runner can acrue. | 10MB | ### Search query config diff --git a/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountTopNQueryTest.java b/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountTopNQueryTest.java index 7e3690cc1cb6..27778d2ee06e 100644 --- a/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountTopNQueryTest.java +++ b/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountTopNQueryTest.java @@ -31,6 +31,7 @@ import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.topn.TopNQuery; import org.apache.druid.query.topn.TopNQueryBuilder; +import org.apache.druid.query.topn.TopNQueryConfig; import org.apache.druid.query.topn.TopNQueryEngine; import org.apache.druid.query.topn.TopNResultValue; import org.apache.druid.segment.IncrementalIndexSegment; @@ -133,6 +134,7 @@ public void testTopNWithDistinctCountAgg() throws Exception final Iterable> results = engine.query( query, + new TopNQueryConfig(), new IncrementalIndexSegment(index, SegmentId.dummy(QueryRunnerTestHelper.DATA_SOURCE)), null ).toList(); diff --git a/processing/src/main/java/org/apache/druid/query/QueryContexts.java b/processing/src/main/java/org/apache/druid/query/QueryContexts.java index f3fc1a80f83c..b4d959618370 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryContexts.java +++ b/processing/src/main/java/org/apache/druid/query/QueryContexts.java @@ -87,6 +87,7 @@ public class QueryContexts public static final String SERIALIZE_DATE_TIME_AS_LONG_INNER_KEY = "serializeDateTimeAsLongInner"; public static final String UNCOVERED_INTERVALS_LIMIT_KEY = "uncoveredIntervalsLimit"; public static final String MIN_TOP_N_THRESHOLD = "minTopNThreshold"; + public static final String MAX_TOP_N_AGGREGATOR_HEAP_SIZE_BYTES = "maxTopNAggregatorHeapSizeBytes"; public static final String CATALOG_VALIDATION_ENABLED = "catalogValidationEnabled"; // projection context keys diff --git a/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java index 4c0bb066eecb..8e1c6973ea2b 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java @@ -41,8 +41,11 @@ public abstract class BaseTopNAlgorithm implements TopNAlgorithm { - public static Aggregator[] makeAggregators(Cursor cursor, List aggregatorSpecs) + + public static Aggregator[] makeAggregators(TopNQuery query, Cursor cursor) { + query.getAggregatorHelper().addAggregatorMemory(); + final List aggregatorSpecs = query.getAggregatorSpecs(); Aggregator[] aggregators = new Aggregator[aggregatorSpecs.size()]; int aggregatorIndex = 0; for (AggregatorFactory spec : aggregatorSpecs) { @@ -52,8 +55,10 @@ public static Aggregator[] makeAggregators(Cursor cursor, List aggregatorSpecs) + protected static BufferAggregator[] makeBufferAggregators(TopNQuery query, Cursor cursor) { + query.getAggregatorHelper().addAggregatorMemory(); + final List aggregatorSpecs = query.getAggregatorSpecs(); BufferAggregator[] aggregators = new BufferAggregator[aggregatorSpecs.size()]; int aggregatorIndex = 0; for (AggregatorFactory spec : aggregatorSpecs) { diff --git a/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java index d2ba16746218..f8b08a1c3fb0 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java @@ -255,6 +255,8 @@ public int[] build() resultsBuf.clear(); final int numBytesToWorkWith = resultsBuf.remaining(); + + query.getAggregatorHelper().addAggregatorMemory(); final int[] aggregatorSizes = new int[query.getAggregatorSpecs().size()]; int numBytesPerRecord = 0; @@ -329,7 +331,7 @@ protected int[] updateDimValSelector(int[] dimValSelector, int numProcessed, int @Override protected BufferAggregator[] makeDimValAggregateStore(PooledTopNParams params) { - return makeBufferAggregators(params.getCursor(), query.getAggregatorSpecs()); + return makeBufferAggregators(query, params.getCursor()); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java index 0c79e7c8d31b..93bee1dc0e63 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java @@ -98,7 +98,7 @@ protected long scanAndAggregate( Aggregator[] theAggregators = aggregatesStore.computeIfAbsent( key, - k -> makeAggregators(cursor, query.getAggregatorSpecs()) + k -> makeAggregators(query, cursor) ); for (Aggregator aggregator : theAggregators) { diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNAggregatorResourceHelper.java b/processing/src/main/java/org/apache/druid/query/topn/TopNAggregatorResourceHelper.java new file mode 100644 index 000000000000..ad217e1dce60 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNAggregatorResourceHelper.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.topn; + +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.ResourceLimitExceededException; + +import java.util.concurrent.atomic.AtomicLong; + +public class TopNAggregatorResourceHelper +{ + public static class Config { + public final long maxAggregatorHeapSize; + public Config(final long maxAggregatorHeapSize) { + this.maxAggregatorHeapSize = maxAggregatorHeapSize; + } + } + + private final Config config; + private final long newAggregatorEstimatedMemorySize; + private final AtomicLong used = new AtomicLong(0); + + TopNAggregatorResourceHelper(final long newAggregatorEstimatedMemorySize, final Config config) { + this.newAggregatorEstimatedMemorySize = newAggregatorEstimatedMemorySize; + this.config = config; + } + + public void addAggregatorMemory() { + final long newTotal = used.addAndGet(newAggregatorEstimatedMemorySize); + if (newTotal > config.maxAggregatorHeapSize){ + throw new ResourceLimitExceededException(StringUtils.format("Query ran out of memory. Maximum allowed bytes=[%d], Hit bytes=[%d]", config.maxAggregatorHeapSize, newTotal)); + } + } +} diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQuery.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQuery.java index 3178ac15a183..3250f4909435 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQuery.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQuery.java @@ -30,6 +30,7 @@ import org.apache.druid.query.PerSegmentQueryOptimizationContext; import org.apache.druid.query.Queries; import org.apache.druid.query.Query; +import org.apache.druid.query.QueryContexts; import org.apache.druid.query.Result; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.PostAggregator; @@ -60,6 +61,7 @@ public class TopNQuery extends BaseQuery> private final DimFilter dimFilter; private final List aggregatorSpecs; private final List postAggregatorSpecs; + private TopNAggregatorResourceHelper aggregatorHelper; @JsonCreator public TopNQuery( @@ -97,9 +99,18 @@ public TopNQuery( : postAggregatorSpecs ); + + final long expectedAllocBytes = aggregatorSpecs.stream().mapToLong(AggregatorFactory::getMaxIntermediateSizeWithNulls).sum(); + final long maxAggregatorHeapSizeBytes = this.context().getLong(QueryContexts.MAX_TOP_N_AGGREGATOR_HEAP_SIZE_BYTES, TopNQueryConfig.DEFAULT_MAX_AGGREGATOR_HEAP_SIZE_BYTES); + this.aggregatorHelper = new TopNAggregatorResourceHelper(expectedAllocBytes, new TopNAggregatorResourceHelper.Config(maxAggregatorHeapSizeBytes)); + topNMetricSpec.verifyPreconditions(this.aggregatorSpecs, this.postAggregatorSpecs); } + public TopNAggregatorResourceHelper getAggregatorHelper() { + return aggregatorHelper; + } + @Override public boolean hasFilters() { diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryConfig.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryConfig.java index 2793b270b8a1..a7582f702608 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryConfig.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryConfig.java @@ -28,6 +28,7 @@ public class TopNQueryConfig { public static final int DEFAULT_MIN_TOPN_THRESHOLD = 1000; + public static final long DEFAULT_MAX_AGGREGATOR_HEAP_SIZE_BYTES = 10 * (2 << 20); // 10mb @JsonProperty @Min(1) @@ -37,4 +38,13 @@ public int getMinTopNThreshold() { return minTopNThreshold; } + + @JsonProperty + @Min(0) + private long maxTopNAggregatorHeapSizeBytes = DEFAULT_MAX_AGGREGATOR_HEAP_SIZE_BYTES; + + public long getMaxTopNAggregatorHeapSizeBytes() + { + return maxTopNAggregatorHeapSizeBytes; + } } diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java index 202a852c2be0..32170debe993 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java @@ -21,6 +21,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableMap; import org.apache.druid.collections.NonBlockingPool; import org.apache.druid.collections.ResourceHolder; import org.apache.druid.java.util.common.granularity.Granularities; @@ -28,6 +29,7 @@ import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.ColumnSelectorPlus; import org.apache.druid.query.CursorGranularizer; +import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryMetrics; import org.apache.druid.query.Result; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -75,6 +77,7 @@ public TopNQueryEngine(NonBlockingPool bufferPool) */ public Sequence> query( TopNQuery query, + TopNQueryConfig config, final Segment segment, @Nullable final TopNQueryMetrics queryMetrics ) @@ -86,6 +89,10 @@ public Sequence> query( ); } + if (!query.context().containsKey(QueryContexts.MAX_TOP_N_AGGREGATOR_HEAP_SIZE_BYTES)){ + query = query.withOverriddenContext(ImmutableMap.of(QueryContexts.MAX_TOP_N_AGGREGATOR_HEAP_SIZE_BYTES, config.getMaxTopNAggregatorHeapSizeBytes())); + } + final CursorBuildSpec buildSpec = makeCursorBuildSpec(query, queryMetrics); final CursorHolder cursorHolder = cursorFactory.makeCursorHolder(buildSpec); if (cursorHolder.isPreAggregated()) { diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java index 501684dce40e..1ef8254de2dd 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java @@ -552,6 +552,11 @@ public Sequence resultsAsArrays(TopNQuery query, Sequence> run( TopNQuery query = (TopNQuery) input.getQuery(); return queryEngine.query( query, + toolchest.getConfig(), segment, (TopNQueryMetrics) input.getQueryMetrics() ); diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/DoubleTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/DoubleTopNColumnAggregatesProcessor.java index d48f472d5869..ed8abaee0bc5 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/types/DoubleTopNColumnAggregatesProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/topn/types/DoubleTopNColumnAggregatesProcessor.java @@ -50,7 +50,7 @@ Aggregator[] getValueAggregators( long key = Double.doubleToLongBits(selector.getDouble()); return aggregatesStore.computeIfAbsent( key, - k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()) + k -> BaseTopNAlgorithm.makeAggregators(query, cursor) ); } diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/FloatTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/FloatTopNColumnAggregatesProcessor.java index 8862663b7829..2000d88c23ac 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/types/FloatTopNColumnAggregatesProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/topn/types/FloatTopNColumnAggregatesProcessor.java @@ -50,7 +50,7 @@ Aggregator[] getValueAggregators( int key = Float.floatToIntBits(selector.getFloat()); return aggregatesStore.computeIfAbsent( key, - k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()) + k -> BaseTopNAlgorithm.makeAggregators(query, cursor) ); } diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/LongTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/LongTopNColumnAggregatesProcessor.java index 8b1948c3d83b..1d9c8b758539 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/types/LongTopNColumnAggregatesProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/topn/types/LongTopNColumnAggregatesProcessor.java @@ -46,7 +46,7 @@ Aggregator[] getValueAggregators(TopNQuery query, BaseLongColumnValueSelector se long key = selector.getLong(); return aggregatesStore.computeIfAbsent( key, - k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()) + k -> BaseTopNAlgorithm.makeAggregators(query, cursor) ); } diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java index 565ad036cea0..2aba7f967c9d 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java @@ -97,7 +97,7 @@ public long scanAndAggregate( while (!cursor.isDone()) { if (hasNulls && selector.isNull()) { if (nullValueAggregates == null) { - nullValueAggregates = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()); + nullValueAggregates = BaseTopNAlgorithm.makeAggregators(query, cursor); } for (Aggregator aggregator : nullValueAggregates) { aggregator.aggregate(); diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java index 9eca369fdc7e..7d6a976e1d38 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java @@ -159,7 +159,7 @@ private long scanAndAggregateWithCardinalityKnown( final Object key = dimensionValueConverter.apply(selector.lookupName(dimIndex)); aggs = aggregatesStore.computeIfAbsent( key, - k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()) + k -> BaseTopNAlgorithm.makeAggregators(query, cursor) ); rowSelector[dimIndex] = aggs; } @@ -199,7 +199,7 @@ private long scanAndAggregateWithCardinalityUnknown( final Object key = dimensionValueConverter.apply(selector.lookupName(dimIndex)); Aggregator[] aggs = aggregatesStore.computeIfAbsent( key, - k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()) + k -> BaseTopNAlgorithm.makeAggregators(query, cursor) ); for (Aggregator aggregator : aggs) { aggregator.aggregate(); diff --git a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerFailureTest.java b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerFailureTest.java new file mode 100644 index 000000000000..48eb88c08528 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerFailureTest.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.topn; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import org.apache.druid.collections.CloseableStupidPool; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.query.QueryContexts; +import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.QueryRunner; +import org.apache.druid.query.QueryRunnerTestHelper; +import org.apache.druid.query.ResourceLimitExceededException; +import org.apache.druid.query.Result; +import org.apache.druid.query.TestQueryRunners; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory; +import org.apache.druid.query.aggregation.DoubleMinAggregatorFactory; +import org.apache.druid.query.aggregation.FloatMaxAggregatorFactory; +import org.apache.druid.query.aggregation.FloatMinAggregatorFactory; +import org.apache.druid.query.aggregation.firstlast.first.DoubleFirstAggregatorFactory; +import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + */ +@RunWith(Parameterized.class) +public class TopNQueryRunnerFailureTest extends InitializedNullHandlingTest +{ + private static final Closer RESOURCE_CLOSER = Closer.create(); + + @AfterClass + public static void teardown() throws IOException + { + RESOURCE_CLOSER.close(); + } + + @Parameterized.Parameters(name = "{7}") + public static Iterable constructorFeeder() + { + List>> retVal = queryRunners(); + List parameters = new ArrayList<>(); + final int nParam = 7; + for (int i = 0; i < 32; i++) { + for (QueryRunner> firstParameter : retVal) { + Object[] params = new Object[nParam]; + params[0] = firstParameter; + params[1] = (i & 1) != 0; + params[2] = (i & 2) != 0; + params[3] = (i & 4) != 0; + params[4] = (i & 8) != 0; + params[5] = QueryRunnerTestHelper.COMMON_DOUBLE_AGGREGATORS; + params[6] = firstParameter + " double aggs"; + Object[] params2 = Arrays.copyOf(params, nParam); + params2[5] = QueryRunnerTestHelper.COMMON_FLOAT_AGGREGATORS; + params2[6] = firstParameter + " float aggs"; + parameters.add(params); + parameters.add(params2); + } + } + return parameters; + } + + public static List>> queryRunners() + { + final CloseableStupidPool defaultPool = TestQueryRunners.createDefaultNonBlockingPool(); + final CloseableStupidPool customPool = new CloseableStupidPool<>( + "TopNQueryRunnerFactory-bufferPool", + () -> ByteBuffer.allocate(20000) + ); + + List>> retVal = new ArrayList<>(); + retVal.addAll( + QueryRunnerTestHelper.makeQueryRunnersToMerge( + new TopNQueryRunnerFactory( + defaultPool, + new TopNQueryQueryToolChest(new TopNQueryConfig()), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ), + false + ) + ); + retVal.addAll( + QueryRunnerTestHelper.makeQueryRunnersToMerge( + new TopNQueryRunnerFactory( + customPool, + new TopNQueryQueryToolChest(new TopNQueryConfig()), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ), + false + ) + ); + + RESOURCE_CLOSER.register(() -> { + // Verify that all objects have been returned to the pool. + Assert.assertEquals("defaultPool objects created", defaultPool.poolSize(), defaultPool.objectsCreatedCount()); + Assert.assertEquals("customPool objects created", customPool.poolSize(), customPool.objectsCreatedCount()); + defaultPool.close(); + customPool.close(); + }); + + return retVal; + } + + private final QueryRunner> runner; + private final List commonAggregators; + + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @SuppressWarnings("unused") + public TopNQueryRunnerFailureTest( + QueryRunner> runner, + boolean specializeGeneric1AggPooledTopN, + boolean specializeGeneric2AggPooledTopN, + boolean specializeHistorical1SimpleDoubleAggPooledTopN, + boolean specializeHistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopN, + List commonAggregators, + String testName + ) + { + this.runner = runner; + PooledTopNAlgorithm.setSpecializeGeneric1AggPooledTopN(specializeGeneric1AggPooledTopN); + PooledTopNAlgorithm.setSpecializeGeneric2AggPooledTopN(specializeGeneric2AggPooledTopN); + PooledTopNAlgorithm.setSpecializeHistorical1SimpleDoubleAggPooledTopN( + specializeHistorical1SimpleDoubleAggPooledTopN + ); + PooledTopNAlgorithm.setSpecializeHistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopN( + specializeHistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopN + ); + this.commonAggregators = commonAggregators; + } + + private Sequence> assertExpectedResults( + Iterable> expectedResults, + TopNQuery query + ) + { + final Sequence> retval = runWithMerge(query);; + TestHelper.assertExpectedResults(expectedResults, retval); + return retval; + } + + private Sequence> runWithMerge(TopNQuery query) + { + return runWithMerge(query, ResponseContext.createEmpty()); + } + + private Sequence> runWithMerge(TopNQuery query, ResponseContext context) + { + return runner.run(QueryPlus.wrap(query), context); + } + + @Test + public void testEmptyTopN() + { + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.DATA_SOURCE) + .granularity(QueryRunnerTestHelper.ALL_GRAN) + .dimension(QueryRunnerTestHelper.MARKET_DIMENSION) + .metric(QueryRunnerTestHelper.INDEX_METRIC) + .threshold(4) + .intervals(QueryRunnerTestHelper.EMPTY_INTERVAL) + .aggregators( + Lists.newArrayList( + Iterables.concat( + commonAggregators, + Lists.newArrayList( + new DoubleMaxAggregatorFactory("maxIndex", "index"), + new DoubleMinAggregatorFactory("minIndex", "index"), + new DoubleFirstAggregatorFactory("first", "index", null) + ) + ) + ) + ) + .postAggregators(QueryRunnerTestHelper.ADD_ROWS_INDEX_CONSTANT) + .build(); + + List> expectedResults = ImmutableList.of( + new Result<>( + DateTimes.of("2020-04-02T00:00:00.000Z"), + TopNResultValue.create(ImmutableList.of()) + ) + ); + assertExpectedResults(expectedResults, query); + } + + @Test(expected = ResourceLimitExceededException.class) + public void testFullOnTopN() + { + Map context = new HashMap<>(); + context.put(QueryContexts.MAX_TOP_N_AGGREGATOR_HEAP_SIZE_BYTES, 1); + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.DATA_SOURCE) + .granularity(QueryRunnerTestHelper.ALL_GRAN) + .dimension(QueryRunnerTestHelper.MARKET_DIMENSION) + .metric(QueryRunnerTestHelper.INDEX_METRIC) + .threshold(4) + .context(context) + .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) + .aggregators( + Lists.newArrayList( + Iterables.concat( + commonAggregators, + Lists.newArrayList( + new DoubleMaxAggregatorFactory("maxIndex", "index"), + new DoubleMinAggregatorFactory("minIndex", "index") + ) + ) + ) + ) + .postAggregators(QueryRunnerTestHelper.ADD_ROWS_INDEX_CONSTANT) + .build(); + + List> expectedResults = Collections.emptyList(); + assertExpectedResults(expectedResults, + query.withAggregatorSpecs(Lists.newArrayList(Iterables.concat( + QueryRunnerTestHelper.COMMON_FLOAT_AGGREGATORS, + Lists.newArrayList( + new FloatMaxAggregatorFactory("maxIndex", "indexFloat"), + new FloatMinAggregatorFactory("minIndex", "indexFloat") + ) + ))) + ); + } +} diff --git a/processing/src/test/java/org/apache/druid/segment/CursorHolderPreaggTest.java b/processing/src/test/java/org/apache/druid/segment/CursorHolderPreaggTest.java index 262ecd500155..bad49241152d 100644 --- a/processing/src/test/java/org/apache/druid/segment/CursorHolderPreaggTest.java +++ b/processing/src/test/java/org/apache/druid/segment/CursorHolderPreaggTest.java @@ -42,6 +42,7 @@ import org.apache.druid.query.timeseries.TimeseriesResultValue; import org.apache.druid.query.topn.TopNQuery; import org.apache.druid.query.topn.TopNQueryBuilder; +import org.apache.druid.query.topn.TopNQueryConfig; import org.apache.druid.query.topn.TopNQueryEngine; import org.apache.druid.query.topn.TopNResultValue; import org.apache.druid.segment.column.ColumnCapabilities; @@ -218,6 +219,7 @@ public void testTopn() .build(); Sequence> results = topNQueryEngine.query( topNQuery, + new TopNQueryConfig(), segment, null ); diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactoryTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactoryTest.java index 36abb79be8c9..85584971d3ee 100644 --- a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactoryTest.java +++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactoryTest.java @@ -59,6 +59,7 @@ import org.apache.druid.query.groupby.GroupingEngine; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.topn.TopNQueryBuilder; +import org.apache.druid.query.topn.TopNQueryConfig; import org.apache.druid.query.topn.TopNQueryEngine; import org.apache.druid.query.topn.TopNResultValue; import org.apache.druid.segment.CloserRule; @@ -391,6 +392,7 @@ public void testSingleValueTopN() throws IOException .threshold(10) .aggregators(new LongSumAggregatorFactory("cnt", "cnt")) .build(), + new TopNQueryConfig(), new IncrementalIndexSegment(index, SegmentId.dummy("test")), null )