Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
74ac4d2
Add Pre-Aggregation Gapfilling functionality.
weixiangsun Jan 14, 2022
2f7d16a
Revert "Add Pre-Aggregation Gapfilling functionality."
weixiangsun Jan 14, 2022
39df1d3
Merge branch 'apache:master' into master
weixiangsun Jan 27, 2022
eb92ba4
Merge branch 'apache:master' into master
weixiangsun Feb 4, 2022
de3e482
Merge branch 'apache:master' into master
weixiangsun Feb 7, 2022
75e5ac1
Merge branch 'apache:master' into master
weixiangsun Feb 11, 2022
e7fa7a8
Merge branch 'apache:master' into master
weixiangsun Feb 11, 2022
54a867c
Merge branch 'apache:master' into master
weixiangsun Feb 15, 2022
aa69201
Merge branch 'apache:master' into master
weixiangsun Feb 16, 2022
1620eb9
Merge branch 'apache:master' into master
weixiangsun Feb 21, 2022
2c854b9
Merge branch 'apache:master' into master
weixiangsun Feb 26, 2022
d00abbc
Merge branch 'apache:master' into master
weixiangsun Mar 7, 2022
0b25ddd
Merge branch 'apache:master' into master
weixiangsun Mar 13, 2022
d2e49c4
Merge branch 'apache:master' into master
weixiangsun Mar 15, 2022
e9a8962
Merge branch 'apache:master' into master
weixiangsun Mar 16, 2022
4e48979
Merge branch 'apache:master' into master
weixiangsun Mar 18, 2022
0f7b231
Merge branch 'apache:master' into master
weixiangsun Mar 18, 2022
a4d70ef
Merge branch 'apache:master' into master
weixiangsun Mar 21, 2022
b786f7d
Merge branch 'apache:master' into master
weixiangsun Mar 21, 2022
56550cb
Merge branch 'apache:master' into master
weixiangsun Mar 21, 2022
e625692
Merge branch 'apache:master' into master
weixiangsun Mar 21, 2022
cea55b2
Merge branch 'apache:master' into master
weixiangsun Mar 22, 2022
80b0d96
Merge branch 'apache:master' into master
weixiangsun Mar 22, 2022
aa1ec40
Merge branch 'apache:master' into master
weixiangsun Mar 24, 2022
128905f
Merge branch 'apache:master' into master
weixiangsun Mar 25, 2022
01d063e
Merge branch 'apache:master' into master
weixiangsun Mar 28, 2022
06594ed
Make Gapfill scalable for big data.
Mar 29, 2022
08dfcc5
Fix the bug.
Apr 4, 2022
2443c30
Avoid scanning the raw rows twice.
Apr 11, 2022
142f4f1
Merge branch 'apache:master' into weixiang-gapfill-scalability
weixiangsun Apr 13, 2022
423e206
Merge branch 'apache:master' into weixiang-gapfill-scalability
weixiangsun May 6, 2022
8e73a82
Make ScalabilityGapfillProcessor support the aggregation buckets.
May 6, 2022
f2d9984
Code refactoring.
May 6, 2022
c06454b
Code refactoring.
May 6, 2022
a8cc2c3
Fix the bug.
May 10, 2022
d7056cc
Code Refactoring.
May 12, 2022
2065e58
Merge branch 'apache:master' into weixiang-gapfill-scalability
weixiangsun May 12, 2022
91a0e39
Add more javadoc.
May 12, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.query.reduce;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FunctionContext;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.data.table.Key;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.aggregation.function.AggregationFunctionFactory;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.util.GapfillUtils;
import org.apache.pinot.spi.data.DateTimeFormatSpec;
import org.apache.pinot.spi.data.DateTimeGranularitySpec;


