Skip to content

Commit

Permalink
[bugfix](core) using weak ptr in data stream receiver to avoid runtim…
Browse files Browse the repository at this point in the history
…e state is deconstructed (apache#29410)
  • Loading branch information
yiguolei authored Jan 10, 2024
1 parent 791b6a9 commit 4c02797
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 2 deletions.
9 changes: 9 additions & 0 deletions be/src/vec/runtime/vdata_stream_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,15 @@ Status VDataStreamMgr::transmit_block(const PTransmitDataParams* request,
return Status::EndOfFile("data stream receiver closed");
}

// Lock the fragment context to ensure the runtime state and other objects are not
// deconstructed
auto ctx_lock = recvr->task_exec_ctx();
if (ctx_lock == nullptr) {
// Do not return internal error, because when query finished, the downstream node
// may finish before upstream node. And the object maybe deconstructed. If return error
// then the upstream node may report error status to FE, the query is failed.
return Status::EndOfFile("data stream receiver is deconstructed");
}
// request can only be used before calling recvr's add_batch or when request
// is the last for the sender, because request maybe released after it's batch
// is consumed by ExchangeNode.
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/runtime/vdata_stream_recvr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,8 @@ VDataStreamRecvr::VDataStreamRecvr(
const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
bool is_merging, RuntimeProfile* profile,
std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr)
: _mgr(stream_mgr),
: HasTaskExecutionCtx(state),
_mgr(stream_mgr),
#ifdef USE_MEM_TRACKER
_query_mem_tracker(state->query_mem_tracker()),
_query_id(state->query_id()),
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/runtime/vdata_stream_recvr.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include "common/status.h"
#include "runtime/descriptors.h"
#include "runtime/query_statistics.h"
#include "runtime/task_execution_context.h"
#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
#include "vec/columns/column.h"
Expand Down Expand Up @@ -70,7 +71,7 @@ class VSortedRunMerger;

class VDataStreamRecvr;

class VDataStreamRecvr {
class VDataStreamRecvr : public HasTaskExecutionCtx {
public:
class SenderQueue;
VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeState* state, const RowDescriptor& row_desc,
Expand Down
2 changes: 2 additions & 0 deletions be/test/vec/runtime/vdata_stream_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ TEST_F(VDataStreamTest, BasicTest) {

doris::RuntimeState runtime_stat(doris::TUniqueId(), doris::TQueryOptions(),
doris::TQueryGlobals(), nullptr);
std::shared_ptr<TaskExecutionContext> task_ctx_lock = std::make_shared<TaskExecutionContext>();
runtime_stat.set_task_execution_context(task_ctx_lock);
runtime_stat.init_mem_trackers();
runtime_stat.set_desc_tbl(desc_tbl);
runtime_stat.set_be_number(1);
Expand Down

0 comments on commit 4c02797

Please sign in to comment.