Skip to content

Commit

Permalink
[Feature](Materialized-View) support duplicate base column for diffre…
Browse files Browse the repository at this point in the history
…nt aggregate function (apache#15837)

support duplicate base column for diffrent aggregate function
  • Loading branch information
BiteTheDDDDt authored Feb 2, 2023
1 parent e31913f commit 0d5b115
Show file tree
Hide file tree
Showing 44 changed files with 883 additions and 308 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,5 @@ tools/**/tpch-data/

# be-ut
data_test

/conf/log4j2-spring.xml
5 changes: 4 additions & 1 deletion be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,10 @@ Status SegmentWriter::init(const std::vector<uint32_t>& col_ids, bool has_key) {

Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_pos,
size_t num_rows) {
assert(block->columns() == _column_writers.size());
CHECK(block->columns() == _column_writers.size())
<< ", block->columns()=" << block->columns()
<< ", _column_writers.size()=" << _column_writers.size();

_olap_data_convertor->set_source_content(block, row_pos, num_rows);

// find all row pos for short key indexes
Expand Down
14 changes: 2 additions & 12 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include "gutil/integral_types.h"
#include "olap/merger.h"
#include "olap/olap_common.h"
#include "olap/row_cursor.h"
#include "olap/rowset/segment_v2/column_reader.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
Expand All @@ -39,8 +38,6 @@
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"

using std::nothrow;

namespace doris {
using namespace ErrorCode;

Expand Down Expand Up @@ -218,13 +215,6 @@ ColumnMapping* RowBlockChanger::get_mutable_column_mapping(size_t column_index)

Status RowBlockChanger::change_block(vectorized::Block* ref_block,
vectorized::Block* new_block) const {
if (new_block->columns() != _schema_mapping.size()) {
LOG(WARNING) << "block does not match with schema mapping rules. "
<< "block_schema_size=" << new_block->columns()
<< ", mapping_schema_size=" << _schema_mapping.size();
return Status::Error<UNINITIALIZED>();
}

ObjectPool pool;
RuntimeState* state = pool.add(new RuntimeState());
state->set_desc_tbl(&_desc_tbl);
Expand Down Expand Up @@ -426,7 +416,7 @@ Status VSchemaChangeDirectly::_inner_process(RowsetReaderSharedPtr rowset_reader
auto ref_block = std::make_unique<vectorized::Block>(base_tablet_schema->create_block());

rowset_reader->next_block(ref_block.get());
if (ref_block->rows() < 1) {
if (ref_block->rows() == 0) {
break;
}

Expand Down Expand Up @@ -502,7 +492,7 @@ Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_rea
do {
auto ref_block = std::make_unique<vectorized::Block>(base_tablet_schema->create_block());
rowset_reader->next_block(ref_block.get());
if (ref_block->rows() < 1) {
if (ref_block->rows() == 0) {
break;
}

Expand Down
6 changes: 4 additions & 2 deletions be/src/vec/exprs/vslot_ref.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ Status VSlotRef::prepare(doris::RuntimeState* state, const doris::RowDescriptor&
}
_column_id = desc.get_column_id(_slot_id);
if (_column_id < 0) {
LOG(INFO) << "VSlotRef - invalid slot id: " << _slot_id << " desc:" << desc.debug_string();
return Status::InternalError("VSlotRef - invalid slot id {}", _slot_id);
return Status::InternalError(
"VSlotRef have invalid slot id: {}, desc: {}, slot_desc: {}, desc_tbl: {}",
*_column_name, _slot_id, desc.debug_string(), slot_desc->debug_string(),
state->desc_tbl().debug_string());
}
return Status::OK();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,18 +468,14 @@ private List<Column> checkAndPrepareMaterializedView(CreateMaterializedViewStmt
}

String mvColumnName = mvColumnItem.getBaseColumnNames().iterator().next();
Column baseColumn = olapTable.getColumn(mvColumnName);
Column mvColumn = mvColumnItem.toMVColumn(olapTable);
if (mvColumnItem.isKey()) {
++numOfKeys;
}
if (baseColumn == null) {
throw new DdlException("The mv column of agg or uniq table cannot be transformed "
+ "from original column[" + String.join(",", mvColumnItem.getBaseColumnNames()) + "]");
}
Preconditions.checkNotNull(baseColumn, "Column[" + mvColumnName + "] does not exist");
AggregateType baseAggregationType = baseColumn.getAggregationType();

AggregateType baseAggregationType = mvColumn.getAggregationType();
AggregateType mvAggregationType = mvColumnItem.getAggregationType();
if (baseColumn.isKey() && !mvColumnItem.isKey()) {
if (mvColumn.isKey() && !mvColumnItem.isKey()) {
throw new DdlException("The column[" + mvColumnName + "] must be the key of materialized view");
}
if (baseAggregationType != mvAggregationType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,26 @@
package org.apache.doris.analysis;

import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.FunctionSet;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.UserException;
import org.apache.doris.rewrite.mvrewrite.CountFieldToSum;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Materialized view is performed to materialize the results of query.
Expand All @@ -60,6 +56,8 @@ public class CreateMaterializedViewStmt extends DdlStmt {
private static final Logger LOG = LogManager.getLogger(CreateMaterializedViewStmt.class);

public static final String MATERIALIZED_VIEW_NAME_PREFIX = "mv_";
public static final String MATERIALIZED_VIEW_AGGREGATE_NAME_PREFIX = "mva_";
public static final String MATERIALIZED_VIEW_AGGREGATE_NAME_LINK = "__";
public static final Map<String, MVColumnPattern> FN_NAME_TO_PATTERN;

static {
Expand Down Expand Up @@ -168,8 +166,6 @@ public void analyzeSelectClause(Analyzer analyzer) throws AnalysisException {
throw new AnalysisException("The materialized view must contain at least one column");
}
boolean meetAggregate = false;
// TODO(ml): support same column with different aggregation function
Set<String> mvColumnNameSet = Sets.newHashSet();
/**
* 1. The columns of mv must be a single column or a aggregate column without any calculate.
* Also the children of aggregate column must be a single column without any calculate.
Expand Down Expand Up @@ -209,10 +205,6 @@ public void analyzeSelectClause(Analyzer analyzer) throws AnalysisException {
List<SlotRef> slots = new ArrayList<>();
functionCallExpr.collect(SlotRef.class, slots);
Preconditions.checkArgument(slots.size() == 1);
String columnName = slots.get(0).getColumnName().toLowerCase();
if (!mvColumnNameSet.add(columnName)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_DUP_FIELDNAME, columnName);
}

if (beginIndexOfAggregation == -1) {
beginIndexOfAggregation = i;
Expand Down Expand Up @@ -358,26 +350,16 @@ private void supplyOrderColumn() throws AnalysisException {
private MVColumnItem buildMVColumnItem(Analyzer analyzer, FunctionCallExpr functionCallExpr)
throws AnalysisException {
String functionName = functionCallExpr.getFnName().getFunction();
List<SlotRef> slots = new ArrayList<>();
functionCallExpr.collect(SlotRef.class, slots);
Preconditions.checkArgument(slots.size() == 1);
SlotRef baseColumnRef = slots.get(0);
String baseColumnName = baseColumnRef.getColumnName().toLowerCase();
Column baseColumn = baseColumnRef.getColumn();
if (baseColumn == null) {
throw new AnalysisException("baseColumn is null");
}
Type baseType = baseColumn.getOriginType();
Expr functionChild0 = functionCallExpr.getChild(0);
String mvColumnName;
AggregateType mvAggregateType;
Expr defineExpr = baseColumnRef;
List<Expr> childs = functionCallExpr.getChildren();
Preconditions.checkArgument(childs.size() == 1);
Expr defineExpr = childs.get(0);
Type baseType = defineExpr.getType();
AggregateType mvAggregateType = null;
Type type;
switch (functionName.toLowerCase()) {
case "sum":
mvColumnName = mvColumnBuilder(baseColumnName);
mvAggregateType = AggregateType.valueOf(functionName.toUpperCase());
PrimitiveType baseColumnType = baseColumnRef.getType().getPrimitiveType();
PrimitiveType baseColumnType = baseType.getPrimitiveType();
if (baseColumnType == PrimitiveType.TINYINT || baseColumnType == PrimitiveType.SMALLINT
|| baseColumnType == PrimitiveType.INT) {
type = Type.BIGINT;
Expand All @@ -387,117 +369,80 @@ private MVColumnItem buildMVColumnItem(Analyzer analyzer, FunctionCallExpr funct
type = baseType;
}
if (type != baseType) {
defineExpr = new CastExpr(type, baseColumnRef);
defineExpr.analyze(analyzer);
defineExpr = new CastExpr(type, defineExpr);
if (analyzer != null) {
defineExpr.analyze(analyzer);
}
}
break;
case "min":
case "max":
mvColumnName = mvColumnBuilder(baseColumnName);
mvAggregateType = AggregateType.valueOf(functionName.toUpperCase());
type = baseType;
break;
case FunctionSet.BITMAP_UNION:
// Compatible aggregation models
if (baseColumnRef.getType().getPrimitiveType() == PrimitiveType.BITMAP) {
mvColumnName = mvColumnBuilder(baseColumnName);
} else {
mvColumnName = mvColumnBuilder(functionName, baseColumnName);
defineExpr = functionChild0;
}
mvAggregateType = AggregateType.valueOf(functionName.toUpperCase());
type = Type.BITMAP;
if (analyzer != null && !baseType.isBitmapType()) {
throw new AnalysisException(
"BITMAP_UNION need input a bitmap column, but input " + baseType.toString());
}
break;
case FunctionSet.HLL_UNION:
// Compatible aggregation models
if (baseColumnRef.getType().getPrimitiveType() == PrimitiveType.HLL) {
mvColumnName = mvColumnBuilder(baseColumnName);
} else {
mvColumnName = mvColumnBuilder(functionName, baseColumnName);
defineExpr = functionChild0;
}
mvAggregateType = AggregateType.valueOf(functionName.toUpperCase());
type = Type.HLL;
if (analyzer != null && !baseType.isHllType()) {
throw new AnalysisException("HLL_UNION need input a hll column, but input " + baseType.toString());
}
break;
case FunctionSet.COUNT:
mvColumnName = mvColumnBuilder(functionName, baseColumnName);
mvAggregateType = AggregateType.SUM;
defineExpr = new CaseExpr(null, Lists.newArrayList(new CaseWhenClause(
new IsNullPredicate(baseColumnRef, false),
new IntLiteral(0, Type.BIGINT))), new IntLiteral(1, Type.BIGINT));
defineExpr.analyze(analyzer);
defineExpr = CountFieldToSum.slotToCaseWhen(defineExpr);
if (analyzer != null) {
defineExpr.analyze(analyzer);
}
type = Type.BIGINT;
break;
default:
throw new AnalysisException("Unsupported function:" + functionName);
}
return new MVColumnItem(mvColumnName, type, mvAggregateType, false, defineExpr, baseColumnName);
if (mvAggregateType == null) {
mvAggregateType = AggregateType.valueOf(functionName.toUpperCase());
}
return new MVColumnItem(type, mvAggregateType, defineExpr, mvColumnBuilder(defineExpr.toSql()));
}

public Map<String, Expr> parseDefineExprWithoutAnalyze() throws AnalysisException {
Map<String, Expr> result = Maps.newHashMap();
SelectList selectList = selectStmt.getSelectList();
for (SelectListItem selectListItem : selectList.getItems()) {
Expr selectListItemExpr = selectListItem.getExpr();
Expr expr = selectListItemExpr;
String name = MaterializedIndexMeta.normalizeName(expr.toSql());
if (selectListItemExpr instanceof FunctionCallExpr) {
FunctionCallExpr functionCallExpr = (FunctionCallExpr) selectListItemExpr;

List<SlotRef> slots = new ArrayList<>();
functionCallExpr.collect(SlotRef.class, slots);
Preconditions.checkArgument(slots.size() == 1);
String baseColumnName = slots.get(0).getColumnName();
String functionName = functionCallExpr.getFnName().getFunction();
SlotRef baseSlotRef = slots.get(0);
switch (functionName.toLowerCase()) {
switch (functionCallExpr.getFnName().getFunction().toLowerCase()) {
case "sum":
case "min":
case "max":
result.put(baseColumnName, null);
break;
case FunctionSet.BITMAP_UNION:
if (functionCallExpr.getChild(0) instanceof FunctionCallExpr) {
CastExpr castExpr = new CastExpr(new TypeDef(Type.VARCHAR), baseSlotRef);
List<Expr> params = Lists.newArrayList();
params.add(castExpr);
FunctionCallExpr defineExpr = new FunctionCallExpr(FunctionSet.TO_BITMAP_WITH_CHECK,
params);
result.put(mvColumnBuilder(functionName, baseColumnName), defineExpr);
} else {
result.put(baseColumnName, null);
}
break;
case FunctionSet.HLL_UNION:
if (functionCallExpr.getChild(0) instanceof FunctionCallExpr) {
CastExpr castExpr = new CastExpr(new TypeDef(Type.VARCHAR), baseSlotRef);
List<Expr> params = Lists.newArrayList();
params.add(castExpr);
FunctionCallExpr defineExpr = new FunctionCallExpr(FunctionSet.HLL_HASH, params);
result.put(mvColumnBuilder(functionName, baseColumnName), defineExpr);
} else {
result.put(baseColumnName, null);
}
break;
case FunctionSet.COUNT:
Expr defineExpr = new CaseExpr(null, Lists.newArrayList(
new CaseWhenClause(new IsNullPredicate(slots.get(0), false),
new IntLiteral(0, Type.BIGINT))),
new IntLiteral(1, Type.BIGINT));
result.put(mvColumnBuilder(functionName, baseColumnName), defineExpr);
MVColumnItem item = buildMVColumnItem(null, functionCallExpr);
expr = item.getDefineExpr();
name = item.getName();
break;
default:
result.put(mvColumnBuilder(functionCallExpr.toSql()), functionCallExpr);
break;
}
} else {
result.put(mvColumnBuilder(selectListItemExpr.toSql()), selectListItemExpr);
}
result.put(name, expr);
}
return result;
}

// for bitmap_union(to_bitmap(column)) function, we should check value is not negative
// for bitmap_union(to_bitmap(column)) function, we should check value is not
// negative
// in vectorized schema_change mode, so we should rewrite the function to
// bitmap_union(to_bitmap_with_check(column))
private void rewriteToBitmapWithCheck() {
public void rewriteToBitmapWithCheck() {
for (SelectListItem item : selectStmt.getSelectList().getItems()) {
if (item.getExpr() instanceof FunctionCallExpr) {
String functionName = ((FunctionCallExpr) item.getExpr()).getFnName().getFunction();
Expand All @@ -519,10 +464,34 @@ public static String mvColumnBuilder(String functionName, String sourceColumnNam
.append(sourceColumnName).toString();
}

public static String mvColumnBuilder(AggregateType aggregateType, String sourceColumnName) {
return new StringBuilder().append(MATERIALIZED_VIEW_AGGREGATE_NAME_PREFIX).append(aggregateType.toSql())
.append("__")
.append(mvColumnBreaker(sourceColumnName)).toString();
}

public static String mvAggregateColumnBuilder(String functionName, String sourceColumnName) {
return new StringBuilder().append(MATERIALIZED_VIEW_AGGREGATE_NAME_PREFIX).append(functionName.toUpperCase())
.append(MATERIALIZED_VIEW_AGGREGATE_NAME_LINK)
.append(sourceColumnName).toString();
}

public static String mvColumnBuilder(String name) {
return new StringBuilder().append(MATERIALIZED_VIEW_NAME_PREFIX).append(name).toString();
}

public static String mvColumnBreaker(String name) {
if (name.startsWith(MATERIALIZED_VIEW_AGGREGATE_NAME_PREFIX)) {
// mva_SUM__k2 -> k2
return mvColumnBreaker(name.substring(name.indexOf(MATERIALIZED_VIEW_AGGREGATE_NAME_LINK)
+ MATERIALIZED_VIEW_AGGREGATE_NAME_LINK.length()));
} else if (name.startsWith(MATERIALIZED_VIEW_NAME_PREFIX)) {
// mv_k2 -> k2
return mvColumnBreaker(name.substring(MATERIALIZED_VIEW_NAME_PREFIX.length()));
}
return name;
}

@Override
public String toSql() {
return null;
Expand Down
Loading

0 comments on commit 0d5b115

Please sign in to comment.