Skip to content

Commit

Permalink
[Scanner](revert) revert the scanner change by apache#35604 (apache#3…
Browse files Browse the repository at this point in the history
  • Loading branch information
HappenLee authored Jun 2, 2024
1 parent 8cae084 commit ae50d7a
Showing 1 changed file with 15 additions and 14 deletions.
29 changes: 15 additions & 14 deletions be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,12 +296,12 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
// `_free_blocks` serve all running scanners, maybe it's too large for the remaining scanners
int free_blocks_for_each = _free_blocks.size_approx() / _num_running_scanners;
_num_running_scanners--;
std::vector<vectorized::BlockUPtr> blocks(free_blocks_for_each);
free_blocks_for_each =
_free_blocks.try_dequeue_bulk(blocks.data(), free_blocks_for_each);
for (int i = 0; i < free_blocks_for_each; ++i) {
_free_blocks_memory_usage -= blocks[i]->allocated_bytes();
_free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
vectorized::BlockUPtr removed_block;
if (_free_blocks.try_dequeue(removed_block)) {
_free_blocks_memory_usage -= block->allocated_bytes();
_free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
}
}
}
} else {
Expand Down Expand Up @@ -350,16 +350,17 @@ void ScannerContext::_try_to_scale_up() {
(_max_bytes_in_queue - _free_blocks_memory_usage) / _estimated_block_size;
num_add = std::min(num_add, most_add);
}
std::vector<std::weak_ptr<ScannerDelegate>> scale_up_scanners(num_add);
// get enough memory to launch one more scanner.
if (auto real_size = _scanners.try_dequeue_bulk(scale_up_scanners.data(), num_add);
real_size) {
for (int i = 0; i < real_size; ++i) {
submit_scan_task(std::make_shared<ScanTask>(scale_up_scanners[i]));
for (int i = 0; i < num_add; ++i) {
// get enough memory to launch one more scanner.
std::weak_ptr<ScannerDelegate> scale_up_scanner;
if (_scanners.try_dequeue(scale_up_scanner)) {
submit_scan_task(std::make_shared<ScanTask>(scale_up_scanner));
_num_running_scanners++;
_scale_up_scanners_counter->update(1);
is_scale_up = true;
} else {
break;
}
_num_running_scanners += real_size;
_scale_up_scanners_counter->update(real_size);
is_scale_up = true;
}

if (is_scale_up) {
Expand Down

0 comments on commit ae50d7a

Please sign in to comment.