From 45bbf9d523f598c83696171851747ac3c5d94e95 Mon Sep 17 00:00:00 2001 From: Yufei Liu <51392299+liuyufei9527@users.noreply.github.com> Date: Thu, 31 Aug 2023 10:19:31 +0800 Subject: [PATCH] [Feature] dict query function (#20961) * [Feature] dict query function Implement function 'dict_mapping' support query a primary key table, and retrurn a numeric value column. dict_mapping(dict_table, keys... [, value_column]) argument: dict_table: STRING, with 'db.table' or 'table' format, need table exist and is primary key model. keys: same as dict_table pk length, order and type, not support implicit type cast. value_column: STRING optional, if don't specific value column, find first AUTO_INCREMENT column as value colmn, value column must be BIGINT type. return: BIGINT, actual value of value column. Use case: Create primary key table with auto-increment column as gloal dictionary. use 'dict_mapping' function when load data to base table, mapping string type key to ordered number, then use BITMAP type create rollup mv speed up count distinct query. Detail: Intercept 'dict_mapping' function in sql planner, transfer dict table metadata in TExprNode to BE. DictQueryExpr do real work use TableReader multi_get api query records from dict table and return id. Signed-off-by: liuyufei9527 --- be/src/exprs/CMakeLists.txt | 1 + be/src/exprs/dict_query_expr.cpp | 140 ++++++++++++ be/src/exprs/dict_query_expr.h | 44 ++++ be/src/exprs/expr.cpp | 4 + be/src/runtime/descriptors.h | 3 +- .../com/starrocks/analysis/DictQueryExpr.java | 79 +++++++ .../com/starrocks/catalog/FunctionSet.java | 3 + .../com/starrocks/planner/OlapTableSink.java | 44 ++-- .../service/FrontendServiceImpl.java | 52 ++++- .../sql/analyzer/AggregationAnalyzer.java | 6 + .../sql/analyzer/AstToStringBuilder.java | 6 + .../sql/analyzer/ExpressionAnalyzer.java | 168 ++++++++++++++ .../com/starrocks/sql/ast/AstVisitor.java | 5 + .../operator/scalar/DictQueryOperator.java | 48 ++++ .../scalar/ScalarOperatorVisitor.java | 4 + .../SqlToScalarOperatorTranslator.java | 11 + .../com/starrocks/sql/parser/AstBuilder.java | 6 + .../sql/plan/ScalarOperatorToExpr.java | 10 + .../starrocks/planner/OlapTableSinkTest.java | 14 +- .../sql/analyzer/DictQueryFunctionTest.java | 210 ++++++++++++++++++ gensrc/thrift/Exprs.thrift | 12 + gensrc/thrift/FrontendService.thrift | 40 ++-- 22 files changed, 867 insertions(+), 43 deletions(-) create mode 100644 be/src/exprs/dict_query_expr.cpp create mode 100644 be/src/exprs/dict_query_expr.h create mode 100644 fe/fe-core/src/main/java/com/starrocks/analysis/DictQueryExpr.java create mode 100644 fe/fe-core/src/main/java/com/starrocks/sql/optimizer/operator/scalar/DictQueryOperator.java create mode 100644 fe/fe-core/src/test/java/com/starrocks/sql/analyzer/DictQueryFunctionTest.java diff --git a/be/src/exprs/CMakeLists.txt b/be/src/exprs/CMakeLists.txt index 0fb2cc55f8f246..40ecba70689712 100644 --- a/be/src/exprs/CMakeLists.txt +++ b/be/src/exprs/CMakeLists.txt @@ -97,6 +97,7 @@ set(EXPR_FILES map_apply_expr.cpp map_expr.cpp substring_index.cpp + dict_query_expr.cpp ) add_library(Exprs ${EXPR_FILES}) diff --git a/be/src/exprs/dict_query_expr.cpp b/be/src/exprs/dict_query_expr.cpp new file mode 100644 index 00000000000000..ef53868cd6d29e --- /dev/null +++ b/be/src/exprs/dict_query_expr.cpp @@ -0,0 +1,140 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "exprs/dict_query_expr.h" + +#include "agent/master_info.h" +#include "column/chunk.h" +#include "column/column.h" +#include "column/column_helper.h" +#include "column/column_viewer.h" +#include "gutil/casts.h" +#include "runtime/client_cache.h" +#include "storage/chunk_helper.h" +#include "storage/table_reader.h" +#include "util/thrift_rpc_helper.h" + +namespace starrocks { + +DictQueryExpr::DictQueryExpr(const TExprNode& node) : Expr(node), _dict_query_expr(node.dict_query_expr) {} + +StatusOr DictQueryExpr::evaluate_checked(ExprContext* context, Chunk* ptr) { + Columns columns(children().size()); + size_t size = ptr != nullptr ? ptr->num_rows() : 1; + for (int i = 0; i < _children.size(); ++i) { + columns[i] = _children[i]->evaluate(context, ptr); + } + + ColumnPtr res; + for (auto& column : columns) { + if (column->is_constant()) { + column = ColumnHelper::unpack_and_duplicate_const_column(size, column); + } + } + ChunkPtr key_chunk = ChunkHelper::new_chunk(_key_slots, size); + key_chunk->reset(); + for (int i = 0; i < _dict_query_expr.key_fields.size(); ++i) { + ColumnPtr key_column = columns[1 + i]; + key_chunk->update_column_by_index(key_column, i); + } + + for (auto& column : key_chunk->columns()) { + if (column->is_nullable()) { + column = ColumnHelper::update_column_nullable(false, column, column->size()); + } + } + + std::vector found; + ChunkPtr value_chunk = ChunkHelper::new_chunk(_value_slot, key_chunk->num_rows()); + + Status status = _table_reader->multi_get(*key_chunk, {_dict_query_expr.value_field}, found, *value_chunk); + if (!status.ok()) { + // todo retry + LOG(WARNING) << "fail to execute multi get: " << status.detailed_message(); + return status; + } + res = ColumnHelper::create_column(_value_slot[0]->type(), true); + + int res_idx = 0; + for (int idx = 0; idx < size; ++idx) { + if (found[idx]) { + res->append_datum(value_chunk->get_column_by_index(0)->get(res_idx)); + res_idx++; + } else { + if (_dict_query_expr.strict_mode) { + return Status::NotFound("In strict mode, query failed if record not exist in dict table."); + } + res->append_nulls(1); + } + } + + return res; +} + +Status DictQueryExpr::prepare(RuntimeState* state, ExprContext* context) { + RETURN_IF_ERROR(Expr::prepare(state, context)); + _runtime_state = state; + return Status::OK(); +} + +Status DictQueryExpr::open(RuntimeState* state, ExprContext* context, FunctionContext::FunctionStateScope scope) { + // init parent open + RETURN_IF_ERROR(Expr::open(state, context, scope)); + + TGetDictQueryParamRequest request; + request.__set_db_name(_dict_query_expr.db_name); + request.__set_table_name(_dict_query_expr.tbl_name); + TGetDictQueryParamResponse response; + + TNetworkAddress master_addr = get_master_address(); + RETURN_IF_ERROR(ThriftRpcHelper::rpc( + master_addr.hostname, master_addr.port, + [&request, &response](FrontendServiceConnection& client) { client->getDictQueryParam(response, request); }, + 30000)); + + TableReaderParams params; + params.schema = response.schema; + params.partition_param = response.partition; + params.location_param = response.location; + params.nodes_info = response.nodes_info; + params.partition_versions = _dict_query_expr.partition_version; + params.timeout_ms = 30000; + + _table_reader = std::make_shared(); + RETURN_IF_ERROR(_table_reader->init(params)); + + _key_slots.resize(_dict_query_expr.key_fields.size()); + for (int i = 0; i < _dict_query_expr.key_fields.size(); ++i) { + vector& slot_descs = response.schema.slot_descs; + for (auto& slot : slot_descs) { + if (slot.colName == _dict_query_expr.key_fields[i]) { + _key_slots[i] = state->obj_pool()->add(new SlotDescriptor(slot)); + } + } + } + _value_slot.resize(1); + for (auto& slot : response.schema.slot_descs) { + if (slot.colName == _dict_query_expr.value_field) { + _value_slot[0] = state->obj_pool()->add(new SlotDescriptor(slot)); + } + } + + return Status::OK(); +} + +void DictQueryExpr::close(RuntimeState* state, ExprContext* context, FunctionContext::FunctionStateScope scope) { + Expr::close(state, context, scope); +} + +} // namespace starrocks diff --git a/be/src/exprs/dict_query_expr.h b/be/src/exprs/dict_query_expr.h new file mode 100644 index 00000000000000..79ffe3941d3ee5 --- /dev/null +++ b/be/src/exprs/dict_query_expr.h @@ -0,0 +1,44 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once +#include + +#include "common/object_pool.h" +#include "exprs/expr.h" +#include "runtime/runtime_state.h" +#include "storage/table_reader.h" + +namespace starrocks { + +class DictQueryExpr final : public Expr { +public: + DictQueryExpr(const TExprNode& node); + + Expr* clone(ObjectPool* pool) const override { return pool->add(new DictQueryExpr(*this)); } + StatusOr evaluate_checked(ExprContext* context, Chunk* ptr) override; + Status prepare(RuntimeState* state, ExprContext* context) override; + Status open(RuntimeState* state, ExprContext* context, FunctionContext::FunctionStateScope scope) override; + void close(RuntimeState* state, ExprContext* context, FunctionContext::FunctionStateScope scope) override; + +private: + RuntimeState* _runtime_state; + TDictQueryExpr _dict_query_expr; + + Schema _key_schema; + std::vector _key_slots; + std::vector _value_slot; + std::shared_ptr _table_reader; +}; +} // namespace starrocks diff --git a/be/src/exprs/expr.cpp b/be/src/exprs/expr.cpp index b0ce091be4aa9f..c2d668871d2164 100644 --- a/be/src/exprs/expr.cpp +++ b/be/src/exprs/expr.cpp @@ -54,6 +54,7 @@ #include "exprs/column_ref.h" #include "exprs/compound_predicate.h" #include "exprs/condition_expr.h" +#include "exprs/dict_query_expr.h" #include "exprs/dictmapping_expr.h" #include "exprs/function_call_expr.h" #include "exprs/in_predicate.h" @@ -387,6 +388,9 @@ Status Expr::create_vectorized_expr(starrocks::ObjectPool* pool, const starrocks case TExprNodeType::CLONE_EXPR: *expr = pool->add(new CloneExpr(texpr_node)); break; + case TExprNodeType::DICT_QUERY_EXPR: + *expr = pool->add(new DictQueryExpr(texpr_node)); + break; case TExprNodeType::ARRAY_SLICE_EXPR: case TExprNodeType::AGG_EXPR: case TExprNodeType::TABLE_FUNCTION_EXPR: diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index 445ed9a0c2eb51..1badd525c53a94 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -109,6 +109,8 @@ class SlotDescriptor { int32_t col_unique_id() const { return _col_unique_id; } + SlotDescriptor(const TSlotDescriptor& tdesc); + private: friend class DescriptorTbl; friend class TupleDescriptor; @@ -136,7 +138,6 @@ class SlotDescriptor { // @todo: replace _null_indicator_offset when remove _null_indicator_offset const bool _is_nullable; - SlotDescriptor(const TSlotDescriptor& tdesc); SlotDescriptor(const PSlotDescriptor& pdesc); }; diff --git a/fe/fe-core/src/main/java/com/starrocks/analysis/DictQueryExpr.java b/fe/fe-core/src/main/java/com/starrocks/analysis/DictQueryExpr.java new file mode 100644 index 00000000000000..f13dee5674029f --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/analysis/DictQueryExpr.java @@ -0,0 +1,79 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + + +package com.starrocks.analysis; + +import com.starrocks.catalog.Function; +import com.starrocks.catalog.FunctionSet; +import com.starrocks.catalog.Type; +import com.starrocks.sql.analyzer.SemanticException; +import com.starrocks.sql.ast.AstVisitor; +import com.starrocks.thrift.TDictQueryExpr; +import com.starrocks.thrift.TExprNode; +import com.starrocks.thrift.TExprNodeType; + +import java.util.List; + +// dict_mapping(STRING dict_table_name, ANY... keys [, STRING value_column_name]) +public class DictQueryExpr extends FunctionCallExpr { + + private TDictQueryExpr dictQueryExpr; + + public DictQueryExpr(List params) throws SemanticException { + super(FunctionSet.DICT_MAPPING, params); + } + + public DictQueryExpr(List params, TDictQueryExpr dictQueryExpr, Function fn) { + super(FunctionSet.DICT_MAPPING, params); + this.dictQueryExpr = dictQueryExpr; + this.fn = fn; + setType(fn.getReturnType()); + } + + protected DictQueryExpr(DictQueryExpr other) { + super(other); + this.dictQueryExpr = other.getDictQueryExpr(); + } + + + @Override + protected void toThrift(TExprNode msg) { + msg.setNode_type(TExprNodeType.DICT_QUERY_EXPR); + msg.setDict_query_expr(dictQueryExpr); + } + + @Override + public R accept(AstVisitor visitor, C context) { + return visitor.visitDictQueryExpr(this, context); + } + + @Override + public boolean isAggregateFunction() { + return false; + } + + @Override + public Expr clone() { + return new DictQueryExpr(this); + } + + public TDictQueryExpr getDictQueryExpr() { + return dictQueryExpr; + } + + public void setDictQueryExpr(TDictQueryExpr dictQueryExpr) { + this.dictQueryExpr = dictQueryExpr; + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/FunctionSet.java b/fe/fe-core/src/main/java/com/starrocks/catalog/FunctionSet.java index 9b4999d61d82ba..21f9f8d31de398 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/FunctionSet.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/FunctionSet.java @@ -449,6 +449,9 @@ public class FunctionSet { public static final Function JSON_QUERY_FUNC = new Function( new FunctionName(JSON_QUERY), new Type[] {Type.JSON, Type.VARCHAR}, Type.JSON, false); + // dict query function + public static final String DICT_MAPPING = "dict_mapping"; + private static final Logger LOGGER = LogManager.getLogger(FunctionSet.class); private static final Set STDDEV_ARG_TYPE = diff --git a/fe/fe-core/src/main/java/com/starrocks/planner/OlapTableSink.java b/fe/fe-core/src/main/java/com/starrocks/planner/OlapTableSink.java index 9ad07389544d87..b8658a9a9272b6 100644 --- a/fe/fe-core/src/main/java/com/starrocks/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/com/starrocks/planner/OlapTableSink.java @@ -234,12 +234,12 @@ public void complete() throws UserException { } tSink.setNum_replicas(numReplicas); tSink.setNeed_gen_rollup(dstTable.shouldLoadToNewRollup()); - tSink.setSchema(createSchema(tSink.getDb_id(), dstTable)); - tSink.setPartition(createPartition(tSink.getDb_id(), dstTable)); - tSink.setLocation(createLocation(dstTable)); + tSink.setSchema(createSchema(tSink.getDb_id(), dstTable, tupleDescriptor)); + tSink.setPartition(createPartition(tSink.getDb_id(), dstTable, enableAutomaticPartition, partitionIds)); + tSink.setLocation(createLocation(dstTable, clusterId, partitionIds, enableReplicatedStorage)); tSink.setNodes_info(GlobalStateMgr.getCurrentState().createNodesInfo(clusterId)); tSink.setPartial_update_mode(this.partialUpdateMode); - if (canUseColocateMVIndex()) { + if (canUseColocateMVIndex(dstTable)) { tSink.setEnable_colocate_mv_index(true); } } @@ -269,7 +269,7 @@ public TDataSink toThrift() { return tDataSink; } - private TOlapTableSchemaParam createSchema(long dbId, OlapTable table) { + public static TOlapTableSchemaParam createSchema(long dbId, OlapTable table, TupleDescriptor tupleDescriptor) { TOlapTableSchemaParam schemaParam = new TOlapTableSchemaParam(); schemaParam.setDb_id(dbId); schemaParam.setTable_id(table.getId()); @@ -300,7 +300,7 @@ private TOlapTableSchemaParam createSchema(long dbId, OlapTable table) { return schemaParam; } - private List getDistColumns(DistributionInfo distInfo, OlapTable table) throws UserException { + private static List getDistColumns(DistributionInfo distInfo, OlapTable table) throws UserException { List distColumns = Lists.newArrayList(); switch (distInfo.getType()) { case HASH: { @@ -319,7 +319,9 @@ private List getDistColumns(DistributionInfo distInfo, OlapTable table) return distColumns; } - private TOlapTablePartitionParam createPartition(long dbId, OlapTable table) throws UserException { + public static TOlapTablePartitionParam createPartition(long dbId, OlapTable table, + boolean enableAutomaticPartition, + List partitionIds) throws UserException { TOlapTablePartitionParam partitionParam = new TOlapTablePartitionParam(); partitionParam.setDb_id(dbId); partitionParam.setTable_id(table.getId()); @@ -400,20 +402,20 @@ private TOlapTablePartitionParam createPartition(long dbId, OlapTable table) thr return partitionParam; } - private List literalExprsToTExprNodes(List values) { + private static List literalExprsToTExprNodes(List values) { return values.stream() .map(value -> value.treeToThrift().getNodes().get(0)) .collect(Collectors.toList()); } - private void setListPartitionValues(ListPartitionInfo listPartitionInfo, Partition partition, + private static void setListPartitionValues(ListPartitionInfo listPartitionInfo, Partition partition, TOlapTablePartition tPartition) { List> inKeysExprNodes = new ArrayList<>(); List> multiValues = listPartitionInfo.getMultiLiteralExprValues().get(partition.getId()); if (multiValues != null && !multiValues.isEmpty()) { inKeysExprNodes = multiValues.stream() - .map(this::literalExprsToTExprNodes) + .map(OlapTableSink::literalExprsToTExprNodes) .collect(Collectors.toList()); tPartition.setIn_keys(inKeysExprNodes); } @@ -421,7 +423,7 @@ private void setListPartitionValues(ListPartitionInfo listPartitionInfo, Partiti List values = listPartitionInfo.getLiteralExprValues().get(partition.getId()); if (values != null && !values.isEmpty()) { inKeysExprNodes = values.stream() - .map(value -> this.literalExprsToTExprNodes(Lists.newArrayList(value))) + .map(value -> OlapTableSink.literalExprsToTExprNodes(Lists.newArrayList(value))) .collect(Collectors.toList()); } @@ -435,7 +437,7 @@ private void setListPartitionValues(ListPartitionInfo listPartitionInfo, Partiti } } - private void setRangeKeys(RangePartitionInfo rangePartitionInfo, Partition partition, + private static void setRangeKeys(RangePartitionInfo rangePartitionInfo, Partition partition, TOlapTablePartition tPartition) { int partColNum = rangePartitionInfo.getPartitionColumns().size(); Range range = rangePartitionInfo.getRange(partition.getId()); @@ -459,7 +461,7 @@ private void setRangeKeys(RangePartitionInfo rangePartitionInfo, Partition parti } } - private void setIndexAndBucketNums(Partition partition, TOlapTablePartition tPartition) { + private static void setIndexAndBucketNums(Partition partition, TOlapTablePartition tPartition) { for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) { tPartition.addToIndexes(new TOlapTableIndexTablets(index.getId(), Lists.newArrayList( index.getTablets().stream().map(Tablet::getId).collect(Collectors.toList())))); @@ -467,7 +469,7 @@ private void setIndexAndBucketNums(Partition partition, TOlapTablePartition tPar } } - private DistributionInfo setDistributedColumns(TOlapTablePartitionParam partitionParam, + private static DistributionInfo setDistributedColumns(TOlapTablePartitionParam partitionParam, DistributionInfo selectedDistInfo, Partition partition, OlapTable table) throws UserException { DistributionInfo distInfo = partition.getDistributionInfo(); @@ -483,7 +485,8 @@ private DistributionInfo setDistributedColumns(TOlapTablePartitionParam partitio return selectedDistInfo; } - private TOlapTableLocationParam createLocation(OlapTable table) throws UserException { + public static TOlapTableLocationParam createLocation(OlapTable table, int clusterId, List partitionIds, + boolean enableReplicatedStorage) throws UserException { TOlapTableLocationParam locationParam = new TOlapTableLocationParam(); // replica -> path hash Multimap allBePathsMap = HashMultimap.create(); @@ -519,7 +522,7 @@ private TOlapTableLocationParam createLocation(OlapTable table) throws UserExcep List replicas = Lists.newArrayList(bePathsMap.keySet()); if (enableReplicatedStorage) { - int lowUsageIndex = findPrimaryReplica(bePrimaryMap, infoService, index, + int lowUsageIndex = findPrimaryReplica(table, bePrimaryMap, infoService, index, selectedBackedIds, idx, replicas); if (lowUsageIndex != -1) { bePrimaryMap.put(replicas.get(lowUsageIndex).getBackendId(), @@ -553,14 +556,15 @@ private TOlapTableLocationParam createLocation(OlapTable table) throws UserExcep return locationParam; } - private int findPrimaryReplica(Map bePrimaryMap, + private static int findPrimaryReplica(OlapTable table, + Map bePrimaryMap, SystemInfoService infoService, MaterializedIndex index, List selectedBackedIds, int idx, List replicas) { // TODO: Check different index's tablet with the same `idx` must be colocate? - if (canUseColocateMVIndex() && selectedBackedIds.size() == index.getTablets().size()) { + if (canUseColocateMVIndex(table) && selectedBackedIds.size() == index.getTablets().size()) { for (int i = 0; i < replicas.size(); i++) { if (replicas.get(i).getBackendId() == selectedBackedIds.get(idx)) { return i; @@ -587,8 +591,8 @@ private int findPrimaryReplica(Map bePrimaryMap, return lowUsageIndex; } - private boolean canUseColocateMVIndex() { - return Config.enable_colocate_mv_index && dstTable.isEnableColocateMVIndex(); + private static boolean canUseColocateMVIndex(OlapTable table) { + return Config.enable_colocate_mv_index && table.isEnableColocateMVIndex(); } public boolean canUsePipeLine() { diff --git a/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java index 52aef61e73b940..c310695c4e2215 100644 --- a/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java @@ -43,7 +43,11 @@ import com.google.common.collect.Range; import com.google.common.collect.Sets; import com.starrocks.analysis.LiteralExpr; +import com.starrocks.analysis.SlotDescriptor; +import com.starrocks.analysis.SlotId; import com.starrocks.analysis.TableName; +import com.starrocks.analysis.TupleDescriptor; +import com.starrocks.analysis.TupleId; import com.starrocks.authentication.AuthenticationMgr; import com.starrocks.catalog.Column; import com.starrocks.catalog.Database; @@ -72,6 +76,7 @@ import com.starrocks.common.Config; import com.starrocks.common.DdlException; import com.starrocks.common.DuplicatedRequestException; +import com.starrocks.common.IdGenerator; import com.starrocks.common.LabelAlreadyUsedException; import com.starrocks.common.MetaNotFoundException; import com.starrocks.common.Pair; @@ -113,6 +118,7 @@ import com.starrocks.mysql.privilege.TablePrivEntry; import com.starrocks.mysql.privilege.UserPrivTable; import com.starrocks.persist.AutoIncrementInfo; +import com.starrocks.planner.OlapTableSink; import com.starrocks.planner.StreamLoadPlanner; import com.starrocks.privilege.AccessDeniedException; import com.starrocks.privilege.PrivilegeBuiltinConstants; @@ -173,6 +179,8 @@ import com.starrocks.thrift.TGetDBPrivsResult; import com.starrocks.thrift.TGetDbsParams; import com.starrocks.thrift.TGetDbsResult; +import com.starrocks.thrift.TGetDictQueryParamRequest; +import com.starrocks.thrift.TGetDictQueryParamResponse; import com.starrocks.thrift.TGetGrantsToRolesOrUserRequest; import com.starrocks.thrift.TGetGrantsToRolesOrUserResponse; import com.starrocks.thrift.TGetLoadTxnStatusRequest; @@ -1261,7 +1269,7 @@ public TLoadTxnBeginResult loadTxnBegin(TLoadTxnBeginRequest request) throws TEx } else { result.setTimeout(0); } - + try { result.setTxnId(loadTxnBeginImpl(request, clientAddr)); } catch (DuplicatedRequestException e) { @@ -2432,4 +2440,46 @@ public TReleaseSlotResponse releaseSlot(TReleaseSlotRequest request) throws TExc return res; } + + @Override + public TGetDictQueryParamResponse getDictQueryParam(TGetDictQueryParamRequest request) throws TException { + Database db = GlobalStateMgr.getCurrentState().getDb(request.getDb_name()); + if (db == null) { + throw new SemanticException("Database %s is not found", request.getDb_name()); + } + Table table = db.getTable(request.getTable_name()); + if (table == null) { + throw new SemanticException("dict table %s is not found", request.getTable_name()); + } + if (!(table instanceof OlapTable)) { + throw new SemanticException("dict table type is not OlapTable, type=" + table.getClass()); + } + OlapTable dictTable = (OlapTable) table; + TupleDescriptor tupleDescriptor = new TupleDescriptor(TupleId.createGenerator().getNextId()); + IdGenerator slotIdIdGenerator = SlotId.createGenerator(); + + for (Column column : dictTable.getBaseSchema()) { + SlotDescriptor slotDescriptor = new SlotDescriptor(slotIdIdGenerator.getNextId(), tupleDescriptor); + slotDescriptor.setColumn(column); + slotDescriptor.setIsMaterialized(true); + tupleDescriptor.addSlot(slotDescriptor); + } + + TGetDictQueryParamResponse response = new TGetDictQueryParamResponse(); + response.setSchema(OlapTableSink.createSchema(db.getId(), dictTable, tupleDescriptor)); + try { + List allPartitions = dictTable.getAllPartitionIds(); + response.setPartition( + OlapTableSink.createPartition( + db.getId(), dictTable, dictTable.supportedAutomaticPartition(), allPartitions)); + response.setLocation(OlapTableSink.createLocation( + dictTable, dictTable.getClusterId(), allPartitions, dictTable.enableReplicatedStorage())); + response.setNodes_info(GlobalStateMgr.getCurrentState().createNodesInfo(dictTable.getClusterId())); + } catch (UserException e) { + SemanticException semanticException = new SemanticException("build DictQueryParams error in dict_query_expr."); + semanticException.initCause(e); + throw semanticException; + } + return response; + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/AggregationAnalyzer.java b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/AggregationAnalyzer.java index 5849f9b1be4465..f1ef89d6cc1d59 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/AggregationAnalyzer.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/AggregationAnalyzer.java @@ -26,6 +26,7 @@ import com.starrocks.analysis.CloneExpr; import com.starrocks.analysis.CollectionElementExpr; import com.starrocks.analysis.CompoundPredicate; +import com.starrocks.analysis.DictQueryExpr; import com.starrocks.analysis.ExistsPredicate; import com.starrocks.analysis.Expr; import com.starrocks.analysis.FunctionCallExpr; @@ -335,5 +336,10 @@ public Boolean visitTimestampArithmeticExpr(TimestampArithmeticExpr node, Void c public Boolean visitCloneExpr(CloneExpr node, Void context) { return visit(node.getChild(0)); } + + @Override + public Boolean visitDictQueryExpr(DictQueryExpr node, Void context) { + return node.getChildren().stream().allMatch(this::visit); + } } } diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/AstToStringBuilder.java b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/AstToStringBuilder.java index baeca065713989..c39cb990b2fd82 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/AstToStringBuilder.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/AstToStringBuilder.java @@ -27,6 +27,7 @@ import com.starrocks.analysis.CollectionElementExpr; import com.starrocks.analysis.CompoundPredicate; import com.starrocks.analysis.DecimalLiteral; +import com.starrocks.analysis.DictQueryExpr; import com.starrocks.analysis.ExistsPredicate; import com.starrocks.analysis.Expr; import com.starrocks.analysis.FunctionCallExpr; @@ -1177,6 +1178,11 @@ public String visitGroupByClause(GroupByClause node, Void context) { return strBuilder.toString(); } + @Override + public String visitDictQueryExpr(DictQueryExpr node, Void context) { + return visitFunctionCall(node, context); + } + private String visitAstList(List contexts) { return Joiner.on(", ").join(contexts.stream().map(this::visit).collect(toList())); } diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/ExpressionAnalyzer.java b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/ExpressionAnalyzer.java index 66a8b3b873da94..ebeae0fa551b8f 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/ExpressionAnalyzer.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/ExpressionAnalyzer.java @@ -26,15 +26,18 @@ import com.starrocks.analysis.ArrowExpr; import com.starrocks.analysis.BetweenPredicate; import com.starrocks.analysis.BinaryPredicate; +import com.starrocks.analysis.BoolLiteral; import com.starrocks.analysis.CaseExpr; import com.starrocks.analysis.CastExpr; import com.starrocks.analysis.CloneExpr; import com.starrocks.analysis.CollectionElementExpr; import com.starrocks.analysis.CompoundPredicate; +import com.starrocks.analysis.DictQueryExpr; import com.starrocks.analysis.ExistsPredicate; import com.starrocks.analysis.Expr; import com.starrocks.analysis.ExprId; import com.starrocks.analysis.FunctionCallExpr; +import com.starrocks.analysis.FunctionName; import com.starrocks.analysis.GroupingFunctionCallExpr; import com.starrocks.analysis.InPredicate; import com.starrocks.analysis.InformationFunction; @@ -52,17 +55,25 @@ import com.starrocks.analysis.StringLiteral; import com.starrocks.analysis.SubfieldExpr; import com.starrocks.analysis.Subquery; +import com.starrocks.analysis.TableName; import com.starrocks.analysis.TimestampArithmeticExpr; import com.starrocks.analysis.VariableExpr; import com.starrocks.catalog.AggregateFunction; import com.starrocks.catalog.ArrayType; +import com.starrocks.catalog.Column; +import com.starrocks.catalog.Database; import com.starrocks.catalog.Function; import com.starrocks.catalog.FunctionSet; +import com.starrocks.catalog.KeysType; import com.starrocks.catalog.MapType; +import com.starrocks.catalog.MaterializedView; +import com.starrocks.catalog.OlapTable; import com.starrocks.catalog.PrimitiveType; import com.starrocks.catalog.ScalarFunction; +import com.starrocks.catalog.ScalarType; import com.starrocks.catalog.StructField; import com.starrocks.catalog.StructType; +import com.starrocks.catalog.Table; import com.starrocks.catalog.TableFunction; import com.starrocks.catalog.Type; import com.starrocks.cluster.ClusterNamespace; @@ -92,12 +103,17 @@ import com.starrocks.sql.optimizer.rewrite.ScalarOperatorEvaluator; import com.starrocks.sql.optimizer.transformer.ExpressionMapping; import com.starrocks.sql.optimizer.transformer.SqlToScalarOperatorTranslator; +import com.starrocks.thrift.TDictQueryExpr; +import com.starrocks.thrift.TFunctionBinaryType; import java.math.BigInteger; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.regex.PatternSyntaxException; import java.util.stream.Collectors; @@ -1624,6 +1640,158 @@ public Void visitDefaultValueExpr(DefaultValueExpr node, Scope context) { public Void visitCloneExpr(CloneExpr node, Scope context) { return null; } + + @Override + public Void visitDictQueryExpr(DictQueryExpr node, Scope context) { + List params = node.getParams().exprs(); + if (!(params.get(0) instanceof StringLiteral)) { + throw new SemanticException("dict_mapping function first param table_name should be string literal"); + } + String[] dictTableFullName = ((StringLiteral) params.get(0)).getStringValue().split("\\."); + TableName tableName; + if (dictTableFullName.length == 1) { + tableName = new TableName(null, dictTableFullName[0]); + tableName.normalization(session); + } else if (dictTableFullName.length == 2) { + tableName = new TableName(dictTableFullName[0], dictTableFullName[1]); + } else { + throw new SemanticException("dict_mapping function first param table_name should be 'db.tbl' or 'tbl' format"); + } + + Database db = GlobalStateMgr.getCurrentState().getDb(tableName.getDb()); + if (db == null) { + throw new SemanticException("Database %s is not found", tableName.getDb()); + } + Table table = db.getTable(tableName.getTbl()); + if (table == null) { + throw new SemanticException("dict table %s is not found", tableName.getTbl()); + } + if (!(table instanceof OlapTable)) { + throw new SemanticException("dict table type is not OlapTable, type=" + table.getClass()); + } + if (table instanceof MaterializedView) { + throw new SemanticException("dict table can't be materialized view"); + } + OlapTable dictTable = (OlapTable) table; + + if (dictTable.getKeysType() != KeysType.PRIMARY_KEYS) { + throw new SemanticException("dict table " + tableName + " should be primary key table"); + } + + // verify keys length and type + List keyColumns = new ArrayList<>(); + Column valueColumn = null; + for (Column column : dictTable.getBaseSchema()) { + if (column.isKey()) { + keyColumns.add(column); + } + if (column.isAutoIncrement()) { + valueColumn = column; + } + } + // (table, keys..., value_column, strict_mode) + int valueColumnIdx; + int strictModeIdx; + if (params.size() == keyColumns.size() + 1) { + valueColumnIdx = -1; + strictModeIdx = -1; + } else if (params.size() == keyColumns.size() + 2) { + if (params.get(params.size() - 1).getType().getPrimitiveType().isStringType()) { + valueColumnIdx = params.size() - 1; + strictModeIdx = -1; + } else { + strictModeIdx = params.size() - 1; + valueColumnIdx = -1; + } + } else if (params.size() == keyColumns.size() + 3) { + valueColumnIdx = params.size() - 2; + strictModeIdx = params.size() - 1; + } else { + throw new SemanticException(String.format("dict_mapping function param size should be %d - %d", + keyColumns.size() + 1, keyColumns.size() + 3)); + } + + String valueField; + if (valueColumnIdx >= 0) { + Expr valueFieldExpr = params.get(valueColumnIdx); + if (!(valueFieldExpr instanceof StringLiteral)) { + throw new SemanticException("dict_mapping function value_column param should be STRING constant"); + } + valueField = ((StringLiteral) valueFieldExpr).getStringValue(); + valueColumn = dictTable.getBaseColumn(valueField); + if (valueColumn == null) { + throw new SemanticException("dict_mapping function can't find value column " + valueField + " in dict table"); + } + } else { + if (valueColumn == null) { + throw new SemanticException("dict_mapping function can't find auto increment column in dict table"); + } + valueField = valueColumn.getName(); + } + + boolean strictMode = false; + if (strictModeIdx >= 0) { + Expr strictModeExpr = params.get(strictModeIdx); + if (!(strictModeExpr instanceof BoolLiteral)) { + throw new SemanticException("dict_mapping function strict_mode param should be bool constant"); + } + strictMode = ((BoolLiteral) strictModeExpr).getValue(); + } + + List expectTypes = new ArrayList<>(); + expectTypes.add(Type.VARCHAR); + for (Column keyColumn : keyColumns) { + expectTypes.add(ScalarType.createType(keyColumn.getType().getPrimitiveType())); + } + if (valueColumnIdx >= 0) { + expectTypes.add(Type.VARCHAR); + } + if (strictModeIdx >= 0) { + expectTypes.add(Type.BOOLEAN); + } + List actualTypes = params.stream() + .map(expr -> ScalarType.createType(expr.getType().getPrimitiveType())).collect(Collectors.toList()); + if (!Objects.equals(expectTypes, actualTypes)) { + List expectTypeNames = new ArrayList<>(); + expectTypeNames.add("VARCHAR dict_table"); + for (int i = 0; i < keyColumns.size(); i++) { + expectTypeNames.add(expectTypes.get(i + 1).canonicalName() + " " + keyColumns.get(i).getName()); + } + if (valueColumnIdx >= 0) { + expectTypeNames.add("VARCHAR value_field_name"); + } + if (strictModeIdx >= 0) { + expectTypeNames.add("BOOLEAN strict_mode"); + } + List actualTypeNames = actualTypes.stream().map(Type::canonicalName).collect(Collectors.toList()); + throw new SemanticException( + String.format("dict_mapping function params not match expected,\nExpect: %s\nActual: %s", + String.join(", ", expectTypeNames), String.join(", ", actualTypeNames))); + } + + Type valueType = ScalarType.createType(valueColumn.getType().getPrimitiveType()); + + final TDictQueryExpr dictQueryExpr = new TDictQueryExpr(); + dictQueryExpr.setDb_name(tableName.getDb()); + dictQueryExpr.setTbl_name(tableName.getTbl()); + + Map partitionVersion = new HashMap<>(); + dictTable.getPartitions().forEach(p -> partitionVersion.put(p.getId(), p.getVisibleVersion())); + dictQueryExpr.setPartition_version(partitionVersion); + + List keyFields = keyColumns.stream().map(Column::getName).collect(Collectors.toList()); + dictQueryExpr.setKey_fields(keyFields); + dictQueryExpr.setValue_field(valueField); + dictQueryExpr.setStrict_mode(strictMode); + node.setType(valueType); + + Function fn = new Function(FunctionName.createFnName(FunctionSet.DICT_MAPPING), actualTypes, valueType, false); + fn.setBinaryType(TFunctionBinaryType.BUILTIN); + node.setFn(fn); + + node.setDictQueryExpr(dictQueryExpr); + return null; + } } static class IgnoreSlotVisitor extends Visitor { diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/ast/AstVisitor.java b/fe/fe-core/src/main/java/com/starrocks/sql/ast/AstVisitor.java index 64da0eb3f0b099..b4bc0579c1214d 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/ast/AstVisitor.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/ast/AstVisitor.java @@ -25,6 +25,7 @@ import com.starrocks.analysis.CloneExpr; import com.starrocks.analysis.CollectionElementExpr; import com.starrocks.analysis.CompoundPredicate; +import com.starrocks.analysis.DictQueryExpr; import com.starrocks.analysis.ExistsPredicate; import com.starrocks.analysis.Expr; import com.starrocks.analysis.FunctionCallExpr; @@ -1195,4 +1196,8 @@ public R visitOrderByElement(OrderByElement node, C context) { public R visitGroupByClause(GroupByClause node, C context) { return null; } + + public R visitDictQueryExpr(DictQueryExpr node, C context) { + return visitExpression(node, context); + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/operator/scalar/DictQueryOperator.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/operator/scalar/DictQueryOperator.java new file mode 100644 index 00000000000000..f380519ed8ce51 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/operator/scalar/DictQueryOperator.java @@ -0,0 +1,48 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + + +package com.starrocks.sql.optimizer.operator.scalar; + +import com.starrocks.catalog.Function; +import com.starrocks.catalog.FunctionSet; +import com.starrocks.catalog.Type; +import com.starrocks.thrift.TDictQueryExpr; + +import java.util.List; + +public class DictQueryOperator extends CallOperator { + + private final TDictQueryExpr dictQueryExpr; + private final Function fn; + + public DictQueryOperator(List arguments, TDictQueryExpr dictQueryExpr, Function fn) { + super(FunctionSet.DICT_MAPPING, Type.BIGINT, arguments); + this.dictQueryExpr = dictQueryExpr; + this.fn = fn; + } + + @Override + public R accept(ScalarOperatorVisitor visitor, C context) { + return visitor.visitDictQueryOperator(this, context); + } + + public TDictQueryExpr getDictQueryExpr() { + return dictQueryExpr; + } + + public Function getFn() { + return fn; + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/operator/scalar/ScalarOperatorVisitor.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/operator/scalar/ScalarOperatorVisitor.java index b43515248c7e97..7093d0eeba9165 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/operator/scalar/ScalarOperatorVisitor.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/operator/scalar/ScalarOperatorVisitor.java @@ -114,4 +114,8 @@ public R visitCloneOperator(CloneOperator operator, C context) { public R visitSubqueryOperator(SubqueryOperator operator, C context) { return visit(operator, context); } + + public R visitDictQueryOperator(DictQueryOperator operator, C context) { + return visit(operator, context); + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/transformer/SqlToScalarOperatorTranslator.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/transformer/SqlToScalarOperatorTranslator.java index 25764f1e262b19..b5cfebc3be1f46 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/transformer/SqlToScalarOperatorTranslator.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/transformer/SqlToScalarOperatorTranslator.java @@ -27,6 +27,7 @@ import com.starrocks.analysis.CloneExpr; import com.starrocks.analysis.CollectionElementExpr; import com.starrocks.analysis.CompoundPredicate; +import com.starrocks.analysis.DictQueryExpr; import com.starrocks.analysis.ExistsPredicate; import com.starrocks.analysis.Expr; import com.starrocks.analysis.FunctionCallExpr; @@ -82,6 +83,7 @@ import com.starrocks.sql.optimizer.operator.scalar.ColumnRefOperator; import com.starrocks.sql.optimizer.operator.scalar.CompoundPredicateOperator; import com.starrocks.sql.optimizer.operator.scalar.ConstantOperator; +import com.starrocks.sql.optimizer.operator.scalar.DictQueryOperator; import com.starrocks.sql.optimizer.operator.scalar.ExistsPredicateOperator; import com.starrocks.sql.optimizer.operator.scalar.InPredicateOperator; import com.starrocks.sql.optimizer.operator.scalar.IsNullPredicateOperator; @@ -821,6 +823,15 @@ public ScalarOperator visitSubquery(Subquery node, Context context) { public ScalarOperator visitCloneExpr(CloneExpr node, Context context) { return new CloneOperator(visit(node.getChild(0), context.clone(node))); } + + @Override + public ScalarOperator visitDictQueryExpr(DictQueryExpr node, Context context) { + List arguments = node.getChildren() + .stream() + .map(child -> visit(child, context.clone(node))) + .collect(Collectors.toList()); + return new DictQueryOperator(arguments, node.getDictQueryExpr(), node.getFn()); + } } static class IgnoreSlotVisitor extends Visitor { diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/parser/AstBuilder.java b/fe/fe-core/src/main/java/com/starrocks/sql/parser/AstBuilder.java index 53f6332e585b4c..c620cb5dfddc1e 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/parser/AstBuilder.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/parser/AstBuilder.java @@ -36,6 +36,7 @@ import com.starrocks.analysis.CompoundPredicate; import com.starrocks.analysis.DateLiteral; import com.starrocks.analysis.DecimalLiteral; +import com.starrocks.analysis.DictQueryExpr; import com.starrocks.analysis.ExistsPredicate; import com.starrocks.analysis.Expr; import com.starrocks.analysis.FloatLiteral; @@ -5671,6 +5672,11 @@ public ParseNode visitSimpleFunctionCall(StarRocksParser.SimpleFunctionCallConte } } + if (functionName.equals(FunctionSet.DICT_MAPPING)) { + List params = visit(context.expression(), Expr.class); + return new DictQueryExpr(params); + } + FunctionCallExpr functionCallExpr = new FunctionCallExpr(fnName, new FunctionParams(false, visit(context.expression(), Expr.class)), pos); if (context.over() != null) { diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/plan/ScalarOperatorToExpr.java b/fe/fe-core/src/main/java/com/starrocks/sql/plan/ScalarOperatorToExpr.java index b9dc87f46e1862..8c70c5a74d1c9d 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/plan/ScalarOperatorToExpr.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/plan/ScalarOperatorToExpr.java @@ -31,6 +31,7 @@ import com.starrocks.analysis.DateLiteral; import com.starrocks.analysis.DecimalLiteral; import com.starrocks.analysis.DictMappingExpr; +import com.starrocks.analysis.DictQueryExpr; import com.starrocks.analysis.Expr; import com.starrocks.analysis.FloatLiteral; import com.starrocks.analysis.FunctionCallExpr; @@ -70,6 +71,7 @@ import com.starrocks.sql.optimizer.operator.scalar.CompoundPredicateOperator; import com.starrocks.sql.optimizer.operator.scalar.ConstantOperator; import com.starrocks.sql.optimizer.operator.scalar.DictMappingOperator; +import com.starrocks.sql.optimizer.operator.scalar.DictQueryOperator; import com.starrocks.sql.optimizer.operator.scalar.ExistsPredicateOperator; import com.starrocks.sql.optimizer.operator.scalar.InPredicateOperator; import com.starrocks.sql.optimizer.operator.scalar.IsNullPredicateOperator; @@ -597,6 +599,14 @@ public Expr visitSubqueryOperator(SubqueryOperator operator, FormatterContext co subquery.setUseSemiAnti(operator.getApplyOperator().isUseSemiAnti()); return subquery; } + + @Override + public Expr visitDictQueryOperator(DictQueryOperator operator, FormatterContext context) { + List arg = operator.getChildren().stream() + .map(expr -> buildExpr.build(expr, context)) + .collect(Collectors.toList()); + return new DictQueryExpr(arg, operator.getDictQueryExpr(), operator.getFn()); + } } static class IgnoreSlotFormatter extends Formatter { diff --git a/fe/fe-core/src/test/java/com/starrocks/planner/OlapTableSinkTest.java b/fe/fe-core/src/test/java/com/starrocks/planner/OlapTableSinkTest.java index ab653886ec9e6f..a1715d0d48477f 100644 --- a/fe/fe-core/src/test/java/com/starrocks/planner/OlapTableSinkTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/planner/OlapTableSinkTest.java @@ -199,7 +199,7 @@ public void testRangeUnknownPartition( @Test public void testCreateLocationWithLocalTablet(@Mocked GlobalStateMgr globalStateMgr, - @Mocked SystemInfoService systemInfoService) { + @Mocked SystemInfoService systemInfoService) throws Exception { long dbId = 1L; long tableId = 2L; long partitionId = 3L; @@ -263,9 +263,8 @@ public void testCreateLocationWithLocalTablet(@Mocked GlobalStateMgr globalState } }; - OlapTableSink sink = new OlapTableSink(table, null, Lists.newArrayList(partitionId), - TWriteQuorumType.MAJORITY, false, false, false); - TOlapTableLocationParam param = (TOlapTableLocationParam) Deencapsulation.invoke(sink, "createLocation", table); + TOlapTableLocationParam param = OlapTableSink.createLocation( + table, table.getClusterId(), Lists.newArrayList(partitionId), false); System.out.println(param); // Check @@ -280,7 +279,7 @@ public void testCreateLocationWithLocalTablet(@Mocked GlobalStateMgr globalState @Test public void testReplicatedStorageWithLocalTablet(@Mocked GlobalStateMgr globalStateMgr, - @Mocked SystemInfoService systemInfoService) { + @Mocked SystemInfoService systemInfoService) throws Exception { long dbId = 1L; long tableId = 2L; long partitionId = 3L; @@ -347,9 +346,8 @@ public void testReplicatedStorageWithLocalTablet(@Mocked GlobalStateMgr globalSt } }; - OlapTableSink sink = new OlapTableSink(table, null, Lists.newArrayList(partitionId), - TWriteQuorumType.MAJORITY, true, false, false); - TOlapTableLocationParam param = (TOlapTableLocationParam) Deencapsulation.invoke(sink, "createLocation", table); + TOlapTableLocationParam param = OlapTableSink.createLocation( + table, table.getClusterId(), Lists.newArrayList(partitionId), true); System.out.println(param); // Check diff --git a/fe/fe-core/src/test/java/com/starrocks/sql/analyzer/DictQueryFunctionTest.java b/fe/fe-core/src/test/java/com/starrocks/sql/analyzer/DictQueryFunctionTest.java new file mode 100644 index 00000000000000..196596620b1a1a --- /dev/null +++ b/fe/fe-core/src/test/java/com/starrocks/sql/analyzer/DictQueryFunctionTest.java @@ -0,0 +1,210 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.sql.analyzer; + +import com.starrocks.qe.ConnectContext; +import com.starrocks.utframe.StarRocksAssert; +import com.starrocks.utframe.UtFrameUtils; +import org.junit.BeforeClass; +import org.junit.Test; + +public class DictQueryFunctionTest { + + private static final String TEST_DICT_DATABASE = "dict"; + + public static ConnectContext connectContext; + public static StarRocksAssert starRocksAssert; + + @BeforeClass + public static void beforeClass() throws Exception { + UtFrameUtils.createMinStarRocksCluster(); + AnalyzeTestUtil.init(); + connectContext = UtFrameUtils.createDefaultCtx(); + starRocksAssert = new StarRocksAssert(connectContext); + + + starRocksAssert.withDatabase(TEST_DICT_DATABASE); + starRocksAssert.useDatabase(TEST_DICT_DATABASE); + + + String dictTable = "" + + "CREATE TABLE dict_table( \n" + + " key_varchar VARCHAR NOT NULL,\n" + + " key_dt DATETIME NOT NULL,\n" + + " value BIGINT(20) NOT NULL AUTO_INCREMENT \n" + + ") ENGINE=OLAP \n" + + "PRIMARY KEY(`key_varchar`, `key_dt`)\n" + + "PARTITION BY RANGE(`key_dt`)\n" + + "(PARTITION p20230506 VALUES [(\"2023-05-06 00:00:00\"), (\"2023-05-07 00:00:00\")))\n" + + "DISTRIBUTED BY HASH(`key_varchar`) BUCKETS 12\n" + + "PROPERTIES (\n" + + " \"replication_num\" = \"1\",\n" + + " \"enable_persistent_index\" = \"false\",\n" + + " \"replicated_storage\" = \"true\"\n" + + ");"; + + String dictTableDup = "" + + "CREATE TABLE dict_table_dup( \n" + + " key_varchar VARCHAR NOT NULL,\n" + + " key_dt DATETIME NOT NULL,\n" + + " value BIGINT(20) NOT NULL \n" + + ") ENGINE=OLAP \n" + + "DUPLICATE KEY(`key_varchar`, `key_dt`)\n" + + "PARTITION BY RANGE(`key_dt`)\n" + + "(PARTITION p20230506 VALUES [(\"2023-05-06 00:00:00\"), (\"2023-05-07 00:00:00\")))\n" + + "DISTRIBUTED BY HASH(`key_varchar`) BUCKETS 12\n" + + "PROPERTIES (\n" + + " \"replication_num\" = \"1\",\n" + + " \"enable_persistent_index\" = \"false\",\n" + + " \"replicated_storage\" = \"true\"\n" + + ");"; + + String dictTableNoAutoInc = "" + + "CREATE TABLE dict_table_without_inc( \n" + + " key_varchar VARCHAR NOT NULL,\n" + + " key_dt DATETIME NOT NULL,\n" + + " value STRING NOT NULL \n" + + ") ENGINE=OLAP \n" + + "PRIMARY KEY(`key_varchar`, `key_dt`)\n" + + "PARTITION BY RANGE(`key_dt`)\n" + + "(PARTITION p20230506 VALUES [(\"2023-05-06 00:00:00\"), (\"2023-05-07 00:00:00\")))\n" + + "DISTRIBUTED BY HASH(`key_varchar`) BUCKETS 12\n" + + "PROPERTIES (\n" + + " \"replication_num\" = \"1\",\n" + + " \"enable_persistent_index\" = \"false\",\n" + + " \"replicated_storage\" = \"true\"\n" + + ");"; + starRocksAssert.withTable(dictTable); + starRocksAssert.withTable(dictTableDup); + starRocksAssert.withTable(dictTableNoAutoInc); + } + + @Test + public void testFunction() throws Exception { + new ExceptionChecker("SELECT dict_mapping('dict.dict_table', 'key', CAST('2023-05-06' AS DATETIME))") + .ok(); + + new ExceptionChecker("SELECT dict_mapping('dict_table', 'key', CAST('2023-05-06' AS DATETIME))") + .ok(); + + new ExceptionChecker("SELECT dict_mapping('dict.dict_table', 'key', CAST('2023-05-06' AS DATETIME), 'value')") + .ok(); + + new ExceptionChecker("SELECT dict_mapping('dict.dict_table', 'key', CAST('2023-05-06' AS DATETIME), 'value', true)") + .ok(); + + } + + @Test + public void testFunctionWithException() throws Exception { + testDictMappingFunction( + "SELECT dict_mapping('catalog.db.tbl', 'k');", + "dict_mapping function first param table_name should be 'db.tbl' or 'tbl' format."); + + testDictMappingFunction( + "SELECT dict_mapping('not_exist.dict_table', 'key', '2023-05-06');", + "Database not_exist is not found."); + + testDictMappingFunction( + "SELECT dict_mapping('dict.dict_table_not_exist', 'key', '2023-05-06');", + "dict table dict_table_not_exist is not found."); + + testDictMappingFunction( + "SELECT dict_mapping('dict.dict_table_dup', 'key', '2023-05-06', 'value');", + "dict table dict.dict_table_dup should be primary key table."); + + testDictMappingFunction( + "SELECT dict_mapping('dict.dict_table', 'key', '2023-05-06', 'value', 'extra');", + "dict_mapping function strict_mode param should be bool constant."); + + testDictMappingFunction( + "SELECT dict_mapping('dict.dict_table', 'key', CAST('2023-05-06' AS datetime), 'v');", + "dict_mapping function can't find value column v in dict table."); + + testDictMappingFunction( + "SELECT dict_mapping('dict.dict_table_without_inc', 'key', CAST('2023-05-06' AS datetime));", + "dict_mapping function can't find auto increment column in dict table."); + + testDictMappingFunction( + "SELECT dict_mapping('dict.dict_table', 'key', '2023-05-06');", + "dict_mapping function params not match expected"); + + testDictMappingFunction( + "SELECT dict_mapping('dict.dict_table', 'key', '2023-05-06', 'value', 'extra', true);", + "dict_mapping function param size should be 3 - 5."); + + testDictMappingFunction( + "SELECT dict_mapping('dict.dict_table', 'key', null, 'value', true);", + "dict_mapping function params not match expected"); + } + + private void testDictMappingFunction(String sql, String expectException) { + new ExceptionChecker(sql) + .withException(SemanticException.class) + .containMessage(expectException) + .ok(); + } + + static class ExceptionChecker { + private final String stmt; + private Class expectException = null; + private String expectMsg = null; + + + ExceptionChecker(String stmt) { + this.stmt = stmt; + } + + public ExceptionChecker withException(Class expectException) { + this.expectException = expectException; + return this; + } + + public ExceptionChecker containMessage(String expectMsg) { + this.expectMsg = expectMsg; + return this; + } + + public void ok() { + try { + starRocksAssert.query(stmt).explainQuery(); + } catch (Exception e) { + if (expectException != null) { + if (!e.getClass().isAssignableFrom(expectException)) { + throw new RuntimeException( + String.format("expect exception: %s, actual: %s", expectException.getName(), e.getClass().getName()), + e); + } + if (expectMsg != null) { + if (!e.getMessage().contains(expectMsg)) { + throw new RuntimeException( + String.format("expect exception message: %s, actual: %s", expectMsg, e.getMessage()), + e); + } else { + return; + } + } + } + throw new RuntimeException("expect no exception, actual: " + e.getMessage(), e); + } + if (expectException != null) { + throw new RuntimeException(String.format("expect exception: %s, actual no exception", expectException.getName())); + } + } + + } + + +} diff --git a/gensrc/thrift/Exprs.thrift b/gensrc/thrift/Exprs.thrift index f5f78b0d3bc321..27e704cd8ca560 100644 --- a/gensrc/thrift/Exprs.thrift +++ b/gensrc/thrift/Exprs.thrift @@ -82,6 +82,7 @@ enum TExprNodeType { MAP_ELEMENT_EXPR, BINARY_LITERAL, MAP_EXPR, + DICT_QUERY_EXPR, } struct TAggregateExpr { @@ -171,6 +172,15 @@ struct TFunctionCallExpr { 2: optional i32 vararg_start_idx } +struct TDictQueryExpr { + 1: required string db_name + 2: required string tbl_name + 3: required map partition_version + 4: required list key_fields + 5: required string value_field + 6: required bool strict_mode +} + // This is essentially a union over the subclasses of Expr. struct TExprNode { 1: required TExprNodeType node_type @@ -219,6 +229,8 @@ struct TExprNode { 52: optional bool is_nullable 53: optional Types.TTypeDesc child_type_desc 54: optional bool is_monotonic + + 55: optional TDictQueryExpr dict_query_expr } struct TPartitionLiteral { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 2e6dd1b3bc7eaf..3d3a8b98ba8ad2 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -99,7 +99,7 @@ struct TDescribeTableParams { 6: optional i64 limit // If not set, match default_catalog - 7: optional string catalog_name + 7: optional string catalog_name } // Results of a call to describeTable() @@ -371,8 +371,8 @@ struct TMaterializedViewStatus { 1: optional string id 2: optional string database_name 3: optional string name - 4: optional string refresh_type - 5: optional string is_active + 4: optional string refresh_type + 5: optional string is_active 6: optional string last_refresh_start_time 7: optional string last_refresh_finished_time 8: optional string last_refresh_duration @@ -382,7 +382,7 @@ struct TMaterializedViewStatus { 12: optional string text 13: optional string rows - 14: optional string partition_type + 14: optional string partition_type 15: optional string last_refresh_force_refresh 16: optional string last_refresh_start_partition 17: optional string last_refresh_end_partition @@ -406,7 +406,7 @@ struct TListPipesInfo { // pipe entity 1: optional i64 pipe_id 2: optional string pipe_name - + // schema info 10: optional string database_name @@ -431,8 +431,8 @@ struct TListPipeFilesInfo { // pipe entity 1: optional i64 pipe_id 2: optional string pipe_name - 3: optional string database_name - + 3: optional string database_name + // file entity 10: optional string file_name @@ -441,12 +441,12 @@ struct TListPipeFilesInfo { 13: optional i64 file_size 14: optional i64 file_rows 15: optional string last_modified - + // load status 20: optional string staged_time 21: optional string start_load 22: optional string finish_load - + // error information 30: optional string first_error_msg 31: optional i64 error_count @@ -1314,7 +1314,7 @@ struct TAuthInfo { 2: optional string user // deprecated 3: optional string user_ip // deprecated 4: optional Types.TUserIdentity current_user_ident // to replace the user and user ip - + // If not set, match default_catalog 5: optional string catalog_name } @@ -1405,7 +1405,7 @@ struct TRequireSlotRequest { } struct TRequireSlotResponse { - + } struct TFinishSlotRequirementRequest { @@ -1523,6 +1523,18 @@ struct TGetProfileResponse { 2: optional list query_result } +struct TGetDictQueryParamRequest { + 1: optional string db_name + 2: optional string table_name +} + +struct TGetDictQueryParamResponse { + 1: required Descriptors.TOlapTableSchemaParam schema + 2: required Descriptors.TOlapTablePartitionParam partition + 3: required Descriptors.TOlapTableLocationParam location + 4: required Descriptors.TNodesInfo nodes_info +} + service FrontendService { TGetDbsResult getDbNames(1:TGetDbsParams params) TGetTablesResult getTableNames(1:TGetTablesParams params) @@ -1593,7 +1605,7 @@ service FrontendService { TUpdateResourceUsageResponse updateResourceUsage(1: TUpdateResourceUsageRequest request) TGetWarehousesResponse getWarehouses(1: TGetWarehousesRequest request) - + // For Materialized View MVMaintenance.TMVReportEpochResponse mvReport(1: MVMaintenance.TMVMaintenanceTasks request) @@ -1607,7 +1619,9 @@ service FrontendService { TRequireSlotResponse requireSlotAsync(1: TRequireSlotRequest request) TFinishSlotRequirementResponse finishSlotRequirement(1: TFinishSlotRequirementRequest request) TReleaseSlotResponse releaseSlot(1: TReleaseSlotRequest request) - + TGetLoadTxnStatusResult getLoadTxnStatus(1: TGetLoadTxnStatusRequest request) + + TGetDictQueryParamResponse getDictQueryParam(1: TGetDictQueryParamRequest request) }