Skip to content

Commit

Permalink
[enhancement](vec) Support outer join for vectorized exec engine (apa…
Browse files Browse the repository at this point in the history
…che#11068)

Hash join node adds three new attributes.
The following will take an SQL as an example to illustrate the meaning of these three attributes

```
select t1. a from t1 left join t2 on t1. a=t2. b;
```
1. vOutputTupleDesc:Tuple2(a'')

2. vIntermediateTupleDescList: Tuple1(a', b'<nullable>)

2. vSrcToOutputSMap: <Tuple1(a'), Tuple2(a'')>

The slot in intermediatetuple corresponds to the slot in output tuple one by one through the expr calculation of the left child in vsrctooutputsmap.

This code mainly merges the contents of two PRs:
1.  [fix](vectorized) Support outer join for vectorized exec engine (apache#10323)
2. [Fix](Join) Fix the bug of outer join function under vectorization apache#9954

The following is the specific description of the first PR
In a vectorized scenario, the query plan will generate a new tuple for the join node.
This tuple mainly describes the output schema of the join node.
Adding this tuple mainly solves the problem that the input schema of the join node is different from the output schema.
For example:
1. The case where the null side column caused by outer join is converted to nullable.
2. The projection of the outer tuple.

The following is the specific description of the second PR
This pr mainly fixes the following problems:
1. Solve the query combined with inline view and outer join. After adding a tuple to the join operator, the position of the `tupleisnull` function is inconsistent with the row storage. Currently the vectorized `tupleisnull` will be calculated in the HashJoinNode.computeOutputTuple() function.
2. Column nullable property error problem. At present, once the outer join occurs, the column on the null-side side will be planned to be nullable in the semantic parsing stage.

For example:
```
select * from (select a as k1 from test) tmp right join b on tmp.k1=b.k1
```
At this time, the nullable property of column k1 in the `tmp` inline view should be true.

In the vectorized code, the virtual `tableRef` of tmp will be used in constructing the output tuple of HashJoinNode (specifically, the function HashJoinNode.computeOutputTuple()). So the **correctness** of the column nullable property of this tableRef is very important.
In the above case, since the tmp table needs to perform a right join with the b table, as a null-side tmp side, it is necessary to change the column attributes involved in the tmp table to nullable.

In non-vectorized code, since the virtual tableRef tmp is not used at all, it uses the `TupleIsNull` function in `outputsmp` to ensure data correctness.
That is to say, the a column of the original table test is still non-null, and it does not affect the correctness of the result.

The vectorized nullable attribute requirements are very strict.
Outer join will change the nullable attribute of the join column, thereby changing the nullable attribute of the column in the upper operator layer by layer.
Since FE has no mechanism to modify the nullable attribute in the upper operator tuple layer by layer after the analyzer.
So at present, we can only preset the attributes before the lower join as nullable in the analyzer stage in advance, so as to avoid the problem.
(At the same time, be also wrote some evasive code in order to deal with the problem of null to non-null.)

Co-authored-by: EmmyMiao87
Co-authored-by: HappenLee
Co-authored-by: morrySnow

Co-authored-by: EmmyMiao87 <522274284@qq.com>
  • Loading branch information
morningman and EmmyMiao87 authored Jul 21, 2022
1 parent 4d158f9 commit 7e3fc0d
Show file tree
Hide file tree
Showing 32 changed files with 890 additions and 295 deletions.
4 changes: 2 additions & 2 deletions be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,9 @@ Status ExecNode::prepare(RuntimeState* state) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());

if (_vconjunct_ctx_ptr) {
RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->prepare(state, row_desc()));
RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->prepare(state, _row_descriptor));
}
RETURN_IF_ERROR(Expr::prepare(_conjunct_ctxs, state, row_desc()));
RETURN_IF_ERROR(Expr::prepare(_conjunct_ctxs, state, _row_descriptor));

