Skip to content

Commit

Permalink
[Feature] dict query function (StarRocks#20961)
Browse files Browse the repository at this point in the history
* [Feature] dict query function

Implement function 'dict_mapping' support query a primary key table, and retrurn a numeric value column.

dict_mapping(dict_table, keys... [, value_column])

argument:
dict_table: STRING, with 'db.table' or 'table' format, need table exist and is primary key model.
keys: same as dict_table pk length, order and type, not support implicit type cast.
value_column: STRING optional, if don't specific value column, find first AUTO_INCREMENT column as value colmn, value column must be BIGINT type.

return:
BIGINT, actual value of value column.

Use case:
Create primary key table with auto-increment column as gloal dictionary.
use 'dict_mapping' function when load data to base table, mapping string type key to ordered number, then use BITMAP type create rollup mv speed up count distinct query.

Detail:
Intercept 'dict_mapping' function in sql planner, transfer dict table metadata in TExprNode to BE. DictQueryExpr do real work use TableReader multi_get api query records from dict table and return id.

Signed-off-by: liuyufei9527 <liuyufei9527@gmail.com>
  • Loading branch information
liuyufei9527 authored Aug 31, 2023
1 parent 1678cae commit 45bbf9d
Show file tree
Hide file tree
Showing 22 changed files with 867 additions and 43 deletions.
1 change: 1 addition & 0 deletions be/src/exprs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ set(EXPR_FILES
map_apply_expr.cpp
map_expr.cpp
substring_index.cpp
dict_query_expr.cpp
)

add_library(Exprs ${EXPR_FILES})
140 changes: 140 additions & 0 deletions be/src/exprs/dict_query_expr.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "exprs/dict_query_expr.h"

#include "agent/master_info.h"
#include "column/chunk.h"
#include "column/column.h"
#include "column/column_helper.h"
#include "column/column_viewer.h"
#include "gutil/casts.h"
#include "runtime/client_cache.h"
#include "storage/chunk_helper.h"
#include "storage/table_reader.h"
#include "util/thrift_rpc_helper.h"

namespace starrocks {

DictQueryExpr::DictQueryExpr(const TExprNode& node) : Expr(node), _dict_query_expr(node.dict_query_expr) {}

StatusOr<ColumnPtr> DictQueryExpr::evaluate_checked(ExprContext* context, Chunk* ptr) {
Columns columns(children().size());
size_t size = ptr != nullptr ? ptr->num_rows() : 1;
for (int i = 0; i < _children.size(); ++i) {
columns[i] = _children[i]->evaluate(context, ptr);
}

ColumnPtr res;
for (auto& column : columns) {
if (column->is_constant()) {
column = ColumnHelper::unpack_and_duplicate_const_column(size, column);
}
}
ChunkPtr key_chunk = ChunkHelper::new_chunk(_key_slots, size);
key_chunk->reset();
for (int i = 0; i < _dict_query_expr.key_fields.size(); ++i) {
ColumnPtr key_column = columns[1 + i];
key_chunk->update_column_by_index(key_column, i);
}

for (auto& column : key_chunk->columns()) {
if (column->is_nullable()) {
column = ColumnHelper::update_column_nullable(false, column, column->size());
}
}

std::vector<bool> found;
ChunkPtr value_chunk = ChunkHelper::new_chunk(_value_slot, key_chunk->num_rows());

Status status = _table_reader->multi_get(*key_chunk, {_dict_query_expr.value_field}, found, *value_chunk);
if (!status.ok()) {
// todo retry
LOG(WARNING) << "fail to execute multi get: " << status.detailed_message();
return status;
}
res = ColumnHelper::create_column(_value_slot[0]->type(), true);

int res_idx = 0;
for (int idx = 0; idx < size; ++idx) {
if (found[idx]) {
res->append_datum(value_chunk->get_column_by_index(0)->get(res_idx));
res_idx++;
} else {
if (_dict_query_expr.strict_mode) {
return Status::NotFound("In strict mode, query failed if record not exist in dict table.");
}
res->append_nulls(1);
}
}

return res;
}

Status DictQueryExpr::prepare(RuntimeState* state, ExprContext* context) {
RETURN_IF_ERROR(Expr::prepare(state, context));
_runtime_state = state;
return Status::OK();
}

Status DictQueryExpr::open(RuntimeState* state, ExprContext* context, FunctionContext::FunctionStateScope scope) {
// init parent open
RETURN_IF_ERROR(Expr::open(state, context, scope));

TGetDictQueryParamRequest request;
request.__set_db_name(_dict_query_expr.db_name);
request.__set_table_name(_dict_query_expr.tbl_name);
TGetDictQueryParamResponse response;

TNetworkAddress master_addr = get_master_address();
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, &response](FrontendServiceConnection& client) { client->getDictQueryParam(response, request); },
30000));

TableReaderParams params;
params.schema = response.schema;
params.partition_param = response.partition;
params.location_param = response.location;
params.nodes_info = response.nodes_info;
params.partition_versions = _dict_query_expr.partition_version;
params.timeout_ms = 30000;

_table_reader = std::make_shared<TableReader>();
RETURN_IF_ERROR(_table_reader->init(params));

_key_slots.resize(_dict_query_expr.key_fields.size());
for (int i = 0; i < _dict_query_expr.key_fields.size(); ++i) {
vector<TSlotDescriptor>& slot_descs = response.schema.slot_descs;
for (auto& slot : slot_descs) {
if (slot.colName == _dict_query_expr.key_fields[i]) {
_key_slots[i] = state->obj_pool()->add(new SlotDescriptor(slot));
}
}
}
_value_slot.resize(1);
for (auto& slot : response.schema.slot_descs) {
if (slot.colName == _dict_query_expr.value_field) {
_value_slot[0] = state->obj_pool()->add(new SlotDescriptor(slot));
}
}

