Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reverse merge #27

Merged
merged 18 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
591f193
Add broker API to run a query on both query engines and compare resul…
yashmayya Sep 30, 2024
546e8b1
Cleanup PluginManager's incorrect usage of environment variables vs s…
yashmayya Sep 30, 2024
dac174e
Fix license headers in pinot-kafka-3.0 (#14112)
yashmayya Sep 30, 2024
96abef8
Add a server level config for segment server upload to deep store. (#…
raghavyadav01 Sep 30, 2024
b56585b
Bump log4j.version from 2.24.0 to 2.24.1 (#14116)
dependabot[bot] Sep 30, 2024
b3721ac
Bump org.mockito:mockito-core from 5.13.0 to 5.14.1 (#14119)
dependabot[bot] Sep 30, 2024
249d5ce
Bump com.puppycrawl.tools:checkstyle from 10.18.1 to 10.18.2 (#14121)
dependabot[bot] Sep 30, 2024
4ba11e5
Part-5: Fix Offset Handling and Effective Time Filter + Enable Group-…
ankitsultana Sep 30, 2024
f07685d
#14095 update the contribution link (#14096)
engrravijain Sep 30, 2024
3452ef9
Improve Adaptive Server Selection to penalize servers returning serve…
kiruphabalu Sep 30, 2024
19b79f4
Polymorphic binary arithmetic scalar functions (#14089)
yashmayya Sep 30, 2024
56ccbc3
Revert "Make ingestion offset delay metric configurable (#14074)" (#1…
KKcorps Oct 1, 2024
8130f26
Bump testcontainers.version from 1.20.1 to 1.20.2 (#14129)
dependabot[bot] Oct 1, 2024
8a227e0
Bump com.nimbusds:nimbus-jose-jwt from 9.41.1 to 9.41.2 (#14131)
dependabot[bot] Oct 1, 2024
f7521d5
Bump com.azure:azure-sdk-bom from 1.2.27 to 1.2.28 (#14132)
dependabot[bot] Oct 1, 2024
7585bce
Bump com.google.errorprone:error_prone_annotations from 2.32.0 to 2.3…
dependabot[bot] Oct 1, 2024
f64178c
Bump software.amazon.awssdk:bom from 2.28.6 to 2.28.12 (#14130)
dependabot[bot] Oct 1, 2024
50249f3
Refactor ForwardIndexHandlerTest and avoid using setters for IndexLoa…
Jackie-Jiang Oct 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Part-5: Fix Offset Handling and Effective Time Filter + Enable Group-…
…By Expressions + Add Unit Tests and Minor Cleanup (apache#14104)
  • Loading branch information
ankitsultana authored Sep 30, 2024
commit 4ba11e52a76252082772529acb8fa37e2d12fb00
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@


public class TimeSeriesContext {
private final String _engine;
private final String _language;
private final String _timeColumn;
private final TimeUnit _timeUnit;
private final TimeBuckets _timeBuckets;
private final Long _offsetSeconds;
private final ExpressionContext _valueExpression;
private final AggInfo _aggInfo;

public TimeSeriesContext(String engine, String timeColumn, TimeUnit timeUnit, TimeBuckets timeBuckets,
public TimeSeriesContext(String language, String timeColumn, TimeUnit timeUnit, TimeBuckets timeBuckets,
Long offsetSeconds, ExpressionContext valueExpression, AggInfo aggInfo) {
_engine = engine;
_language = language;
_timeColumn = timeColumn;
_timeUnit = timeUnit;
_timeBuckets = timeBuckets;
Expand All @@ -43,8 +43,8 @@ public TimeSeriesContext(String engine, String timeColumn, TimeUnit timeUnit, Ti
_aggInfo = aggInfo;
}

public String getEngine() {
return _engine;
public String getLanguage() {
return _language;
}

public String getTimeColumn() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.core.operator.timeseries;

import com.google.common.collect.ImmutableList;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -61,7 +62,7 @@ public class TimeSeriesAggregationOperator extends BaseOperator<TimeSeriesResult
public TimeSeriesAggregationOperator(
String timeColumn,
TimeUnit timeUnit,
Long timeOffset,
Long timeOffsetSeconds,
AggInfo aggInfo,
ExpressionContext valueExpression,
List<String> groupByExpressions,
Expand All @@ -70,7 +71,7 @@ public TimeSeriesAggregationOperator(
TimeSeriesBuilderFactory seriesBuilderFactory) {
_timeColumn = timeColumn;
_storedTimeUnit = timeUnit;
_timeOffset = timeOffset;
_timeOffset = timeUnit.convert(Duration.ofSeconds(timeOffsetSeconds));
_aggInfo = aggInfo;
_valueExpression = valueExpression;
_groupByExpressions = groupByExpressions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ private BaseCombineOperator getCombineOperator() {

if (QueryContextUtils.isTimeSeriesQuery(_queryContext)) {
return new TimeSeriesCombineOperator(new TimeSeriesAggResultsBlockMerger(
TimeSeriesBuilderFactoryProvider.getSeriesBuilderFactory(_queryContext.getTimeSeriesContext().getEngine()),
TimeSeriesBuilderFactoryProvider.getSeriesBuilderFactory(_queryContext.getTimeSeriesContext().getLanguage()),
_queryContext.getTimeSeriesContext().getAggInfo()), operators, _queryContext, _executorService);
} else if (_streamer != null
&& QueryContextUtils.isSelectionOnlyQuery(_queryContext) && _queryContext.getLimit() != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public TimeSeriesPlanNode(SegmentContext segmentContext, QueryContext queryConte
_queryContext = queryContext;
_timeSeriesContext = Objects.requireNonNull(queryContext.getTimeSeriesContext(),
"Missing time-series context in TimeSeriesPlanNode");
_seriesBuilderFactory = TimeSeriesBuilderFactoryProvider.getSeriesBuilderFactory(_timeSeriesContext.getEngine());
_seriesBuilderFactory = TimeSeriesBuilderFactoryProvider.getSeriesBuilderFactory(_timeSeriesContext.getLanguage());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public class QueryExecutorTest {
private static final int NUM_SEGMENTS_TO_GENERATE = 2;
private static final int NUM_EMPTY_SEGMENTS_TO_GENERATE = 2;
private static final ExecutorService QUERY_RUNNERS = Executors.newFixedThreadPool(20);
private static final String TIME_SERIES_ENGINE_NAME = "QueryExecutorTest";
private static final String TIME_SERIES_LANGUAGE_NAME = "QueryExecutorTest";
private static final String TIME_SERIES_TIME_COL_NAME = "orderCreatedTimestamp";
private static final Long TIME_SERIES_TEST_START_TIME = 1726228400L;

Expand Down Expand Up @@ -171,7 +171,7 @@ public void setUp()
_queryExecutor.init(new PinotConfiguration(queryExecutorConfig), instanceDataManager, ServerMetrics.get());

// Setup time series builder factory
TimeSeriesBuilderFactoryProvider.registerSeriesBuilderFactory(TIME_SERIES_ENGINE_NAME,
TimeSeriesBuilderFactoryProvider.registerSeriesBuilderFactory(TIME_SERIES_LANGUAGE_NAME,
new SimpleTimeSeriesBuilderFactory());
}

Expand Down Expand Up @@ -219,7 +219,7 @@ public void testMinQuery() {
public void testTimeSeriesSumQuery() {
TimeBuckets timeBuckets = TimeBuckets.ofSeconds(TIME_SERIES_TEST_START_TIME, Duration.ofMinutes(1), 100);
ExpressionContext valueExpression = ExpressionContext.forIdentifier("orderAmount");
TimeSeriesContext timeSeriesContext = new TimeSeriesContext(TIME_SERIES_ENGINE_NAME, TIME_SERIES_TIME_COL_NAME,
TimeSeriesContext timeSeriesContext = new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME,
TimeUnit.SECONDS, timeBuckets, 0L /* offsetSeconds */, valueExpression, new AggInfo("SUM"));
QueryContext queryContext = getQueryContextForTimeSeries(timeSeriesContext);
ServerQueryRequest serverQueryRequest = new ServerQueryRequest(
Expand All @@ -235,7 +235,7 @@ public void testTimeSeriesSumQuery() {
public void testTimeSeriesMaxQuery() {
TimeBuckets timeBuckets = TimeBuckets.ofSeconds(TIME_SERIES_TEST_START_TIME, Duration.ofMinutes(1), 100);
ExpressionContext valueExpression = ExpressionContext.forIdentifier("orderItemCount");
TimeSeriesContext timeSeriesContext = new TimeSeriesContext(TIME_SERIES_ENGINE_NAME, TIME_SERIES_TIME_COL_NAME,
TimeSeriesContext timeSeriesContext = new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME,
TimeUnit.SECONDS, timeBuckets, 0L /* offsetSeconds */, valueExpression, new AggInfo("MAX"));
QueryContext queryContext = getQueryContextForTimeSeries(timeSeriesContext);
ServerQueryRequest serverQueryRequest = new ServerQueryRequest(
Expand Down Expand Up @@ -267,7 +267,7 @@ public void testTimeSeriesMaxQuery() {
public void testTimeSeriesMinQuery() {
TimeBuckets timeBuckets = TimeBuckets.ofSeconds(TIME_SERIES_TEST_START_TIME, Duration.ofMinutes(1), 100);
ExpressionContext valueExpression = ExpressionContext.forIdentifier("orderItemCount");
TimeSeriesContext timeSeriesContext = new TimeSeriesContext(TIME_SERIES_ENGINE_NAME, TIME_SERIES_TIME_COL_NAME,
TimeSeriesContext timeSeriesContext = new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME,
TimeUnit.SECONDS, timeBuckets, 0L /* offsetSeconds */, valueExpression, new AggInfo("MIN"));
QueryContext queryContext = getQueryContextForTimeSeries(timeSeriesContext);
ServerQueryRequest serverQueryRequest = new ServerQueryRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ public void initLeafPlanNode(BaseTimeSeriesPlanNode planNode, TimeSeriesExecutio
for (int index = 0; index < planNode.getChildren().size(); index++) {
BaseTimeSeriesPlanNode childNode = planNode.getChildren().get(index);
if (childNode instanceof LeafTimeSeriesPlanNode) {
LeafTimeSeriesPlanNode sfpNode = (LeafTimeSeriesPlanNode) childNode;
List<String> segments = context.getPlanIdToSegmentsMap().get(sfpNode.getId());
ServerQueryRequest serverQueryRequest = compileLeafServerQueryRequest(sfpNode, segments, context);
LeafTimeSeriesPlanNode leafNode = (LeafTimeSeriesPlanNode) childNode;
List<String> segments = context.getPlanIdToSegmentsMap().get(leafNode.getId());
ServerQueryRequest serverQueryRequest = compileLeafServerQueryRequest(leafNode, segments, context);
TimeSeriesPhysicalTableScan physicalTableScan = new TimeSeriesPhysicalTableScan(childNode.getId(),
serverQueryRequest, _queryExecutor, _executorService);
planNode.getChildren().set(index, physicalTableScan);
Expand All @@ -75,26 +75,24 @@ public void initLeafPlanNode(BaseTimeSeriesPlanNode planNode, TimeSeriesExecutio
}
}

public ServerQueryRequest compileLeafServerQueryRequest(LeafTimeSeriesPlanNode sfpNode, List<String> segments,
public ServerQueryRequest compileLeafServerQueryRequest(LeafTimeSeriesPlanNode leafNode, List<String> segments,
TimeSeriesExecutionContext context) {
return new ServerQueryRequest(compileQueryContext(sfpNode, context),
return new ServerQueryRequest(compileQueryContext(leafNode, context),
segments, /* TODO: Pass metadata from request */ Collections.emptyMap(), _serverMetrics);
}

public QueryContext compileQueryContext(LeafTimeSeriesPlanNode sfpNode, TimeSeriesExecutionContext context) {
public QueryContext compileQueryContext(LeafTimeSeriesPlanNode leafNode, TimeSeriesExecutionContext context) {
FilterContext filterContext =
RequestContextUtils.getFilter(CalciteSqlParser.compileToExpression(
sfpNode.getEffectiveFilter(context.getInitialTimeBuckets())));
List<ExpressionContext> groupByExpressions = sfpNode.getGroupByColumns().stream()
.map(ExpressionContext::forIdentifier).collect(Collectors.toList());
ExpressionContext valueExpression = RequestContextUtils.getExpression(sfpNode.getValueExpression());
leafNode.getEffectiveFilter(context.getInitialTimeBuckets())));
List<ExpressionContext> groupByExpressions = leafNode.getGroupByExpressions().stream()
.map(RequestContextUtils::getExpression).collect(Collectors.toList());
ExpressionContext valueExpression = RequestContextUtils.getExpression(leafNode.getValueExpression());
TimeSeriesContext timeSeriesContext = new TimeSeriesContext(context.getLanguage(),
sfpNode.getTimeColumn(),
sfpNode.getTimeUnit(), context.getInitialTimeBuckets(), sfpNode.getOffset(),
valueExpression,
sfpNode.getAggInfo());
leafNode.getTimeColumn(), leafNode.getTimeUnit(), context.getInitialTimeBuckets(), leafNode.getOffsetSeconds(),
valueExpression, leafNode.getAggInfo());
return new QueryContext.Builder()
.setTableName(sfpNode.getTableName())
.setTableName(leafNode.getTableName())
.setFilter(filterContext)
.setGroupByExpressions(groupByExpressions)
.setSelectExpressions(Collections.emptyList())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.query.runtime.timeseries;

import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.tsdb.spi.AggInfo;
import org.apache.pinot.tsdb.spi.TimeBuckets;
import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;
import org.testng.annotations.Test;

import static org.testng.Assert.*;


public class PhysicalTimeSeriesPlanVisitorTest {
@Test
public void testCompileQueryContext() {
final String planId = "id";
final String tableName = "orderTable";
final String timeColumn = "orderTime";
final AggInfo aggInfo = new AggInfo("SUM");
final String filterExpr = "cityName = 'Chicago'";
// Case-1: Without offset, simple column based group-by expression, simple column based value, and non-empty filter.
{
TimeSeriesExecutionContext context =
new TimeSeriesExecutionContext("m3ql", TimeBuckets.ofSeconds(1000L, Duration.ofSeconds(10), 100),
Collections.emptyMap());
LeafTimeSeriesPlanNode leafNode =
new LeafTimeSeriesPlanNode(planId, Collections.emptyList(), tableName, timeColumn, TimeUnit.SECONDS, 0L,
filterExpr, "orderCount", aggInfo, Collections.singletonList("cityName"));
QueryContext queryContext = PhysicalTimeSeriesPlanVisitor.INSTANCE.compileQueryContext(leafNode, context);
assertNotNull(queryContext.getTimeSeriesContext());
assertEquals(queryContext.getTimeSeriesContext().getLanguage(), "m3ql");
assertEquals(queryContext.getTimeSeriesContext().getOffsetSeconds(), 0L);
assertEquals(queryContext.getTimeSeriesContext().getTimeColumn(), timeColumn);
assertEquals(queryContext.getTimeSeriesContext().getValueExpression().getIdentifier(), "orderCount");
assertEquals(queryContext.getFilter().toString(),
"(cityName = 'Chicago' AND orderTime >= '1000' AND orderTime <= '2000')");
}
// Case-2: With offset, complex group-by expression, complex value, and non-empty filter
{
TimeSeriesExecutionContext context =
new TimeSeriesExecutionContext("m3ql", TimeBuckets.ofSeconds(1000L, Duration.ofSeconds(10), 100),
Collections.emptyMap());
LeafTimeSeriesPlanNode leafNode =
new LeafTimeSeriesPlanNode(planId, Collections.emptyList(), tableName, timeColumn, TimeUnit.SECONDS, 10L,
filterExpr, "orderCount*2", aggInfo, Collections.singletonList("concat(cityName, stateName, '-')"));
QueryContext queryContext = PhysicalTimeSeriesPlanVisitor.INSTANCE.compileQueryContext(leafNode, context);
assertNotNull(queryContext);
assertNotNull(queryContext.getGroupByExpressions());
assertEquals("concat(cityName,stateName,'-')", queryContext.getGroupByExpressions().get(0).toString());
assertNotNull(queryContext.getTimeSeriesContext());
assertEquals(queryContext.getTimeSeriesContext().getLanguage(), "m3ql");
assertEquals(queryContext.getTimeSeriesContext().getOffsetSeconds(), 10L);
assertEquals(queryContext.getTimeSeriesContext().getTimeColumn(), timeColumn);
assertEquals(queryContext.getTimeSeriesContext().getValueExpression().toString(), "times(orderCount,'2')");
assertNotNull(queryContext.getFilter());
assertEquals(queryContext.getFilter().toString(),
"(cityName = 'Chicago' AND orderTime >= '990' AND orderTime <= '1990')");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,30 +40,29 @@ public class LeafTimeSeriesPlanNode extends BaseTimeSeriesPlanNode {
private final String _tableName;
private final String _timeColumn;
private final TimeUnit _timeUnit;
private final Long _offset;
private final Long _offsetSeconds;
private final String _filterExpression;
private final String _valueExpression;
private final AggInfo _aggInfo;
private final List<String> _groupByColumns;
private final List<String> _groupByExpressions;

@JsonCreator
public LeafTimeSeriesPlanNode(
@JsonProperty("id") String id, @JsonProperty("children") List<BaseTimeSeriesPlanNode> children,
@JsonProperty("tableName") String tableName, @JsonProperty("timeColumn") String timeColumn,
@JsonProperty("timeUnit") TimeUnit timeUnit, @JsonProperty("offset") Long offset,
@JsonProperty("timeUnit") TimeUnit timeUnit, @JsonProperty("offsetSeconds") Long offsetSeconds,
@JsonProperty("filterExpression") String filterExpression,
@JsonProperty("valueExpression") String valueExpression,
@JsonProperty("aggInfo") AggInfo aggInfo, @JsonProperty("groupByColumns") List<String> groupByColumns) {
@JsonProperty("valueExpression") String valueExpression, @JsonProperty("aggInfo") AggInfo aggInfo,
@JsonProperty("groupByExpressions") List<String> groupByExpressions) {
super(id, children);
_tableName = tableName;
_timeColumn = timeColumn;
_timeUnit = timeUnit;
// TODO: This is broken technically. Adjust offset to meet TimeUnit resolution. For now use 0 offset.
_offset = offset;
_offsetSeconds = offsetSeconds;
_filterExpression = filterExpression;
_valueExpression = valueExpression;
_aggInfo = aggInfo;
_groupByColumns = groupByColumns;
_groupByExpressions = groupByExpressions;
}

@Override
Expand All @@ -78,7 +77,7 @@ public String getExplainName() {

@Override
public BaseTimeSeriesOperator run() {
throw new UnsupportedOperationException("");
throw new UnsupportedOperationException("Leaf plan node is replaced with a physical plan node at runtime");
}

public String getTableName() {
Expand All @@ -93,8 +92,8 @@ public TimeUnit getTimeUnit() {
return _timeUnit;
}

public Long getOffset() {
return _offset;
public Long getOffsetSeconds() {
return _offsetSeconds;
}

public String getFilterExpression() {
Expand All @@ -109,15 +108,16 @@ public AggInfo getAggInfo() {
return _aggInfo;
}

public List<String> getGroupByColumns() {
return _groupByColumns;
public List<String> getGroupByExpressions() {
return _groupByExpressions;
}

public String getEffectiveFilter(TimeBuckets timeBuckets) {
String filter = _filterExpression == null ? "" : _filterExpression;
// TODO: This is wrong. offset should be converted to seconds before arithmetic. For now use 0 offset.
long startTime = _timeUnit.convert(Duration.ofSeconds(timeBuckets.getStartTime() - _offset));
long endTime = _timeUnit.convert(Duration.ofSeconds(timeBuckets.getEndTime() - _offset));
long startTime = _timeUnit.convert(Duration.ofSeconds(timeBuckets.getStartTime() - _offsetSeconds));
long endTime =
_timeUnit.convert(Duration.ofSeconds(
timeBuckets.getEndTime() + timeBuckets.getBucketSize().toSeconds() - _offsetSeconds));
String addnFilter = String.format("%s >= %d AND %s <= %d", _timeColumn, startTime, _timeColumn, endTime);
if (filter.strip().isEmpty()) {
return addnFilter;
Expand Down
Loading