From 880d0d7e705a4c48e3a17fbdf966af7680eb1247 Mon Sep 17 00:00:00 2001 From: HappenLee Date: Tue, 10 Oct 2023 11:51:12 +0800 Subject: [PATCH] [Bug](pipeline) Support the auto partition in pipeline load (#25176) --- be/src/pipeline/pipeline_task.cpp | 3 --- be/src/runtime/plan_fragment_executor.cpp | 4 +--- be/src/vec/sink/async_writer_sink.h | 12 +++++++---- .../vec/sink/writer/async_result_writer.cpp | 20 +++++++++++++------ be/src/vec/sink/writer/async_result_writer.h | 8 ++++++-- .../java/org/apache/doris/qe/Coordinator.java | 2 +- 6 files changed, 30 insertions(+), 19 deletions(-) diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index a0f77578e731ca..a46914fb601655 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -296,9 +296,6 @@ Status PipelineTask::execute(bool* eos) { if (_block->rows() != 0 || *eos) { SCOPED_TIMER(_sink_timer); auto status = _sink->sink(_state, block, _data_state); - if (status.is()) { - status = _sink->sink(_state, block, _data_state); - } if (UNLIKELY(!status.ok() || block->rows() == 0)) { if (_fragment_context->is_group_commit()) { auto* future_block = dynamic_cast(block); diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index ae3ad1c2d9b8c0..265fad8882d2fc 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -340,9 +340,7 @@ Status PlanFragmentExecutor::open_vectorized_internal() { //TODO: Asynchronisation need refactor this if (st.is()) { // created partition, do it again. st = _sink->send(runtime_state(), block.get()); - if (st.is()) { - LOG(WARNING) << "have to create partition again..."; - } + DCHECK(!st.is()); } if (UNLIKELY(!st.ok() || block->rows() == 0)) { // Used for group commit insert diff --git a/be/src/vec/sink/async_writer_sink.h b/be/src/vec/sink/async_writer_sink.h index b0d772a841d582..1d566036e12ab8 100644 --- a/be/src/vec/sink/async_writer_sink.h +++ b/be/src/vec/sink/async_writer_sink.h @@ -91,11 +91,15 @@ class AsyncWriterSink : public DataSink { Status close(RuntimeState* state, Status exec_status) override { // if the init failed, the _writer may be nullptr. so here need check - if (_writer && _writer->need_normal_close()) { - if (exec_status.ok() && !state->is_cancelled()) { - RETURN_IF_ERROR(_writer->commit_trans()); + if (_writer) { + if (_writer->need_normal_close()) { + if (exec_status.ok() && !state->is_cancelled()) { + RETURN_IF_ERROR(_writer->commit_trans()); + } + RETURN_IF_ERROR(_writer->close(exec_status)); + } else { + RETURN_IF_ERROR(_writer->get_writer_status()); } - RETURN_IF_ERROR(_writer->close(exec_status)); } return DataSink::close(state, exec_status); } diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index 0e752f63abd4d3..2a7d8d988d82ab 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -67,7 +67,7 @@ Status AsyncResultWriter::sink(Block* block, bool eos) { return status; } -std::unique_ptr AsyncResultWriter::get_block_from_queue() { +std::unique_ptr AsyncResultWriter::_get_block_from_queue() { std::lock_guard l(_m); DCHECK(!_data_queue.empty()); auto block = std::move(_data_queue.front()); @@ -78,6 +78,11 @@ std::unique_ptr AsyncResultWriter::get_block_from_queue() { return block; } +void AsyncResultWriter::_return_block_to_queue(std::unique_ptr add_block) { + std::lock_guard l(_m); + _data_queue.emplace_back(std::move(add_block)); +} + void AsyncResultWriter::start_writer(RuntimeState* state, RuntimeProfile* profile) { static_cast(ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func( [this, state, profile]() { this->process_block(state, profile); })); @@ -102,11 +107,12 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi break; } - auto block = get_block_from_queue(); + auto block = _get_block_from_queue(); auto status = write(block); - _return_free_block(std::move(block)); - - if (!status.ok()) { + if (status.is()) { + _return_block_to_queue(std::move(block)); + continue; + } else if (UNLIKELY(!status.ok())) { std::unique_lock l(_m); _writer_status = status; if (_dependency && _is_finished()) { @@ -114,13 +120,15 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi } break; } + + _return_free_block(std::move(block)); } } // if not in transaction or status is in error or force close we can do close in // async IO thread if (!_writer_status.ok() || !in_transaction()) { - static_cast(close(_writer_status)); + _writer_status = close(_writer_status); _need_normal_close = false; } _writer_thread_closed = true; diff --git a/be/src/vec/sink/writer/async_result_writer.h b/be/src/vec/sink/writer/async_result_writer.h index 27b8f2de13e724..5d0cd2605271ba 100644 --- a/be/src/vec/sink/writer/async_result_writer.h +++ b/be/src/vec/sink/writer/async_result_writer.h @@ -86,11 +86,11 @@ class AsyncResultWriter : public ResultWriter { // sink the block date to date queue Status sink(Block* block, bool eos); - std::unique_ptr get_block_from_queue(); - // Add the IO thread task process block() to thread pool to dispose the IO void start_writer(RuntimeState* state, RuntimeProfile* profile); + Status get_writer_status() { return _writer_status; } + protected: Status _projection_block(Block& input_block, Block* output_block); const VExprContextSPtrs& _vec_output_expr_ctxs; @@ -102,6 +102,10 @@ class AsyncResultWriter : public ResultWriter { private: [[nodiscard]] bool _data_queue_is_available() const { return _data_queue.size() < QUEUE_SIZE; } [[nodiscard]] bool _is_finished() const { return !_writer_status.ok() || _eos; } + + std::unique_ptr _get_block_from_queue(); + void _return_block_to_queue(std::unique_ptr); + static constexpr auto QUEUE_SIZE = 3; std::mutex _m; std::condition_variable _cv; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index a8a08fb0e626ec..7cf4fdafa5399c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -321,7 +321,7 @@ public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) { this.enableShareHashTableForBroadcastJoin = context.getSessionVariable().enableShareHashTableForBroadcastJoin; // Only enable pipeline query engine in query, not load this.enablePipelineEngine = context.getSessionVariable().getEnablePipelineEngine() - && (fragments.size() > 0 && fragments.get(0).getSink() instanceof ResultSink); + && (fragments.size() > 0); this.enablePipelineXEngine = context.getSessionVariable().getEnablePipelineXEngine() && (fragments.size() > 0 && fragments.get(0).getSink() instanceof ResultSink);