Skip to content

Commit

Permalink
[Refactor] add vpre_filter_expr for vectorized to improve performance (
Browse files Browse the repository at this point in the history
  • Loading branch information
xiepengcheng01 authored May 22, 2022
1 parent 0c4b477 commit 31e4019
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 24 deletions.
28 changes: 13 additions & 15 deletions be/src/exec/base_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,13 @@ Status BaseScanner::init_expr_ctxes() {
// preceding filter expr should be initialized by using `_row_desc`, which is the source row descriptor
if (!_pre_filter_texprs.empty()) {
if (_state->enable_vectorized_exec()) {
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(
_state->obj_pool(), _pre_filter_texprs, &_vpre_filter_ctxs));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_vpre_filter_ctxs, _state, *_row_desc,
_mem_tracker));
RETURN_IF_ERROR(vectorized::VExpr::open(_vpre_filter_ctxs, _state));
// for vectorized, preceding filter exprs should be compounded to one passed from fe.
DCHECK(_pre_filter_texprs.size() == 1);
_vpre_filter_ctx_ptr.reset(new doris::vectorized::VExprContext*);
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(
_state->obj_pool(), _pre_filter_texprs[0], _vpre_filter_ctx_ptr.get()));
RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->prepare(_state, *_row_desc, _mem_tracker));
RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->open(_state));
} else {
RETURN_IF_ERROR(Expr::create_expr_trees(_state->obj_pool(), _pre_filter_texprs,
&_pre_filter_ctxs));
Expand Down Expand Up @@ -302,14 +304,10 @@ Status BaseScanner::_fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) {
Status BaseScanner::_filter_src_block() {
auto origin_column_num = _src_block.columns();
// filter block
if (!_vpre_filter_ctxs.empty()) {
for (auto _vpre_filter_ctx : _vpre_filter_ctxs) {
auto old_rows = _src_block.rows();
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_vpre_filter_ctx, &_src_block,
origin_column_num));
_counter->num_rows_unselected += old_rows - _src_block.rows();
}
}
auto old_rows = _src_block.rows();
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_vpre_filter_ctx_ptr, &_src_block,
origin_column_num));
_counter->num_rows_unselected += old_rows - _src_block.rows();
return Status::OK();
}

Expand Down Expand Up @@ -453,8 +451,8 @@ void BaseScanner::close() {
Expr::close(_pre_filter_ctxs, _state);
}

if (_state->enable_vectorized_exec() && !_vpre_filter_ctxs.empty()) {
vectorized::VExpr::close(_vpre_filter_ctxs, _state);
if (_vpre_filter_ctx_ptr) {
(*_vpre_filter_ctx_ptr)->close(_state);
}
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/base_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class BaseScanner {
if (_state->enable_vectorized_exec()) {
vectorized::VExpr::close(_dest_vexpr_ctx, _state);
}
};
}

