Skip to content

Commit

Permalink
Support local rf for pipeline (#7698)
Browse files Browse the repository at this point in the history
ref #6518, ref #7275
  • Loading branch information
SeaRise authored Jul 6, 2023
1 parent 53c5beb commit b2791df
Show file tree
Hide file tree
Showing 25 changed files with 421 additions and 90 deletions.
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/RuntimeFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,4 +213,4 @@ DM::RSOperatorPtr RuntimeFilter::parseToRSOperator(DM::ColumnDefines & columns_t
}
}

} // namespace DB
} // namespace DB
5 changes: 2 additions & 3 deletions dbms/src/DataStreams/RuntimeFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

namespace DB
{

enum class RuntimeFilterStatus
{
NOT_READY,
Expand All @@ -33,7 +32,7 @@ enum class RuntimeFilterStatus
class RuntimeFilter
{
public:
RuntimeFilter(tipb::RuntimeFilter & rf_pb)
explicit RuntimeFilter(tipb::RuntimeFilter & rf_pb)
: id(rf_pb.id())
, rf_type(rf_pb.rf_type())
{
Expand Down Expand Up @@ -102,4 +101,4 @@ class RuntimeFilter
std::condition_variable inner_cv;
};

} // namespace DB
} // namespace DB
71 changes: 59 additions & 12 deletions dbms/src/Debug/MockStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Flash/Coprocessor/TiDBTableScan.h>
#include <Interpreters/Context.h>
#include <Operators/FilterTransformOp.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTSelectQuery.h>
Expand Down Expand Up @@ -176,7 +177,13 @@ std::tuple<StorageDeltaMergePtr, Names, SelectQueryInfo> MockStorage::prepareFor
return {storage, column_names, query_info};
}

BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(Context & context, Int64 table_id, const FilterConditions * filter_conditions, bool keep_order, std::vector<int> runtime_filter_ids, int rf_max_wait_time_ms)
BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(
Context & context,
Int64 table_id,
const FilterConditions * filter_conditions,
bool keep_order,
std::vector<int> runtime_filter_ids,
int rf_max_wait_time_ms)
{
QueryProcessingStage::Enum stage;
auto [storage, column_names, query_info] = prepareForRead(context, table_id, keep_order);
Expand Down Expand Up @@ -217,25 +224,65 @@ BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(Context & context, Int6
}
}


void MockStorage::buildExecFromDeltaMerge(
PipelineExecutorStatus & exec_status_,
PipelineExecGroupBuilder & group_builder,
Context & context,
Int64 table_id,
size_t concurrency,
bool keep_order)
bool keep_order,
const FilterConditions * filter_conditions,
std::vector<int> runtime_filter_ids,
int rf_max_wait_time_ms)
{
auto [storage, column_names, query_info] = prepareForRead(context, table_id, keep_order);
// Currently don't support test for late materialization
storage->read(
exec_status_,
group_builder,
column_names,
query_info,
context,
context.getSettingsRef().max_block_size,
concurrency);
if (filter_conditions && filter_conditions->hasValue())
{
auto analyzer = std::make_unique<DAGExpressionAnalyzer>(names_and_types_map_for_delta_merge[table_id], context);
const google::protobuf::RepeatedPtrField<tipb::Expr> pushed_down_filters{};
query_info.dag_query = std::make_unique<DAGQueryInfo>(
filter_conditions->conditions,
pushed_down_filters, // Not care now
mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]),
runtime_filter_ids,
rf_max_wait_time_ms,
context.getTimezoneInfo());
// Not using `auto [before_where, filter_column_name, project_after_where]` just to make the compiler happy.
auto build_ret = ::DB::buildPushDownFilter(filter_conditions->conditions, *analyzer);
storage->read(
exec_status_,
group_builder,
column_names,
query_info,
context,
context.getSettingsRef().max_block_size,
concurrency);
auto log = Logger::get("test for late materialization");
auto input_header = group_builder.getCurrentHeader();
group_builder.transform([&](auto & builder) {
builder.appendTransformOp(std::make_unique<FilterTransformOp>(exec_status_, log->identifier(), input_header, std::get<0>(build_ret), std::get<1>(build_ret)));
});
executeExpression(exec_status_, group_builder, std::get<2>(build_ret), log);
}
else
{
const google::protobuf::RepeatedPtrField<tipb::Expr> pushed_down_filters{};
query_info.dag_query = std::make_unique<DAGQueryInfo>(
google::protobuf::RepeatedPtrField<tipb::Expr>(),
pushed_down_filters, // Not care now
mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]),
runtime_filter_ids,
rf_max_wait_time_ms,
context.getTimezoneInfo());
storage->read(
exec_status_,
group_builder,
column_names,
query_info,
context,
context.getSettingsRef().max_block_size,
concurrency);
}
}

