Skip to content

Commit

Permalink
[Pipeline](exec) Support shared scan in colo agg (apache#18457)
Browse files Browse the repository at this point in the history
  • Loading branch information
HappenLee authored Apr 13, 2023
1 parent 9955815 commit 40a3529
Show file tree
Hide file tree
Showing 11 changed files with 208 additions and 32 deletions.
1 change: 1 addition & 0 deletions be/src/vec/exec/scan/new_olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ NewOlapScanNode::NewOlapScanNode(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
: VScanNode(pool, tnode, descs), _olap_scan_node(tnode.olap_scan_node) {
_output_tuple_id = tnode.olap_scan_node.tuple_id;
_col_distribute_ids = tnode.olap_scan_node.distribute_column_ids;
if (_olap_scan_node.__isset.sort_info && _olap_scan_node.__isset.sort_limit) {
_limit_per_scanner = _olap_scan_node.sort_limit;
}
Expand Down
129 changes: 117 additions & 12 deletions be/src/vec/exec/scan/pip_scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ class PipScannerContext : public vectorized::ScannerContext {
const TupleDescriptor* input_tuple_desc,
const TupleDescriptor* output_tuple_desc,
const std::list<vectorized::VScanner*>& scanners, int64_t limit,
int64_t max_bytes_in_blocks_queue)
int64_t max_bytes_in_blocks_queue, const std::vector<int>& col_distribute_ids)
: vectorized::ScannerContext(state, parent, input_tuple_desc, output_tuple_desc,
scanners, limit, max_bytes_in_blocks_queue) {}
scanners, limit, max_bytes_in_blocks_queue),
_col_distribute_ids(col_distribute_ids),
_need_colocate_distribute(!_col_distribute_ids.empty()) {}

Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block, bool* eos,
int id, bool wait = false) override {
Expand Down Expand Up @@ -67,19 +69,54 @@ class PipScannerContext : public vectorized::ScannerContext {
const int queue_size = _queue_mutexs.size();
const int block_size = blocks.size();
int64_t local_bytes = 0;
for (const auto& block : blocks) {
local_bytes += block->allocated_bytes();
}

for (int i = 0; i < queue_size && i < block_size; ++i) {
int queue = _next_queue_to_feed;
{
std::lock_guard<std::mutex> l(*_queue_mutexs[queue]);
for (int j = i; j < block_size; j += queue_size) {
_blocks_queues[queue].emplace_back(std::move(blocks[j]));
if (_need_colocate_distribute) {
std::vector<uint64_t> hash_vals;
for (const auto& block : blocks) {
// vectorized calculate hash
int rows = block->rows();
const auto element_size = _max_queue_size;
hash_vals.resize(rows);
std::fill(hash_vals.begin(), hash_vals.end(), 0);
auto* __restrict hashes = hash_vals.data();

for (int j = 0; j < _col_distribute_ids.size(); ++j) {
block->get_by_position(_col_distribute_ids[j])
.column->update_crcs_with_value(
hash_vals, _output_tuple_desc->slots()[_col_distribute_ids[j]]
->type()
.type);
}
for (int i = 0; i < rows; i++) {
hashes[i] = hashes[i] % element_size;
}

std::vector<int> channel2rows[element_size];
for (int i = 0; i < rows; i++) {
channel2rows[hashes[i]].emplace_back(i);
}

for (int i = 0; i < element_size; ++i) {
if (!channel2rows[i].empty()) {
_add_rows_colocate_blocks(block.get(), i, channel2rows[i]);
}
}
}
_next_queue_to_feed = queue + 1 < queue_size ? queue + 1 : 0;
} else {
for (const auto& block : blocks) {
local_bytes += block->allocated_bytes();
}

for (int i = 0; i < queue_size && i < block_size; ++i) {
int queue = _next_queue_to_feed;
{
std::lock_guard<std::mutex> l(*_queue_mutexs[queue]);
for (int j = i; j < block_size; j += queue_size) {
_blocks_queues[queue].emplace_back(std::move(blocks[j]));
}
}
_next_queue_to_feed = queue + 1 < queue_size ? queue + 1 : 0;
}
}
_current_used_bytes += local_bytes;
}
Expand All @@ -95,18 +132,86 @@ class PipScannerContext : public vectorized::ScannerContext {
_queue_mutexs.emplace_back(new std::mutex);
_blocks_queues.emplace_back(std::list<vectorized::BlockUPtr>());
}
if (_need_colocate_distribute) {
int real_block_size =
limit == -1 ? _batch_size : std::min(static_cast<int64_t>(_batch_size), limit);
int64_t free_blocks_memory_usage = 0;
for (int i = 0; i < _max_queue_size; ++i) {
auto block = std::make_unique<vectorized::Block>(_output_tuple_desc->slots(),
real_block_size,
true /*ignore invalid slots*/);
free_blocks_memory_usage += block->allocated_bytes();
_colocate_mutable_blocks.emplace_back(new vectorized::MutableBlock(block.get()));
_colocate_blocks.emplace_back(std::move(block));
_colocate_block_mutexs.emplace_back(new std::mutex);
}
_free_blocks_memory_usage->add(free_blocks_memory_usage);
}
}

bool has_enough_space_in_blocks_queue() const override {
return _current_used_bytes < _max_bytes_in_queue / 2 * _max_queue_size;
}

virtual void _dispose_coloate_blocks_not_in_queue() override {
if (_need_colocate_distribute) {
for (int i = 0; i < _max_queue_size; ++i) {
std::scoped_lock s(*_colocate_block_mutexs[i], *_queue_mutexs[i]);
if (_colocate_blocks[i] && !_colocate_blocks[i]->empty()) {
_current_used_bytes += _colocate_blocks[i]->allocated_bytes();
_blocks_queues[i].emplace_back(std::move(_colocate_blocks[i]));
_colocate_mutable_blocks[i]->clear();
}
}
}
}

private:
int _max_queue_size = 1;
int _next_queue_to_feed = 0;
std::vector<std::unique_ptr<std::mutex>> _queue_mutexs;
std::vector<std::list<vectorized::BlockUPtr>> _blocks_queues;
std::atomic_int64_t _current_used_bytes = 0;

const std::vector<int>& _col_distribute_ids;
const bool _need_colocate_distribute;
std::vector<vectorized::BlockUPtr> _colocate_blocks;
std::vector<std::unique_ptr<vectorized::MutableBlock>> _colocate_mutable_blocks;
std::vector<std::unique_ptr<std::mutex>> _colocate_block_mutexs;

void _add_rows_colocate_blocks(vectorized::Block* block, int loc,
const std::vector<int>& rows) {
int row_wait_add = rows.size();
const int batch_size = _batch_size;
const int* begin = &rows[0];
std::lock_guard<std::mutex> l(*_colocate_block_mutexs[loc]);

while (row_wait_add > 0) {
int row_add = 0;
int max_add = batch_size - _colocate_mutable_blocks[loc]->rows();
if (row_wait_add >= max_add) {
row_add = max_add;
} else {
row_add = row_wait_add;
}

_colocate_mutable_blocks[loc]->add_rows(block, begin, begin + row_add);
row_wait_add -= row_add;
begin += row_add;

if (row_add == max_add) {
_current_used_bytes += _colocate_blocks[loc]->allocated_bytes();
{
std::lock_guard<std::mutex> queue_l(*_queue_mutexs[loc]);
_blocks_queues[loc].emplace_back(std::move(_colocate_blocks[loc]));
}
bool get_block_not_empty = true;
_colocate_blocks[loc] = get_free_block(&get_block_not_empty, get_block_not_empty);
_colocate_mutable_blocks[loc]->set_muatable_columns(
_colocate_blocks[loc]->mutate_columns());
}
}
}
};
} // namespace pipeline
} // namespace doris
14 changes: 9 additions & 5 deletions be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,17 @@ Status ScannerContext::init() {
return Status::OK();
}

