Skip to content

Commit

Permalink
String functions support global dict optimization (StarRocks#688)
Browse files Browse the repository at this point in the history
  • Loading branch information
kangkaisen authored Oct 26, 2021
1 parent ffde56e commit c706efe
Show file tree
Hide file tree
Showing 32 changed files with 889 additions and 349 deletions.
19 changes: 0 additions & 19 deletions fe/fe-core/src/main/java/com/starrocks/analysis/LikePredicate.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,7 @@
package com.starrocks.analysis;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.starrocks.catalog.Function;
import com.starrocks.catalog.FunctionSet;
import com.starrocks.catalog.ScalarFunction;
import com.starrocks.catalog.Type;
import com.starrocks.common.AnalysisException;
import com.starrocks.sql.analyzer.ExprVisitor;
import com.starrocks.sql.analyzer.SemanticException;
Expand Down Expand Up @@ -58,21 +54,6 @@ public String toString() {
}
}

public static void initBuiltins(FunctionSet functionSet) {
functionSet.addBuiltin(ScalarFunction.createBuiltin(
Operator.LIKE.name(), Lists.<Type>newArrayList(Type.VARCHAR, Type.VARCHAR),
false, Type.BOOLEAN,
"_ZN9starrocks13LikePredicate4likeEPN13starrocks_udf15FunctionContextERKNS1_9StringValES6_",
"_ZN9starrocks13LikePredicate12like_prepareEPN13starrocks_udf15FunctionContextENS2_18FunctionStateScopeE",
"_ZN9starrocks13LikePredicate10like_closeEPN13starrocks_udf15FunctionContextENS2_18FunctionStateScopeE", true));
functionSet.addBuiltin(ScalarFunction.createBuiltin(
Operator.REGEXP.name(), Lists.<Type>newArrayList(Type.VARCHAR, Type.VARCHAR),
false, Type.BOOLEAN,
"_ZN9starrocks13LikePredicate5regexEPN13starrocks_udf15FunctionContextERKNS1_9StringValES6_",
"_ZN9starrocks13LikePredicate13regex_prepareEPN13starrocks_udf15FunctionContextENS2_18FunctionStateScopeE",
"_ZN9starrocks13LikePredicate11regex_closeEPN13starrocks_udf15FunctionContextENS2_18FunctionStateScopeE", true));
}

private final Operator op;

