Skip to content

Commit

Permalink
Pipeline: refine log (#6921)
Browse files Browse the repository at this point in the history
ref #6518
  • Loading branch information
SeaRise authored Mar 2, 2023
1 parent 7876b80 commit 12616a3
Show file tree
Hide file tree
Showing 48 changed files with 293 additions and 181 deletions.
11 changes: 7 additions & 4 deletions dbms/src/Flash/Executor/DataStreamExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,19 @@
#include <Common/FmtUtils.h>
#include <Common/TiFlashMetrics.h>
#include <Common/getNumberOfCPUCores.h>
#include <DataStreams/BlockIO.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Executor/DataStreamExecutor.h>

namespace DB
{
DataStreamExecutor::DataStreamExecutor(Context & context_, const BlockIO & block_io)
: QueryExecutor(block_io.process_list_entry, context_)
, data_stream(block_io.in)
DataStreamExecutor::DataStreamExecutor(
const MemoryTrackerPtr & memory_tracker_,
Context & context_,
const String & req_id,
const BlockInputStreamPtr & data_stream_)
: QueryExecutor(memory_tracker_, context_, req_id)
, data_stream(data_stream_)
{
assert(data_stream);
thread_cnt_before_execute = GET_METRIC(tiflash_thread_count, type_active_threads_of_thdpool).Value();
Expand Down
8 changes: 5 additions & 3 deletions dbms/src/Flash/Executor/DataStreamExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ namespace DB
class IBlockInputStream;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;

struct BlockIO;

class DataStreamExecutor : public QueryExecutor
{
public:
DataStreamExecutor(Context & context_, const BlockIO & block_io);
DataStreamExecutor(
const MemoryTrackerPtr & memory_tracker_,
Context & context_,
const String & req_id,
const BlockInputStreamPtr & data_stream_);

String toString() const override;

Expand Down
6 changes: 4 additions & 2 deletions dbms/src/Flash/Executor/PipelineExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
namespace DB
{
PipelineExecutor::PipelineExecutor(
const ProcessListEntryPtr & process_list_entry_,
const MemoryTrackerPtr & memory_tracker_,
Context & context_,
const String & req_id,
const PipelinePtr & root_pipeline_)
: QueryExecutor(process_list_entry_, context_)
: QueryExecutor(memory_tracker_, context_, req_id)
, root_pipeline(root_pipeline_)
, status(req_id)
{
assert(root_pipeline);
}
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Executor/PipelineExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ class PipelineExecutor : public QueryExecutor
{
public:
PipelineExecutor(
const ProcessListEntryPtr & process_list_entry_,
const MemoryTrackerPtr & memory_tracker_,
Context & context_,
const String & req_id,
const PipelinePtr & root_pipeline_);

String toString() const override;
Expand Down
32 changes: 21 additions & 11 deletions dbms/src/Flash/Executor/PipelineExecutorStatus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ std::exception_ptr PipelineExecutorStatus::getExceptionPtr() noexcept

String PipelineExecutorStatus::getExceptionMsg() noexcept
{
std::lock_guard lock(mu);
try
{
if (!exception_ptr)
auto cur_exception_ptr = getExceptionPtr();
if (!cur_exception_ptr)
return "";
std::rethrow_exception(exception_ptr);
std::rethrow_exception(cur_exception_ptr);
}
catch (const DB::Exception & e)
{
Expand All @@ -56,22 +56,32 @@ void PipelineExecutorStatus::onErrorOccurred(const String & err_msg) noexcept
onErrorOccurred(std::make_exception_ptr(e));
}

void PipelineExecutorStatus::onErrorOccurred(const std::exception_ptr & exception_ptr_) noexcept
bool PipelineExecutorStatus::setExceptionPtr(const std::exception_ptr & exception_ptr_) noexcept
{
assert(exception_ptr_ != nullptr);
std::lock_guard lock(mu);
if (exception_ptr != nullptr)
return false;
exception_ptr = exception_ptr_;
return true;
}

void PipelineExecutorStatus::onErrorOccurred(const std::exception_ptr & exception_ptr_) noexcept
{
if (setExceptionPtr(exception_ptr_))
{
std::lock_guard lock(mu);
if (exception_ptr != nullptr)
return;
exception_ptr = exception_ptr_;
cancel();
LOG_WARNING(log, "error occured and cancel the query");
}
cancel();
}

void PipelineExecutorStatus::wait() noexcept
{
std::unique_lock lock(mu);
cv.wait(lock, [&] { return 0 == active_event_count; });
{
std::unique_lock lock(mu);
cv.wait(lock, [&] { return 0 == active_event_count; });
}
LOG_DEBUG(log, "query finished and wait done");
}

void PipelineExecutorStatus::onEventSchedule() noexcept
Expand Down
16 changes: 16 additions & 0 deletions dbms/src/Flash/Executor/PipelineExecutorStatus.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <Common/Logger.h>
#include <Flash/Executor/ExecutionResult.h>

#include <atomic>
Expand All @@ -27,6 +28,14 @@ class PipelineExecutorStatus : private boost::noncopyable
public:
static constexpr auto timeout_err_msg = "error with timeout";

PipelineExecutorStatus()
: log(Logger::get())
{}

explicit PipelineExecutorStatus(const String & req_id)
: log(Logger::get(req_id))
{}

ExecutionResult toExecutionResult() noexcept;

std::exception_ptr getExceptionPtr() noexcept;
Expand All @@ -51,9 +60,11 @@ class PipelineExecutorStatus : private boost::noncopyable
}
if (is_timeout)
{
LOG_WARNING(log, "wait timeout");
onErrorOccurred(timeout_err_msg);
throw Exception(timeout_err_msg);
}
LOG_DEBUG(log, "query finished and wait done");
}

void cancel() noexcept;
Expand All @@ -64,6 +75,11 @@ class PipelineExecutorStatus : private boost::noncopyable
}

private:
bool setExceptionPtr(const std::exception_ptr & exception_ptr_) noexcept;

private:
LoggerPtr log;

std::mutex mu;
std::condition_variable cv;
std::exception_ptr exception_ptr;
Expand Down
15 changes: 9 additions & 6 deletions dbms/src/Flash/Executor/QueryExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <Common/MemoryTracker.h>
#include <Flash/Executor/ExecutionResult.h>
#include <Flash/Executor/ResultHandler.h>
#include <Flash/Executor/toRU.h>
Expand All @@ -24,18 +25,19 @@

namespace DB
{
class ProcessListEntry;
using ProcessListEntryPtr = std::shared_ptr<ProcessListEntry>;

class Context;
class DAGContext;

class QueryExecutor
{
public:
QueryExecutor(const ProcessListEntryPtr & process_list_entry_, Context & context_)
: process_list_entry(process_list_entry_)
QueryExecutor(
const MemoryTrackerPtr & memory_tracker_,
Context & context_,
const String & req_id)
: memory_tracker(memory_tracker_)
, context(context_)
, log(Logger::get(req_id))
{}

virtual ~QueryExecutor() = default;
Expand All @@ -61,8 +63,9 @@ class QueryExecutor
DAGContext & dagContext() const;

protected:
ProcessListEntryPtr process_list_entry;
MemoryTrackerPtr memory_tracker;
Context & context;
LoggerPtr log;
};

using QueryExecutorPtr = std::unique_ptr<QueryExecutor>;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ EventPtr Pipeline::toEvent(PipelineExecutorStatus & status, Context & context, s
// ```
auto memory_tracker = current_memory_tracker ? current_memory_tracker->shared_from_this() : nullptr;

auto plain_pipeline_event = std::make_shared<PlainPipelineEvent>(status, memory_tracker, context, shared_from_this(), concurrency);
auto plain_pipeline_event = std::make_shared<PlainPipelineEvent>(status, memory_tracker, log->identifier(), context, shared_from_this(), concurrency);
for (const auto & child : children)
{
auto input = child->toEvent(status, context, concurrency, all_events);
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Flash/Pipeline/Pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ using SharedQueuePtr = std::shared_ptr<SharedQueue>;
class Pipeline : public std::enable_shared_from_this<Pipeline>
{
public:
explicit Pipeline(UInt32 id_)
Pipeline(UInt32 id_, const String & req_id)
: id(id_)
, log(Logger::get(req_id, id_))
{}

void addPlanNode(const PhysicalPlanNodePtr & plan_node);
Expand All @@ -74,6 +75,7 @@ class Pipeline : public std::enable_shared_from_this<Pipeline>

private:
const UInt32 id;
LoggerPtr log;

// data flow: plan_nodes.begin() --> plan_nodes.end()
std::deque<PhysicalPlanNodePtr> plan_nodes;
Expand Down
20 changes: 13 additions & 7 deletions dbms/src/Flash/Pipeline/PipelineBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ using PipelineIdGeneratorPtr = std::shared_ptr<PipelineIdGenerator>;
class PipelineBuilder
{
public:
PipelineBuilder()
: id_generator(std::make_shared<PipelineIdGenerator>())
explicit PipelineBuilder(const String & req_id)
: log(Logger::get(req_id))
, id_generator(std::make_shared<PipelineIdGenerator>())
, pipeline_breaker(std::nullopt)
{
pipeline = std::make_shared<Pipeline>(id_generator->nextID());
pipeline = std::make_shared<Pipeline>(id_generator->nextID(), req_id);
}

private:
Expand All @@ -61,11 +62,15 @@ class PipelineBuilder
const PhysicalPlanNodePtr breaker_node;
};

PipelineBuilder(const PipelineIdGeneratorPtr & id_generator_, PipelineBreaker && pipeline_breaker_)
: id_generator(id_generator_)
PipelineBuilder(
const PipelineIdGeneratorPtr & id_generator_,
PipelineBreaker && pipeline_breaker_,
const String & req_id)
: log(Logger::get(req_id))
, id_generator(id_generator_)
, pipeline_breaker(std::move(pipeline_breaker_))
{
pipeline = std::make_shared<Pipeline>(id_generator->nextID());
pipeline = std::make_shared<Pipeline>(id_generator->nextID(), req_id);
}

public:
Expand All @@ -77,7 +82,7 @@ class PipelineBuilder
/// Break the current pipeline and return a new builder for the broken pipeline.
PipelineBuilder breakPipeline(const PhysicalPlanNodePtr & breaker_node)
{
return PipelineBuilder(id_generator, PipelineBreaker{pipeline, breaker_node});
return PipelineBuilder(id_generator, PipelineBreaker{pipeline, breaker_node}, log->identifier());
}

PipelinePtr build()
Expand All @@ -95,6 +100,7 @@ class PipelineBuilder
}

private:
LoggerPtr log;
PipelineIdGeneratorPtr id_generator;
PipelinePtr pipeline;
std::optional<PipelineBreaker> pipeline_breaker;
Expand Down
22 changes: 15 additions & 7 deletions dbms/src/Flash/Pipeline/Schedule/Events/Event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <Flash/Pipeline/Schedule/TaskScheduler.h>
#include <assert.h>

#include <magic_enum.hpp>

namespace DB
{
namespace FailPoints
Expand All @@ -29,10 +31,11 @@ extern const char random_pipeline_model_event_finish_failpoint[];
} // namespace FailPoints

// if any exception throw here, we should record err msg and then cancel the query.
#define CATCH \
catch (...) \
{ \
exec_status.onErrorOccurred(std::current_exception()); \
#define CATCH \
catch (...) \
{ \
LOG_WARNING(log, "error occurred and cancel the query"); \
exec_status.onErrorOccurred(std::current_exception()); \
}

void Event::addInput(const EventPtr & input)
Expand Down Expand Up @@ -126,21 +129,26 @@ void Event::scheduleTasks(std::vector<TaskPtr> & tasks) noexcept
// If query has already been cancelled, we can skip scheduling tasks.
// And then tasks will be destroyed and call `onTaskFinish`.
if (likely(!exec_status.isCancelled()))
{
LOG_DEBUG(log, "{} tasks scheduled by event", tasks.size());
TaskScheduler::instance->submit(tasks);
}
}

void Event::onTaskFinish() noexcept
{
assert(status != EventStatus::FINISHED);
auto cur_value = unfinished_tasks.fetch_sub(1);
assert(cur_value >= 1);
if (1 == cur_value)
int32_t remaining_tasks = unfinished_tasks.fetch_sub(1) - 1;
assert(remaining_tasks >= 0);
LOG_DEBUG(log, "one task finished, {} tasks remaining", remaining_tasks);
if (0 == remaining_tasks)
finish();
}

void Event::switchStatus(EventStatus from, EventStatus to) noexcept
{
RUNTIME_ASSERT(status.compare_exchange_strong(from, to));
LOG_DEBUG(log, "switch status: {} --> {}", magic_enum::enum_name(from), magic_enum::enum_name(to));
}

#undef CATCH
Expand Down
9 changes: 8 additions & 1 deletion dbms/src/Flash/Pipeline/Schedule/Events/Event.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <Common/Logger.h>
#include <Common/MemoryTracker.h>
#include <Flash/Pipeline/Schedule/Tasks/Task.h>

Expand All @@ -39,9 +40,13 @@ class PipelineExecutorStatus;
class Event : public std::enable_shared_from_this<Event>
{
public:
Event(PipelineExecutorStatus & exec_status_, MemoryTrackerPtr mem_tracker_)
Event(
PipelineExecutorStatus & exec_status_,
MemoryTrackerPtr mem_tracker_,
const String & req_id = "")
: exec_status(exec_status_)
, mem_tracker(std::move(mem_tracker_))
, log(Logger::get(req_id))
{}
virtual ~Event() = default;

Expand Down Expand Up @@ -79,6 +84,8 @@ class Event : public std::enable_shared_from_this<Event>

MemoryTrackerPtr mem_tracker;

LoggerPtr log;

private:
Events outputs;

Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Pipeline/Schedule/Events/PipelineEvent.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ class PipelineEvent : public Event
PipelineEvent(
PipelineExecutorStatus & exec_status_,
MemoryTrackerPtr mem_tracker_,
const String & req_id,
Context & context_,
const PipelinePtr & pipeline_)
: Event(exec_status_, std::move(mem_tracker_))
: Event(exec_status_, std::move(mem_tracker_), req_id)
, context(context_)
, pipeline(pipeline_)
{}
Expand Down
Loading

0 comments on commit 12616a3

Please sign in to comment.