// TODO(zc):
// AddExprCtxsToFree(_conjunct_ctxs);
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/exec_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ class ExecNode {

int id() const { return _id; }
TPlanNodeType::type type() const { return _type; }
const RowDescriptor& row_desc() const { return _row_descriptor; }
virtual const RowDescriptor& row_desc() const { return _row_descriptor; }
int64_t rows_returned() const { return _num_rows_returned; }
int64_t limit() const { return _limit; }
bool reached_limit() const { return _limit != -1 && _num_rows_returned >= _limit; }
Expand Down
7 changes: 7 additions & 0 deletions be/src/vec/columns/column_nullable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -482,4 +482,11 @@ ColumnPtr make_nullable(const ColumnPtr& column, bool is_nullable) {
return ColumnNullable::create(column, ColumnUInt8::create(column->size(), is_nullable ? 1 : 0));
}

ColumnPtr remove_nullable(const ColumnPtr& column) {
if (is_column_nullable(*column)) {
return reinterpret_cast<const ColumnNullable*>(column.get())->get_nested_column_ptr();
}
return column;
}

} // namespace doris::vectorized
1 change: 1 addition & 0 deletions be/src/vec/columns/column_nullable.h
Original file line number Diff line number Diff line change
Expand Up @@ -301,5 +301,6 @@ class ColumnNullable final : public COWHelper<IColumn, ColumnNullable> {
};

