Skip to content

Commit

Permalink
[fix](multi-table-load) fix multi table load can not finish (apache#2…
Browse files Browse the repository at this point in the history
  • Loading branch information
sollhui authored Jan 17, 2024
1 parent 5f70c0b commit 140bfd1
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 20 deletions.
32 changes: 17 additions & 15 deletions be/src/io/fs/multi_table_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> para
_unplanned_pipes.size(), _planned_pipes.size(), params.size());
_unplanned_pipes.clear();

_inflight_plan_cnt += params.size();
_inflight_cnt += params.size();
for (auto& plan : params) {
if (!plan.__isset.table_name ||
_planned_pipes.find(plan.table_name) == _planned_pipes.end()) {
Expand Down Expand Up @@ -263,20 +263,9 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> para
_status = *status;
}

--_inflight_plan_cnt;
if (_inflight_plan_cnt == 0 && is_consume_finished()) {
_ctx->number_total_rows = _number_total_rows;
_ctx->number_loaded_rows = _number_loaded_rows;
_ctx->number_filtered_rows = _number_filtered_rows;
_ctx->number_unselected_rows = _number_unselected_rows;
_ctx->commit_infos = _tablet_commit_infos;
LOG(INFO) << "all plan for multi-table load complete. number_total_rows="
<< _ctx->number_total_rows
<< " number_loaded_rows=" << _ctx->number_loaded_rows
<< " number_filtered_rows=" << _ctx->number_filtered_rows
<< " number_unselected_rows=" << _ctx->number_unselected_rows;
_ctx->promise.set_value(
_status); // when all done, finish the routine load task
auto inflight_cnt = _inflight_cnt.fetch_sub(1);
if (inflight_cnt == 1 && is_consume_finished()) {
_handle_consumer_finished();
}
}));
}
Expand All @@ -303,6 +292,19 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> para

#endif

void MultiTablePipe::_handle_consumer_finished() {
_ctx->number_total_rows = _number_total_rows;
_ctx->number_loaded_rows = _number_loaded_rows;
_ctx->number_filtered_rows = _number_filtered_rows;
_ctx->number_unselected_rows = _number_unselected_rows;
_ctx->commit_infos = _tablet_commit_infos;
LOG(INFO) << "all plan for multi-table load complete. number_total_rows="
<< _ctx->number_total_rows << " number_loaded_rows=" << _ctx->number_loaded_rows
<< " number_filtered_rows=" << _ctx->number_filtered_rows
<< " number_unselected_rows=" << _ctx->number_unselected_rows;
_ctx->promise.set_value(_status); // when all done, finish the routine load task
}

Status MultiTablePipe::put_pipe(const TUniqueId& pipe_id,
std::shared_ptr<io::StreamLoadPipe> pipe) {
std::lock_guard<std::mutex> l(_pipe_map_lock);
Expand Down
19 changes: 15 additions & 4 deletions be/src/io/fs/multi_table_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,13 @@ class MultiTablePipe : public KafkaConsumerPipe {
// request and execute plans for unplanned pipes
Status request_and_exec_plans();

void set_consume_finished() { _consume_finished.store(true, std::memory_order_release); }
void handle_consume_finished() {
_set_consume_finished();
auto inflight_cnt = _inflight_cnt.fetch_sub(1);
if (inflight_cnt == 1) {
_handle_consumer_finished();
}
}

bool is_consume_finished() { return _consume_finished.load(std::memory_order_acquire); }

Expand All @@ -71,25 +77,30 @@ class MultiTablePipe : public KafkaConsumerPipe {
template <typename ExecParam>
Status exec_plans(ExecEnv* exec_env, std::vector<ExecParam> params);

void _set_consume_finished() { _consume_finished.store(true, std::memory_order_release); }

void _handle_consumer_finished();

private:
std::unordered_map<std::string /*table*/, KafkaConsumerPipePtr> _planned_pipes;
std::unordered_map<std::string /*table*/, KafkaConsumerPipePtr> _unplanned_pipes;
std::atomic<uint64_t> _unplanned_row_cnt {0}; // trigger plan request when exceed threshold
std::atomic<uint64_t> _inflight_plan_cnt {0}; // how many plan fragment are executing?
// inflight count, when it is zero, means consume and all plans is finished
std::atomic<uint64_t> _inflight_cnt {1};
std::atomic<bool> _consume_finished {false};
// note: Use raw pointer here to avoid cycle reference with StreamLoadContext.
// Life cycle of MultiTablePipe is under control of StreamLoadContext, which means StreamLoadContext is created
// before NultiTablePipe and released after it. It is safe to use raw pointer here.
StreamLoadContext* _ctx = nullptr;
Status _status; // save the first error status of all executing plan fragment
#ifndef BE_TEST

std::mutex _tablet_commit_infos_lock;
std::vector<TTabletCommitInfo> _tablet_commit_infos; // collect from each plan fragment
std::atomic<int64_t> _number_total_rows {0};
std::atomic<int64_t> _number_loaded_rows {0};
std::atomic<int64_t> _number_filtered_rows {0};
std::atomic<int64_t> _number_unselected_rows {0};
#endif

std::mutex _pipe_map_lock;
std::unordered_map<TUniqueId /*instance id*/, std::shared_ptr<io::StreamLoadPipe>> _pipe_map;

Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/routine_load/routine_load_task_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ void RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx,
HANDLE_ERROR(multi_table_pipe->request_and_exec_plans(),
"multi tables task executes plan error");
// need memory order
multi_table_pipe->set_consume_finished();
multi_table_pipe->handle_consume_finished();
HANDLE_ERROR(kafka_pipe->finish(), "finish multi table task failed");
}

Expand Down

0 comments on commit 140bfd1

Please sign in to comment.