/**
* Helper class to reduce and set gap fill results into the BrokerResponseNative
*/
abstract class BaseGapfillProcessor {
protected final QueryContext _queryContext;

protected final int _limitForAggregatedResult;
protected final DateTimeGranularitySpec _gapfillDateTimeGranularity;
protected final DateTimeGranularitySpec _postGapfillDateTimeGranularity;
protected final DateTimeFormatSpec _dateTimeFormatter;
protected final long _startMs;
protected final long _endMs;
protected final long _gapfillTimeBucketSize;
protected final long _postGapfillTimeBucketSize;
protected final int _numOfTimeBuckets;
protected final List<Integer> _groupByKeyIndexes;
protected final Map<Key, Object[]> _previousByGroupKey;
protected long _count = 0;
protected final List<ExpressionContext> _timeSeries;
protected final int _timeBucketColumnIndex;
protected GapfillFilterHandler _postGapfillFilterHandler = null;
protected GapfillFilterHandler _postAggregateHavingFilterHandler = null;
protected final int _aggregationSize;
protected final GapfillUtils.GapfillType _gapfillType;
protected int _limitForGapfilledResult;
protected boolean[] _isGroupBySelections;
protected ExpressionContext _gapFillSelection;

BaseGapfillProcessor(QueryContext queryContext, GapfillUtils.GapfillType gapfillType) {
_queryContext = queryContext;
_limitForAggregatedResult = queryContext.getLimit();
_gapfillType = gapfillType;

if (_gapfillType == GapfillUtils.GapfillType.AGGREGATE_GAP_FILL
|| _gapfillType == GapfillUtils.GapfillType.GAP_FILL) {
_limitForGapfilledResult = queryContext.getLimit();
} else {
_limitForGapfilledResult = queryContext.getSubquery().getLimit();
}

_gapFillSelection = GapfillUtils.getGapfillExpressionContext(queryContext, gapfillType);
_timeBucketColumnIndex = GapfillUtils.findTimeBucketColumnIndex(queryContext, gapfillType);

List<ExpressionContext> args = _gapFillSelection.getFunction().getArguments();

_dateTimeFormatter = new DateTimeFormatSpec(args.get(1).getLiteral());
_gapfillDateTimeGranularity = new DateTimeGranularitySpec(args.get(4).getLiteral());
if (args.get(5).getLiteral() == null) {
_postGapfillDateTimeGranularity = _gapfillDateTimeGranularity;
} else {
_postGapfillDateTimeGranularity = new DateTimeGranularitySpec(args.get(5).getLiteral());
}
String start = args.get(2).getLiteral();
_startMs = truncate(_dateTimeFormatter.fromFormatToMillis(start));
String end = args.get(3).getLiteral();
_endMs = truncate(_dateTimeFormatter.fromFormatToMillis(end));
_gapfillTimeBucketSize = _gapfillDateTimeGranularity.granularityToMillis();
_postGapfillTimeBucketSize = _postGapfillDateTimeGranularity.granularityToMillis();
_numOfTimeBuckets = (int) ((_endMs - _startMs) / _gapfillTimeBucketSize);

_aggregationSize = (int) (_postGapfillTimeBucketSize / _gapfillTimeBucketSize);

_previousByGroupKey = new HashMap<>();
_groupByKeyIndexes = new ArrayList<>();

ExpressionContext timeseriesOn = GapfillUtils.getTimeSeriesOnExpressionContext(_gapFillSelection);
_timeSeries = timeseriesOn.getFunction().getArguments();
}

protected int findGapfillBucketIndex(long time) {
return (int) ((time - _startMs) / _gapfillTimeBucketSize);
}

protected void replaceColumnNameWithAlias(DataSchema dataSchema) {
QueryContext queryContext;
if (_gapfillType == GapfillUtils.GapfillType.AGGREGATE_GAP_FILL_AGGREGATE) {
queryContext = _queryContext.getSubquery().getSubquery();
} else if (_gapfillType == GapfillUtils.GapfillType.GAP_FILL) {
queryContext = _queryContext;
} else {
queryContext = _queryContext.getSubquery();
}
List<String> aliasList = queryContext.getAliasList();
Map<String, String> columnNameToAliasMap = new HashMap<>();
for (int i = 0; i < aliasList.size(); i++) {
if (aliasList.get(i) != null) {
ExpressionContext selection = queryContext.getSelectExpressions().get(i);
if (GapfillUtils.isGapfill(selection)) {
selection = selection.getFunction().getArguments().get(0);
}
columnNameToAliasMap.put(selection.toString(), aliasList.get(i));
}
}
for (int i = 0; i < dataSchema.getColumnNames().length; i++) {
if (columnNameToAliasMap.containsKey(dataSchema.getColumnNames()[i])) {
dataSchema.getColumnNames()[i] = columnNameToAliasMap.get(dataSchema.getColumnNames()[i]);
}
}
}

/**
* Here are three things that happen
* 1. Sort the result sets from all pinot servers based on timestamp
* 2. Gapfill the data for missing entities per time bucket
* 3. Aggregate the dataset per time bucket.
*/
public void process(BrokerResponseNative brokerResponseNative) {
DataSchema dataSchema = brokerResponseNative.getResultTable().getDataSchema();
DataSchema resultTableSchema = getResultTableDataSchema(dataSchema);
if (brokerResponseNative.getResultTable().getRows().isEmpty()) {
brokerResponseNative.setResultTable(new ResultTable(resultTableSchema, Collections.emptyList()));
return;
}

String[] columns = dataSchema.getColumnNames();

Map<String, Integer> indexes = new HashMap<>();
for (int i = 0; i < columns.length; i++) {
indexes.put(columns[i], i);
}

_isGroupBySelections = new boolean[dataSchema.getColumnDataTypes().length];

// The first one argument of timeSeries is time column. The left ones are defining entity.
for (ExpressionContext entityColum : _timeSeries) {
int index = indexes.get(entityColum.getIdentifier());
_isGroupBySelections[index] = true;
}

for (int i = 0; i < _isGroupBySelections.length; i++) {
if (_isGroupBySelections[i]) {
_groupByKeyIndexes.add(i);
}
}

List<Object[]> rows = brokerResponseNative.getResultTable().getRows();
replaceColumnNameWithAlias(dataSchema);
List<Object[]> resultRows = gapFillAndAggregate(rows, dataSchema, resultTableSchema);
brokerResponseNative.setResultTable(new ResultTable(resultTableSchema, resultRows));
}

/**
* Constructs the DataSchema for the ResultTable.
*/
protected DataSchema getResultTableDataSchema(DataSchema dataSchema) {
if (_gapfillType == GapfillUtils.GapfillType.GAP_FILL) {
return dataSchema;
}

int numOfColumns = _queryContext.getSelectExpressions().size();
String[] columnNames = new String[numOfColumns];
ColumnDataType[] columnDataTypes = new ColumnDataType[numOfColumns];
for (int i = 0; i < numOfColumns; i++) {
ExpressionContext expressionContext = _queryContext.getSelectExpressions().get(i);
if (GapfillUtils.isGapfill(expressionContext)) {
expressionContext = expressionContext.getFunction().getArguments().get(0);
}
if (expressionContext.getType() != ExpressionContext.Type.FUNCTION) {
columnNames[i] = expressionContext.getIdentifier();
columnDataTypes[i] = dataSchema.getColumnDataType(_timeBucketColumnIndex);
} else {
FunctionContext functionContext = expressionContext.getFunction();
AggregationFunction aggregationFunction =
AggregationFunctionFactory.getAggregationFunction(functionContext, _queryContext);
columnDataTypes[i] = aggregationFunction.getFinalResultColumnType();
columnNames[i] = functionContext.toString();
}
}
return new DataSchema(columnNames, columnDataTypes);
}

protected Key constructGroupKeys(Object[] row) {
Object[] groupKeys = new Object[_groupByKeyIndexes.size()];
for (int i = 0; i < _groupByKeyIndexes.size(); i++) {
groupKeys[i] = row[_groupByKeyIndexes.get(i)];
}
return new Key(groupKeys);
}

protected long truncate(long epoch) {
int sz = _gapfillDateTimeGranularity.getSize();
return epoch / sz * sz;
}

protected List<Object[]> gapFillAndAggregate(
List<Object[]> rows, DataSchema dataSchema, DataSchema resultTableSchema) {
throw new UnsupportedOperationException("Not supported");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,9 @@ public BrokerResponseNative reduceOnDataTable(BrokerRequest brokerRequest, Broke
} else {
queryContext = QueryContextConverterUtils.getQueryContext(brokerRequest.getPinotQuery());
GapfillUtils.GapfillType gapfillType = GapfillUtils.getGapfillType(queryContext);
if (gapfillType != null) {
GapfillProcessor gapfillProcessor = new GapfillProcessor(queryContext, gapfillType);
gapfillProcessor.process(brokerResponseNative);
}
BaseGapfillProcessor gapfillProcessor =
GapfillProcessorFactory.getGapfillProcessor(queryContext, gapfillType);
gapfillProcessor.process(brokerResponseNative);
}

updateAlias(queryContext, brokerResponseNative);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.query.reduce;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.data.table.Key;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.util.GapfillUtils;


/**
* Helper class to reduce and set gap fill results into the BrokerResponseNative
*
* {@link CountGapfillProcessor} is only applying the count aggregation function on the gapfilled result for each time
* bucket.
*
* {@link CountGapfillProcessor} is different from {@link GapfillProcessor} that {@link GapfillProcessor} will create
* the gapfilled entries, but {@link CountGapfillProcessor} does not generate the gapfilled entries. It just updated
* the aggregated count as necessary.
*/
class CountGapfillProcessor extends BaseGapfillProcessor {
protected final Set<Key> _filteredSet;

CountGapfillProcessor(QueryContext queryContext, GapfillUtils.GapfillType gapfillType) {
super(queryContext, gapfillType);
_filteredSet = new HashSet<>();
}

@Override
protected List<Object[]> gapFillAndAggregate(
List<Object[]> rows, DataSchema dataSchema, DataSchema resultTableSchema) {
DataSchema.ColumnDataType timeColumnDataType = resultTableSchema.getColumnDataTypes()[0];
if (_queryContext.getSubquery() != null && _queryContext.getFilter() != null) {
_postGapfillFilterHandler = new GapfillFilterHandler(_queryContext.getFilter(), dataSchema);
}
if (_queryContext.getHavingFilter() != null) {
_postAggregateHavingFilterHandler =
new GapfillFilterHandler(_queryContext.getHavingFilter(), resultTableSchema);
}

int rowIndex = 0;
while (rowIndex < rows.size()) {
Object[] row = rows.get(rowIndex);
long rowTimestamp = extractTimeColumn(row, timeColumnDataType);
int bucketIndex = findGapfillBucketIndex(rowTimestamp);
if (bucketIndex >= 0) {
break;
}
updateCounter(row);
rowIndex++;
}

List<Object[]> result = new ArrayList<>();

long aggregatedCount = 0;
for (long time = _startMs; time < _endMs; time += _gapfillTimeBucketSize) {
while (rowIndex < rows.size()) {
Object[] row = rows.get(rowIndex);
long rowTimestamp = extractTimeColumn(row, timeColumnDataType);
if (rowTimestamp == time) {
updateCounter(row);
rowIndex++;
} else {
break;
}
}
int timeBucketIndex = findGapfillBucketIndex(time);
aggregatedCount += _count;
if (aggregatedCount > 0 && (timeBucketIndex + 1) % _aggregationSize == 0) {
Object[] aggregatedRow = new Object[_queryContext.getSelectExpressions().size()];
long aggregationTimeBucketTimestamp = time - (_aggregationSize - 1) * _gapfillTimeBucketSize;
aggregatedRow[0] = (timeColumnDataType == DataSchema.ColumnDataType.LONG)
? aggregationTimeBucketTimestamp : _dateTimeFormatter.fromMillisToFormat(aggregationTimeBucketTimestamp);
aggregatedRow[1] = aggregatedCount;
aggregatedCount = 0;
if (_postAggregateHavingFilterHandler == null || _postAggregateHavingFilterHandler.isMatch(aggregatedRow)) {
result.add(aggregatedRow);
}
if (result.size() >= _limitForAggregatedResult) {
return result;
}
}
}
return result;
}

private long extractTimeColumn(Object[] row, DataSchema.ColumnDataType columnDataType) {
if (columnDataType == DataSchema.ColumnDataType.LONG) {
return (Long) row[_timeBucketColumnIndex];
} else {
return _dateTimeFormatter.fromFormatToMillis((String) row[_timeBucketColumnIndex]);
}
}

private void updateCounter(Object[] row) {
Key key = constructGroupKeys(row);
boolean isFilter = _postGapfillFilterHandler == null || _postGapfillFilterHandler.isMatch(row);
if (_filteredSet.contains(key) != isFilter) {
if (isFilter) {
_count++;
} else {
_count--;
}
}
if (isFilter) {
_filteredSet.add(key);
} else {
_filteredSet.remove(key);
}
}
}
Loading