ColumnPtr make_nullable(const ColumnPtr& column, bool is_nullable = false);
ColumnPtr remove_nullable(const ColumnPtr& column);

} // namespace doris::vectorized
4 changes: 2 additions & 2 deletions be/src/vec/core/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ void filter_block_internal(Block* block, const IColumn::Filter& filter, uint32_t
auto count = count_bytes_in_filter(filter);
if (count == 0) {
for (size_t i = 0; i < column_to_keep; ++i) {
std::move(*block->get_by_position(i).column).mutate()->clear();
std::move(*block->get_by_position(i).column).assume_mutable()->clear();
}
} else {
if (count != block->rows()) {
Expand Down Expand Up @@ -651,7 +651,7 @@ Status Block::filter_block(Block* block, int filter_column_id, int column_to_kee
bool ret = const_column->get_bool(0);
if (!ret) {
for (size_t i = 0; i < column_to_keep; ++i) {
std::move(*block->get_by_position(i).column).mutate()->clear();
std::move(*block->get_by_position(i).column).assume_mutable()->clear();
}
}
} else {
Expand Down
239 changes: 180 additions & 59 deletions be/src/vec/exec/join/vhash_join_node.cpp

Large diffs are not rendered by default.

39 changes: 28 additions & 11 deletions be/src/vec/exec/join/vhash_join_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,17 @@ class HashJoinNode : public ::doris::ExecNode {
HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
~HashJoinNode() override;

virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override;
virtual Status prepare(RuntimeState* state) override;
virtual Status open(RuntimeState* state) override;
virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
virtual Status get_next(RuntimeState* state, Block* block, bool* eos) override;
virtual Status close(RuntimeState* state) override;
Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override;
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;
Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
Status get_next(RuntimeState* state, Block* block, bool* eos) override;
Status close(RuntimeState* state) override;
HashTableVariants& get_hash_table_variants() { return _hash_table_variants; }
void init_join_op();

const RowDescriptor& row_desc() const override { return _output_row_desc; }

private:
using VExprContexts = std::vector<VExprContext*>;

Expand All @@ -168,6 +170,8 @@ class HashJoinNode : public ::doris::ExecNode {
VExprContexts _build_expr_ctxs;
// other expr
std::unique_ptr<VExprContext*> _vother_join_conjunct_ptr;
// output expr
VExprContexts _output_expr_ctxs;

// mark the join column whether support null eq
std::vector<bool> _is_null_safe_eq_join;
Expand All @@ -178,6 +182,7 @@ class HashJoinNode : public ::doris::ExecNode {
std::vector<bool> _probe_not_ignore_null;

std::vector<uint16_t> _probe_column_disguise_null;
std::vector<uint16_t> _probe_column_convert_to_null;

DataTypes _right_table_data_types;
DataTypes _left_table_data_types;
Expand Down Expand Up @@ -226,6 +231,7 @@ class HashJoinNode : public ::doris::ExecNode {
bool _have_other_join_conjunct = false;

RowDescriptor _row_desc_for_other_join_conjunt;
Block _join_block;

std::vector<uint32_t> _items_counts;
std::vector<int8_t> _build_block_offsets;
Expand All @@ -235,22 +241,33 @@ class HashJoinNode : public ::doris::ExecNode {
std::vector<bool> _left_output_slot_flags;
std::vector<bool> _right_output_slot_flags;

RowDescriptor _intermediate_row_desc;
RowDescriptor _output_row_desc;

private:
void _hash_table_build_thread(RuntimeState* state, std::promise<Status>* status);

Status _hash_table_build(RuntimeState* state);

Status _process_build_block(RuntimeState* state, Block& block, uint8_t offset);

Status extract_build_join_column(Block& block, NullMap& null_map, ColumnRawPtrs& raw_ptrs,
bool& ignore_null, RuntimeProfile::Counter& expr_call_timer);
Status _extract_build_join_column(Block& block, NullMap& null_map, ColumnRawPtrs& raw_ptrs,
bool& ignore_null, RuntimeProfile::Counter& expr_call_timer);

Status extract_probe_join_column(Block& block, NullMap& null_map, ColumnRawPtrs& raw_ptrs,
bool& ignore_null, RuntimeProfile::Counter& expr_call_timer);
Status _extract_probe_join_column(Block& block, NullMap& null_map, ColumnRawPtrs& raw_ptrs,
bool& ignore_null, RuntimeProfile::Counter& expr_call_timer);

void _hash_table_init();

static const int _MAX_BUILD_BLOCK_COUNT = 128;
static constexpr auto _MAX_BUILD_BLOCK_COUNT = 128;

void _prepare_probe_block();

void _construct_mutable_join_block();

Status _build_output_block(Block* origin_block, Block* output_block);

static std::vector<uint16_t> _convert_block_to_null(Block& block);

template <class HashTableContext>
friend struct ProcessHashTableBuild;
Expand Down
3 changes: 1 addition & 2 deletions build-support/clang-format.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ ROOT=`cd "$ROOT"; pwd`

export DORIS_HOME=`cd "${ROOT}/.."; pwd`

#CLANG_FORMAT=${CLANG_FORMAT_BINARY:=$(which clang-format)}
CLANG_FORMAT=/mnt/disk1/liyifan/doris/ldb_toolchain/bin/clang-format
CLANG_FORMAT=${CLANG_FORMAT_BINARY:=$(which clang-format)}

python ${DORIS_HOME}/build-support/run_clang_format.py "--clang-format-executable" "${CLANG_FORMAT}" "-r" "--style" "file" "--inplace" "true" "--extensions" "c,h,C,H,cpp,hpp,cc,hh,c++,h++,cxx,hxx" "--exclude" "none" "be/src be/test"
79 changes: 12 additions & 67 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.IdGenerator;
import org.apache.doris.common.Pair;
import org.apache.doris.common.VecNotImplException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.external.hudi.HudiTable;
import org.apache.doris.external.hudi.HudiUtils;
Expand Down Expand Up @@ -256,15 +255,14 @@ private static class GlobalState {
private final Map<TupleId, List<ExprId>> eqJoinConjuncts = Maps.newHashMap();

// set of conjuncts that have been assigned to some PlanNode
private Set<ExprId> assignedConjuncts =
Collections.newSetFromMap(new IdentityHashMap<ExprId, Boolean>());
private Set<ExprId> assignedConjuncts = Collections.newSetFromMap(new IdentityHashMap<ExprId, Boolean>());

private Set<TupleId> inlineViewTupleIds = Sets.newHashSet();

// map from outer-joined tuple id, ie, one that is nullable in this select block,
// to the last Join clause (represented by its rhs table ref) that outer-joined it
private final Map<TupleId, TableRef> outerJoinedTupleIds = Maps.newHashMap();

private final Set<TupleId> outerJoinedMaterializedTupleIds = Sets.newHashSet();

// Map of registered conjunct to the last full outer join (represented by its
// rhs table ref) that outer joined it.
public final Map<ExprId, TableRef> fullOuterJoinedConjuncts = Maps.newHashMap();
Expand Down Expand Up @@ -794,18 +792,12 @@ public SlotDescriptor registerColumnRef(TableName tblName, String colName) throw
String key = d.getAlias() + "." + col.getName();
SlotDescriptor result = slotRefMap.get(key);
if (result != null) {
// this is a trick to set slot as nullable when slot is on inline view
// When analyze InlineViewRef, we first generate sMap and baseTblSmap and then analyze join.
// We have already registered column ref at that time, but we did not know
// whether inline view is outer joined. So we have to check it and set slot as nullable here.
if (isOuterJoined(d.getId())) {
result.setIsNullable(true);
}
result.setMultiRef(true);
return result;
}
result = globalState.descTbl.addSlotDescriptor(d);
result.setColumn(col);
// TODO: need to remove this outer join'
result.setIsNullable(col.isAllowNull() || isOuterJoined(d.getId()));

slotRefMap.put(key, result);
Expand Down Expand Up @@ -905,6 +897,10 @@ public SlotDescriptor copySlotDescriptor(SlotDescriptor srcSlotDesc, TupleDescri
return result;
}

public void registerInlineViewTupleId(TupleId tupleId) {
globalState.inlineViewTupleIds.add(tupleId);
}

/**
* Register conjuncts that are outer joined by a full outer join. For a given
* predicate, we record the last full outer join that outer-joined any of its
Expand Down Expand Up @@ -956,57 +952,6 @@ public void registerOuterJoinedTids(List<TupleId> tids, TableRef rhsRef) {
}
}

public void registerOuterJoinedMaterilizeTids(List<TupleId> tids) {
globalState.outerJoinedMaterializedTupleIds.addAll(tids);
}

/**
* The main function of this method is to set the column property on the nullable side of the outer join
* to nullable in the case of vectorization.
* For example:
* Query: select * from t1 left join t2 on t1.k1=t2.k1
* Origin: t2.k1 not null
* Result: t2.k1 is nullable
*
* @throws VecNotImplException In some cases, it is not possible to directly modify the column property to nullable.
* It will report an error and fall back from vectorized mode to non-vectorized mode for execution.
* If the nullside column of the outer join is a column that must return non-null like count(*)
* then there is no way to force the column to be nullable.
* At this time, vectorization cannot support this situation,
* so it is necessary to fall back to non-vectorization for processing.
* For example:
* Query: select * from t1 left join
* (select k1, count(k2) as count_k2 from t2 group by k1) tmp on t1.k1=tmp.k1
* Origin: tmp.k1 not null, tmp.count_k2 not null
* Result: throw VecNotImplException
*/
public void changeAllOuterJoinTupleToNull() throws VecNotImplException {
for (TupleId tid : globalState.outerJoinedTupleIds.keySet()) {
for (SlotDescriptor slotDescriptor : getTupleDesc(tid).getSlots()) {
changeSlotToNull(slotDescriptor);
}
}

for (TupleId tid : globalState.outerJoinedMaterializedTupleIds) {
for (SlotDescriptor slotDescriptor : getTupleDesc(tid).getSlots()) {
changeSlotToNull(slotDescriptor);
}
}
}

private void changeSlotToNull(SlotDescriptor slotDescriptor) throws VecNotImplException {
if (slotDescriptor.getSourceExprs().isEmpty()) {
slotDescriptor.setIsNullable(true);
return;
}
for (Expr sourceExpr : slotDescriptor.getSourceExprs()) {
if (!sourceExpr.isNullable()) {
throw new VecNotImplException("The slot (" + slotDescriptor.toString()
+ ") could not be changed to nullable");
}
}
}

/**
* Register the given tuple id as being the invisible side of a semi-join.
*/
Expand Down Expand Up @@ -1426,10 +1371,6 @@ public boolean isFullOuterJoined(TupleId tid) {
return globalState.fullOuterJoinedTupleIds.containsKey(tid);
}

public boolean isOuterMaterializedJoined(TupleId tid) {
return globalState.outerJoinedMaterializedTupleIds.contains(tid);
}

public boolean isFullOuterJoined(SlotId sid) {
return isFullOuterJoined(getTupleId(sid));
}
Expand Down Expand Up @@ -2243,6 +2184,10 @@ public boolean isOuterJoined(TupleId tid) {
return globalState.outerJoinedTupleIds.containsKey(tid);
}

public boolean isInlineView(TupleId tid) {
return globalState.inlineViewTupleIds.contains(tid);
}

public boolean containSubquery() {
return globalState.containsSubquery;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
package org.apache.doris.analysis;

import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.IdGenerator;
import org.apache.doris.thrift.TDescriptorTable;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -113,6 +115,21 @@ public TupleDescriptor getTupleDesc(TupleId id) {
return tupleDescs.get(id);
}

/**
* Return all tuple desc by idList.
*/
public List<TupleDescriptor> getTupleDesc(List<TupleId> idList) throws AnalysisException {
List<TupleDescriptor> result = Lists.newArrayList();
for (TupleId tupleId : idList) {
TupleDescriptor tupleDescriptor = getTupleDesc(tupleId);
if (tupleDescriptor == null) {
throw new AnalysisException("Invalid tuple id:" + tupleId.toString());
}
result.add(tupleDescriptor);
}
return result;
}

public SlotDescriptor getSlotDesc(SlotId id) {
return slotDescs.get(id);
}
Expand Down
Loading

0 comments on commit 7e3fc0d

Please sign in to comment.