diff --git a/dbms/src/Flash/Executor/DataStreamExecutor.cpp b/dbms/src/Flash/Executor/DataStreamExecutor.cpp index 2b299027cc1..a695579d519 100644 --- a/dbms/src/Flash/Executor/DataStreamExecutor.cpp +++ b/dbms/src/Flash/Executor/DataStreamExecutor.cpp @@ -15,16 +15,19 @@ #include #include #include -#include #include #include #include namespace DB { -DataStreamExecutor::DataStreamExecutor(Context & context_, const BlockIO & block_io) - : QueryExecutor(block_io.process_list_entry, context_) - , data_stream(block_io.in) +DataStreamExecutor::DataStreamExecutor( + const MemoryTrackerPtr & memory_tracker_, + Context & context_, + const String & req_id, + const BlockInputStreamPtr & data_stream_) + : QueryExecutor(memory_tracker_, context_, req_id) + , data_stream(data_stream_) { assert(data_stream); thread_cnt_before_execute = GET_METRIC(tiflash_thread_count, type_active_threads_of_thdpool).Value(); diff --git a/dbms/src/Flash/Executor/DataStreamExecutor.h b/dbms/src/Flash/Executor/DataStreamExecutor.h index b1f92d7ef3a..ae32aeefbb4 100644 --- a/dbms/src/Flash/Executor/DataStreamExecutor.h +++ b/dbms/src/Flash/Executor/DataStreamExecutor.h @@ -21,12 +21,14 @@ namespace DB class IBlockInputStream; using BlockInputStreamPtr = std::shared_ptr; -struct BlockIO; - class DataStreamExecutor : public QueryExecutor { public: - DataStreamExecutor(Context & context_, const BlockIO & block_io); + DataStreamExecutor( + const MemoryTrackerPtr & memory_tracker_, + Context & context_, + const String & req_id, + const BlockInputStreamPtr & data_stream_); String toString() const override; diff --git a/dbms/src/Flash/Executor/PipelineExecutor.cpp b/dbms/src/Flash/Executor/PipelineExecutor.cpp index c4b06316fea..524624dedd7 100644 --- a/dbms/src/Flash/Executor/PipelineExecutor.cpp +++ b/dbms/src/Flash/Executor/PipelineExecutor.cpp @@ -20,11 +20,13 @@ namespace DB { PipelineExecutor::PipelineExecutor( - const ProcessListEntryPtr & process_list_entry_, + const MemoryTrackerPtr & memory_tracker_, Context & context_, + const String & req_id, const PipelinePtr & root_pipeline_) - : QueryExecutor(process_list_entry_, context_) + : QueryExecutor(memory_tracker_, context_, req_id) , root_pipeline(root_pipeline_) + , status(req_id) { assert(root_pipeline); } diff --git a/dbms/src/Flash/Executor/PipelineExecutor.h b/dbms/src/Flash/Executor/PipelineExecutor.h index 07d57a83081..9ccd352db6b 100644 --- a/dbms/src/Flash/Executor/PipelineExecutor.h +++ b/dbms/src/Flash/Executor/PipelineExecutor.h @@ -49,8 +49,9 @@ class PipelineExecutor : public QueryExecutor { public: PipelineExecutor( - const ProcessListEntryPtr & process_list_entry_, + const MemoryTrackerPtr & memory_tracker_, Context & context_, + const String & req_id, const PipelinePtr & root_pipeline_); String toString() const override; diff --git a/dbms/src/Flash/Executor/PipelineExecutorStatus.cpp b/dbms/src/Flash/Executor/PipelineExecutorStatus.cpp index e95188ec8f8..471f9e96393 100644 --- a/dbms/src/Flash/Executor/PipelineExecutorStatus.cpp +++ b/dbms/src/Flash/Executor/PipelineExecutorStatus.cpp @@ -33,12 +33,12 @@ std::exception_ptr PipelineExecutorStatus::getExceptionPtr() noexcept String PipelineExecutorStatus::getExceptionMsg() noexcept { - std::lock_guard lock(mu); try { - if (!exception_ptr) + auto cur_exception_ptr = getExceptionPtr(); + if (!cur_exception_ptr) return ""; - std::rethrow_exception(exception_ptr); + std::rethrow_exception(cur_exception_ptr); } catch (const DB::Exception & e) { @@ -56,22 +56,32 @@ void PipelineExecutorStatus::onErrorOccurred(const String & err_msg) noexcept onErrorOccurred(std::make_exception_ptr(e)); } -void PipelineExecutorStatus::onErrorOccurred(const std::exception_ptr & exception_ptr_) noexcept +bool PipelineExecutorStatus::setExceptionPtr(const std::exception_ptr & exception_ptr_) noexcept { assert(exception_ptr_ != nullptr); + std::lock_guard lock(mu); + if (exception_ptr != nullptr) + return false; + exception_ptr = exception_ptr_; + return true; +} + +void PipelineExecutorStatus::onErrorOccurred(const std::exception_ptr & exception_ptr_) noexcept +{ + if (setExceptionPtr(exception_ptr_)) { - std::lock_guard lock(mu); - if (exception_ptr != nullptr) - return; - exception_ptr = exception_ptr_; + cancel(); + LOG_WARNING(log, "error occured and cancel the query"); } - cancel(); } void PipelineExecutorStatus::wait() noexcept { - std::unique_lock lock(mu); - cv.wait(lock, [&] { return 0 == active_event_count; }); + { + std::unique_lock lock(mu); + cv.wait(lock, [&] { return 0 == active_event_count; }); + } + LOG_DEBUG(log, "query finished and wait done"); } void PipelineExecutorStatus::onEventSchedule() noexcept diff --git a/dbms/src/Flash/Executor/PipelineExecutorStatus.h b/dbms/src/Flash/Executor/PipelineExecutorStatus.h index 79f68be686c..0892751acf4 100644 --- a/dbms/src/Flash/Executor/PipelineExecutorStatus.h +++ b/dbms/src/Flash/Executor/PipelineExecutorStatus.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include @@ -27,6 +28,14 @@ class PipelineExecutorStatus : private boost::noncopyable public: static constexpr auto timeout_err_msg = "error with timeout"; + PipelineExecutorStatus() + : log(Logger::get()) + {} + + explicit PipelineExecutorStatus(const String & req_id) + : log(Logger::get(req_id)) + {} + ExecutionResult toExecutionResult() noexcept; std::exception_ptr getExceptionPtr() noexcept; @@ -51,9 +60,11 @@ class PipelineExecutorStatus : private boost::noncopyable } if (is_timeout) { + LOG_WARNING(log, "wait timeout"); onErrorOccurred(timeout_err_msg); throw Exception(timeout_err_msg); } + LOG_DEBUG(log, "query finished and wait done"); } void cancel() noexcept; @@ -64,6 +75,11 @@ class PipelineExecutorStatus : private boost::noncopyable } private: + bool setExceptionPtr(const std::exception_ptr & exception_ptr_) noexcept; + +private: + LoggerPtr log; + std::mutex mu; std::condition_variable cv; std::exception_ptr exception_ptr; diff --git a/dbms/src/Flash/Executor/QueryExecutor.h b/dbms/src/Flash/Executor/QueryExecutor.h index f859211df97..9c90d2e968d 100644 --- a/dbms/src/Flash/Executor/QueryExecutor.h +++ b/dbms/src/Flash/Executor/QueryExecutor.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include #include @@ -24,18 +25,19 @@ namespace DB { -class ProcessListEntry; -using ProcessListEntryPtr = std::shared_ptr; - class Context; class DAGContext; class QueryExecutor { public: - QueryExecutor(const ProcessListEntryPtr & process_list_entry_, Context & context_) - : process_list_entry(process_list_entry_) + QueryExecutor( + const MemoryTrackerPtr & memory_tracker_, + Context & context_, + const String & req_id) + : memory_tracker(memory_tracker_) , context(context_) + , log(Logger::get(req_id)) {} virtual ~QueryExecutor() = default; @@ -61,8 +63,9 @@ class QueryExecutor DAGContext & dagContext() const; protected: - ProcessListEntryPtr process_list_entry; + MemoryTrackerPtr memory_tracker; Context & context; + LoggerPtr log; }; using QueryExecutorPtr = std::unique_ptr; diff --git a/dbms/src/Flash/Pipeline/Pipeline.cpp b/dbms/src/Flash/Pipeline/Pipeline.cpp index 2d8da185230..fa789746c8b 100644 --- a/dbms/src/Flash/Pipeline/Pipeline.cpp +++ b/dbms/src/Flash/Pipeline/Pipeline.cpp @@ -111,7 +111,7 @@ EventPtr Pipeline::toEvent(PipelineExecutorStatus & status, Context & context, s // ``` auto memory_tracker = current_memory_tracker ? current_memory_tracker->shared_from_this() : nullptr; - auto plain_pipeline_event = std::make_shared(status, memory_tracker, context, shared_from_this(), concurrency); + auto plain_pipeline_event = std::make_shared(status, memory_tracker, log->identifier(), context, shared_from_this(), concurrency); for (const auto & child : children) { auto input = child->toEvent(status, context, concurrency, all_events); diff --git a/dbms/src/Flash/Pipeline/Pipeline.h b/dbms/src/Flash/Pipeline/Pipeline.h index 56b58febe80..37504b29b6a 100644 --- a/dbms/src/Flash/Pipeline/Pipeline.h +++ b/dbms/src/Flash/Pipeline/Pipeline.h @@ -46,8 +46,9 @@ using SharedQueuePtr = std::shared_ptr; class Pipeline : public std::enable_shared_from_this { public: - explicit Pipeline(UInt32 id_) + Pipeline(UInt32 id_, const String & req_id) : id(id_) + , log(Logger::get(req_id, id_)) {} void addPlanNode(const PhysicalPlanNodePtr & plan_node); @@ -74,6 +75,7 @@ class Pipeline : public std::enable_shared_from_this private: const UInt32 id; + LoggerPtr log; // data flow: plan_nodes.begin() --> plan_nodes.end() std::deque plan_nodes; diff --git a/dbms/src/Flash/Pipeline/PipelineBuilder.h b/dbms/src/Flash/Pipeline/PipelineBuilder.h index 8f7b532b1f7..f41d92015a7 100644 --- a/dbms/src/Flash/Pipeline/PipelineBuilder.h +++ b/dbms/src/Flash/Pipeline/PipelineBuilder.h @@ -38,11 +38,12 @@ using PipelineIdGeneratorPtr = std::shared_ptr; class PipelineBuilder { public: - PipelineBuilder() - : id_generator(std::make_shared()) + explicit PipelineBuilder(const String & req_id) + : log(Logger::get(req_id)) + , id_generator(std::make_shared()) , pipeline_breaker(std::nullopt) { - pipeline = std::make_shared(id_generator->nextID()); + pipeline = std::make_shared(id_generator->nextID(), req_id); } private: @@ -61,11 +62,15 @@ class PipelineBuilder const PhysicalPlanNodePtr breaker_node; }; - PipelineBuilder(const PipelineIdGeneratorPtr & id_generator_, PipelineBreaker && pipeline_breaker_) - : id_generator(id_generator_) + PipelineBuilder( + const PipelineIdGeneratorPtr & id_generator_, + PipelineBreaker && pipeline_breaker_, + const String & req_id) + : log(Logger::get(req_id)) + , id_generator(id_generator_) , pipeline_breaker(std::move(pipeline_breaker_)) { - pipeline = std::make_shared(id_generator->nextID()); + pipeline = std::make_shared(id_generator->nextID(), req_id); } public: @@ -77,7 +82,7 @@ class PipelineBuilder /// Break the current pipeline and return a new builder for the broken pipeline. PipelineBuilder breakPipeline(const PhysicalPlanNodePtr & breaker_node) { - return PipelineBuilder(id_generator, PipelineBreaker{pipeline, breaker_node}); + return PipelineBuilder(id_generator, PipelineBreaker{pipeline, breaker_node}, log->identifier()); } PipelinePtr build() @@ -95,6 +100,7 @@ class PipelineBuilder } private: + LoggerPtr log; PipelineIdGeneratorPtr id_generator; PipelinePtr pipeline; std::optional pipeline_breaker; diff --git a/dbms/src/Flash/Pipeline/Schedule/Events/Event.cpp b/dbms/src/Flash/Pipeline/Schedule/Events/Event.cpp index 35d364b6f51..da3a44aedee 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Events/Event.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/Events/Event.cpp @@ -20,6 +20,8 @@ #include #include +#include + namespace DB { namespace FailPoints @@ -29,10 +31,11 @@ extern const char random_pipeline_model_event_finish_failpoint[]; } // namespace FailPoints // if any exception throw here, we should record err msg and then cancel the query. -#define CATCH \ - catch (...) \ - { \ - exec_status.onErrorOccurred(std::current_exception()); \ +#define CATCH \ + catch (...) \ + { \ + LOG_WARNING(log, "error occurred and cancel the query"); \ + exec_status.onErrorOccurred(std::current_exception()); \ } void Event::addInput(const EventPtr & input) @@ -126,21 +129,26 @@ void Event::scheduleTasks(std::vector & tasks) noexcept // If query has already been cancelled, we can skip scheduling tasks. // And then tasks will be destroyed and call `onTaskFinish`. if (likely(!exec_status.isCancelled())) + { + LOG_DEBUG(log, "{} tasks scheduled by event", tasks.size()); TaskScheduler::instance->submit(tasks); + } } void Event::onTaskFinish() noexcept { assert(status != EventStatus::FINISHED); - auto cur_value = unfinished_tasks.fetch_sub(1); - assert(cur_value >= 1); - if (1 == cur_value) + int32_t remaining_tasks = unfinished_tasks.fetch_sub(1) - 1; + assert(remaining_tasks >= 0); + LOG_DEBUG(log, "one task finished, {} tasks remaining", remaining_tasks); + if (0 == remaining_tasks) finish(); } void Event::switchStatus(EventStatus from, EventStatus to) noexcept { RUNTIME_ASSERT(status.compare_exchange_strong(from, to)); + LOG_DEBUG(log, "switch status: {} --> {}", magic_enum::enum_name(from), magic_enum::enum_name(to)); } #undef CATCH diff --git a/dbms/src/Flash/Pipeline/Schedule/Events/Event.h b/dbms/src/Flash/Pipeline/Schedule/Events/Event.h index 2f2778e5b48..5bff301e6ee 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Events/Event.h +++ b/dbms/src/Flash/Pipeline/Schedule/Events/Event.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include @@ -39,9 +40,13 @@ class PipelineExecutorStatus; class Event : public std::enable_shared_from_this { public: - Event(PipelineExecutorStatus & exec_status_, MemoryTrackerPtr mem_tracker_) + Event( + PipelineExecutorStatus & exec_status_, + MemoryTrackerPtr mem_tracker_, + const String & req_id = "") : exec_status(exec_status_) , mem_tracker(std::move(mem_tracker_)) + , log(Logger::get(req_id)) {} virtual ~Event() = default; @@ -79,6 +84,8 @@ class Event : public std::enable_shared_from_this MemoryTrackerPtr mem_tracker; + LoggerPtr log; + private: Events outputs; diff --git a/dbms/src/Flash/Pipeline/Schedule/Events/PipelineEvent.h b/dbms/src/Flash/Pipeline/Schedule/Events/PipelineEvent.h index bc862370cea..d3631bbd3b5 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Events/PipelineEvent.h +++ b/dbms/src/Flash/Pipeline/Schedule/Events/PipelineEvent.h @@ -28,9 +28,10 @@ class PipelineEvent : public Event PipelineEvent( PipelineExecutorStatus & exec_status_, MemoryTrackerPtr mem_tracker_, + const String & req_id, Context & context_, const PipelinePtr & pipeline_) - : Event(exec_status_, std::move(mem_tracker_)) + : Event(exec_status_, std::move(mem_tracker_), req_id) , context(context_) , pipeline(pipeline_) {} diff --git a/dbms/src/Flash/Pipeline/Schedule/Events/PlainPipelineEvent.cpp b/dbms/src/Flash/Pipeline/Schedule/Events/PlainPipelineEvent.cpp index c724442ae61..bea061b551c 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Events/PlainPipelineEvent.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/Events/PlainPipelineEvent.cpp @@ -26,7 +26,7 @@ std::vector PlainPipelineEvent::scheduleImpl() std::vector tasks; tasks.reserve(pipeline_exec_group.size()); for (auto & pipline_exec : pipeline_exec_group) - tasks.push_back(std::make_unique(mem_tracker, exec_status, shared_from_this(), std::move(pipline_exec))); + tasks.push_back(std::make_unique(mem_tracker, log->identifier(), exec_status, shared_from_this(), std::move(pipline_exec))); return tasks; } } // namespace DB diff --git a/dbms/src/Flash/Pipeline/Schedule/Events/PlainPipelineEvent.h b/dbms/src/Flash/Pipeline/Schedule/Events/PlainPipelineEvent.h index ead0802968c..e0b4bd9fe05 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Events/PlainPipelineEvent.h +++ b/dbms/src/Flash/Pipeline/Schedule/Events/PlainPipelineEvent.h @@ -24,10 +24,11 @@ class PlainPipelineEvent : public PipelineEvent PlainPipelineEvent( PipelineExecutorStatus & exec_status_, MemoryTrackerPtr mem_tracker_, + const String & req_id, Context & context_, const PipelinePtr & pipeline_, size_t concurrency_) - : PipelineEvent(exec_status_, std::move(mem_tracker_), context_, pipeline_) + : PipelineEvent(exec_status_, std::move(mem_tracker_), req_id, context_, pipeline_) , concurrency(concurrency_) {} diff --git a/dbms/src/Flash/Pipeline/Schedule/Events/tests/gtest_event.cpp b/dbms/src/Flash/Pipeline/Schedule/Events/tests/gtest_event.cpp index 1c1dd6559a4..e830a7c04e0 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Events/tests/gtest_event.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/Events/tests/gtest_event.cpp @@ -31,7 +31,7 @@ class BaseTask : public EventTask PipelineExecutorStatus & exec_status_, const EventPtr & event_, std::atomic_int64_t & counter_) - : EventTask(nullptr, exec_status_, event_) + : EventTask(exec_status_, event_) , counter(counter_) {} @@ -82,7 +82,7 @@ class RunTask : public EventTask RunTask( PipelineExecutorStatus & exec_status_, const EventPtr & event_) - : EventTask(nullptr, exec_status_, event_) + : EventTask(exec_status_, event_) {} protected: @@ -129,7 +129,7 @@ class DeadLoopTask : public EventTask DeadLoopTask( PipelineExecutorStatus & exec_status_, const EventPtr & event_) - : EventTask(nullptr, exec_status_, event_) + : EventTask(exec_status_, event_) {} protected: @@ -213,7 +213,7 @@ class ThrowExceptionTask : public EventTask ThrowExceptionTask( PipelineExecutorStatus & exec_status_, const EventPtr & event_) - : EventTask(nullptr, exec_status_, event_) + : EventTask(exec_status_, event_) {} protected: diff --git a/dbms/src/Flash/Pipeline/Schedule/TaskQueues/FiFOTaskQueue.h b/dbms/src/Flash/Pipeline/Schedule/TaskQueues/FiFOTaskQueue.h index 1567421593c..64e34e1da5f 100644 --- a/dbms/src/Flash/Pipeline/Schedule/TaskQueues/FiFOTaskQueue.h +++ b/dbms/src/Flash/Pipeline/Schedule/TaskQueues/FiFOTaskQueue.h @@ -14,7 +14,6 @@ #pragma once -#include #include #include @@ -40,7 +39,5 @@ class FIFOTaskQueue : public TaskQueue std::condition_variable cv; bool is_closed = false; std::deque task_queue; - - LoggerPtr logger = Logger::get(); }; } // namespace DB diff --git a/dbms/src/Flash/Pipeline/Schedule/TaskQueues/TaskQueue.h b/dbms/src/Flash/Pipeline/Schedule/TaskQueues/TaskQueue.h index 72c8cf0281a..8b25d07e59b 100644 --- a/dbms/src/Flash/Pipeline/Schedule/TaskQueues/TaskQueue.h +++ b/dbms/src/Flash/Pipeline/Schedule/TaskQueues/TaskQueue.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include @@ -39,6 +40,9 @@ class TaskQueue virtual bool empty() noexcept = 0; virtual void close() = 0; + +protected: + LoggerPtr logger = Logger::get(); }; using TaskQueuePtr = std::unique_ptr; diff --git a/dbms/src/Flash/Pipeline/Schedule/TaskQueues/tests/gtest_fifo.cpp b/dbms/src/Flash/Pipeline/Schedule/TaskQueues/tests/gtest_fifo.cpp index 545c5e4b699..591d0f55814 100644 --- a/dbms/src/Flash/Pipeline/Schedule/TaskQueues/tests/gtest_fifo.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/TaskQueues/tests/gtest_fifo.cpp @@ -27,8 +27,7 @@ class IndexTask : public Task { public: explicit IndexTask(size_t index_) - : Task(nullptr) - , index(index_) + : index(index_) {} ExecTaskStatus executeImpl() noexcept override { return ExecTaskStatus::FINISHED; } diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/EventTask.cpp b/dbms/src/Flash/Pipeline/Schedule/Tasks/EventTask.cpp index 932bd1f0ba9..82605f95402 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/EventTask.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/EventTask.cpp @@ -12,15 +12,31 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include namespace DB { +namespace FailPoints +{ +extern const char random_pipeline_model_task_run_failpoint[]; +} // namespace FailPoints + +EventTask::EventTask( + PipelineExecutorStatus & exec_status_, + const EventPtr & event_) + : exec_status(exec_status_) + , event(event_) +{ + assert(event); +} + EventTask::EventTask( MemoryTrackerPtr mem_tracker_, + const String & req_id, PipelineExecutorStatus & exec_status_, const EventPtr & event_) - : Task(std::move(mem_tracker_)) + : Task(std::move(mem_tracker_), req_id) , exec_status(exec_status_) , event(event_) { @@ -45,7 +61,7 @@ void EventTask::finalize() noexcept catch (...) { // ignore exception from finalizeImpl. - // TODO add log here. + LOG_WARNING(log, "finalizeImpl throw exception: {}", getCurrentExceptionMessage(true, true)); } } @@ -59,4 +75,33 @@ ExecTaskStatus EventTask::awaitImpl() noexcept return doTaskAction([&] { return doAwaitImpl(); }); } +ExecTaskStatus EventTask::doTaskAction(std::function && action) +{ + if (unlikely(exec_status.isCancelled())) + { + finalize(); + return ExecTaskStatus::CANCELLED; + } + try + { + auto status = action(); + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_pipeline_model_task_run_failpoint); + switch (status) + { + case FINISH_STATUS: + finalize(); + default: + return status; + } + } + catch (...) + { + finalize(); + assert(event); + LOG_WARNING(log, "error occurred and cancel the query"); + exec_status.onErrorOccurred(std::current_exception()); + return ExecTaskStatus::ERROR; + } +} + } // namespace DB diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/EventTask.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/EventTask.h index 4a15c44995e..2b884831bd4 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/EventTask.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/EventTask.h @@ -14,7 +14,6 @@ #pragma once -#include #include #include #include @@ -22,17 +21,16 @@ namespace DB { -namespace FailPoints -{ -extern const char random_pipeline_model_task_run_failpoint[]; -} // namespace FailPoints - // The base class of event related task. class EventTask : public Task { public: + EventTask( + PipelineExecutorStatus & exec_status_, + const EventPtr & event_); EventTask( MemoryTrackerPtr mem_tracker_, + const String & req_id, PipelineExecutorStatus & exec_status_, const EventPtr & event_); @@ -50,34 +48,7 @@ class EventTask : public Task virtual void finalizeImpl(){}; private: - template - ExecTaskStatus doTaskAction(Action && action) noexcept - { - if (unlikely(exec_status.isCancelled())) - { - finalize(); - return ExecTaskStatus::CANCELLED; - } - try - { - auto status = action(); - FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_pipeline_model_task_run_failpoint); - switch (status) - { - case FINISH_STATUS: - finalize(); - default: - return status; - } - } - catch (...) - { - finalize(); - assert(event); - exec_status.onErrorOccurred(std::current_exception()); - return ExecTaskStatus::ERROR; - } - } + ExecTaskStatus doTaskAction(std::function && action); private: PipelineExecutorStatus & exec_status; diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.cpp b/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.cpp index 12a3ee4e591..e5820b8d893 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.cpp @@ -21,10 +21,11 @@ namespace DB { PipelineTask::PipelineTask( MemoryTrackerPtr mem_tracker_, + const String & req_id, PipelineExecutorStatus & exec_status_, const EventPtr & event_, PipelineExecPtr && pipeline_exec_) - : EventTask(std::move(mem_tracker_), exec_status_, event_) + : EventTask(std::move(mem_tracker_), req_id, exec_status_, event_) , pipeline_exec(std::move(pipeline_exec_)) { assert(pipeline_exec); diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.h index 2758723e087..35425785e90 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.h @@ -24,6 +24,7 @@ class PipelineTask : public EventTask public: PipelineTask( MemoryTrackerPtr mem_tracker_, + const String & req_id, PipelineExecutorStatus & exec_status_, const EventPtr & event_, PipelineExecPtr && pipeline_exec_); diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h index dacae1eacdc..1d13ca6bf18 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h @@ -15,9 +15,13 @@ #pragma once #include +#include #include +#include #include +#include + namespace DB { namespace FailPoints @@ -35,6 +39,7 @@ extern const char random_pipeline_model_task_construct_failpoint[]; */ enum class ExecTaskStatus { + INIT, WAITING, RUNNING, FINISHED, @@ -45,15 +50,23 @@ enum class ExecTaskStatus class Task { public: - explicit Task(MemoryTrackerPtr mem_tracker_) + Task() + : mem_tracker(nullptr) + , log(Logger::get()) + { + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_pipeline_model_task_construct_failpoint); + } + + Task(MemoryTrackerPtr mem_tracker_, const String & req_id) : mem_tracker(std::move(mem_tracker_)) + , log(Logger::get(req_id)) { FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_pipeline_model_task_construct_failpoint); } virtual ~Task() = default; - MemoryTrackerPtr getMemTracker() + MemoryTrackerPtr getMemTracker() const { return mem_tracker; } @@ -61,21 +74,38 @@ class Task ExecTaskStatus execute() noexcept { assert(getMemTracker().get() == current_memory_tracker); - return executeImpl(); + switchStatus(executeImpl()); + return exec_status; } - // Avoid allocating memory in `await` if possible. + ExecTaskStatus await() noexcept { assert(getMemTracker().get() == current_memory_tracker); - return awaitImpl(); + switchStatus(awaitImpl()); + return exec_status; } protected: virtual ExecTaskStatus executeImpl() noexcept = 0; + // Avoid allocating memory in `await` if possible. virtual ExecTaskStatus awaitImpl() noexcept { return ExecTaskStatus::RUNNING; } private: + void switchStatus(ExecTaskStatus to) noexcept + { + if (exec_status != to) + { + LOG_TRACE(log, "switch status: {} --> {}", magic_enum::enum_name(exec_status), magic_enum::enum_name(to)); + exec_status = to; + } + } + +protected: MemoryTrackerPtr mem_tracker; + LoggerPtr log; + +private: + ExecTaskStatus exec_status{ExecTaskStatus::INIT}; }; using TaskPtr = std::unique_ptr; diff --git a/dbms/src/Flash/Pipeline/Schedule/tests/gtest_task_scheduler.cpp b/dbms/src/Flash/Pipeline/Schedule/tests/gtest_task_scheduler.cpp index 7ee985b59a4..67cb76d472a 100644 --- a/dbms/src/Flash/Pipeline/Schedule/tests/gtest_task_scheduler.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/tests/gtest_task_scheduler.cpp @@ -55,8 +55,7 @@ class SimpleTask : public Task { public: explicit SimpleTask(Waiter & waiter_) - : Task(nullptr) - , waiter(waiter_) + : waiter(waiter_) {} ~SimpleTask() @@ -81,8 +80,7 @@ class SimpleWaitingTask : public Task { public: explicit SimpleWaitingTask(Waiter & waiter_) - : Task(nullptr) - , waiter(waiter_) + : waiter(waiter_) {} ~SimpleWaitingTask() @@ -136,7 +134,7 @@ class MemoryTraceTask : public Task { public: MemoryTraceTask(MemoryTrackerPtr mem_tracker_, Waiter & waiter_) - : Task(std::move(mem_tracker_)) + : Task(std::move(mem_tracker_), "") , waiter(waiter_) {} @@ -181,11 +179,6 @@ class MemoryTraceTask : public Task class DeadLoopTask : public Task { -public: - DeadLoopTask() - : Task(nullptr) - {} - protected: ExecTaskStatus executeImpl() noexcept override { diff --git a/dbms/src/Flash/Planner/PhysicalPlan.cpp b/dbms/src/Flash/Planner/PhysicalPlan.cpp index 059a681b867..32b75f2f7a2 100644 --- a/dbms/src/Flash/Planner/PhysicalPlan.cpp +++ b/dbms/src/Flash/Planner/PhysicalPlan.cpp @@ -302,7 +302,7 @@ void PhysicalPlan::buildBlockInputStream(DAGPipeline & pipeline, Context & conte PipelinePtr PhysicalPlan::toPipeline() { assert(root_node); - PipelineBuilder builder; + PipelineBuilder builder{log->identifier()}; root_node->buildPipeline(builder); root_node.reset(); auto pipeline = builder.build(); diff --git a/dbms/src/Flash/Planner/PhysicalPlanNode.cpp b/dbms/src/Flash/Planner/PhysicalPlanNode.cpp index 413aa556fd4..cf200be1d6b 100644 --- a/dbms/src/Flash/Planner/PhysicalPlanNode.cpp +++ b/dbms/src/Flash/Planner/PhysicalPlanNode.cpp @@ -32,7 +32,7 @@ PhysicalPlanNode::PhysicalPlanNode( : executor_id(executor_id_) , type(type_) , schema(schema_) - , log(Logger::get(type_.toString(), req_id)) + , log(Logger::get(req_id, type_.toString(), executor_id_)) {} String PhysicalPlanNode::toString() diff --git a/dbms/src/Flash/Planner/Plans/PhysicalExchangeReceiver.cpp b/dbms/src/Flash/Planner/Plans/PhysicalExchangeReceiver.cpp index 7afb65a71f3..9a74346cb98 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalExchangeReceiver.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalExchangeReceiver.cpp @@ -101,10 +101,9 @@ void PhysicalExchangeReceiver::buildPipelineExec(PipelineExecGroupBuilder & grou group_builder.transform([&](auto & builder) { builder.setSourceOp(std::make_unique( group_builder.exec_status, - mpp_exchange_receiver, - /*stream_id=*/0, log->identifier(), - execId())); + mpp_exchange_receiver, + /*stream_id=*/0)); }); } diff --git a/dbms/src/Flash/Planner/Plans/PhysicalExchangeSender.cpp b/dbms/src/Flash/Planner/Plans/PhysicalExchangeSender.cpp index 8c06dd5c1ac..0e161469fba 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalExchangeSender.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalExchangeSender.cpp @@ -108,7 +108,7 @@ void PhysicalExchangeSender::buildPipelineExec(PipelineExecGroupBuilder & group_ context.getSettingsRef().batch_send_min_limit_compression, log->identifier(), /*is_async=*/true); - builder.setSinkOp(std::make_unique(group_builder.exec_status, std::move(response_writer), log->identifier())); + builder.setSinkOp(std::make_unique(group_builder.exec_status, log->identifier(), std::move(response_writer))); }); } diff --git a/dbms/src/Flash/Planner/Plans/PhysicalExpand.cpp b/dbms/src/Flash/Planner/Plans/PhysicalExpand.cpp index a710ac0fc40..fdba8dd1b8a 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalExpand.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalExpand.cpp @@ -85,7 +85,7 @@ void PhysicalExpand::expandTransform(DAGPipeline & child_pipeline) void PhysicalExpand::buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context &, size_t) { group_builder.transform([&](auto & builder) { - builder.appendTransformOp(std::make_unique(group_builder.exec_status, expand_actions, log->identifier())); + builder.appendTransformOp(std::make_unique(group_builder.exec_status, log->identifier(), expand_actions)); }); } diff --git a/dbms/src/Flash/Planner/Plans/PhysicalFilter.cpp b/dbms/src/Flash/Planner/Plans/PhysicalFilter.cpp index 27f08f0be82..a3d353a60a1 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalFilter.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalFilter.cpp @@ -61,7 +61,7 @@ void PhysicalFilter::buildPipelineExec(PipelineExecGroupBuilder & group_builder, { auto input_header = group_builder.getCurrentHeader(); group_builder.transform([&](auto & builder) { - builder.appendTransformOp(std::make_unique(group_builder.exec_status, input_header, before_filter_actions, filter_column, log->identifier())); + builder.appendTransformOp(std::make_unique(group_builder.exec_status, log->identifier(), input_header, before_filter_actions, filter_column)); }); } diff --git a/dbms/src/Flash/Planner/Plans/PhysicalGetResultSink.cpp b/dbms/src/Flash/Planner/Plans/PhysicalGetResultSink.cpp index bb3a4f4888e..e1ac74f8b32 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalGetResultSink.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalGetResultSink.cpp @@ -29,7 +29,7 @@ void PhysicalGetResultSink::buildPipelineExec(PipelineExecGroupBuilder & group_b { auto this_shared_ptr = std::static_pointer_cast(shared_from_this()); group_builder.transform([&](auto & builder) { - builder.setSinkOp(std::make_unique(group_builder.exec_status, this_shared_ptr)); + builder.setSinkOp(std::make_unique(group_builder.exec_status, log->identifier(), this_shared_ptr)); }); } } // namespace DB diff --git a/dbms/src/Flash/Planner/Plans/PhysicalLimit.cpp b/dbms/src/Flash/Planner/Plans/PhysicalLimit.cpp index 0c0edd0e975..5144ad6c11d 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalLimit.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalLimit.cpp @@ -58,7 +58,7 @@ void PhysicalLimit::buildPipelineExec(PipelineExecGroupBuilder & group_builder, auto input_header = group_builder.getCurrentHeader(); auto global_limit = std::make_shared(input_header, limit); group_builder.transform([&](auto & builder) { - builder.appendTransformOp(std::make_unique(group_builder.exec_status, global_limit, log->identifier())); + builder.appendTransformOp(std::make_unique(group_builder.exec_status, log->identifier(), global_limit)); }); } diff --git a/dbms/src/Flash/Planner/Plans/PhysicalMockExchangeReceiver.cpp b/dbms/src/Flash/Planner/Plans/PhysicalMockExchangeReceiver.cpp index ae109b35207..7d5dca38f14 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalMockExchangeReceiver.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalMockExchangeReceiver.cpp @@ -68,7 +68,7 @@ void PhysicalMockExchangeReceiver::buildPipelineExec(PipelineExecGroupBuilder & group_builder.init(mock_streams.size()); size_t i = 0; group_builder.transform([&](auto & builder) { - builder.setSourceOp(std::make_unique(group_builder.exec_status, mock_streams[i++])); + builder.setSourceOp(std::make_unique(group_builder.exec_status, log->identifier(), mock_streams[i++])); }); } diff --git a/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp b/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp index 1411dce0f70..422af263138 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp @@ -116,7 +116,7 @@ void PhysicalMockTableScan::buildPipelineExec(PipelineExecGroupBuilder & group_b group_builder.init(mock_streams.size()); size_t i = 0; group_builder.transform([&](auto & builder) { - builder.setSourceOp(std::make_unique(group_builder.exec_status, mock_streams[i++])); + builder.setSourceOp(std::make_unique(group_builder.exec_status, log->identifier(), mock_streams[i++])); }); } diff --git a/dbms/src/Flash/Planner/Plans/PhysicalProjection.cpp b/dbms/src/Flash/Planner/Plans/PhysicalProjection.cpp index dccfc7bd7b5..9eea80a077c 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalProjection.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalProjection.cpp @@ -148,7 +148,7 @@ void PhysicalProjection::buildPipelineExec(PipelineExecGroupBuilder & group_buil if (project_actions && !project_actions->getActions().empty()) { group_builder.transform([&](auto & builder) { - builder.appendTransformOp(std::make_unique(group_builder.exec_status, project_actions, log->identifier())); + builder.appendTransformOp(std::make_unique(group_builder.exec_status, log->identifier(), project_actions)); }); } } diff --git a/dbms/src/Flash/Planner/Plans/PhysicalTopN.cpp b/dbms/src/Flash/Planner/Plans/PhysicalTopN.cpp index eddcc5af425..5fe779b6e46 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalTopN.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalTopN.cpp @@ -73,11 +73,11 @@ void PhysicalTopN::buildPipelineExec(PipelineExecGroupBuilder & group_builder, C if (!before_sort_actions->getActions().empty()) { group_builder.transform([&](auto & builder) { - builder.appendTransformOp(std::make_unique(group_builder.exec_status, before_sort_actions, log->identifier())); + builder.appendTransformOp(std::make_unique(group_builder.exec_status, log->identifier(), before_sort_actions)); }); } group_builder.transform([&](auto & builder) { - builder.appendTransformOp(std::make_unique(group_builder.exec_status, order_descr, limit, context.getSettingsRef().max_block_size, log->identifier())); + builder.appendTransformOp(std::make_unique(group_builder.exec_status, log->identifier(), order_descr, limit, context.getSettingsRef().max_block_size)); }); } diff --git a/dbms/src/Flash/executeQuery.cpp b/dbms/src/Flash/executeQuery.cpp index 102d4bd3a7a..b4ac4bb6d69 100644 --- a/dbms/src/Flash/executeQuery.cpp +++ b/dbms/src/Flash/executeQuery.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include #include @@ -73,7 +74,7 @@ ProcessList::EntryPtr getProcessListEntry(Context & context, DAGContext & dag_co } } -BlockIO doExecuteAsBlockIO(IQuerySource & dag, Context & context, bool internal) +QueryExecutorPtr doExecuteAsBlockIO(IQuerySource & dag, Context & context, bool internal) { RUNTIME_ASSERT(context.getDAGContext()); auto & dag_context = *context.getDAGContext(); @@ -92,8 +93,13 @@ BlockIO doExecuteAsBlockIO(IQuerySource & dag, Context & context, bool internal) FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_interpreter_failpoint); auto interpreter = dag.interpreter(context, QueryProcessingStage::Complete); BlockIO res = interpreter->execute(); + MemoryTrackerPtr memory_tracker; if (likely(process_list_entry)) + { (*process_list_entry)->setQueryStreams(res); + memory_tracker = (*process_list_entry)->getMemoryTrackerPtr(); + } + /// Hold element of process list till end of query execution. res.process_list_entry = process_list_entry; @@ -102,7 +108,7 @@ BlockIO doExecuteAsBlockIO(IQuerySource & dag, Context & context, bool internal) if (likely(!internal)) logQueryPipeline(logger, res.in); - return res; + return std::make_unique(memory_tracker, context, logger->identifier(), res.in); } std::optional executeAsPipeline(Context & context, bool internal) @@ -124,13 +130,17 @@ std::optional executeAsPipeline(Context & context, bool intern logQuery(dag_context.dummy_query_string, context, logger); } + MemoryTrackerPtr memory_tracker; + if (likely(process_list_entry)) + memory_tracker = (*process_list_entry)->getMemoryTrackerPtr(); + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_interpreter_failpoint); PhysicalPlan physical_plan{context, logger->identifier()}; physical_plan.build(dag_context.dag_request); physical_plan.outputAndOptimize(); auto pipeline = physical_plan.toPipeline(); - auto executor = std::make_unique(process_list_entry, context, pipeline); + auto executor = std::make_unique(memory_tracker, context, logger->identifier(), pipeline); if (likely(!internal)) LOG_DEBUG(logger, fmt::format("Query pipeline:\n{}", executor->toString())); return {std::move(executor)}; @@ -141,12 +151,12 @@ QueryExecutorPtr executeAsBlockIO(Context & context, bool internal) if (context.getSettingsRef().enable_planner) { PlanQuerySource plan(context); - return std::make_unique(context, doExecuteAsBlockIO(plan, context, internal)); + return doExecuteAsBlockIO(plan, context, internal); } else { DAGQuerySource dag(context); - return std::make_unique(context, doExecuteAsBlockIO(dag, context, internal)); + return doExecuteAsBlockIO(dag, context, internal); } } } // namespace diff --git a/dbms/src/Operators/BlockInputStreamSourceOp.cpp b/dbms/src/Operators/BlockInputStreamSourceOp.cpp index d3dafe1dad4..8aa3719596a 100644 --- a/dbms/src/Operators/BlockInputStreamSourceOp.cpp +++ b/dbms/src/Operators/BlockInputStreamSourceOp.cpp @@ -19,25 +19,28 @@ namespace DB { BlockInputStreamSourceOp::BlockInputStreamSourceOp( PipelineExecutorStatus & exec_status_, + const String & req_id, const BlockInputStreamPtr & impl_) - : SourceOp(exec_status_) + : SourceOp(exec_status_, req_id) , impl(impl_) { - impl->readPrefix(); + assert(impl); setHeader(impl->getHeader()); } -OperatorStatus BlockInputStreamSourceOp::readImpl(Block & block) +void BlockInputStreamSourceOp::operatePrefix() { - if (unlikely(finished)) - return OperatorStatus::HAS_OUTPUT; + impl->readPrefix(); +} +void BlockInputStreamSourceOp::operateSuffix() +{ + impl->readSuffix(); +} + +OperatorStatus BlockInputStreamSourceOp::readImpl(Block & block) +{ block = impl->read(); - if (unlikely(!block)) - { - impl->readSuffix(); - finished = true; - } return OperatorStatus::HAS_OUTPUT; } } // namespace DB diff --git a/dbms/src/Operators/BlockInputStreamSourceOp.h b/dbms/src/Operators/BlockInputStreamSourceOp.h index ec64b5744ec..c37df9fce26 100644 --- a/dbms/src/Operators/BlockInputStreamSourceOp.h +++ b/dbms/src/Operators/BlockInputStreamSourceOp.h @@ -27,18 +27,23 @@ using BlockInputStreamPtr = std::shared_ptr; class BlockInputStreamSourceOp : public SourceOp { public: - BlockInputStreamSourceOp(PipelineExecutorStatus & exec_status_, const BlockInputStreamPtr & impl_); + BlockInputStreamSourceOp( + PipelineExecutorStatus & exec_status_, + const String & req_id, + const BlockInputStreamPtr & impl_); String getName() const override { return "BlockInputStreamSourceOp"; } + void operatePrefix() override; + void operateSuffix() override; + protected: OperatorStatus readImpl(Block & block) override; private: BlockInputStreamPtr impl; - bool finished = false; }; } // namespace DB diff --git a/dbms/src/Operators/ExchangeReceiverSourceOp.h b/dbms/src/Operators/ExchangeReceiverSourceOp.h index b6bfb4ad39f..de0d68b21e1 100644 --- a/dbms/src/Operators/ExchangeReceiverSourceOp.h +++ b/dbms/src/Operators/ExchangeReceiverSourceOp.h @@ -26,14 +26,12 @@ class ExchangeReceiverSourceOp : public SourceOp public: ExchangeReceiverSourceOp( PipelineExecutorStatus & exec_status_, - const std::shared_ptr & exchange_receiver_, - size_t stream_id_, const String & req_id, - const String & executor_id) - : SourceOp(exec_status_) + const std::shared_ptr & exchange_receiver_, + size_t stream_id_) + : SourceOp(exec_status_, req_id) , exchange_receiver(exchange_receiver_) , stream_id(stream_id_) - , log(Logger::get(req_id, executor_id)) { setHeader(Block(getColumnWithTypeAndName(toNamesAndTypes(exchange_receiver->getOutputSchema())))); decoder_ptr = std::make_unique(getHeader(), 8192); @@ -44,13 +42,13 @@ class ExchangeReceiverSourceOp : public SourceOp return "ExchangeReceiverSourceOp"; } + void operateSuffix() override; + protected: OperatorStatus readImpl(Block & block) override; OperatorStatus awaitImpl() override; - void operateSuffix() override; - private: Block popFromBlockQueue(); @@ -64,6 +62,5 @@ class ExchangeReceiverSourceOp : public SourceOp std::optional recv_res; size_t stream_id; - const LoggerPtr log; }; } // namespace DB diff --git a/dbms/src/Operators/ExchangeSenderSinkOp.h b/dbms/src/Operators/ExchangeSenderSinkOp.h index 037390f64e3..25bddc4c020 100644 --- a/dbms/src/Operators/ExchangeSenderSinkOp.h +++ b/dbms/src/Operators/ExchangeSenderSinkOp.h @@ -25,11 +25,10 @@ class ExchangeSenderSinkOp : public SinkOp public: ExchangeSenderSinkOp( PipelineExecutorStatus & exec_status_, - std::unique_ptr && writer, - const String & req_id) - : SinkOp(exec_status_) + const String & req_id, + std::unique_ptr && writer) + : SinkOp(exec_status_, req_id) , writer(std::move(writer)) - , log(Logger::get(req_id)) { } @@ -41,6 +40,7 @@ class ExchangeSenderSinkOp : public SinkOp void operatePrefix() override; void operateSuffix() override; +protected: OperatorStatus writeImpl(Block && block) override; OperatorStatus prepareImpl() override; @@ -49,7 +49,6 @@ class ExchangeSenderSinkOp : public SinkOp private: std::unique_ptr writer; - const LoggerPtr log; size_t total_rows = 0; }; } // namespace DB diff --git a/dbms/src/Operators/ExpressionTransformOp.h b/dbms/src/Operators/ExpressionTransformOp.h index ff53785719e..a8a485d876a 100644 --- a/dbms/src/Operators/ExpressionTransformOp.h +++ b/dbms/src/Operators/ExpressionTransformOp.h @@ -27,11 +27,10 @@ class ExpressionTransformOp : public TransformOp public: ExpressionTransformOp( PipelineExecutorStatus & exec_status_, - const ExpressionActionsPtr & expression_, - const String & req_id) - : TransformOp(exec_status_) + const String & req_id, + const ExpressionActionsPtr & expression_) + : TransformOp(exec_status_, req_id) , expression(expression_) - , log(Logger::get(req_id)) {} String getName() const override @@ -46,6 +45,5 @@ class ExpressionTransformOp : public TransformOp private: ExpressionActionsPtr expression; - const LoggerPtr log; }; } // namespace DB diff --git a/dbms/src/Operators/FilterTransformOp.h b/dbms/src/Operators/FilterTransformOp.h index 4636bacf8c5..a9216b9dc12 100644 --- a/dbms/src/Operators/FilterTransformOp.h +++ b/dbms/src/Operators/FilterTransformOp.h @@ -25,13 +25,12 @@ class FilterTransformOp : public TransformOp public: FilterTransformOp( PipelineExecutorStatus & exec_status_, + const String & req_id, const Block & input_header, const ExpressionActionsPtr & expression, - const String & filter_column_name, - const String & req_id) - : TransformOp(exec_status_) + const String & filter_column_name) + : TransformOp(exec_status_, req_id) , filter_transform_action(input_header, expression, filter_column_name) - , log(Logger::get(req_id)) {} String getName() const override @@ -47,6 +46,5 @@ class FilterTransformOp : public TransformOp private: FilterTransformAction filter_transform_action; FilterPtr filter_ignored = nullptr; - const LoggerPtr log; }; } // namespace DB diff --git a/dbms/src/Operators/GetResultSinkOp.h b/dbms/src/Operators/GetResultSinkOp.h index e9eb8d7f719..d5f4aab4dac 100644 --- a/dbms/src/Operators/GetResultSinkOp.h +++ b/dbms/src/Operators/GetResultSinkOp.h @@ -27,8 +27,9 @@ class GetResultSinkOp : public SinkOp public: GetResultSinkOp( PipelineExecutorStatus & exec_status_, + const String & req_id, const PhysicalGetResultSinkPtr & physical_sink_) - : SinkOp(exec_status_) + : SinkOp(exec_status_, req_id) , physical_sink(physical_sink_) { assert(physical_sink); diff --git a/dbms/src/Operators/LimitTransformOp.h b/dbms/src/Operators/LimitTransformOp.h index bd6ab0feeb5..f671cdf99bf 100644 --- a/dbms/src/Operators/LimitTransformOp.h +++ b/dbms/src/Operators/LimitTransformOp.h @@ -27,11 +27,10 @@ class LimitTransformOp : public TransformOp public: LimitTransformOp( PipelineExecutorStatus & exec_status_, - const GlobalLimitPtr & action_, - const String & req_id) - : TransformOp(exec_status_) + const String & req_id, + const GlobalLimitPtr & action_) + : TransformOp(exec_status_, req_id) , action(action_) - , log(Logger::get(req_id)) {} String getName() const override @@ -46,6 +45,5 @@ class LimitTransformOp : public TransformOp private: GlobalLimitPtr action; - const LoggerPtr log; }; } // namespace DB diff --git a/dbms/src/Operators/Operator.h b/dbms/src/Operators/Operator.h index 18181cc12a7..24baf7b2784 100644 --- a/dbms/src/Operators/Operator.h +++ b/dbms/src/Operators/Operator.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include @@ -48,8 +49,9 @@ class PipelineExecutorStatus; class Operator { public: - explicit Operator(PipelineExecutorStatus & exec_status_) + Operator(PipelineExecutorStatus & exec_status_, const String & req_id) : exec_status(exec_status_) + , log(Logger::get(req_id)) {} virtual ~Operator() = default; @@ -80,6 +82,7 @@ class Operator protected: PipelineExecutorStatus & exec_status; + const LoggerPtr log; Block header; }; @@ -87,8 +90,8 @@ class Operator class SourceOp : public Operator { public: - explicit SourceOp(PipelineExecutorStatus & exec_status_) - : Operator(exec_status_) + SourceOp(PipelineExecutorStatus & exec_status_, const String & req_id) + : Operator(exec_status_, req_id) {} // read will inplace the block when return status is HAS_OUTPUT; // Even after source has finished, source op still needs to return an empty block and HAS_OUTPUT, @@ -103,8 +106,8 @@ using SourceOpPtr = std::unique_ptr; class TransformOp : public Operator { public: - explicit TransformOp(PipelineExecutorStatus & exec_status_) - : Operator(exec_status_) + TransformOp(PipelineExecutorStatus & exec_status_, const String & req_id) + : Operator(exec_status_, req_id) {} // running status may return are NEED_INPUT and HAS_OUTPUT here. // tryOutput will inplace the block when return status is HAS_OUPUT; do nothing to the block when NEED_INPUT or others. @@ -134,8 +137,8 @@ using TransformOps = std::vector; class SinkOp : public Operator { public: - explicit SinkOp(PipelineExecutorStatus & exec_status_) - : Operator(exec_status_) + SinkOp(PipelineExecutorStatus & exec_status_, const String & req_id) + : Operator(exec_status_, req_id) {} OperatorStatus prepare(); virtual OperatorStatus prepareImpl() { return OperatorStatus::NEED_INPUT; } diff --git a/dbms/src/Operators/TopNTransformOp.h b/dbms/src/Operators/TopNTransformOp.h index c77ff4707eb..546d4cdfcde 100644 --- a/dbms/src/Operators/TopNTransformOp.h +++ b/dbms/src/Operators/TopNTransformOp.h @@ -26,12 +26,11 @@ class TopNTransformOp : public TransformOp public: TopNTransformOp( PipelineExecutorStatus & exec_status_, + const String & req_id_, const SortDescription & order_desc_, size_t limit_, - size_t max_block_size_, - const String & req_id_) - : TransformOp(exec_status_) - , log(Logger::get(req_id_)) + size_t max_block_size_) + : TransformOp(exec_status_, req_id_) , order_desc(order_desc_) , limit(limit_) , max_block_size(max_block_size_) @@ -51,7 +50,6 @@ class TopNTransformOp : public TransformOp private: - const LoggerPtr log; SortDescription order_desc; size_t limit; size_t max_block_size;