vectorized::BlockUPtr ScannerContext::get_free_block(bool* has_free_block) {
vectorized::BlockUPtr ScannerContext::get_free_block(bool* has_free_block,
bool get_block_not_empty) {
{
std::lock_guard l(_free_blocks_lock);
if (!_free_blocks.empty()) {
auto block = std::move(_free_blocks.back());
_free_blocks.pop_back();
_free_blocks_memory_usage->add(-block->allocated_bytes());
return block;
if (!get_block_not_empty || _free_blocks.back()->mem_reuse()) {
auto block = std::move(_free_blocks.back());
_free_blocks.pop_back();
_free_blocks_memory_usage->add(-block->allocated_bytes());
return block;
}
}
}
*has_free_block = false;
Expand Down Expand Up @@ -319,6 +322,7 @@ void ScannerContext::push_back_scanner_and_reschedule(VScanner* scanner) {
// same scanner.
if (scanner->need_to_close() && scanner->set_counted_down() &&
(--_num_unfinished_scanners) == 0) {
_dispose_coloate_blocks_not_in_queue();
_is_finished = true;
_blocks_queue_added_cv.notify_one();
}
Expand Down
4 changes: 3 additions & 1 deletion be/src/vec/exec/scan/scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class ScannerContext {
virtual ~ScannerContext() = default;
Status init();

vectorized::BlockUPtr get_free_block(bool* has_free_block);
vectorized::BlockUPtr get_free_block(bool* has_free_block, bool get_not_empty_block = false);
void return_free_block(std::unique_ptr<vectorized::Block> block);

// Append blocks from scanners to the blocks queue.
Expand Down Expand Up @@ -140,6 +140,8 @@ class ScannerContext {
Status _close_and_clear_scanners(VScanNode* node, RuntimeState* state);

protected:
virtual void _dispose_coloate_blocks_not_in_queue() {}

RuntimeState* _state;
VScanNode* _parent;

Expand Down
6 changes: 3 additions & 3 deletions be/src/vec/exec/scan/vscan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,9 @@ Status VScanNode::_init_profile() {

Status VScanNode::_start_scanners(const std::list<VScanner*>& scanners) {
if (_is_pipeline_scan) {
_scanner_ctx.reset(new pipeline::PipScannerContext(_state, this, _input_tuple_desc,
_output_tuple_desc, scanners, limit(),
_state->query_options().mem_limit / 20));
_scanner_ctx.reset(new pipeline::PipScannerContext(
_state, this, _input_tuple_desc, _output_tuple_desc, scanners, limit(),
_state->query_options().mem_limit / 20, _col_distribute_ids));
} else {
_scanner_ctx.reset(new ScannerContext(_state, this, _input_tuple_desc, _output_tuple_desc,
scanners, limit(),
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/scan/vscan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ class VScanNode : public ExecNode {
RuntimeProfile::HighWaterMarkCounter* _free_blocks_memory_usage;

std::unordered_map<std::string, int> _colname_to_slot_id;
std::vector<int> _col_distribute_ids;

private:
// Register and get all runtime filters at Init phase.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -932,9 +932,11 @@ private PlanFragment createAggregationFragment(
return createPhase2DistinctAggregationFragment(node, childFragment, fragments);
} else {
if (canColocateAgg(node.getAggInfo(), childFragment.getDataPartition())
&& childFragment.getPlanRoot().shouldColoAgg()) {
|| childFragment.getPlanRoot().shouldColoAgg(node.getAggInfo())) {
childFragment.getPlanRoot().setShouldColoScan();
childFragment.addPlanRoot(node);
childFragment.setHasColocatePlanNode(true);
// pipeline here should use shared scan to improve performance
childFragment.setHasColocatePlanNode(!ConnectContext.get().getSessionVariable().enablePipelineEngine());
return childFragment;
} else {
return createMergeAggregationFragment(node, childFragment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.planner;

import org.apache.doris.analysis.AggregateInfo;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.BaseTableRef;
import org.apache.doris.analysis.BinaryPredicate;
Expand Down Expand Up @@ -185,15 +186,28 @@ public class OlapScanNode extends ScanNode {
private Map<SlotRef, Expr> pointQueryEqualPredicats;
private DescriptorTable descTable;

private Set<Integer> distributionColumnIds;

private boolean shouldColoScan = false;

// Constructs node to scan given data files of table 'tbl'.
public OlapScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) {
super(id, desc, planNodeName, StatisticalType.OLAP_SCAN_NODE);
olapTable = (OlapTable) desc.getTable();
distributionColumnIds = Sets.newTreeSet();

Set<String> distColumnName = olapTable != null
? olapTable.getDistributionColumnNames() : Sets.newTreeSet();
int columnId = 0;
// use for Nereids to generate uniqueId set for inverted index to avoid scan unnecessary big size column
for (SlotDescriptor slotDescriptor : desc.getSlots()) {
if (slotDescriptor.getColumn() != null) {
outputColumnUniqueIds.add(slotDescriptor.getColumn().getUniqueId());
if (distColumnName.contains(slotDescriptor.getColumn().getName().toLowerCase())) {
distributionColumnIds.add(columnId);
}
}
columnId++;
}
}

Expand Down Expand Up @@ -1166,17 +1180,48 @@ public int getNumInstances() {
}

@Override
public boolean shouldColoAgg() {
// In pipeline exec engine, the instance num is parallel instance. we should disable colo agg
// in parallelInstance >= tablet_num * 2 to use more thread to speed up the query
if (ConnectContext.get().getSessionVariable().enablePipelineEngine()) {
int parallelInstance = ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
return parallelInstance < result.size() * 2;
public boolean shouldColoAgg(AggregateInfo aggregateInfo) {
distributionColumnIds.clear();
if (ConnectContext.get().getSessionVariable().enablePipelineEngine()
&& ConnectContext.get().getSessionVariable().enableColocateScan()) {
List<Expr> aggPartitionExprs = aggregateInfo.getInputPartitionExprs();
List<SlotDescriptor> slots = desc.getSlots();
for (Expr aggExpr : aggPartitionExprs) {
if (aggExpr instanceof SlotRef) {
SlotDescriptor slotDesc = ((SlotRef) aggExpr).getDesc();
int columnId = 0;
for (SlotDescriptor slotDescriptor : slots) {
if (slotDescriptor.equals(slotDesc)) {
if (slotDescriptor.getType().isFixedLengthType()
|| slotDescriptor.getType().isStringType()) {
distributionColumnIds.add(columnId);
} else {
return false;
}
}
columnId++;
}
}
}

for (int i = 0; i < slots.size(); i++) {
if (!distributionColumnIds.contains(i) && (!slots.get(i).getType().isFixedLengthType()
|| slots.get(i).getType().isStringType())) {
return false;
}
}

return !distributionColumnIds.isEmpty();
} else {
return true;
return false;
}
}

@Override
public void setShouldColoScan() {
shouldColoScan = true;
}

@Override
protected void toThrift(TPlanNode msg) {
List<String> keyColumnNames = new ArrayList<String>();
Expand Down Expand Up @@ -1233,6 +1278,10 @@ protected void toThrift(TPlanNode msg) {
if (outputColumnUniqueIds != null) {
msg.olap_scan_node.setOutputColumnUniqueIds(outputColumnUniqueIds);
}

if (shouldColoScan) {
msg.olap_scan_node.setDistributeColumnIds(new ArrayList<>(distributionColumnIds));
}
}

// export some tablets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package org.apache.doris.planner;

import org.apache.doris.analysis.AggregateInfo;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.BitmapFilterPredicate;
import org.apache.doris.analysis.CompoundPredicate;
Expand Down Expand Up @@ -835,10 +836,12 @@ public int getNumInstances() {
return numInstances;
}

public boolean shouldColoAgg() {
return true;
public boolean shouldColoAgg(AggregateInfo aggregateInfo) {
return false;
}

public void setShouldColoScan() {}

public void setNumInstances(int numInstances) {
this.numInstances = numInstances;
}
Expand Down
Loading

0 comments on commit 40a3529

Please sign in to comment.