Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@
import org.apache.pinot.core.data.table.Record;
import org.apache.pinot.core.data.table.Table;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
import org.apache.pinot.core.query.distinct.DistinctTable;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.NullValueUtils;
Expand All @@ -52,6 +54,8 @@

/**
* The <code>IntermediateResultsBlock</code> class is the holder of the server side inter-segment results.
*
* TODO: Break it into segment level result and server level result
*/
@SuppressWarnings("rawtypes")
public class IntermediateResultsBlock implements Block {
Expand All @@ -61,8 +65,10 @@ public class IntermediateResultsBlock implements Block {
private AggregationFunction[] _aggregationFunctions;
private List<Object> _aggregationResult;
private AggregationGroupByResult _aggregationGroupByResult;
private List<ProcessingException> _processingExceptions;
private Collection<IntermediateRecord> _intermediateRecords;
private DistinctAggregationFunction _distinctFunction;
private DistinctTable _distinctTable;
private List<ProcessingException> _processingExceptions;
private long _numDocsScanned;
private long _numEntriesScannedInFilter;
private long _numEntriesScannedPostFilter;
Expand Down Expand Up @@ -129,6 +135,17 @@ public IntermediateResultsBlock(AggregationFunction[] aggregationFunctions,
_nullHandlingEnabled = nullHandlingEnabled;
}

/**
* Constructor for distinct result.
*/
public IntermediateResultsBlock(DistinctAggregationFunction distinctFunction, DistinctTable distinctTable) {
_distinctFunction = distinctFunction;
_distinctTable = distinctTable;
}

/**
* Constructor for server level group-by result.
*/
public IntermediateResultsBlock(Table table, boolean nullHandlingEnabled) {
_table = table;
_dataSchema = table.getDataSchema();
Expand Down Expand Up @@ -192,6 +209,25 @@ public AggregationGroupByResult getAggregationGroupByResult() {
return _aggregationGroupByResult;
}

@Nullable
public Collection<IntermediateRecord> getIntermediateRecords() {
return _intermediateRecords;
}

@Nullable
public DistinctAggregationFunction getDistinctFunction() {
return _distinctFunction;
}

@Nullable
public DistinctTable getDistinctTable() {
return _distinctTable;
}

public void setDistinctTable(DistinctTable distinctTable) {
_distinctTable = distinctTable;
}

@Nullable
public List<ProcessingException> getProcessingExceptions() {
return _processingExceptions;
Expand Down Expand Up @@ -321,14 +357,6 @@ public void setQueryHasMVSelectionOrderBy(boolean queryHasMVSelectionOrderBy) {
_queryHasMVSelectionOrderBy = queryHasMVSelectionOrderBy;
}

/**
* Get the collection of intermediate records
*/
@Nullable
public Collection<IntermediateRecord> getIntermediateRecords() {
return _intermediateRecords;
}

public DataTable getDataTable()
throws Exception {

Expand All @@ -345,6 +373,10 @@ public DataTable getDataTable()
return getAggregationResultDataTable();
}

if (_distinctTable != null) {
return getDistinctResultDataTable();
}

return getMetadataDataTable();
}

Expand All @@ -359,8 +391,7 @@ private DataTable getResultDataTable()
Object[] colDefaultNullValues = new Object[numColumns];
for (int colId = 0; colId < numColumns; colId++) {
if (storedColumnDataTypes[colId] != ColumnDataType.OBJECT) {
colDefaultNullValues[colId] =
NullValueUtils.getDefaultNullValue(storedColumnDataTypes[colId].toDataType());
colDefaultNullValues[colId] = NullValueUtils.getDefaultNullValue(storedColumnDataTypes[colId].toDataType());
}
nullBitmaps[colId] = new RoaringBitmap();
}
Expand Down Expand Up @@ -448,8 +479,8 @@ private void setDataTableColumn(ColumnDataType columnDataType, DataTableBuilder

private DataTable getSelectionResultDataTable()
throws Exception {
return attachMetadataToDataTable(SelectionOperatorUtils.getDataTableFromRows(
_selectionResult, _dataSchema, _nullHandlingEnabled));
return attachMetadataToDataTable(
SelectionOperatorUtils.getDataTableFromRows(_selectionResult, _dataSchema, _nullHandlingEnabled));
}

private DataTable getAggregationResultDataTable()
Expand Down Expand Up @@ -537,6 +568,18 @@ private DataTable getAggregationResultDataTable()
return attachMetadataToDataTable(dataTable);
}

private DataTable getDistinctResultDataTable()
throws IOException {
String[] columnNames = new String[]{_distinctFunction.getColumnName()};
ColumnDataType[] columnDataTypes = new ColumnDataType[]{ColumnDataType.OBJECT};
DataTableBuilder dataTableBuilder =
DataTableFactory.getDataTableBuilder(new DataSchema(columnNames, columnDataTypes));
dataTableBuilder.startRow();
dataTableBuilder.setColumn(0, _distinctTable);
dataTableBuilder.finishRow();
return attachMetadataToDataTable(dataTableBuilder.build());
}

private DataTable getMetadataDataTable() {
return attachMetadataToDataTable(DataTableFactory.getEmptyDataTable());
}
Expand All @@ -549,10 +592,10 @@ private DataTable attachMetadataToDataTable(DataTable dataTable) {
.put(MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName(), String.valueOf(_numEntriesScannedPostFilter));
dataTable.getMetadata().put(MetadataKey.NUM_SEGMENTS_PROCESSED.getName(), String.valueOf(_numSegmentsProcessed));
dataTable.getMetadata().put(MetadataKey.NUM_SEGMENTS_MATCHED.getName(), String.valueOf(_numSegmentsMatched));
dataTable.getMetadata().put(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(),
String.valueOf(_numConsumingSegmentsProcessed));
dataTable.getMetadata().put(MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName(),
String.valueOf(_numConsumingSegmentsMatched));
dataTable.getMetadata()
.put(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(), String.valueOf(_numConsumingSegmentsProcessed));
dataTable.getMetadata()
.put(MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName(), String.valueOf(_numConsumingSegmentsMatched));
dataTable.getMetadata().put(MetadataKey.NUM_RESIZES.getName(), String.valueOf(_numResizes));
dataTable.getMetadata().put(MetadataKey.RESIZE_TIME_MS.getName(), String.valueOf(_resizeTimeMs));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pinot.core.operator.combine;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.pinot.core.common.Operator;
Expand All @@ -41,7 +40,6 @@ public DistinctCombineOperator(List<Operator> operators, QueryContext queryConte
_hasOrderBy = queryContext.getOrderByExpressions() != null;
}


@Override
public String toExplainString() {
return EXPLAIN_NAME;
Expand All @@ -52,29 +50,24 @@ protected boolean isQuerySatisfied(IntermediateResultsBlock resultsBlock) {
if (_hasOrderBy) {
return false;
}
List<Object> result = resultsBlock.getAggregationResult();
assert result != null && result.size() == 1 && result.get(0) instanceof DistinctTable;
DistinctTable distinctTable = (DistinctTable) result.get(0);
DistinctTable distinctTable = resultsBlock.getDistinctTable();
assert distinctTable != null;
return distinctTable.size() >= _queryContext.getLimit();
}

@Override
protected void mergeResultsBlocks(IntermediateResultsBlock mergedBlock, IntermediateResultsBlock blockToMerge) {
// TODO: Use a separate way to represent DISTINCT instead of aggregation.
List<Object> mergedResults = mergedBlock.getAggregationResult();
assert mergedResults != null && mergedResults.size() == 1 && mergedResults.get(0) instanceof DistinctTable;
DistinctTable mergedDistinctTable = (DistinctTable) mergedResults.get(0);

List<Object> resultsToMerge = blockToMerge.getAggregationResult();
assert resultsToMerge != null && resultsToMerge.size() == 1 && resultsToMerge.get(0) instanceof DistinctTable;
DistinctTable distinctTableToMerge = (DistinctTable) resultsToMerge.get(0);
DistinctTable mergedDistinctTable = mergedBlock.getDistinctTable();
DistinctTable distinctTableToMerge = blockToMerge.getDistinctTable();
assert mergedDistinctTable != null && distinctTableToMerge != null;

// Convert the merged table into a main table if necessary in order to merge other tables
if (!mergedDistinctTable.isMainTable()) {
DistinctTable mainDistinctTable = new DistinctTable(distinctTableToMerge.getDataSchema(),
_queryContext.getOrderByExpressions(), _queryContext.getLimit(), _queryContext.isNullHandlingEnabled());
DistinctTable mainDistinctTable =
new DistinctTable(distinctTableToMerge.getDataSchema(), _queryContext.getOrderByExpressions(),
_queryContext.getLimit(), _queryContext.isNullHandlingEnabled());
mainDistinctTable.mergeTable(mergedDistinctTable);
mergedBlock.setAggregationResults(Collections.singletonList(mainDistinctTable));
mergedBlock.setDistinctTable(mainDistinctTable);
mergedDistinctTable = mainDistinctTable;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.operator.ExecutionStatistics;
import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
import org.apache.pinot.core.query.distinct.DistinctTable;
import org.apache.pinot.segment.spi.AggregationFunctionType;
Expand All @@ -56,28 +55,23 @@ public class DictionaryBasedDistinctOperator extends BaseOperator<IntermediateRe
public DictionaryBasedDistinctOperator(FieldSpec.DataType dataType,
DistinctAggregationFunction distinctAggregationFunction, Dictionary dictionary, int numTotalDocs,
boolean nullHandlingEnabled) {

_dataType = dataType;
_distinctAggregationFunction = distinctAggregationFunction;
_dictionary = dictionary;
_numTotalDocs = numTotalDocs;
_nullHandlingEnabled = nullHandlingEnabled;

List<OrderByExpressionContext> orderByExpressionContexts = _distinctAggregationFunction.getOrderByExpressions();

if (orderByExpressionContexts != null) {
OrderByExpressionContext orderByExpressionContext = orderByExpressionContexts.get(0);

_isAscending = orderByExpressionContext.isAsc();
_hasOrderBy = true;
}
}

@Override
protected IntermediateResultsBlock getNextBlock() {
DistinctTable distinctTable = buildResult();
return new IntermediateResultsBlock(new AggregationFunction[]{_distinctAggregationFunction},
Collections.singletonList(distinctTable), _nullHandlingEnabled);
return new IntermediateResultsBlock(_distinctAggregationFunction, buildResult());
}

/**
Expand Down Expand Up @@ -124,8 +118,9 @@ private DistinctTable buildResult() {
}
} else {
// DictionaryBasedDistinctOperator cannot handle nulls.
DistinctTable distinctTable = new DistinctTable(
dataSchema, _distinctAggregationFunction.getOrderByExpressions(), limit, _nullHandlingEnabled);
DistinctTable distinctTable =
new DistinctTable(dataSchema, _distinctAggregationFunction.getOrderByExpressions(), limit,
_nullHandlingEnabled);

_numDocsScanned = dictLength;
for (int i = 0; i < dictLength; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@
import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
import org.apache.pinot.core.operator.blocks.TransformBlock;
import org.apache.pinot.core.operator.transform.TransformOperator;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
import org.apache.pinot.core.query.distinct.DistinctExecutor;
import org.apache.pinot.core.query.distinct.DistinctExecutorFactory;
import org.apache.pinot.core.query.distinct.DistinctTable;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.segment.spi.IndexSegment;

Expand All @@ -55,8 +53,8 @@ public DistinctOperator(IndexSegment indexSegment, DistinctAggregationFunction d
_distinctAggregationFunction = distinctAggregationFunction;
_transformOperator = transformOperator;
_queryContext = queryContext;
_distinctExecutor = DistinctExecutorFactory.getDistinctExecutor(
distinctAggregationFunction, transformOperator, _queryContext.isNullHandlingEnabled());
_distinctExecutor = DistinctExecutorFactory.getDistinctExecutor(distinctAggregationFunction, transformOperator,
_queryContext.isNullHandlingEnabled());
}

@Override
Expand All @@ -68,10 +66,7 @@ protected IntermediateResultsBlock getNextBlock() {
break;
}
}
DistinctTable distinctTable = _distinctExecutor.getResult();
// TODO: Use a separate way to represent DISTINCT instead of aggregation.
return new IntermediateResultsBlock(new AggregationFunction[]{_distinctAggregationFunction},
Collections.singletonList(distinctTable), _queryContext.isNullHandlingEnabled());
return new IntermediateResultsBlock(_distinctAggregationFunction, _distinctExecutor.getResult());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1190,11 +1190,9 @@ public void testNonAggGroupByRewriteToDistinctInnerSegment() {
*/
private DistinctTable getDistinctTableInnerSegment(String query) {
BaseOperator<IntermediateResultsBlock> distinctOperator = getOperator(query);
List<Object> operatorResult = distinctOperator.nextBlock().getAggregationResult();
assertNotNull(operatorResult);
assertEquals(operatorResult.size(), 1);
assertTrue(operatorResult.get(0) instanceof DistinctTable);
return (DistinctTable) operatorResult.get(0);
DistinctTable distinctTable = distinctOperator.nextBlock().getDistinctTable();
assertNotNull(distinctTable);
return distinctTable;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,12 @@
*/
package org.apache.pinot.queries;

import java.util.List;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.data.table.Record;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
import org.apache.pinot.core.operator.query.AggregationGroupByOrderByOperator;
import org.apache.pinot.core.operator.query.AggregationOperator;
import org.apache.pinot.core.operator.query.DistinctOperator;
import org.apache.pinot.core.operator.query.FilteredAggregationOperator;
import org.apache.pinot.core.query.distinct.DistinctTable;
import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;


@SuppressWarnings("ConstantConditions")
public class InnerSegmentAggregationSingleValueQueriesTest extends BaseSingleValueQueriesTest {
Expand Down Expand Up @@ -182,63 +172,4 @@ public void testVeryLargeAggregationGroupBy() {
1361199163, 178133991, 296467636, 788414092, 1719301234, "P", "MaztCmmxxgguBUxPti", 1284373442, 752388855
}, 1L, 1361199163L, 178133991, 296467636, 788414092L, 1L);
}

/**
* Test DISTINCT on single column single segment. Since the dataset
* is Avro files, the only thing we currently check
* for correctness is the actual number of DISTINCT
* records returned
*/
@Test
public void testSingleColumnDistinct() {
String query = "SELECT DISTINCT column1 FROM testTable LIMIT 1000000";
BaseOperator<IntermediateResultsBlock> distinctOperator = getOperator(query);
IntermediateResultsBlock resultsBlock = distinctOperator.nextBlock();
List<Object> operatorResult = resultsBlock.getAggregationResult();

assertEquals(operatorResult.size(), 1);
assertTrue(operatorResult.get(0) instanceof DistinctTable);

DistinctTable distinctTable = (DistinctTable) operatorResult.get(0);
assertEquals(distinctTable.size(), 6582);

DataSchema dataSchema = distinctTable.getDataSchema();
assertEquals(dataSchema.getColumnNames(), new String[]{"column1"});
assertEquals(dataSchema.getColumnDataTypes(), new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT});

for (Record record : distinctTable.getRecords()) {
assertNotNull(record);
assertEquals(record.getValues().length, 1);
}
}

/**
* Test DISTINCT on multiple column single segment. Since the dataset
* is Avro files, the only thing we currently check
* for correctness is the actual number of DISTINCT
* records returned
*/
@Test
public void testMultiColumnDistinct() {
String query = "SELECT DISTINCT column1, column3 FROM testTable LIMIT 1000000";
DistinctOperator distinctOperator = getOperator(query);
IntermediateResultsBlock resultsBlock = distinctOperator.nextBlock();
List<Object> operatorResult = resultsBlock.getAggregationResult();

assertEquals(operatorResult.size(), 1);
assertTrue(operatorResult.get(0) instanceof DistinctTable);

DistinctTable distinctTable = (DistinctTable) operatorResult.get(0);
assertEquals(distinctTable.size(), 21968);

DataSchema dataSchema = distinctTable.getDataSchema();
assertEquals(dataSchema.getColumnNames(), new String[]{"column1", "column3"});
assertEquals(dataSchema.getColumnDataTypes(),
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT});

for (Record record : distinctTable.getRecords()) {
assertNotNull(record);
assertEquals(record.getValues().length, 2);
}
}
}
Loading