Skip to content

Commit

Permalink
[Bug](pipeline) Support the auto partition in pipeline load (apache#2…
Browse files Browse the repository at this point in the history
  • Loading branch information
HappenLee authored Oct 10, 2023
1 parent 6ad2272 commit 880d0d7
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 19 deletions.
3 changes: 0 additions & 3 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,6 @@ Status PipelineTask::execute(bool* eos) {
if (_block->rows() != 0 || *eos) {
SCOPED_TIMER(_sink_timer);
auto status = _sink->sink(_state, block, _data_state);
if (status.is<ErrorCode::NEED_SEND_AGAIN>()) {
status = _sink->sink(_state, block, _data_state);
}
if (UNLIKELY(!status.ok() || block->rows() == 0)) {
if (_fragment_context->is_group_commit()) {
auto* future_block = dynamic_cast<vectorized::FutureBlock*>(block);
Expand Down
4 changes: 1 addition & 3 deletions be/src/runtime/plan_fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,9 +340,7 @@ Status PlanFragmentExecutor::open_vectorized_internal() {
//TODO: Asynchronisation need refactor this
if (st.is<NEED_SEND_AGAIN>()) { // created partition, do it again.
st = _sink->send(runtime_state(), block.get());
if (st.is<NEED_SEND_AGAIN>()) {
LOG(WARNING) << "have to create partition again...";
}
DCHECK(!st.is<NEED_SEND_AGAIN>());
}
if (UNLIKELY(!st.ok() || block->rows() == 0)) {
// Used for group commit insert
Expand Down
12 changes: 8 additions & 4 deletions be/src/vec/sink/async_writer_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,15 @@ class AsyncWriterSink : public DataSink {

Status close(RuntimeState* state, Status exec_status) override {
// if the init failed, the _writer may be nullptr. so here need check
if (_writer && _writer->need_normal_close()) {
if (exec_status.ok() && !state->is_cancelled()) {
RETURN_IF_ERROR(_writer->commit_trans());
if (_writer) {
if (_writer->need_normal_close()) {
if (exec_status.ok() && !state->is_cancelled()) {
RETURN_IF_ERROR(_writer->commit_trans());
}
RETURN_IF_ERROR(_writer->close(exec_status));
} else {
RETURN_IF_ERROR(_writer->get_writer_status());
}
RETURN_IF_ERROR(_writer->close(exec_status));
}
return DataSink::close(state, exec_status);
}
Expand Down
20 changes: 14 additions & 6 deletions be/src/vec/sink/writer/async_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ Status AsyncResultWriter::sink(Block* block, bool eos) {
return status;
}

std::unique_ptr<Block> AsyncResultWriter::get_block_from_queue() {
std::unique_ptr<Block> AsyncResultWriter::_get_block_from_queue() {
std::lock_guard l(_m);
DCHECK(!_data_queue.empty());
auto block = std::move(_data_queue.front());
Expand All @@ -78,6 +78,11 @@ std::unique_ptr<Block> AsyncResultWriter::get_block_from_queue() {
return block;
}

void AsyncResultWriter::_return_block_to_queue(std::unique_ptr<Block> add_block) {
std::lock_guard l(_m);
_data_queue.emplace_back(std::move(add_block));
}

void AsyncResultWriter::start_writer(RuntimeState* state, RuntimeProfile* profile) {
static_cast<void>(ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func(
[this, state, profile]() { this->process_block(state, profile); }));
Expand All @@ -102,25 +107,28 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi
break;
}

auto block = get_block_from_queue();
auto block = _get_block_from_queue();
auto status = write(block);
_return_free_block(std::move(block));

if (!status.ok()) {
if (status.is<ErrorCode::NEED_SEND_AGAIN>()) {
_return_block_to_queue(std::move(block));
continue;
} else if (UNLIKELY(!status.ok())) {
std::unique_lock l(_m);
_writer_status = status;
if (_dependency && _is_finished()) {
_dependency->set_ready_for_write();
}
break;
}

_return_free_block(std::move(block));
}
}

// if not in transaction or status is in error or force close we can do close in
// async IO thread
if (!_writer_status.ok() || !in_transaction()) {
static_cast<void>(close(_writer_status));
_writer_status = close(_writer_status);
_need_normal_close = false;
}
_writer_thread_closed = true;
Expand Down
8 changes: 6 additions & 2 deletions be/src/vec/sink/writer/async_result_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ class AsyncResultWriter : public ResultWriter {
// sink the block date to date queue
Status sink(Block* block, bool eos);

std::unique_ptr<Block> get_block_from_queue();

// Add the IO thread task process block() to thread pool to dispose the IO
void start_writer(RuntimeState* state, RuntimeProfile* profile);

Status get_writer_status() { return _writer_status; }

protected:
Status _projection_block(Block& input_block, Block* output_block);
const VExprContextSPtrs& _vec_output_expr_ctxs;
Expand All @@ -102,6 +102,10 @@ class AsyncResultWriter : public ResultWriter {
private:
[[nodiscard]] bool _data_queue_is_available() const { return _data_queue.size() < QUEUE_SIZE; }
[[nodiscard]] bool _is_finished() const { return !_writer_status.ok() || _eos; }

std::unique_ptr<Block> _get_block_from_queue();
void _return_block_to_queue(std::unique_ptr<Block>);

static constexpr auto QUEUE_SIZE = 3;
std::mutex _m;
std::condition_variable _cv;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) {
this.enableShareHashTableForBroadcastJoin = context.getSessionVariable().enableShareHashTableForBroadcastJoin;
// Only enable pipeline query engine in query, not load
this.enablePipelineEngine = context.getSessionVariable().getEnablePipelineEngine()
&& (fragments.size() > 0 && fragments.get(0).getSink() instanceof ResultSink);
&& (fragments.size() > 0);
this.enablePipelineXEngine = context.getSessionVariable().getEnablePipelineXEngine()
&& (fragments.size() > 0 && fragments.get(0).getSink() instanceof ResultSink);

Expand Down

0 comments on commit 880d0d7

Please sign in to comment.