void MockStorage::addTableInfoForDeltaMerge(const String & name, const MockColumnInfoVec & columns)
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Debug/MockStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,10 @@ class MockStorage
Context & context,
Int64 table_id,
size_t concurrency = 1,
bool keep_order = false);
bool keep_order = false,
const FilterConditions * filter_conditions = nullptr,
std::vector<int> runtime_filter_ids = std::vector<int>(),
int rf_max_wait_time_ms = 0);

bool tableExistsForDeltaMerge(Int64 table_id);

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/RuntimeFilterMgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ extern RuntimeFilteList dummy_runtime_filter_list;
class RuntimeFilterMgr
{
public:
RuntimeFilterMgr(){};
RuntimeFilterMgr() = default;

~RuntimeFilterMgr(){};
~RuntimeFilterMgr() = default;

RuntimeFilteList getLocalRuntimeFilterByIds(const std::vector<int> & ids);

Expand Down
20 changes: 10 additions & 10 deletions dbms/src/Flash/Executor/PipelineExecutorStatus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ void PipelineExecutorStatus::wait()
{
std::unique_lock lock(mu);
RUNTIME_ASSERT(isWaitMode());
cv.wait(lock, [&] { return 0 == active_event_count; });
cv.wait(lock, [&] { return 0 == active_ref_count; });
}
LOG_DEBUG(log, "query finished and wait done");
}
Expand Down Expand Up @@ -115,25 +115,25 @@ void PipelineExecutorStatus::consume(ResultHandler & result_handler)
// If result_handler throws an error, here should notify the query to terminate, and wait for the end of the query.
onErrorOccurred(std::current_exception());
}
// In order to ensure that `onEventFinish` has finished calling at this point
// and avoid referencing the already destructed `mu` in `onEventFinish`.
// In order to ensure that `decActiveRefCount` has finished calling at this point
// and avoid referencing the already destructed `mu` in `decActiveRefCount`.
std::unique_lock lock(mu);
cv.wait(lock, [&] { return 0 == active_event_count; });
cv.wait(lock, [&] { return 0 == active_ref_count; });
LOG_DEBUG(log, "query finished and consume done");
}

void PipelineExecutorStatus::onEventSchedule()
void PipelineExecutorStatus::incActiveRefCount()
{
std::lock_guard lock(mu);
++active_event_count;
++active_ref_count;
}

