Skip to content

Commit

Permalink
[schedule](pipeline) Remove wait schedule time in pipeline query engi…
Browse files Browse the repository at this point in the history
…ne and change current queue to std::mutex (#24525) (#25116)

This reverts commit 591aeaa.
  • Loading branch information
HappenLee authored Oct 8, 2023
1 parent 53d0b71 commit 164a008
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 34 deletions.
2 changes: 0 additions & 2 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ void PipelineTask::_fresh_profile_counter() {
COUNTER_SET(_schedule_counts, (int64_t)_schedule_time);
COUNTER_SET(_wait_sink_timer, (int64_t)_wait_sink_watcher.elapsed_time());
COUNTER_SET(_wait_worker_timer, (int64_t)_wait_worker_watcher.elapsed_time());
COUNTER_SET(_wait_schedule_timer, (int64_t)_wait_schedule_watcher.elapsed_time());
COUNTER_SET(_begin_execute_timer, _begin_execute_time);
COUNTER_SET(_eos_timer, _eos_time);
COUNTER_SET(_src_pending_finish_over_timer, _src_pending_finish_over_time);
Expand Down Expand Up @@ -99,7 +98,6 @@ void PipelineTask::_init_profile() {
_wait_bf_timer = ADD_TIMER(_task_profile, "WaitBfTime");
_wait_sink_timer = ADD_TIMER(_task_profile, "WaitSinkTime");
_wait_worker_timer = ADD_TIMER(_task_profile, "WaitWorkerTime");
_wait_schedule_timer = ADD_TIMER(_task_profile, "WaitScheduleTime");
_block_counts = ADD_COUNTER(_task_profile, "NumBlockedTimes", TUnit::UNIT);
_block_by_source_counts = ADD_COUNTER(_task_profile, "NumBlockedBySrcTimes", TUnit::UNIT);
_block_by_sink_counts = ADD_COUNTER(_task_profile, "NumBlockedBySinkTimes", TUnit::UNIT);
Expand Down
4 changes: 0 additions & 4 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,6 @@ class PipelineTask {
_wait_worker_watcher.start();
}
void pop_out_runnable_queue() { _wait_worker_watcher.stop(); }
void start_schedule_watcher() { _wait_schedule_watcher.start(); }
void stop_schedule_watcher() { _wait_schedule_watcher.stop(); }
PipelineTaskState get_state() { return _cur_state; }
void set_state(PipelineTaskState state);

Expand Down Expand Up @@ -310,8 +308,6 @@ class PipelineTask {
MonotonicStopWatch _wait_worker_watcher;
RuntimeProfile::Counter* _wait_worker_timer;
// TODO we should calculate the time between when really runnable and runnable
MonotonicStopWatch _wait_schedule_watcher;
RuntimeProfile::Counter* _wait_schedule_timer;
RuntimeProfile::Counter* _yield_counts;
RuntimeProfile::Counter* _core_change_times;

Expand Down
32 changes: 12 additions & 20 deletions be/src/pipeline/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ void BlockedTaskScheduler::_schedule() {
_started.store(true);
std::list<PipelineTask*> local_blocked_tasks;
int empty_times = 0;
std::vector<PipelineTask*> ready_tasks;

while (!_shutdown) {
{
Expand All @@ -104,6 +103,7 @@ void BlockedTaskScheduler::_schedule() {
}
}

auto origin_local_block_tasks_size = local_blocked_tasks.size();
auto iter = local_blocked_tasks.begin();
vectorized::VecDateTimeValue now = vectorized::VecDateTimeValue::local_time();
while (iter != local_blocked_tasks.end()) {
Expand All @@ -114,68 +114,62 @@ void BlockedTaskScheduler::_schedule() {
if (task->is_pending_finish()) {
iter++;
} else {
_make_task_run(local_blocked_tasks, iter, ready_tasks,
PipelineTaskState::PENDING_FINISH);
_make_task_run(local_blocked_tasks, iter, PipelineTaskState::PENDING_FINISH);
}
} else if (task->fragment_context()->is_canceled()) {
if (task->is_pending_finish()) {
task->set_state(PipelineTaskState::PENDING_FINISH);
iter++;
} else {
_make_task_run(local_blocked_tasks, iter, ready_tasks);
_make_task_run(local_blocked_tasks, iter);
}
} else if (task->query_context()->is_timeout(now)) {
LOG(WARNING) << "Timeout, query_id=" << print_id(task->query_context()->query_id)
<< ", instance_id="
<< print_id(task->fragment_context()->get_fragment_instance_id());

task->fragment_context()->cancel(PPlanFragmentCancelReason::TIMEOUT);

if (task->is_pending_finish()) {
task->set_state(PipelineTaskState::PENDING_FINISH);
iter++;
} else {
_make_task_run(local_blocked_tasks, iter, ready_tasks);
_make_task_run(local_blocked_tasks, iter);
}
} else if (state == PipelineTaskState::BLOCKED_FOR_DEPENDENCY) {
if (task->has_dependency()) {
iter++;
} else {
_make_task_run(local_blocked_tasks, iter, ready_tasks);
_make_task_run(local_blocked_tasks, iter);
}
} else if (state == PipelineTaskState::BLOCKED_FOR_SOURCE) {
if (task->source_can_read()) {
_make_task_run(local_blocked_tasks, iter, ready_tasks);
_make_task_run(local_blocked_tasks, iter);
} else {
iter++;
}
} else if (state == PipelineTaskState::BLOCKED_FOR_RF) {
if (task->runtime_filters_are_ready_or_timeout()) {
_make_task_run(local_blocked_tasks, iter, ready_tasks);
_make_task_run(local_blocked_tasks, iter);
} else {
iter++;
}
} else if (state == PipelineTaskState::BLOCKED_FOR_SINK) {
if (task->sink_can_write()) {
_make_task_run(local_blocked_tasks, iter, ready_tasks);
_make_task_run(local_blocked_tasks, iter);
} else {
iter++;
}
} else {
// TODO: DCHECK the state
_make_task_run(local_blocked_tasks, iter, ready_tasks);
_make_task_run(local_blocked_tasks, iter);
}
}

if (ready_tasks.empty()) {
if (origin_local_block_tasks_size == 0 ||
local_blocked_tasks.size() == origin_local_block_tasks_size) {
empty_times += 1;
} else {
empty_times = 0;
for (auto& task : ready_tasks) {
task->stop_schedule_watcher();
_task_queue->push_back(task);
}
ready_tasks.clear();
}

if (empty_times != 0 && (empty_times & (EMPTY_TIMES_TO_YIELD - 1)) == 0) {
Expand All @@ -195,13 +189,11 @@ void BlockedTaskScheduler::_schedule() {

void BlockedTaskScheduler::_make_task_run(std::list<PipelineTask*>& local_tasks,
std::list<PipelineTask*>::iterator& task_itr,
std::vector<PipelineTask*>& ready_tasks,
PipelineTaskState t_state) {
auto task = *task_itr;
task->start_schedule_watcher();
task->set_state(t_state);
local_tasks.erase(task_itr++);
ready_tasks.emplace_back(task);
_task_queue->push_back(task);
}

TaskScheduler::~TaskScheduler() {
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/task_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ class BlockedTaskScheduler {
void _schedule();
void _make_task_run(std::list<PipelineTask*>& local_tasks,
std::list<PipelineTask*>::iterator& task_itr,
std::vector<PipelineTask*>& ready_tasks,
PipelineTaskState state = PipelineTaskState::RUNNABLE);
};

Expand Down
29 changes: 22 additions & 7 deletions be/src/vec/exec/scan/pip_scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,14 @@ class PipScannerContext : public vectorized::ScannerContext {
}

{
if (!_blocks_queues[id].try_dequeue(*block)) {
std::unique_lock<std::mutex> l(*_queue_mutexs[id]);
if (_blocks_queues[id].empty()) {
*eos = _is_finished || _should_stop;
return Status::OK();
} else {
*block = std::move(_blocks_queues[id].front());
_blocks_queues[id].pop_front();

}
}

Expand Down Expand Up @@ -112,8 +117,9 @@ class PipScannerContext : public vectorized::ScannerContext {
for (int i = 0; i < queue_size && i < block_size; ++i) {
int queue = _next_queue_to_feed;
{
std::lock_guard<std::mutex> l(*_queue_mutexs[queue]);
for (int j = i; j < block_size; j += queue_size) {
_blocks_queues[queue].enqueue(std::move(blocks[j]));
_blocks_queues[queue].emplace_back(std::move(blocks[j]));
}
}
_next_queue_to_feed = queue + 1 < queue_size ? queue + 1 : 0;
Expand All @@ -122,11 +128,15 @@ class PipScannerContext : public vectorized::ScannerContext {
_current_used_bytes += local_bytes;
}

bool empty_in_queue(int id) override { return _blocks_queues[id].size_approx() == 0; }
bool empty_in_queue(int id) override {
std::unique_lock<std::mutex> l(*_queue_mutexs[id]);
return _blocks_queues[id].empty();
}

Status init() override {
for (int i = 0; i < _num_parallel_instances; ++i) {
_blocks_queues.emplace_back(moodycamel::ConcurrentQueue<vectorized::BlockUPtr>());
_queue_mutexs.emplace_back(std::make_unique<std::mutex>());
_blocks_queues.emplace_back(std::list<vectorized::BlockUPtr>());
}
RETURN_IF_ERROR(ScannerContext::init());
if (_need_colocate_distribute) {
Expand Down Expand Up @@ -158,9 +168,10 @@ class PipScannerContext : public vectorized::ScannerContext {
void _dispose_coloate_blocks_not_in_queue() override {
if (_need_colocate_distribute) {
for (int i = 0; i < _num_parallel_instances; ++i) {
std::scoped_lock s(*_colocate_block_mutexs[i], *_queue_mutexs[i]);
if (_colocate_blocks[i] && !_colocate_blocks[i]->empty()) {
_current_used_bytes += _colocate_blocks[i]->allocated_bytes();
_blocks_queues[i].enqueue(std::move(_colocate_blocks[i]));
_blocks_queues[i].emplace_back(std::move(_colocate_blocks[i]));
_colocate_mutable_blocks[i]->clear();
}
}
Expand All @@ -169,7 +180,8 @@ class PipScannerContext : public vectorized::ScannerContext {

private:
int _next_queue_to_feed = 0;
std::vector<moodycamel::ConcurrentQueue<vectorized::BlockUPtr>> _blocks_queues;
std::vector<std::unique_ptr<std::mutex>> _queue_mutexs;
std::vector<std::list<vectorized::BlockUPtr>> _blocks_queues;
std::atomic_int64_t _current_used_bytes = 0;

const std::vector<int>& _col_distribute_ids;
Expand Down Expand Up @@ -200,7 +212,10 @@ class PipScannerContext : public vectorized::ScannerContext {

if (row_add == max_add) {
_current_used_bytes += _colocate_blocks[loc]->allocated_bytes();
{ _blocks_queues[loc].enqueue(std::move(_colocate_blocks[loc])); }
{
std::lock_guard<std::mutex> queue_l(*_queue_mutexs[loc]);
_blocks_queues[loc].emplace_back(std::move(_colocate_blocks[loc]));
}
bool get_block_not_empty = true;
_colocate_blocks[loc] = get_free_block(&get_block_not_empty, get_block_not_empty);
_colocate_mutable_blocks[loc]->set_muatable_columns(
Expand Down

0 comments on commit 164a008

Please sign in to comment.