Skip to content

Commit

Permalink
[exec](pipeline) runtime filter wait time (#34872)
Browse files Browse the repository at this point in the history
  • Loading branch information
HappenLee authored May 20, 2024
1 parent c82f3ac commit 0e95a6a
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 15 deletions.
2 changes: 2 additions & 0 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ class IRuntimeFilter {

bool has_remote_target() const { return _has_remote_target; }

bool has_local_target() const { return _has_local_target; }

bool is_ready() const {
return (!_enable_pipeline_exec && _rf_state == RuntimeFilterState::READY) ||
(_enable_pipeline_exec &&
Expand Down
39 changes: 32 additions & 7 deletions be/src/pipeline/dependency.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,27 @@ void RuntimeFilterTimer::call_ready() {
_parent->set_ready();
}

// should check rf timeout in two case:
// 1. the rf is ready just remove the wait queue
// 2. if the rf have local dependency, the rf should start wait when all local dependency is ready
bool RuntimeFilterTimer::should_be_check_timeout() {
if (!_parent->ready() && !_local_runtime_filter_dependencies.empty()) {
bool all_ready = true;
for (auto& dep : _local_runtime_filter_dependencies) {
if (!dep->ready()) {
all_ready = false;
break;
}
}
if (all_ready) {
_local_runtime_filter_dependencies.clear();
_registration_time = MonotonicMillis();
}
return all_ready;
}
return true;
}

void RuntimeFilterTimerQueue::start() {
while (!_stop) {
std::unique_lock<std::mutex> lk(cv_m);
Expand All @@ -135,14 +156,18 @@ void RuntimeFilterTimerQueue::start() {
for (auto& it : _que) {
if (it.use_count() == 1) {
// `use_count == 1` means this runtime filter has been released
} else if (it->_parent->is_blocked_by(nullptr)) {
// This means runtime filter is not ready, so we call timeout or continue to poll this timer.
int64_t ms_since_registration = MonotonicMillis() - it->registration_time();
if (ms_since_registration > it->wait_time_ms()) {
it->call_timeout();
} else {
new_que.push_back(std::move(it));
} else if (it->should_be_check_timeout()) {
if (it->_parent->is_blocked_by(nullptr)) {
// This means runtime filter is not ready, so we call timeout or continue to poll this timer.
int64_t ms_since_registration = MonotonicMillis() - it->registration_time();
if (ms_since_registration > it->wait_time_ms()) {
it->call_timeout();
} else {
new_que.push_back(std::move(it));
}
}
} else {
new_que.push_back(std::move(it));
}
}
new_que.swap(_que);
Expand Down
17 changes: 12 additions & 5 deletions be/src/pipeline/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class Dependency : public std::enable_shared_from_this<Dependency> {
BasicSharedState* shared_state() { return _shared_state; }
void set_shared_state(BasicSharedState* shared_state) { _shared_state = shared_state; }
virtual std::string debug_string(int indentation_level = 0);
bool ready() const { return _ready; }

// Start the watcher. We use it to count how long this dependency block the current pipeline task.
void start_watcher() { _watcher.start(); }
Expand Down Expand Up @@ -231,11 +232,19 @@ class RuntimeFilterTimer {
int64_t registration_time() const { return _registration_time; }
int32_t wait_time_ms() const { return _wait_time_ms; }

void set_local_runtime_filter_dependencies(
const std::vector<std::shared_ptr<RuntimeFilterDependency>>& deps) {
_local_runtime_filter_dependencies = deps;
}

bool should_be_check_timeout();

private:
friend struct RuntimeFilterTimerQueue;
std::shared_ptr<RuntimeFilterDependency> _parent = nullptr;
std::vector<std::shared_ptr<RuntimeFilterDependency>> _local_runtime_filter_dependencies;
std::mutex _lock;
const int64_t _registration_time;
int64_t _registration_time;
const int32_t _wait_time_ms;
};

Expand All @@ -258,11 +267,9 @@ struct RuntimeFilterTimerQueue {

~RuntimeFilterTimerQueue() = default;
RuntimeFilterTimerQueue() { _thread = std::thread(&RuntimeFilterTimerQueue::start, this); }
void push_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer> filter) { push(filter); }

void push(std::shared_ptr<pipeline::RuntimeFilterTimer> filter) {
void push_filter_timer(std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>>&& filter) {
std::unique_lock<std::mutex> lc(_que_lock);
_que.push_back(filter);
_que.insert(_que.end(), filter.begin(), filter.end());
cv.notify_all();
}

Expand Down
25 changes: 22 additions & 3 deletions be/src/vec/exec/runtime_filter_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,36 @@ void RuntimeFilterConsumer::init_runtime_filter_dependency(
runtime_filter_dependencies,
const int id, const int node_id, const std::string& name) {
runtime_filter_dependencies.resize(_runtime_filter_descs.size());
std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> runtime_filter_timers(
_runtime_filter_descs.size());
std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>>
local_runtime_filter_dependencies;

for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
runtime_filter_dependencies[i] = std::make_shared<pipeline::RuntimeFilterDependency>(
id, node_id, name, runtime_filter);
_runtime_filter_ctxs[i].runtime_filter_dependency = runtime_filter_dependencies[i].get();
auto filter_timer = std::make_shared<pipeline::RuntimeFilterTimer>(
runtime_filter_timers[i] = std::make_shared<pipeline::RuntimeFilterTimer>(
runtime_filter->registration_time(), runtime_filter->wait_time_ms(),
runtime_filter_dependencies[i]);
runtime_filter->set_filter_timer(filter_timer);
ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);
runtime_filter->set_filter_timer(runtime_filter_timers[i]);
if (runtime_filter->has_local_target()) {
local_runtime_filter_dependencies.emplace_back(runtime_filter_dependencies[i]);
}
}

// The gloabl runtime filter timer need set local runtime filter dependencies.
// start to wait before the local runtime filter ready
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
if (!runtime_filter->has_local_target()) {
runtime_filter_timers[i]->set_local_runtime_filter_dependencies(
local_runtime_filter_dependencies);
}
}
ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(
std::move(runtime_filter_timers));
}

Status RuntimeFilterConsumer::_acquire_runtime_filter(bool pipeline_x) {
Expand Down

0 comments on commit 0e95a6a

Please sign in to comment.