Skip to content

Commit

Permalink
[Improvement](load) Do no block in group commit sink
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 committed Jun 21, 2024
1 parent d545eb3 commit a7d3bec
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 76 deletions.
73 changes: 44 additions & 29 deletions be/src/pipeline/exec/group_commit_block_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,28 @@ Status GroupCommitBlockSinkLocalState::open(RuntimeState* state) {
for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state, _output_vexpr_ctxs[i]));
}
_write_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
"GroupCommitBlockSinkDependency", true);

WARN_IF_ERROR(_initialize_load_queue(), "");
return Status::OK();
}

Status GroupCommitBlockSinkLocalState::_initialize_load_queue() {
auto& p = _parent->cast<GroupCommitBlockSinkOperatorX>();
TUniqueId load_id;
load_id.__set_hi(p._load_id.hi);
load_id.__set_lo(p._load_id.lo);
if (_state->exec_env()->wal_mgr()->is_running()) {
RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
p._db_id, p._table_id, p._base_schema_version, load_id, _load_block_queue,
_state->be_exec_version(), _state->query_mem_tracker(), _write_dependency));
return Status::OK();
} else {
return Status::InternalError("be is stopping");
}
}

Status GroupCommitBlockSinkLocalState::close(RuntimeState* state, Status close_status) {
if (_closed) {
return Status::OK();
Expand Down Expand Up @@ -164,37 +183,31 @@ Status GroupCommitBlockSinkLocalState::_add_blocks(RuntimeState* state,
TUniqueId load_id;
load_id.__set_hi(p._load_id.hi);
load_id.__set_lo(p._load_id.lo);
if (_load_block_queue == nullptr) {
if (_state->exec_env()->wal_mgr()->is_running()) {
RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
p._db_id, p._table_id, p._base_schema_version, load_id, _load_block_queue,
_state->be_exec_version(), _state->query_mem_tracker()));
if (_group_commit_mode == TGroupCommitMode::ASYNC_MODE) {
size_t estimated_wal_bytes =
_calculate_estimated_wal_bytes(is_blocks_contain_all_load_data);
_group_commit_mode =
_load_block_queue->has_enough_wal_disk_space(estimated_wal_bytes)
? TGroupCommitMode::ASYNC_MODE
: TGroupCommitMode::SYNC_MODE;
if (_group_commit_mode == TGroupCommitMode::SYNC_MODE) {
LOG(INFO) << "Load id=" << print_id(_state->query_id())
<< ", use group commit label=" << _load_block_queue->label
<< " will not write wal because wal disk space usage reach max "
"limit. Detail info: "
<< _state->exec_env()->wal_mgr()->get_wal_dirs_info_string();
} else {
_estimated_wal_bytes = estimated_wal_bytes;
}
if (_state->exec_env()->wal_mgr()->is_running()) {
if (_group_commit_mode == TGroupCommitMode::ASYNC_MODE) {
size_t estimated_wal_bytes =
_calculate_estimated_wal_bytes(is_blocks_contain_all_load_data);
_group_commit_mode = _load_block_queue->has_enough_wal_disk_space(estimated_wal_bytes)
? TGroupCommitMode::ASYNC_MODE
: TGroupCommitMode::SYNC_MODE;
if (_group_commit_mode == TGroupCommitMode::SYNC_MODE) {
LOG(INFO) << "Load id=" << print_id(_state->query_id())
<< ", use group commit label=" << _load_block_queue->label
<< " will not write wal because wal disk space usage reach max "
"limit. Detail info: "
<< _state->exec_env()->wal_mgr()->get_wal_dirs_info_string();
} else {
_estimated_wal_bytes = estimated_wal_bytes;
}
if (_load_block_queue->wait_internal_group_commit_finish ||
_group_commit_mode == TGroupCommitMode::SYNC_MODE) {
_load_block_queue->append_dependency(_finish_dependency);
}
_state->set_import_label(_load_block_queue->label);
_state->set_wal_id(_load_block_queue->txn_id);
} else {
return Status::InternalError("be is stopping");
}
if (_load_block_queue->wait_internal_group_commit_finish ||
_group_commit_mode == TGroupCommitMode::SYNC_MODE) {
_load_block_queue->append_dependency(_finish_dependency);
}
_state->set_import_label(_load_block_queue->label);
_state->set_wal_id(_load_block_queue->txn_id);
} else {
return Status::InternalError("be is stopping");
}
for (auto it = _blocks.begin(); it != _blocks.end(); ++it) {
RETURN_IF_ERROR(_load_block_queue->add_block(
Expand Down Expand Up @@ -263,6 +276,8 @@ Status GroupCommitBlockSinkOperatorX::sink(RuntimeState* state, vectorized::Bloc
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)input_block->rows());
SCOPED_CONSUME_MEM_TRACKER(local_state._mem_tracker.get());
RETURN_IF_ERROR(local_state._initialize_load_queue());
DCHECK(local_state._load_block_queue);
Status status = Status::OK();

auto wind_up = [&]() -> Status {
Expand Down
5 changes: 4 additions & 1 deletion be/src/pipeline/exec/group_commit_block_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class GroupCommitBlockSinkLocalState final : public PipelineXSinkLocalState<Basi

Status close(RuntimeState* state, Status exec_status) override;
Dependency* finishdependency() override { return _finish_dependency.get(); }
std::vector<Dependency*> dependencies() const override { return {_write_dependency.get()}; }
std::string debug_string(int indentation_level) const override;

private:
Expand All @@ -54,12 +55,13 @@ class GroupCommitBlockSinkLocalState final : public PipelineXSinkLocalState<Basi
Status _add_blocks(RuntimeState* state, bool is_blocks_contain_all_load_data);
size_t _calculate_estimated_wal_bytes(bool is_blocks_contain_all_load_data);
void _remove_estimated_wal_bytes();
Status _initialize_load_queue();

vectorized::VExprContextSPtrs _output_vexpr_ctxs;

std::unique_ptr<vectorized::OlapTableBlockConvertor> _block_convertor;

std::shared_ptr<LoadBlockQueue> _load_block_queue;
std::shared_ptr<LoadBlockQueue> _load_block_queue = nullptr;
// used to calculate if meet the max filter ratio
std::vector<std::shared_ptr<vectorized::Block>> _blocks;
bool _is_block_appended = false;
Expand All @@ -73,6 +75,7 @@ class GroupCommitBlockSinkLocalState final : public PipelineXSinkLocalState<Basi
Bitmap _filter_bitmap;
int64_t _table_id;
std::shared_ptr<Dependency> _finish_dependency;
std::shared_ptr<Dependency> _write_dependency = nullptr;
};

class GroupCommitBlockSinkOperatorX final
Expand Down
85 changes: 42 additions & 43 deletions be/src/runtime/group_commit_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,46 +250,48 @@ void LoadBlockQueue::_cancel_without_lock(const Status& st) {
Status GroupCommitTable::get_first_block_load_queue(
int64_t table_id, int64_t base_schema_version, const UniqueId& load_id,
std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version,
std::shared_ptr<MemTrackerLimiter> mem_tracker) {
std::shared_ptr<MemTrackerLimiter> mem_tracker, std::shared_ptr<pipeline::Dependency> dep) {
DCHECK(table_id == _table_id);
{
std::unique_lock l(_lock);
for (int i = 0; i < 3; i++) {
bool is_schema_version_match = true;
for (const auto& [_, inner_block_queue] : _load_block_queues) {
if (!inner_block_queue->need_commit()) {
if (base_schema_version == inner_block_queue->schema_version) {
if (inner_block_queue->add_load_id(load_id).ok()) {
load_block_queue = inner_block_queue;
return Status::OK();
}
} else if (base_schema_version < inner_block_queue->schema_version) {
is_schema_version_match = false;
std::unique_lock l(_lock);
auto try_to_get_matched_queue = [&]() -> Status {
for (const auto& [_, inner_block_queue] : _load_block_queues) {
if (!inner_block_queue->need_commit()) {
if (base_schema_version == inner_block_queue->schema_version) {
if (inner_block_queue->add_load_id(load_id).ok()) {
load_block_queue = inner_block_queue;
return Status::OK();
}
} else {
return Status::DataQualityError<false>(
"schema version not match, maybe a schema change is in process. "
"Please "
"retry this load manually.");
}
}
if (!is_schema_version_match) {
return Status::DataQualityError<false>(
"schema version not match, maybe a schema change is in process. Please "
"retry this load manually.");
}
if (!_is_creating_plan_fragment) {
_is_creating_plan_fragment = true;
RETURN_IF_ERROR(_thread_pool->submit_func([this, be_exe_version, mem_tracker] {
auto st = _create_group_commit_load(be_exe_version, mem_tracker);
if (!st.ok()) {
LOG(WARNING) << "create group commit load error, st=" << st.to_string();
std::unique_lock l(_lock);
_is_creating_plan_fragment = false;
_cv.notify_all();
}
}));
}
_cv.wait_for(l, std::chrono::seconds(4));
}
return Status::InternalError<false>("can not get a block queue for table_id: " +
std::to_string(_table_id));
};

if (try_to_get_matched_queue().ok()) {
return Status::OK();
}
return Status::InternalError<false>("can not get a block queue for table_id: " +
std::to_string(_table_id));
if (!_is_creating_plan_fragment) {
_is_creating_plan_fragment = true;
dep->block();
RETURN_IF_ERROR(_thread_pool->submit_func([&, be_exe_version, mem_tracker, dep = dep] {
Defer defer {[&, dep = dep]() {
dep->set_ready();
std::unique_lock l(_lock);
_is_creating_plan_fragment = false;
}};
auto st = _create_group_commit_load(be_exe_version, mem_tracker);
if (!st.ok()) {
LOG(WARNING) << "create group commit load error, st=" << st.to_string();
}
}));
}
return try_to_get_matched_queue();
}

Status GroupCommitTable::_create_group_commit_load(int be_exe_version,
Expand Down Expand Up @@ -378,8 +380,6 @@ Status GroupCommitTable::_create_group_commit_load(int be_exe_version,
be_exe_version));
}
_load_block_queues.emplace(instance_id, load_block_queue);
_is_creating_plan_fragment = false;
_cv.notify_all();
}
}
st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline, result.params,
Expand Down Expand Up @@ -565,12 +565,10 @@ void GroupCommitMgr::stop() {
LOG(INFO) << "GroupCommitMgr is stopped";
}

Status GroupCommitMgr::get_first_block_load_queue(int64_t db_id, int64_t table_id,
int64_t base_schema_version,
const UniqueId& load_id,
std::shared_ptr<LoadBlockQueue>& load_block_queue,
int be_exe_version,
std::shared_ptr<MemTrackerLimiter> mem_tracker) {
Status GroupCommitMgr::get_first_block_load_queue(
int64_t db_id, int64_t table_id, int64_t base_schema_version, const UniqueId& load_id,
std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version,
std::shared_ptr<MemTrackerLimiter> mem_tracker, std::shared_ptr<pipeline::Dependency> dep) {
std::shared_ptr<GroupCommitTable> group_commit_table;
{
std::lock_guard wlock(_lock);
Expand All @@ -582,7 +580,8 @@ Status GroupCommitMgr::get_first_block_load_queue(int64_t db_id, int64_t table_i
group_commit_table = _table_map[table_id];
}
RETURN_IF_ERROR(group_commit_table->get_first_block_load_queue(
table_id, base_schema_version, load_id, load_block_queue, be_exe_version, mem_tracker));
table_id, base_schema_version, load_id, load_block_queue, be_exe_version, mem_tracker,
dep));
return Status::OK();
}

Expand Down
7 changes: 4 additions & 3 deletions be/src/runtime/group_commit_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ class GroupCommitTable {
const UniqueId& load_id,
std::shared_ptr<LoadBlockQueue>& load_block_queue,
int be_exe_version,
std::shared_ptr<MemTrackerLimiter> mem_tracker);
std::shared_ptr<MemTrackerLimiter> mem_tracker,
std::shared_ptr<pipeline::Dependency> dep);
Status get_load_block_queue(const TUniqueId& instance_id,
std::shared_ptr<LoadBlockQueue>& load_block_queue);

Expand All @@ -178,7 +179,6 @@ class GroupCommitTable {
int64_t _table_id;

std::mutex _lock;
std::condition_variable _cv;
// fragment_instance_id to load_block_queue
std::unordered_map<UniqueId, std::shared_ptr<LoadBlockQueue>> _load_block_queues;
bool _is_creating_plan_fragment = false;
Expand All @@ -198,7 +198,8 @@ class GroupCommitMgr {
const UniqueId& load_id,
std::shared_ptr<LoadBlockQueue>& load_block_queue,
int be_exe_version,
std::shared_ptr<MemTrackerLimiter> mem_tracker);
std::shared_ptr<MemTrackerLimiter> mem_tracker,
std::shared_ptr<pipeline::Dependency> dep);
std::promise<Status> debug_promise;
std::future<Status> debug_future = debug_promise.get_future();

Expand Down

0 comments on commit a7d3bec

Please sign in to comment.