return Status::OK();
}

void DictQueryExpr::close(RuntimeState* state, ExprContext* context, FunctionContext::FunctionStateScope scope) {
Expr::close(state, context, scope);
}

} // namespace starrocks
44 changes: 44 additions & 0 deletions be/src/exprs/dict_query_expr.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once
#include <memory>

#include "common/object_pool.h"
#include "exprs/expr.h"
#include "runtime/runtime_state.h"
#include "storage/table_reader.h"

namespace starrocks {

class DictQueryExpr final : public Expr {
public:
DictQueryExpr(const TExprNode& node);

Expr* clone(ObjectPool* pool) const override { return pool->add(new DictQueryExpr(*this)); }
StatusOr<ColumnPtr> evaluate_checked(ExprContext* context, Chunk* ptr) override;
Status prepare(RuntimeState* state, ExprContext* context) override;
Status open(RuntimeState* state, ExprContext* context, FunctionContext::FunctionStateScope scope) override;
void close(RuntimeState* state, ExprContext* context, FunctionContext::FunctionStateScope scope) override;

private:
RuntimeState* _runtime_state;
TDictQueryExpr _dict_query_expr;

Schema _key_schema;
std::vector<SlotDescriptor*> _key_slots;
std::vector<SlotDescriptor*> _value_slot;
std::shared_ptr<TableReader> _table_reader;
};
} // namespace starrocks
4 changes: 4 additions & 0 deletions be/src/exprs/expr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
#include "exprs/column_ref.h"
#include "exprs/compound_predicate.h"
#include "exprs/condition_expr.h"
#include "exprs/dict_query_expr.h"
#include "exprs/dictmapping_expr.h"
#include "exprs/function_call_expr.h"
#include "exprs/in_predicate.h"
Expand Down Expand Up @@ -387,6 +388,9 @@ Status Expr::create_vectorized_expr(starrocks::ObjectPool* pool, const starrocks
case TExprNodeType::CLONE_EXPR:
*expr = pool->add(new CloneExpr(texpr_node));
break;
case TExprNodeType::DICT_QUERY_EXPR:
*expr = pool->add(new DictQueryExpr(texpr_node));
break;
case TExprNodeType::ARRAY_SLICE_EXPR:
case TExprNodeType::AGG_EXPR:
case TExprNodeType::TABLE_FUNCTION_EXPR:
Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/descriptors.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ class SlotDescriptor {

int32_t col_unique_id() const { return _col_unique_id; }

SlotDescriptor(const TSlotDescriptor& tdesc);

private:
friend class DescriptorTbl;
friend class TupleDescriptor;
Expand Down Expand Up @@ -136,7 +138,6 @@ class SlotDescriptor {
// @todo: replace _null_indicator_offset when remove _null_indicator_offset
const bool _is_nullable;

SlotDescriptor(const TSlotDescriptor& tdesc);
SlotDescriptor(const PSlotDescriptor& pdesc);
};

Expand Down
79 changes: 79 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/analysis/DictQueryExpr.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.


package com.starrocks.analysis;

import com.starrocks.catalog.Function;
import com.starrocks.catalog.FunctionSet;
import com.starrocks.catalog.Type;
import com.starrocks.sql.analyzer.SemanticException;
import com.starrocks.sql.ast.AstVisitor;
import com.starrocks.thrift.TDictQueryExpr;
import com.starrocks.thrift.TExprNode;
import com.starrocks.thrift.TExprNodeType;

import java.util.List;

// dict_mapping(STRING dict_table_name, ANY... keys [, STRING value_column_name])
public class DictQueryExpr extends FunctionCallExpr {

private TDictQueryExpr dictQueryExpr;

public DictQueryExpr(List<Expr> params) throws SemanticException {
super(FunctionSet.DICT_MAPPING, params);
}

public DictQueryExpr(List<Expr> params, TDictQueryExpr dictQueryExpr, Function fn) {
super(FunctionSet.DICT_MAPPING, params);
this.dictQueryExpr = dictQueryExpr;
this.fn = fn;
setType(fn.getReturnType());
}

protected DictQueryExpr(DictQueryExpr other) {
super(other);
this.dictQueryExpr = other.getDictQueryExpr();
}


@Override
protected void toThrift(TExprNode msg) {
msg.setNode_type(TExprNodeType.DICT_QUERY_EXPR);
msg.setDict_query_expr(dictQueryExpr);
}

@Override
public <R, C> R accept(AstVisitor<R, C> visitor, C context) {
return visitor.visitDictQueryExpr(this, context);
}

@Override
public boolean isAggregateFunction() {
return false;
}

@Override
public Expr clone() {
return new DictQueryExpr(this);
}

public TDictQueryExpr getDictQueryExpr() {
return dictQueryExpr;
}

public void setDictQueryExpr(TDictQueryExpr dictQueryExpr) {
this.dictQueryExpr = dictQueryExpr;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,9 @@ public class FunctionSet {
public static final Function JSON_QUERY_FUNC = new Function(
new FunctionName(JSON_QUERY), new Type[] {Type.JSON, Type.VARCHAR}, Type.JSON, false);

// dict query function
public static final String DICT_MAPPING = "dict_mapping";

private static final Logger LOGGER = LogManager.getLogger(FunctionSet.class);

private static final Set<Type> STDDEV_ARG_TYPE =
Expand Down
Loading

0 comments on commit 45bbf9d

Please sign in to comment.