Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pinot-integration-test-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@
<version>${project.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-query-runtime</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-minion-builtin-tasks</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
import org.apache.pinot.query.runtime.QueryRunnerTestBase;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
Expand Down Expand Up @@ -700,6 +701,9 @@ private static void testQueryInternal(String pinotQuery, String queryResourceUrl
throws Exception {
// broker response
JsonNode pinotResponse;
if (useMultiStageQueryEngine) {
pinotQuery = QueryRunnerTestBase.appendLargeLimitClause(pinotQuery);
}
if (viaController) {
pinotResponse = ClusterTest.postQueryToController(pinotQuery, queryResourceUrl, headers, extraJsonProperties);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractorConfig;
import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
import org.apache.pinot.query.runtime.QueryRunnerTestBase;
import org.apache.pinot.server.starter.helix.BaseServerStarter;
import org.apache.pinot.server.starter.helix.HelixServerStarter;
import org.apache.pinot.spi.config.table.TableType;
Expand Down Expand Up @@ -448,6 +449,9 @@ protected JsonNode getDebugInfo(final String uri)
*/
protected JsonNode postQuery(String query)
throws Exception {
if (useMultiStageQueryEngine()) {
query = QueryRunnerTestBase.appendLargeLimitClause(query);
}
return postQuery(query, getBrokerQueryApiUrl(getBrokerBaseApiUrl(), useMultiStageQueryEngine()), null,
getExtraQueryProperties());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ public void testGeneratedQueries()
@Test
public void testQueryOptions()
throws Exception {
String pinotQuery = "SET multistageLeafLimit = 1; SELECT * FROM mytable;";
String h2Query = "SELECT * FROM mytable limit 1";
testQueryWithMatchingRowCount(pinotQuery, h2Query);
String query = "SET multistageLeafLimit = 1; SELECT * FROM mytable a JOIN mytable b on a.AirlineID= b.AirlineID;";
JsonNode jsonNode = postQuery(query);
assertEquals(jsonNode.get("resultTable").get("rows").size(), 1);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.client.ResultSetGroup;
import org.apache.pinot.query.runtime.QueryRunnerTestBase;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.tools.utils.JarUtils;
Expand Down Expand Up @@ -178,6 +179,7 @@ public static Object[][] queryDataProvider() {
.getResourceAsStream(SSB_QUERY_SET_RESOURCE_NAME);
Map<String, List<String>> ssbQuerySet = yaml.load(inputStream);
List<String> ssbQueryList = ssbQuerySet.get("sqls");
return ssbQueryList.stream().map(s -> new Object[]{s}).toArray(Object[][]::new);
return ssbQueryList.stream().map(s -> new Object[]{QueryRunnerTestBase.appendLargeLimitClause(s)})
.toArray(Object[][]::new);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.pinot.client.ResultSetGroup;
import org.apache.pinot.query.runtime.QueryRunnerTestBase;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.tools.utils.JarUtils;
Expand Down Expand Up @@ -173,7 +174,8 @@ public static Object[][] queryDataProvider()
try (InputStream inputStream = TPCHQueryIntegrationTest.class.getClassLoader()
.getResourceAsStream(path)) {
queries[iter] = new Object[1];
queries[iter][0] = IOUtils.toString(Objects.requireNonNull(inputStream), Charset.defaultCharset());
queries[iter][0] = QueryRunnerTestBase.appendLargeLimitClause(
IOUtils.toString(Objects.requireNonNull(inputStream), Charset.defaultCharset()));
iter++;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,24 @@
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.hint.HintStrategyTable;
import org.apache.calcite.rel.hint.PinotHintStrategyTable;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalCorrelate;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.calcite.rel.rules.PinotQueryRuleSets;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.runtime.CalciteContextException;
import org.apache.calcite.sql.SqlExplain;
import org.apache.calcite.sql.SqlExplainFormat;
import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.fun.PinotOperatorTable;
import org.apache.calcite.sql.type.BasicSqlType;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.util.PinotChainedSqlOperatorTable;
import org.apache.calcite.sql2rel.PinotConvertletTable;
import org.apache.calcite.sql2rel.RelDecorrelator;
Expand Down Expand Up @@ -282,7 +289,39 @@ protected RelRoot compileQuery(SqlNode sqlNode, PlannerContext plannerContext)
RelRoot relation = toRelation(validated, plannerContext);
RelRoot decorrelated = decorrelateIfNeeded(relation);
RelNode optimized = optimize(decorrelated, plannerContext);
return relation.withRel(optimized);
RelNode appliedRelation = applyDefaultLimitIfNeeded(optimized);
return relation.withRel(appliedRelation);
}

private RelNode applyDefaultLimitIfNeeded(RelNode relNode) {
if (relNode instanceof LogicalAggregate) {
LogicalAggregate logicalAggregate = (LogicalAggregate) relNode;
if (logicalAggregate.getGroupSets().size() == 1 && logicalAggregate.getGroupSets().get(0).isEmpty()) {
// Don't apply default limit if the query is aggregation only.
return relNode;
}
}
if (relNode instanceof LogicalProject) {
LogicalProject logicalProject = (LogicalProject) relNode;
if (logicalProject.getProjects().size() == 1) {
RexNode project = logicalProject.getProjects().get(0);
if (project.getKind().equals(SqlKind.AGGREGATE)) {
// Don't apply default limit if the query is aggregation only.
return relNode;
}
}
}
if (!(relNode instanceof LogicalSort)) {
relNode = RelBuilder.create(_config).push(relNode).limit(0, 10).build();
} else {
LogicalSort logicalSort = (LogicalSort) relNode;
if (logicalSort.fetch == null) {
RexLiteral fetch = new RexBuilder(_typeFactory).makeLiteral(10,
new BasicSqlType(_typeFactory.getTypeSystem(), SqlTypeName.INTEGER));
relNode = LogicalSort.create(logicalSort.getInput(), logicalSort.getCollation(), logicalSort.offset, fetch);
}
}
return relNode;
}

private RelRoot decorrelateIfNeeded(RelRoot relRoot) {
Expand Down
Loading