Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pipeline](exec) Support shared scan in colo agg #18457

Merged
merged 1 commit into from
Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/vec/exec/scan/new_olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ NewOlapScanNode::NewOlapScanNode(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
: VScanNode(pool, tnode, descs), _olap_scan_node(tnode.olap_scan_node) {
_output_tuple_id = tnode.olap_scan_node.tuple_id;
_col_distribute_ids = tnode.olap_scan_node.distribute_column_ids;
if (_olap_scan_node.__isset.sort_info && _olap_scan_node.__isset.sort_limit) {
_limit_per_scanner = _olap_scan_node.sort_limit;
}
Expand Down
129 changes: 117 additions & 12 deletions be/src/vec/exec/scan/pip_scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ class PipScannerContext : public vectorized::ScannerContext {
const TupleDescriptor* input_tuple_desc,
const TupleDescriptor* output_tuple_desc,
const std::list<vectorized::VScanner*>& scanners, int64_t limit,
int64_t max_bytes_in_blocks_queue)
int64_t max_bytes_in_blocks_queue, const std::vector<int>& col_distribute_ids)
: vectorized::ScannerContext(state, parent, input_tuple_desc, output_tuple_desc,
scanners, limit, max_bytes_in_blocks_queue) {}
scanners, limit, max_bytes_in_blocks_queue),
_col_distribute_ids(col_distribute_ids),
_need_colocate_distribute(!_col_distribute_ids.empty()) {}

Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block, bool* eos,
int id, bool wait = false) override {
Expand Down Expand Up @@ -67,19 +69,54 @@ class PipScannerContext : public vectorized::ScannerContext {
const int queue_size = _queue_mutexs.size();
const int block_size = blocks.size();
int64_t local_bytes = 0;
for (const auto& block : blocks) {
local_bytes += block->allocated_bytes();
}

for (int i = 0; i < queue_size && i < block_size; ++i) {
int queue = _next_queue_to_feed;
{
std::lock_guard<std::mutex> l(*_queue_mutexs[queue]);
for (int j = i; j < block_size; j += queue_size) {
_blocks_queues[queue].emplace_back(std::move(blocks[j]));
if (_need_colocate_distribute) {
std::vector<uint64_t> hash_vals;
for (const auto& block : blocks) {
// vectorized calculate hash
int rows = block->rows();
const auto element_size = _max_queue_size;
hash_vals.resize(rows);
std::fill(hash_vals.begin(), hash_vals.end(), 0);
auto* __restrict hashes = hash_vals.data();

for (int j = 0; j < _col_distribute_ids.size(); ++j) {
block->get_by_position(_col_distribute_ids[j])
.column->update_crcs_with_value(
hash_vals, _output_tuple_desc->slots()[_col_distribute_ids[j]]
->type()
.type);
}
for (int i = 0; i < rows; i++) {
hashes[i] = hashes[i] % element_size;
}

std::vector<int> channel2rows[element_size];
for (int i = 0; i < rows; i++) {
channel2rows[hashes[i]].emplace_back(i);
}

for (int i = 0; i < element_size; ++i) {
if (!channel2rows[i].empty()) {
_add_rows_colocate_blocks(block.get(), i, channel2rows[i]);
}
}
}
_next_queue_to_feed = queue + 1 < queue_size ? queue + 1 : 0;
} else {
for (const auto& block : blocks) {
local_bytes += block->allocated_bytes();
}

for (int i = 0; i < queue_size && i < block_size; ++i) {
int queue = _next_queue_to_feed;
{
std::lock_guard<std::mutex> l(*_queue_mutexs[queue]);
for (int j = i; j < block_size; j += queue_size) {
_blocks_queues[queue].emplace_back(std::move(blocks[j]));
}
}
_next_queue_to_feed = queue + 1 < queue_size ? queue + 1 : 0;
}
}
_current_used_bytes += local_bytes;
}
Expand All @@ -95,18 +132,86 @@ class PipScannerContext : public vectorized::ScannerContext {
_queue_mutexs.emplace_back(new std::mutex);
_blocks_queues.emplace_back(std::list<vectorized::BlockUPtr>());
}
if (_need_colocate_distribute) {
int real_block_size =
limit == -1 ? _batch_size : std::min(static_cast<int64_t>(_batch_size), limit);
int64_t free_blocks_memory_usage = 0;
for (int i = 0; i < _max_queue_size; ++i) {
auto block = std::make_unique<vectorized::Block>(_output_tuple_desc->slots(),
real_block_size,
true /*ignore invalid slots*/);
free_blocks_memory_usage += block->allocated_bytes();
_colocate_mutable_blocks.emplace_back(new vectorized::MutableBlock(block.get()));
_colocate_blocks.emplace_back(std::move(block));
_colocate_block_mutexs.emplace_back(new std::mutex);
}
_free_blocks_memory_usage->add(free_blocks_memory_usage);
}
}

bool has_enough_space_in_blocks_queue() const override {
return _current_used_bytes < _max_bytes_in_queue / 2 * _max_queue_size;
}

virtual void _dispose_coloate_blocks_not_in_queue() override {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]

Suggested change
virtual void _dispose_coloate_blocks_not_in_queue() override {
void _dispose_coloate_blocks_not_in_queue() override {

if (_need_colocate_distribute) {
for (int i = 0; i < _max_queue_size; ++i) {
std::scoped_lock s(*_colocate_block_mutexs[i], *_queue_mutexs[i]);
if (_colocate_blocks[i] && !_colocate_blocks[i]->empty()) {
_current_used_bytes += _colocate_blocks[i]->allocated_bytes();
_blocks_queues[i].emplace_back(std::move(_colocate_blocks[i]));
_colocate_mutable_blocks[i]->clear();
}
}
}
}

private:
int _max_queue_size = 1;
int _next_queue_to_feed = 0;
std::vector<std::unique_ptr<std::mutex>> _queue_mutexs;
std::vector<std::list<vectorized::BlockUPtr>> _blocks_queues;
std::atomic_int64_t _current_used_bytes = 0;

const std::vector<int>& _col_distribute_ids;
const bool _need_colocate_distribute;
std::vector<vectorized::BlockUPtr> _colocate_blocks;
std::vector<std::unique_ptr<vectorized::MutableBlock>> _colocate_mutable_blocks;
std::vector<std::unique_ptr<std::mutex>> _colocate_block_mutexs;

void _add_rows_colocate_blocks(vectorized::Block* block, int loc,
const std::vector<int>& rows) {
int row_wait_add = rows.size();
const int batch_size = _batch_size;
const int* begin = &rows[0];
std::lock_guard<std::mutex> l(*_colocate_block_mutexs[loc]);

while (row_wait_add > 0) {
int row_add = 0;
int max_add = batch_size - _colocate_mutable_blocks[loc]->rows();
if (row_wait_add >= max_add) {
row_add = max_add;
} else {
row_add = row_wait_add;
}

_colocate_mutable_blocks[loc]->add_rows(block, begin, begin + row_add);
row_wait_add -= row_add;
begin += row_add;

if (row_add == max_add) {
_current_used_bytes += _colocate_blocks[loc]->allocated_bytes();
{
std::lock_guard<std::mutex> queue_l(*_queue_mutexs[loc]);
_blocks_queues[loc].emplace_back(std::move(_colocate_blocks[loc]));
}
bool get_block_not_empty = true;
_colocate_blocks[loc] = get_free_block(&get_block_not_empty, get_block_not_empty);
_colocate_mutable_blocks[loc]->set_muatable_columns(
_colocate_blocks[loc]->mutate_columns());
}
}
}
};
} // namespace pipeline
} // namespace doris
14 changes: 9 additions & 5 deletions be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,17 @@ Status ScannerContext::init() {
return Status::OK();
}

vectorized::BlockUPtr ScannerContext::get_free_block(bool* has_free_block) {
vectorized::BlockUPtr ScannerContext::get_free_block(bool* has_free_block,
bool get_block_not_empty) {
{
std::lock_guard l(_free_blocks_lock);
if (!_free_blocks.empty()) {
auto block = std::move(_free_blocks.back());
_free_blocks.pop_back();
_free_blocks_memory_usage->add(-block->allocated_bytes());
return block;
if (!get_block_not_empty || _free_blocks.back()->mem_reuse()) {
auto block = std::move(_free_blocks.back());
_free_blocks.pop_back();
_free_blocks_memory_usage->add(-block->allocated_bytes());
return block;
}
}
}
*has_free_block = false;
Expand Down Expand Up @@ -319,6 +322,7 @@ void ScannerContext::push_back_scanner_and_reschedule(VScanner* scanner) {
// same scanner.
if (scanner->need_to_close() && scanner->set_counted_down() &&
(--_num_unfinished_scanners) == 0) {
_dispose_coloate_blocks_not_in_queue();
_is_finished = true;
_blocks_queue_added_cv.notify_one();
}
Expand Down
4 changes: 3 additions & 1 deletion be/src/vec/exec/scan/scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class ScannerContext {
virtual ~ScannerContext() = default;
Status init();

vectorized::BlockUPtr get_free_block(bool* has_free_block);
vectorized::BlockUPtr get_free_block(bool* has_free_block, bool get_not_empty_block = false);
void return_free_block(std::unique_ptr<vectorized::Block> block);

// Append blocks from scanners to the blocks queue.
Expand Down Expand Up @@ -140,6 +140,8 @@ class ScannerContext {
Status _close_and_clear_scanners(VScanNode* node, RuntimeState* state);

protected:
virtual void _dispose_coloate_blocks_not_in_queue() {}

RuntimeState* _state;
VScanNode* _parent;

Expand Down
6 changes: 3 additions & 3 deletions be/src/vec/exec/scan/vscan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,9 @@ Status VScanNode::_init_profile() {

Status VScanNode::_start_scanners(const std::list<VScanner*>& scanners) {
if (_is_pipeline_scan) {
_scanner_ctx.reset(new pipeline::PipScannerContext(_state, this, _input_tuple_desc,
_output_tuple_desc, scanners, limit(),
_state->query_options().mem_limit / 20));
_scanner_ctx.reset(new pipeline::PipScannerContext(
_state, this, _input_tuple_desc, _output_tuple_desc, scanners, limit(),
_state->query_options().mem_limit / 20, _col_distribute_ids));
} else {
_scanner_ctx.reset(new ScannerContext(_state, this, _input_tuple_desc, _output_tuple_desc,
scanners, limit(),
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/scan/vscan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ class VScanNode : public ExecNode {
RuntimeProfile::HighWaterMarkCounter* _free_blocks_memory_usage;

std::unordered_map<std::string, int> _colname_to_slot_id;
std::vector<int> _col_distribute_ids;

private:
// Register and get all runtime filters at Init phase.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -932,9 +932,11 @@ private PlanFragment createAggregationFragment(
return createPhase2DistinctAggregationFragment(node, childFragment, fragments);
} else {
if (canColocateAgg(node.getAggInfo(), childFragment.getDataPartition())
&& childFragment.getPlanRoot().shouldColoAgg()) {
|| childFragment.getPlanRoot().shouldColoAgg(node.getAggInfo())) {
childFragment.getPlanRoot().setShouldColoScan();
childFragment.addPlanRoot(node);
childFragment.setHasColocatePlanNode(true);
// pipeline here should use shared scan to improve performance
childFragment.setHasColocatePlanNode(!ConnectContext.get().getSessionVariable().enablePipelineEngine());
return childFragment;
} else {
return createMergeAggregationFragment(node, childFragment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.planner;

import org.apache.doris.analysis.AggregateInfo;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.BaseTableRef;
import org.apache.doris.analysis.BinaryPredicate;
Expand Down Expand Up @@ -185,15 +186,28 @@ public class OlapScanNode extends ScanNode {
private Map<SlotRef, Expr> pointQueryEqualPredicats;
private DescriptorTable descTable;

private Set<Integer> distributionColumnIds;

private boolean shouldColoScan = false;

// Constructs node to scan given data files of table 'tbl'.
public OlapScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) {
super(id, desc, planNodeName, StatisticalType.OLAP_SCAN_NODE);
olapTable = (OlapTable) desc.getTable();
distributionColumnIds = Sets.newTreeSet();

Set<String> distColumnName = olapTable != null
? olapTable.getDistributionColumnNames() : Sets.newTreeSet();
int columnId = 0;
// use for Nereids to generate uniqueId set for inverted index to avoid scan unnecessary big size column
for (SlotDescriptor slotDescriptor : desc.getSlots()) {
if (slotDescriptor.getColumn() != null) {
outputColumnUniqueIds.add(slotDescriptor.getColumn().getUniqueId());
if (distColumnName.contains(slotDescriptor.getColumn().getName().toLowerCase())) {
distributionColumnIds.add(columnId);
}
}
columnId++;
}
}

Expand Down Expand Up @@ -1166,17 +1180,48 @@ public int getNumInstances() {
}

@Override
public boolean shouldColoAgg() {
// In pipeline exec engine, the instance num is parallel instance. we should disable colo agg
// in parallelInstance >= tablet_num * 2 to use more thread to speed up the query
if (ConnectContext.get().getSessionVariable().enablePipelineEngine()) {
int parallelInstance = ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
return parallelInstance < result.size() * 2;
public boolean shouldColoAgg(AggregateInfo aggregateInfo) {
distributionColumnIds.clear();
if (ConnectContext.get().getSessionVariable().enablePipelineEngine()
&& ConnectContext.get().getSessionVariable().enableColocateScan()) {
List<Expr> aggPartitionExprs = aggregateInfo.getInputPartitionExprs();
List<SlotDescriptor> slots = desc.getSlots();
for (Expr aggExpr : aggPartitionExprs) {
if (aggExpr instanceof SlotRef) {
SlotDescriptor slotDesc = ((SlotRef) aggExpr).getDesc();
int columnId = 0;
for (SlotDescriptor slotDescriptor : slots) {
if (slotDescriptor.equals(slotDesc)) {
if (slotDescriptor.getType().isFixedLengthType()
|| slotDescriptor.getType().isStringType()) {
distributionColumnIds.add(columnId);
} else {
return false;
}
}
columnId++;
}
}
}

for (int i = 0; i < slots.size(); i++) {
if (!distributionColumnIds.contains(i) && (!slots.get(i).getType().isFixedLengthType()
|| slots.get(i).getType().isStringType())) {
return false;
}
}

return !distributionColumnIds.isEmpty();
} else {
return true;
return false;
}
}

@Override
public void setShouldColoScan() {
shouldColoScan = true;
}

@Override
protected void toThrift(TPlanNode msg) {
List<String> keyColumnNames = new ArrayList<String>();
Expand Down Expand Up @@ -1233,6 +1278,10 @@ protected void toThrift(TPlanNode msg) {
if (outputColumnUniqueIds != null) {
msg.olap_scan_node.setOutputColumnUniqueIds(outputColumnUniqueIds);
}

if (shouldColoScan) {
msg.olap_scan_node.setDistributeColumnIds(new ArrayList<>(distributionColumnIds));
}
}

// export some tablets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package org.apache.doris.planner;

import org.apache.doris.analysis.AggregateInfo;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.BitmapFilterPredicate;
import org.apache.doris.analysis.CompoundPredicate;
Expand Down Expand Up @@ -835,10 +836,12 @@ public int getNumInstances() {
return numInstances;
}

public boolean shouldColoAgg() {
return true;
public boolean shouldColoAgg(AggregateInfo aggregateInfo) {
return false;
}

public void setShouldColoScan() {}

public void setNumInstances(int numInstances) {
this.numInstances = numInstances;
}
Expand Down
Loading