public LikePredicate(Operator op, Expr e1, Expr e2) {
Expand Down
28 changes: 13 additions & 15 deletions fe/fe-core/src/main/java/com/starrocks/catalog/Function.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import com.starrocks.common.io.Writable;
import com.starrocks.thrift.TFunction;
import com.starrocks.thrift.TFunctionBinaryType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
Expand All @@ -45,8 +43,6 @@
* Base class for all functions.
*/
public class Function implements Writable {
private static final Logger LOG = LogManager.getLogger(Function.class);

// Enum for how to compare function signatures.
// For decimal types, the type in the function can be a wildcard, i.e. decimal(*,*).
// The wildcard can *only* exist as function type, the caller will always be a
Expand Down Expand Up @@ -84,7 +80,6 @@ public enum CompareMode {
IS_MATCHABLE
}

public static final long UNIQUE_FUNCTION_ID = 0;
// Function id, every function has a unique id. Now all built-in functions' id is 0
private long id = 0;
// User specified function name e.g. "Add"
Expand All @@ -109,14 +104,15 @@ public enum CompareMode {
// library's checksum to make sure all backends use one library to serve user's request
protected String checksum = "";

// for vectorized engine
private boolean isVectorized = false;

// for vectorized engine, function-id
private long functionId;

private boolean isPolymorphic = false;

// If low cardinality string column with global dict, for some string functions,
// we could evaluate the function only with the dict content, not all string column data.
private boolean couldApplyDictOptimize = false;

// Only used for serialization
protected Function() {
}
Expand All @@ -140,7 +136,6 @@ public Function(long functionId, FunctionName name, List<Type> argTypes, Type re
this.argTypes = argTypes.toArray(new Type[argTypes.size()]);
}
this.retType = retType;
this.isVectorized = isVectorized;
this.isPolymorphic = Arrays.stream(this.argTypes).anyMatch(Type::isPseudoType);
}

Expand Down Expand Up @@ -236,10 +231,6 @@ public boolean isPolymorphic() {
return isPolymorphic;
}

public void setIsVectorized(boolean vectorized) {
isVectorized = vectorized;
}

public long getFunctionId() {
return functionId;
}
Expand Down Expand Up @@ -287,6 +278,14 @@ public String signatureString() {
return sb.toString();
}

public boolean isCouldApplyDictOptimize() {
return couldApplyDictOptimize;
}

public void setCouldApplyDictOptimize(boolean couldApplyDictOptimize) {
this.couldApplyDictOptimize = couldApplyDictOptimize;
}

// Compares this to 'other' for mode.
public boolean compare(Function other, CompareMode mode) {
switch (mode) {
Expand Down Expand Up @@ -515,6 +514,7 @@ public TFunction toThrift() {
if (!checksum.isEmpty()) {
fn.setChecksum(checksum);
}
fn.setCould_apply_dict_optimize(couldApplyDictOptimize);
return fn;
}

Expand Down Expand Up @@ -687,8 +687,6 @@ public static FunctionType read(DataInput input) throws IOException {
}
}

;

protected void writeFields(DataOutput output) throws IOException {
output.writeLong(id);
name.write(output);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,15 @@ public class FunctionSet {
* to row function when init.
*/
private final Map<String, List<Function>> vectorizedFunctions;
// cmy: This does not contain any user defined functions. All UDFs handle null values by themselves.
// This does not contain any user defined functions. All UDFs handle null values by themselves.
private final ImmutableSet<String> nonNullResultWithNullParamFunctions = ImmutableSet.of("if", "hll_hash",
"concat_ws", "ifnull", "nullif", "null_or_empty", "coalesce");

// If low cardinality string column with global dict, for some string functions,
// we could evaluate the function only with the dict content, not all string column data.
public final ImmutableSet<String> couldApplyDictOptimizationFunctions = ImmutableSet.of(
"like", "substr", "substring", "upper", "lower");

public FunctionSet() {
vectorizedFunctions = Maps.newHashMap();
}
Expand Down Expand Up @@ -290,6 +295,7 @@ private void addVectorizedBuiltin(Function fn) {
if (findVectorizedFunction(fn) != null) {
return;
}
fn.setCouldApplyDictOptimize(couldApplyDictOptimizationFunctions.contains(fn.functionName()));

List<Function> fns = vectorizedFunctions.computeIfAbsent(fn.functionName(), k -> Lists.newArrayList());
fns.add(fn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@
import com.starrocks.thrift.TFunction;
import com.starrocks.thrift.TFunctionBinaryType;
import com.starrocks.thrift.TScalarFunction;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Strings;

import java.io.DataInput;
import java.io.DataOutput;
Expand All @@ -47,7 +46,6 @@
* Internal representation of a scalar function.
*/
public class ScalarFunction extends Function {
private static final Logger LOG = LogManager.getLogger(ScalarFunction.class);
// The name inside the binary at location_ that contains this particular
// function. e.g. org.example.MyUdf.class.
private String symbolName;
Expand Down Expand Up @@ -267,7 +265,7 @@ public TFunction toThrift() {
TFunction fn = super.toThrift();
if (symbolName == null) {
// For vector engine, the symbol field is required
symbolName = "";
symbolName = Strings.EMPTY;
}
fn.setScalar_fn(new TScalarFunction());
fn.getScalar_fn().setSymbol(symbolName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.starrocks.common.LoadException;
import com.starrocks.common.MetaNotFoundException;
import com.starrocks.common.NotImplementedException;
import com.starrocks.common.Pair;
import com.starrocks.common.UserException;
import com.starrocks.common.util.DebugUtil;
import com.starrocks.load.BrokerFileGroup;
Expand All @@ -50,6 +51,8 @@
import com.starrocks.planner.PlanNodeId;
import com.starrocks.planner.ScanNode;
import com.starrocks.qe.ConnectContext;
import com.starrocks.sql.optimizer.statistics.ColumnDict;
import com.starrocks.sql.optimizer.statistics.IDictManager;
import com.starrocks.thrift.TBrokerFileStatus;
import com.starrocks.thrift.TUniqueId;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -98,25 +101,26 @@ public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable table
this.timeoutS = timeoutS;
this.parallelInstanceNum = Config.load_parallel_instance_num;

/*
* TODO(cmy): UDF currently belongs to a database. Therefore, before using UDF,
* we need to check whether the user has corresponding permissions on this database.
* But here we have lost user information and therefore cannot check permissions.
* So here we first prohibit users from using UDF in load. If necessary, improve it later.
*/
this.analyzer.setUDFAllowed(false);
}

public void plan(TUniqueId loadId, List<List<TBrokerFileStatus>> fileStatusesList, int filesAdded)
throws UserException {
// Generate tuple descriptor
TupleDescriptor tupleDesc = descTable.createTupleDescriptor("DestTableTuple");
List<Pair<Integer, ColumnDict>> globalDicts = Lists.newArrayList();
// use full schema to fill the descriptor table
for (Column col : table.getFullSchema()) {
SlotDescriptor slotDesc = descTable.addSlotDescriptor(tupleDesc);
slotDesc.setIsMaterialized(true);
slotDesc.setColumn(col);
slotDesc.setIsNullable(col.isAllowNull());

if (col.getType().isVarchar() && IDictManager.getInstance().hasGlobalDict(table.getId(),
col.getName())) {
ColumnDict dict = IDictManager.getInstance().getGlobalDict(table.getId(), col.getName());
globalDicts.add(new Pair<>(slotDesc.getId().asInt(), dict));
}
}
if (table.getKeysType() == KeysType.PRIMARY_KEYS) {
// add op type column
Expand Down Expand Up @@ -148,6 +152,9 @@ public void plan(TUniqueId loadId, List<List<TBrokerFileStatus>> fileStatusesLis
PlanFragment sinkFragment = new PlanFragment(new PlanFragmentId(0), scanNode, DataPartition.RANDOM);
sinkFragment.setSink(olapTableSink);
sinkFragment.setParallelExecNum(parallelInstanceNum);
// After data loading, we need to check the global dict for low cardinality string column
// whether update.
sinkFragment.setGlobalDicts(globalDicts);

fragments.add(sinkFragment);

Expand Down
7 changes: 0 additions & 7 deletions fe/fe-core/src/main/java/com/starrocks/persist/EditLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.File;
import java.io.IOException;
import java.util.List;

Expand Down Expand Up @@ -829,12 +828,6 @@ public synchronized void close() throws IOException {
journal.close();
}

public synchronized void createEditLogFile(File name) throws IOException {
EditLogOutputStream editLogOutputStream = new EditLogFileOutputStream(name);
editLogOutputStream.create();
editLogOutputStream.close();
}

public void open() {
journal.open();
}
Expand Down
31 changes: 25 additions & 6 deletions fe/fe-core/src/main/java/com/starrocks/planner/DecodeNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,32 @@

package com.starrocks.planner;

import com.starrocks.analysis.TupleId;
import com.starrocks.analysis.Expr;
import com.starrocks.analysis.SlotId;
import com.starrocks.analysis.TupleDescriptor;
import com.starrocks.thrift.TDecodeNode;
import com.starrocks.thrift.TExplainLevel;
import com.starrocks.thrift.TPlanNode;
import com.starrocks.thrift.TPlanNodeType;

import java.util.ArrayList;
import java.util.Map;

public class DecodeNode extends PlanNode{
// The dict id int column ids to dict string column ids
private final Map<Integer, Integer> dictIdToStringIds;
// The string functions have applied global dict optimization
private final Map<SlotId, Expr> stringFunctions;

public DecodeNode(PlanNodeId id,
ArrayList<TupleId> tupleIds,
TupleDescriptor tupleDescriptor,
PlanNode child,
Map<Integer, Integer> dictIdToStringIds) {
super(id, tupleIds, "Decode");
Map<Integer, Integer> dictIdToStringIds,
Map<SlotId, Expr> stringFunctions) {
super(id, tupleDescriptor.getId().asList(), "Decode");
addChild(child);
this.tblRefIds = child.tblRefIds;
this.tupleIds.addAll(child.tblRefIds);
this.dictIdToStringIds = dictIdToStringIds;
this.stringFunctions = stringFunctions;
}

@Override
Expand All @@ -34,6 +40,7 @@ protected void toThrift(TPlanNode msg) {
msg.node_type = TPlanNodeType.DECODE_NODE;
msg.decode_node = new TDecodeNode();
msg.decode_node.setDict_id_to_string_ids(dictIdToStringIds);
stringFunctions.forEach((key, value) -> msg.decode_node.putToString_functions(key.asInt(), value.treeToThrift()));
}

@Override
Expand All @@ -47,6 +54,18 @@ protected String getNodeExplainString(String prefix, TExplainLevel detailLevel)
append("<string id ").append(kv.getValue()).append(">").
append("\n");
}
if (!stringFunctions.isEmpty()) {
output.append(prefix);
output.append("string functions:\n");
for (Map.Entry<SlotId, Expr> kv : stringFunctions.entrySet()) {
output.append(prefix);
output.append("<function id ").
append(kv.getKey()).
append("> : ").
append(kv.getValue().toSql()).
append("\n");
}
}
return output.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -505,10 +505,6 @@ public Map<Integer, RuntimeFilterDescription> getProbeRuntimeFilters() {
return probeRuntimeFilters;
}

public List<Pair<Integer, ColumnDict>> getGlobalDicts() {
return globalDicts;
}

public void setGlobalDicts(List<Pair<Integer, ColumnDict>> dicts) {
this.globalDicts = dicts;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,12 @@
import com.starrocks.common.DdlException;
import com.starrocks.common.ErrorCode;
import com.starrocks.common.ErrorReport;
import com.starrocks.common.Pair;
import com.starrocks.common.UserException;
import com.starrocks.load.Load;
import com.starrocks.load.LoadErrorHub;
import com.starrocks.sql.optimizer.statistics.ColumnDict;
import com.starrocks.sql.optimizer.statistics.IDictManager;
import com.starrocks.task.StreamLoadTask;
import com.starrocks.thrift.InternalServiceVersion;
import com.starrocks.thrift.TExecPlanFragmentParams;
Expand Down Expand Up @@ -103,6 +106,7 @@ public TExecPlanFragmentParams plan(TUniqueId loadId) throws UserException {
// construct tuple descriptor, used for scanNode and dataSink
TupleDescriptor tupleDesc = descTable.createTupleDescriptor("DstTableTuple");
boolean negative = streamLoadTask.getNegative();
List<Pair<Integer, ColumnDict>> globalDicts = Lists.newArrayList();
// here we should be full schema to fill the descriptor table
for (Column col : destTable.getFullSchema()) {
SlotDescriptor slotDesc = descTable.addSlotDescriptor(tupleDesc);
Expand All @@ -112,6 +116,12 @@ public TExecPlanFragmentParams plan(TUniqueId loadId) throws UserException {
if (negative && !col.isKey() && col.getAggregationType() != AggregateType.SUM) {
throw new DdlException("Column is not SUM AggreateType. column:" + col.getName());
}

if (col.getType().isVarchar() && IDictManager.getInstance().hasGlobalDict(destTable.getId(),
col.getName())) {
ColumnDict dict = IDictManager.getInstance().getGlobalDict(destTable.getId(), col.getName());
globalDicts.add(new Pair<>(slotDesc.getId().asInt(), dict));
}
}
if (destTable.getKeysType() == KeysType.PRIMARY_KEYS) {
// add op type column
Expand Down Expand Up @@ -141,6 +151,9 @@ public TExecPlanFragmentParams plan(TUniqueId loadId) throws UserException {
// OlapTableSink can dispatch data to corresponding node.
PlanFragment fragment = new PlanFragment(new PlanFragmentId(0), scanNode, DataPartition.UNPARTITIONED);
fragment.setSink(olapTableSink);
// After data loading, we need to check the global dict for low cardinality string column
// whether update.
fragment.setGlobalDicts(globalDicts);

fragment.finalize(null, false);

Expand Down
Loading

0 comments on commit c706efe

Please sign in to comment.