void PipelineExecutorStatus::onEventFinish()
void PipelineExecutorStatus::decActiveRefCount()
{
std::lock_guard lock(mu);
RUNTIME_ASSERT(active_event_count > 0);
--active_event_count;
if (0 == active_event_count)
RUNTIME_ASSERT(active_ref_count > 0);
--active_ref_count;
if (0 == active_ref_count)
{
// It is not expected for a query to be finished more than one time.
RUNTIME_ASSERT(!is_finished);
Expand Down
16 changes: 8 additions & 8 deletions dbms/src/Flash/Executor/PipelineExecutorStatus.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ class PipelineExecutorStatus : private boost::noncopyable
std::exception_ptr getExceptionPtr();
String getExceptionMsg();

void onEventSchedule();
void incActiveRefCount();

void onEventFinish();
void decActiveRefCount();

void onErrorOccurred(const String & err_msg);
void onErrorOccurred(const std::exception_ptr & exception_ptr_);
Expand All @@ -60,7 +60,7 @@ class PipelineExecutorStatus : private boost::noncopyable
{
std::unique_lock lock(mu);
RUNTIME_ASSERT(isWaitMode());
is_timeout = !cv.wait_for(lock, timeout_duration, [&] { return 0 == active_event_count; });
is_timeout = !cv.wait_for(lock, timeout_duration, [&] { return 0 == active_ref_count; });
}
if (is_timeout)
{
Expand Down Expand Up @@ -113,10 +113,10 @@ class PipelineExecutorStatus : private boost::noncopyable
}
else
{
// In order to ensure that `onEventFinish` has finished calling at this point
// and avoid referencing the already destructed `mu` in `onEventFinish`.
// In order to ensure that `decActiveRefCount` has finished calling at this point
// and avoid referencing the already destructed `mu` in `decActiveRefCount`.
std::unique_lock lock(mu);
cv.wait(lock, [&] { return 0 == active_event_count; });
cv.wait(lock, [&] { return 0 == active_ref_count; });
}
LOG_DEBUG(log, "query finished and consume done");
}
Expand Down Expand Up @@ -154,13 +154,13 @@ class PipelineExecutorStatus : private boost::noncopyable
std::mutex mu;
std::condition_variable cv;
std::exception_ptr exception_ptr;
UInt32 active_event_count{0};
UInt32 active_ref_count{0};

std::atomic_bool is_cancelled{false};

bool is_finished{false};

// `result_queue.finish` can only be called in `onEventFinish` because `result_queue.pop` cannot end until events end.
// `result_queue.finish` can only be called in `decActiveRefCount` because `result_queue.pop` cannot end until events end.
std::optional<ResultQueuePtr> result_queue;

QueryProfileInfo query_profile_info;
Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Flash/Executor/tests/gtest_pipeline_executor_status.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ try
PipelineExecutorStatus status;
try
{
status.onEventSchedule();
status.incActiveRefCount();
std::chrono::milliseconds timeout(10);
status.waitFor(timeout);
GTEST_FAIL();
Expand All @@ -51,7 +51,7 @@ try
status.toConsumeMode(1);
try
{
status.onEventSchedule();
status.incActiveRefCount();
std::chrono::milliseconds timeout(10);
ResultHandler result_handler{[](const Block &) {
}};
Expand All @@ -71,9 +71,9 @@ TEST_F(PipelineExecutorStatusTestRunner, run)
try
{
PipelineExecutorStatus status;
status.onEventSchedule();
status.incActiveRefCount();
auto thread_manager = newThreadManager();
thread_manager->schedule(false, "run", [&status]() mutable { status.onEventFinish(); });
thread_manager->schedule(false, "run", [&status]() mutable { status.decActiveRefCount(); });
status.wait();
auto exception_ptr = status.getExceptionPtr();
auto err_msg = status.getExceptionMsg();
Expand All @@ -88,11 +88,11 @@ try
auto test = [](std::string && err_msg) {
auto expect_err_msg = err_msg;
PipelineExecutorStatus status;
status.onEventSchedule();
status.incActiveRefCount();
auto thread_manager = newThreadManager();
thread_manager->schedule(false, "err", [&status, &err_msg]() mutable {
status.onErrorOccurred(err_msg);
status.onEventFinish();
status.decActiveRefCount();
});
status.wait();
status.onErrorOccurred("unexpect exception");
Expand Down
16 changes: 8 additions & 8 deletions dbms/src/Flash/Pipeline/Schedule/Events/Event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,13 @@ bool Event::prepare()
assertStatus(EventStatus::INIT);
if (is_source)
{
// For source event, `exec_status.onEventSchedule()` needs to be called before schedule.
// For source event, `exec_status.incActiveRefCount()` needs to be called before schedule.
// Suppose there are two source events, A and B, a possible sequence of calls is:
// `A.prepareForSource --> B.prepareForSource --> A.schedule --> A.finish --> B.schedule --> B.finish`.
// if `exec_status.onEventSchedule()` be called in schedule just like non-source event,
// if `exec_status.incActiveRefCount()` be called in schedule just like non-source event,
// `exec_status.wait` and `result_queue.pop` may return early.
switchStatus(EventStatus::INIT, EventStatus::SCHEDULED);
exec_status.onEventSchedule();
exec_status.incActiveRefCount();
return true;
}
else
Expand Down Expand Up @@ -120,9 +120,9 @@ void Event::schedule()
}
else
{
// for is_source == true, `exec_status.onEventSchedule()` has been called in `prepare`.
// for is_source == true, `exec_status.incActiveRefCount()` has been called in `prepare`.
switchStatus(EventStatus::INIT, EventStatus::SCHEDULED);
exec_status.onEventSchedule();
exec_status.incActiveRefCount();
}
MemoryTrackerSetter setter{true, mem_tracker.get()};
try
Expand Down Expand Up @@ -199,10 +199,10 @@ void Event::finish()
// because of `exec_status.isCancelled()` will be destructured before the end of `exec_status.wait`.
outputs.clear();
// In order to ensure that `exec_status.wait()` doesn't finish when there is an active event,
// we have to call `exec_status.onEventFinish()` here,
// since `exec_status.onEventSchedule()` will have been called by outputs.
// we have to call `exec_status.decActiveRefCount()` here,
// since `exec_status.incActiveRefCount()` will have been called by outputs.
// The call order will be `eventA++ ───► eventB++ ───► eventA-- ───► eventB-- ───► exec_status.await finished`.
exec_status.onEventFinish();
exec_status.decActiveRefCount();
}

UInt64 Event::getScheduleDuration() const
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ void PlainPipelineEvent::finishImpl()
{
if (auto complete_event = pipeline->complete(exec_status); complete_event)
insertEvent(complete_event);
// Plan nodes in pipeline hold resources like hash table for join, when destruction they will operate memory tracker in MPP task. But MPP task may get destructed once `exec_status.onEventFinish()` is called.
// So pipeline needs to be released before `exec_status.onEventFinish()` is called.
// Plan nodes in pipeline hold resources like hash table for join, when destruction they will operate memory tracker in MPP task. But MPP task may get destructed once `exec_status.decActiveRefCount()` is called.
// So pipeline needs to be released before `exec_status.decActiveRefCount()` is called.
pipeline.reset();
}

Expand Down
Loading

0 comments on commit b2791df

Please sign in to comment.