virtual Status init_expr_ctxes();
// Open this scanner, will initialize information need to
Expand Down Expand Up @@ -138,7 +138,7 @@ class BaseScanner {

// for vectorized load
std::vector<vectorized::VExprContext*> _dest_vexpr_ctx;
std::vector<vectorized::VExprContext*> _vpre_filter_ctxs;
std::unique_ptr<vectorized::VExprContext*> _vpre_filter_ctx_ptr;
vectorized::Block _src_block;
int _num_of_columns_from_file;

Expand Down
103 changes: 99 additions & 4 deletions be/test/vec/exec/vbroker_scanner_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,6 @@ TEST_F(VBrokerScannerTest, normal) {
range.file_type = TFileType::FILE_LOCAL;
range.format_type = TFileFormatType::FORMAT_CSV_PLAIN;
ranges.push_back(range);

VBrokerScanner scanner(&_runtime_state, _profile, _params, ranges, _addresses, _pre_filter,
&_counter);
auto st = scanner.open();
Expand All @@ -376,7 +375,6 @@ TEST_F(VBrokerScannerTest, normal) {
ASSERT_TRUE(eof);
auto columns = block->get_columns();
ASSERT_EQ(columns.size(), 3);

ASSERT_EQ(columns[0]->get_int(0), 1);
ASSERT_EQ(columns[0]->get_int(1), 4);
ASSERT_EQ(columns[0]->get_int(2), 8);
Expand All @@ -390,6 +388,105 @@ TEST_F(VBrokerScannerTest, normal) {
ASSERT_EQ(columns[2]->get_int(2), 10);
}

TEST_F(VBrokerScannerTest, normal_with_pre_filter) {
std::vector<TBrokerRangeDesc> ranges;
TBrokerRangeDesc range;
range.path = "./be/test/exec/test_data/broker_scanner/normal.csv";
range.start_offset = 0;
range.size = -1;
range.splittable = true;
range.file_type = TFileType::FILE_LOCAL;
range.format_type = TFileFormatType::FORMAT_CSV_PLAIN;
ranges.push_back(range);

// init pre_filter expr: k1 < '8'
TTypeDesc int_type;
{
TTypeNode node;
node.__set_type(TTypeNodeType::SCALAR);
TScalarType scalar_type;
scalar_type.__set_type(TPrimitiveType::INT);
node.__set_scalar_type(scalar_type);
int_type.types.push_back(node);
}
TTypeDesc varchar_type;
{
TTypeNode node;
node.__set_type(TTypeNodeType::SCALAR);
TScalarType scalar_type;
scalar_type.__set_type(TPrimitiveType::VARCHAR);
scalar_type.__set_len(5000);
node.__set_scalar_type(scalar_type);
varchar_type.types.push_back(node);
}

TExpr filter_expr;
{
TExprNode expr_node;
expr_node.__set_node_type(TExprNodeType::BINARY_PRED);
expr_node.type = gen_type_desc(TPrimitiveType::BOOLEAN);
expr_node.__set_num_children(2);
expr_node.__isset.opcode = true;
expr_node.__set_opcode(TExprOpcode::LT);
expr_node.__isset.vector_opcode = true;
expr_node.__set_vector_opcode(TExprOpcode::LT);
expr_node.__isset.fn = true;
expr_node.fn.name.function_name = "lt";
expr_node.fn.binary_type = TFunctionBinaryType::BUILTIN;
expr_node.fn.ret_type = int_type;
expr_node.fn.has_var_args = false;
filter_expr.nodes.push_back(expr_node);
}
{
TExprNode expr_node;
expr_node.__set_node_type(TExprNodeType::SLOT_REF);
expr_node.type = varchar_type;
expr_node.__set_num_children(0);
expr_node.__isset.slot_ref = true;
TSlotRef slot_ref;
slot_ref.__set_slot_id(4);
slot_ref.__set_tuple_id(1);
expr_node.__set_slot_ref(slot_ref);
expr_node.__isset.output_column = true;
expr_node.__set_output_column(0);
filter_expr.nodes.push_back(expr_node);
}
{
TExprNode expr_node;
expr_node.__set_node_type(TExprNodeType::STRING_LITERAL);
expr_node.type = varchar_type;
expr_node.__set_num_children(0);
expr_node.__isset.string_literal = true;
TStringLiteral string_literal;
string_literal.__set_value("8");
expr_node.__set_string_literal(string_literal);
filter_expr.nodes.push_back(expr_node);
}
_pre_filter.push_back(filter_expr);
VBrokerScanner scanner(&_runtime_state, _profile, _params, ranges, _addresses, _pre_filter,
&_counter);
auto st = scanner.open();
ASSERT_TRUE(st.ok());

std::unique_ptr<vectorized::Block> block(new vectorized::Block());
bool eof = false;
// end of file
st = scanner.get_next(block.get(), &eof);
ASSERT_TRUE(st.ok());
ASSERT_TRUE(eof);
auto columns = block->get_columns();
ASSERT_EQ(columns.size(), 3);

ASSERT_EQ(columns[0]->get_int(0), 1);
ASSERT_EQ(columns[0]->get_int(1), 4);

ASSERT_EQ(columns[1]->get_int(0), 2);
ASSERT_EQ(columns[1]->get_int(1), 5);

ASSERT_EQ(columns[2]->get_int(0), 3);
ASSERT_EQ(columns[2]->get_int(1), 6);
}

TEST_F(VBrokerScannerTest, normal2) {
std::vector<TBrokerRangeDesc> ranges;

Expand All @@ -406,7 +503,6 @@ TEST_F(VBrokerScannerTest, normal2) {
range.start_offset = 0;
range.size = 4;
ranges.push_back(range);

VBrokerScanner scanner(&_runtime_state, _profile, _params, ranges, _addresses, _pre_filter,
&_counter);
auto st = scanner.open();
Expand Down Expand Up @@ -440,7 +536,6 @@ TEST_F(VBrokerScannerTest, normal5) {
range.file_type = TFileType::FILE_LOCAL;
range.format_type = TFileFormatType::FORMAT_CSV_PLAIN;
ranges.push_back(range);

VBrokerScanner scanner(&_runtime_state, _profile, _params, ranges, _addresses, _pre_filter,
&_counter);
auto st = scanner.open();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.rewrite.ExprRewriter;
Expand Down Expand Up @@ -213,8 +214,12 @@ protected void toThrift(TPlanNode planNode) {
planNode.setNodeType(TPlanNodeType.BROKER_SCAN_NODE);
TBrokerScanNode brokerScanNode = new TBrokerScanNode(desc.getId().asInt());
if (!preFilterConjuncts.isEmpty()) {
for (Expr e : preFilterConjuncts) {
brokerScanNode.addToPreFilterExprs(e.treeToThrift());
if (Config.enable_vectorized_load && vpreFilterConjunct != null) {
brokerScanNode.addToPreFilterExprs(vpreFilterConjunct.treeToThrift());
} else {
for (Expr e : preFilterConjuncts) {
brokerScanNode.addToPreFilterExprs(e.treeToThrift());
}
}
}
planNode.setBrokerScanNode(brokerScanNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
// 4. Filter data by using "conjuncts".
protected List<Expr> preFilterConjuncts = Lists.newArrayList();

protected Expr vpreFilterConjunct = null;

// Fragment that this PlanNode is executed in. Valid only after this PlanNode has been
// assigned to a fragment. Set and maintained by enclosing PlanFragment.
protected PlanFragment fragment;
Expand Down Expand Up @@ -904,6 +906,11 @@ public void convertToVectoriezd() {
initCompoundPredicate(vconjunct);
}

if (!preFilterConjuncts.isEmpty()) {
vpreFilterConjunct = convertConjunctsToAndCompoundPredicate(preFilterConjuncts);
initCompoundPredicate(vpreFilterConjunct);
}

for (PlanNode child : children) {
child.convertToVectoriezd();
}
Expand Down
2 changes: 1 addition & 1 deletion gensrc/thrift/PlanNodes.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ struct TBrokerScanNode {
// Partition info used to process partition select in broker load
2: optional list<Exprs.TExpr> partition_exprs
3: optional list<Partitions.TRangePartition> partition_infos
4: optional list<Exprs.TExpr> pre_filter_exprs
4: optional list<Exprs.TExpr> pre_filter_exprs
}

struct TEsScanNode {
Expand Down

0 comments on commit 31e4019

Please sign in to comment.