Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,17 @@ public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBl
private final TransformResultMetadata[] _orderByExpressionMetadata;
private final int _numRowsToKeep;
private final PriorityQueue<Object[]> _rows;
private final boolean _allOrderByColsPreSorted;

private int _numDocsScanned = 0;
private long _numEntriesScannedPostFilter = 0;

public SelectionOrderByOperator(IndexSegment indexSegment, QueryContext queryContext,
List<ExpressionContext> expressions, TransformOperator transformOperator) {
List<ExpressionContext> expressions, TransformOperator transformOperator, boolean allOrderByColsPreSorted) {
_indexSegment = indexSegment;
_expressions = expressions;
_transformOperator = transformOperator;
_allOrderByColsPreSorted = allOrderByColsPreSorted;

_orderByExpressions = queryContext.getOrderByExpressions();
assert _orderByExpressions != null;
Expand Down Expand Up @@ -180,13 +182,52 @@ private Comparator<Object[]> getComparator() {

@Override
protected IntermediateResultsBlock getNextBlock() {
if (_expressions.size() == _orderByExpressions.size()) {
if (_allOrderByColsPreSorted) {
return computeAllPreSorted();
} else if (_expressions.size() == _orderByExpressions.size()) {
return computeAllOrdered();
} else {
return computePartiallyOrdered();
}
}

private IntermediateResultsBlock computeAllPreSorted() {
int numExpressions = _expressions.size();

// Fetch all the expressions and insert them into the priority queue
BlockValSet[] blockValSets = new BlockValSet[numExpressions];
int numColumnsProjected = _transformOperator.getNumColumnsProjected();
TransformBlock transformBlock;
while (_numDocsScanned < _numRowsToKeep && (transformBlock = _transformOperator.nextBlock()) != null) {
for (int i = 0; i < numExpressions; i++) {
ExpressionContext expression = _expressions.get(i);
blockValSets[i] = transformBlock.getBlockValueSet(expression);
}
RowBasedBlockValueFetcher blockValueFetcher = new RowBasedBlockValueFetcher(blockValSets);
int numDocsFetched = transformBlock.getNumDocs();
for (int i = 0; i < numDocsFetched && (_numDocsScanned < _numRowsToKeep); i++) {
SelectionOperatorUtils.addToPriorityQueue(blockValueFetcher.getRow(i), _rows, _numRowsToKeep);
_numDocsScanned++;
}
}
_numEntriesScannedPostFilter += _numDocsScanned * numColumnsProjected;

// Create the data schema
String[] columnNames = new String[numExpressions];
DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[numExpressions];
for (int i = 0; i < numExpressions; i++) {
ExpressionContext expression = _expressions.get(i);
columnNames[i] = expression.toString();
TransformResultMetadata expressionMetadata = _transformOperator.getResultMetadata(expression);
columnDataTypes[i] =
DataSchema.ColumnDataType.fromDataType(expressionMetadata.getDataType(), expressionMetadata.isSingleValue());
}

DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);

return new IntermediateResultsBlock(dataSchema, _rows);
}

/**
* Helper method to compute the result when all the output expressions are ordered.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,18 @@ public Operator<IntermediateResultsBlock> run() {
if (orderByExpressions == null) {
// Selection only
TransformOperator transformOperator = new TransformPlanNode(_indexSegment, _queryContext, expressions,
Math.min(limit, DocIdSetPlanNode.MAX_DOC_PER_CALL)).run();
Math.min(limit + _queryContext.getOffset(), DocIdSetPlanNode.MAX_DOC_PER_CALL)).run();
return new SelectionOnlyOperator(_indexSegment, _queryContext, expressions, transformOperator);
} else {
// Selection order-by
if (orderByExpressions.size() == expressions.size()) {
if (isAllOrderByColumnsSorted(orderByExpressions)) { // no sorting needed
TransformOperator transformOperator =
new TransformPlanNode(_indexSegment, _queryContext, expressions, DocIdSetPlanNode.MAX_DOC_PER_CALL).run();
return new SelectionOrderByOperator(_indexSegment, _queryContext, expressions, transformOperator, true);
} else if (orderByExpressions.size() == expressions.size()) { // Selection order-by
// All output expressions are ordered
TransformOperator transformOperator =
new TransformPlanNode(_indexSegment, _queryContext, expressions, DocIdSetPlanNode.MAX_DOC_PER_CALL).run();
return new SelectionOrderByOperator(_indexSegment, _queryContext, expressions, transformOperator);
return new SelectionOrderByOperator(_indexSegment, _queryContext, expressions, transformOperator, false);
} else {
// Not all output expressions are ordered, only fetch the order-by expressions and docId to avoid the
// unnecessary data fetch
Expand All @@ -76,7 +79,7 @@ public Operator<IntermediateResultsBlock> run() {
TransformOperator transformOperator =
new TransformPlanNode(_indexSegment, _queryContext, expressionsToTransform,
DocIdSetPlanNode.MAX_DOC_PER_CALL).run();
return new SelectionOrderByOperator(_indexSegment, _queryContext, expressions, transformOperator);
return new SelectionOrderByOperator(_indexSegment, _queryContext, expressions, transformOperator, false);
}
}
} else {
Expand All @@ -85,4 +88,31 @@ public Operator<IntermediateResultsBlock> run() {
return new EmptySelectionOperator(_indexSegment, expressions, transformOperator);
}
}

/**
* This function checks whether all columns in order by clause are pre-sorted.
* This is used to optimize order by limit clauses.
* For eg:
* A query like "select * from table order by col1, col2 limit 10"
* will take all the n matching rows and add it to a priority queue of size 10.
* This is nlogk operation which can be quite expensive for a large n.
* In the above example, if the docs in the segment are already sorted by col1 and col2 then there is no need for
* sorting at all (only limit is needed).
* @return true is all columns in order by clause are sorted . False otherwise
*/
private boolean isAllOrderByColumnsSorted(List<OrderByExpressionContext> orderByExpressions) {
int numOrderByExpressions = orderByExpressions.size();
for (int i = 0; i < numOrderByExpressions; i++) {
OrderByExpressionContext expressionContext = orderByExpressions.get(0);
if (!(expressionContext.getExpression().getType() == ExpressionContext.Type.IDENTIFIER) || !expressionContext
.isAsc()) {
return false;
}
String column = expressionContext.getExpression().getIdentifier();
if (!_indexSegment.getDataSource(column).getDataSourceMetadata().isSorted()) {
return false;
}
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ public void testSelectionOrderBy() {
// Should early-terminate after processing the result of the first segment. Each thread should process at most 1
// segment.
long numDocsScanned = combineResult.getNumDocsScanned();
assertTrue(numDocsScanned >= NUM_RECORDS_PER_SEGMENT
&& numDocsScanned <= CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY * NUM_RECORDS_PER_SEGMENT);
// result size is 10. Each thread scans atmost 10 docs
assertTrue(numDocsScanned <= 10 * CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY);
assertEquals(combineResult.getNumEntriesScannedInFilter(), 0);
assertEquals(combineResult.getNumEntriesScannedPostFilter(), numDocsScanned);
assertEquals(combineResult.getNumSegmentsProcessed(), NUM_SEGMENTS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,58 @@ public void testSelectStarOrderBy() {
Assert.assertEquals(((Integer) lastRow[columnIndexMap.get("column1")]).intValue(), 462769197);
}

@Test
public void testSelectStarOrderBySortedColumn() {
// column5 is sorted
String query = "SELECT * " + " FROM testTable" + " ORDER BY column5 LIMIT 10";

// Test query without filter
BaseOperator<IntermediateResultsBlock> selectionOrderByOperator = getOperatorForPqlQuery(query);
IntermediateResultsBlock resultsBlock = selectionOrderByOperator.nextBlock();
ExecutionStatistics executionStatistics = selectionOrderByOperator.getExecutionStatistics();
Assert.assertEquals(executionStatistics.getNumDocsScanned(), 10L);
Assert.assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
// 10 * (1 order-by columns) + 10 * (10 non-order-by columns)
Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 110L);
Assert.assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
DataSchema selectionDataSchema = resultsBlock.getDataSchema();
Map<String, Integer> columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);

Assert.assertEquals(getVirtualColumns(selectionDataSchema), 0);
Assert.assertEquals(selectionDataSchema.size(), 11);
Assert.assertTrue(columnIndexMap.containsKey("column5"));
Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column5")),
DataSchema.ColumnDataType.STRING);
PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) resultsBlock.getSelectionResult();
Assert.assertEquals(selectionResult.size(), 10);
Object[] lastRow = selectionResult.peek();
Assert.assertEquals(lastRow.length, 11);
Assert.assertEquals((lastRow[columnIndexMap.get("column5")]), "gFuH");

// Test query with filter
selectionOrderByOperator = getOperatorForPqlQueryWithFilter(query);
resultsBlock = selectionOrderByOperator.nextBlock();
executionStatistics = selectionOrderByOperator.getExecutionStatistics();
Assert.assertEquals(executionStatistics.getNumDocsScanned(), 10);
Assert.assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 84134L);
// 6129 * (2 order-by columns + 1 docId column) + 10 * (9 non-order-by columns)
Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 110);
Assert.assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
selectionDataSchema = resultsBlock.getDataSchema();
columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);

Assert.assertEquals(getVirtualColumns(selectionDataSchema), 0);
Assert.assertEquals(selectionDataSchema.size(), 11);
Assert.assertTrue(columnIndexMap.containsKey("column5"));
Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column5")),
DataSchema.ColumnDataType.STRING);
selectionResult = (PriorityQueue<Object[]>) resultsBlock.getSelectionResult();
Assert.assertEquals(selectionResult.size(), 10);
lastRow = selectionResult.peek();
Assert.assertEquals(lastRow.length, 11);
Assert.assertEquals((lastRow[columnIndexMap.get("column5")]), "gFuH");
}

@Test
public void testSelectStarOrderByLargeOffsetLimit() {
String query = "SELECT * " + " FROM testTable" + ORDER_BY + " LIMIT 5000, 7000";
Expand Down