From b2791dfb4285053bf2c16d71d70624361c4a6350 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Thu, 6 Jul 2023 12:59:15 +0800 Subject: [PATCH] Support local rf for pipeline (#7698) ref pingcap/tiflash#6518, ref pingcap/tiflash#7275 --- dbms/src/DataStreams/RuntimeFilter.cpp | 2 +- dbms/src/DataStreams/RuntimeFilter.h | 5 +- dbms/src/Debug/MockStorage.cpp | 71 ++++++++-- dbms/src/Debug/MockStorage.h | 5 +- dbms/src/Flash/Coprocessor/RuntimeFilterMgr.h | 4 +- .../Flash/Executor/PipelineExecutorStatus.cpp | 20 +-- .../Flash/Executor/PipelineExecutorStatus.h | 16 +-- .../tests/gtest_pipeline_executor_status.cpp | 12 +- .../Flash/Pipeline/Schedule/Events/Event.cpp | 16 +-- .../Schedule/Events/PlainPipelineEvent.cpp | 4 +- .../Pipeline/Schedule/Tasks/RFWaitTask.h | 122 ++++++++++++++++++ dbms/src/Flash/Planner/PhysicalPlan.cpp | 7 +- .../Planner/Plans/PhysicalMockTableScan.cpp | 46 ++++--- .../Planner/Plans/PhysicalMockTableScan.h | 4 - dbms/src/Flash/tests/gtest_compute_server.cpp | 2 +- .../Flash/tests/gtest_executors_with_dm.cpp | 15 +++ .../tests/gtest_runtime_filter_executor.cpp | 20 ++- .../src/Operators/SharedAggregateRestorer.cpp | 4 +- dbms/src/Operators/UnorderedSourceOp.cpp | 30 +++++ dbms/src/Operators/UnorderedSourceOp.h | 24 +++- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 6 +- .../src/Storages/DeltaMerge/DeltaMergeStore.h | 2 + dbms/src/Storages/StorageDeltaMerge.cpp | 6 +- dbms/src/Storages/StorageDeltaMerge.h | 2 +- tests/fullstack-test/mpp/runtime_filter.test | 66 ++++++++++ 25 files changed, 421 insertions(+), 90 deletions(-) create mode 100644 dbms/src/Flash/Pipeline/Schedule/Tasks/RFWaitTask.h create mode 100644 tests/fullstack-test/mpp/runtime_filter.test diff --git a/dbms/src/DataStreams/RuntimeFilter.cpp b/dbms/src/DataStreams/RuntimeFilter.cpp index a69ef20845f..97c746da25e 100644 --- a/dbms/src/DataStreams/RuntimeFilter.cpp +++ b/dbms/src/DataStreams/RuntimeFilter.cpp @@ -213,4 +213,4 @@ DM::RSOperatorPtr RuntimeFilter::parseToRSOperator(DM::ColumnDefines & columns_t } } -} // namespace DB \ No newline at end of file +} // namespace DB diff --git a/dbms/src/DataStreams/RuntimeFilter.h b/dbms/src/DataStreams/RuntimeFilter.h index a33a187dcd5..7a9e71ab2e5 100644 --- a/dbms/src/DataStreams/RuntimeFilter.h +++ b/dbms/src/DataStreams/RuntimeFilter.h @@ -22,7 +22,6 @@ namespace DB { - enum class RuntimeFilterStatus { NOT_READY, @@ -33,7 +32,7 @@ enum class RuntimeFilterStatus class RuntimeFilter { public: - RuntimeFilter(tipb::RuntimeFilter & rf_pb) + explicit RuntimeFilter(tipb::RuntimeFilter & rf_pb) : id(rf_pb.id()) , rf_type(rf_pb.rf_type()) { @@ -102,4 +101,4 @@ class RuntimeFilter std::condition_variable inner_cv; }; -} // namespace DB \ No newline at end of file +} // namespace DB diff --git a/dbms/src/Debug/MockStorage.cpp b/dbms/src/Debug/MockStorage.cpp index 85ffcfa2069..7f4bcb49bbd 100644 --- a/dbms/src/Debug/MockStorage.cpp +++ b/dbms/src/Debug/MockStorage.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -176,7 +177,13 @@ std::tuple MockStorage::prepareFor return {storage, column_names, query_info}; } -BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(Context & context, Int64 table_id, const FilterConditions * filter_conditions, bool keep_order, std::vector runtime_filter_ids, int rf_max_wait_time_ms) +BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge( + Context & context, + Int64 table_id, + const FilterConditions * filter_conditions, + bool keep_order, + std::vector runtime_filter_ids, + int rf_max_wait_time_ms) { QueryProcessingStage::Enum stage; auto [storage, column_names, query_info] = prepareForRead(context, table_id, keep_order); @@ -217,25 +224,65 @@ BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(Context & context, Int6 } } - void MockStorage::buildExecFromDeltaMerge( PipelineExecutorStatus & exec_status_, PipelineExecGroupBuilder & group_builder, Context & context, Int64 table_id, size_t concurrency, - bool keep_order) + bool keep_order, + const FilterConditions * filter_conditions, + std::vector runtime_filter_ids, + int rf_max_wait_time_ms) { auto [storage, column_names, query_info] = prepareForRead(context, table_id, keep_order); - // Currently don't support test for late materialization - storage->read( - exec_status_, - group_builder, - column_names, - query_info, - context, - context.getSettingsRef().max_block_size, - concurrency); + if (filter_conditions && filter_conditions->hasValue()) + { + auto analyzer = std::make_unique(names_and_types_map_for_delta_merge[table_id], context); + const google::protobuf::RepeatedPtrField pushed_down_filters{}; + query_info.dag_query = std::make_unique( + filter_conditions->conditions, + pushed_down_filters, // Not care now + mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]), + runtime_filter_ids, + rf_max_wait_time_ms, + context.getTimezoneInfo()); + // Not using `auto [before_where, filter_column_name, project_after_where]` just to make the compiler happy. + auto build_ret = ::DB::buildPushDownFilter(filter_conditions->conditions, *analyzer); + storage->read( + exec_status_, + group_builder, + column_names, + query_info, + context, + context.getSettingsRef().max_block_size, + concurrency); + auto log = Logger::get("test for late materialization"); + auto input_header = group_builder.getCurrentHeader(); + group_builder.transform([&](auto & builder) { + builder.appendTransformOp(std::make_unique(exec_status_, log->identifier(), input_header, std::get<0>(build_ret), std::get<1>(build_ret))); + }); + executeExpression(exec_status_, group_builder, std::get<2>(build_ret), log); + } + else + { + const google::protobuf::RepeatedPtrField pushed_down_filters{}; + query_info.dag_query = std::make_unique( + google::protobuf::RepeatedPtrField(), + pushed_down_filters, // Not care now + mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]), + runtime_filter_ids, + rf_max_wait_time_ms, + context.getTimezoneInfo()); + storage->read( + exec_status_, + group_builder, + column_names, + query_info, + context, + context.getSettingsRef().max_block_size, + concurrency); + } } void MockStorage::addTableInfoForDeltaMerge(const String & name, const MockColumnInfoVec & columns) diff --git a/dbms/src/Debug/MockStorage.h b/dbms/src/Debug/MockStorage.h index 9e402ed5df3..4df602eb6d6 100644 --- a/dbms/src/Debug/MockStorage.h +++ b/dbms/src/Debug/MockStorage.h @@ -111,7 +111,10 @@ class MockStorage Context & context, Int64 table_id, size_t concurrency = 1, - bool keep_order = false); + bool keep_order = false, + const FilterConditions * filter_conditions = nullptr, + std::vector runtime_filter_ids = std::vector(), + int rf_max_wait_time_ms = 0); bool tableExistsForDeltaMerge(Int64 table_id); diff --git a/dbms/src/Flash/Coprocessor/RuntimeFilterMgr.h b/dbms/src/Flash/Coprocessor/RuntimeFilterMgr.h index ac0fc5f890c..97552dd28d7 100644 --- a/dbms/src/Flash/Coprocessor/RuntimeFilterMgr.h +++ b/dbms/src/Flash/Coprocessor/RuntimeFilterMgr.h @@ -30,9 +30,9 @@ extern RuntimeFilteList dummy_runtime_filter_list; class RuntimeFilterMgr { public: - RuntimeFilterMgr(){}; + RuntimeFilterMgr() = default; - ~RuntimeFilterMgr(){}; + ~RuntimeFilterMgr() = default; RuntimeFilteList getLocalRuntimeFilterByIds(const std::vector & ids); diff --git a/dbms/src/Flash/Executor/PipelineExecutorStatus.cpp b/dbms/src/Flash/Executor/PipelineExecutorStatus.cpp index f41035e73c8..eb95db11cdf 100644 --- a/dbms/src/Flash/Executor/PipelineExecutorStatus.cpp +++ b/dbms/src/Flash/Executor/PipelineExecutorStatus.cpp @@ -86,7 +86,7 @@ void PipelineExecutorStatus::wait() { std::unique_lock lock(mu); RUNTIME_ASSERT(isWaitMode()); - cv.wait(lock, [&] { return 0 == active_event_count; }); + cv.wait(lock, [&] { return 0 == active_ref_count; }); } LOG_DEBUG(log, "query finished and wait done"); } @@ -115,25 +115,25 @@ void PipelineExecutorStatus::consume(ResultHandler & result_handler) // If result_handler throws an error, here should notify the query to terminate, and wait for the end of the query. onErrorOccurred(std::current_exception()); } - // In order to ensure that `onEventFinish` has finished calling at this point - // and avoid referencing the already destructed `mu` in `onEventFinish`. + // In order to ensure that `decActiveRefCount` has finished calling at this point + // and avoid referencing the already destructed `mu` in `decActiveRefCount`. std::unique_lock lock(mu); - cv.wait(lock, [&] { return 0 == active_event_count; }); + cv.wait(lock, [&] { return 0 == active_ref_count; }); LOG_DEBUG(log, "query finished and consume done"); } -void PipelineExecutorStatus::onEventSchedule() +void PipelineExecutorStatus::incActiveRefCount() { std::lock_guard lock(mu); - ++active_event_count; + ++active_ref_count; } -void PipelineExecutorStatus::onEventFinish() +void PipelineExecutorStatus::decActiveRefCount() { std::lock_guard lock(mu); - RUNTIME_ASSERT(active_event_count > 0); - --active_event_count; - if (0 == active_event_count) + RUNTIME_ASSERT(active_ref_count > 0); + --active_ref_count; + if (0 == active_ref_count) { // It is not expected for a query to be finished more than one time. RUNTIME_ASSERT(!is_finished); diff --git a/dbms/src/Flash/Executor/PipelineExecutorStatus.h b/dbms/src/Flash/Executor/PipelineExecutorStatus.h index d2a3726e4f6..3ad28c08380 100644 --- a/dbms/src/Flash/Executor/PipelineExecutorStatus.h +++ b/dbms/src/Flash/Executor/PipelineExecutorStatus.h @@ -44,9 +44,9 @@ class PipelineExecutorStatus : private boost::noncopyable std::exception_ptr getExceptionPtr(); String getExceptionMsg(); - void onEventSchedule(); + void incActiveRefCount(); - void onEventFinish(); + void decActiveRefCount(); void onErrorOccurred(const String & err_msg); void onErrorOccurred(const std::exception_ptr & exception_ptr_); @@ -60,7 +60,7 @@ class PipelineExecutorStatus : private boost::noncopyable { std::unique_lock lock(mu); RUNTIME_ASSERT(isWaitMode()); - is_timeout = !cv.wait_for(lock, timeout_duration, [&] { return 0 == active_event_count; }); + is_timeout = !cv.wait_for(lock, timeout_duration, [&] { return 0 == active_ref_count; }); } if (is_timeout) { @@ -113,10 +113,10 @@ class PipelineExecutorStatus : private boost::noncopyable } else { - // In order to ensure that `onEventFinish` has finished calling at this point - // and avoid referencing the already destructed `mu` in `onEventFinish`. + // In order to ensure that `decActiveRefCount` has finished calling at this point + // and avoid referencing the already destructed `mu` in `decActiveRefCount`. std::unique_lock lock(mu); - cv.wait(lock, [&] { return 0 == active_event_count; }); + cv.wait(lock, [&] { return 0 == active_ref_count; }); } LOG_DEBUG(log, "query finished and consume done"); } @@ -154,13 +154,13 @@ class PipelineExecutorStatus : private boost::noncopyable std::mutex mu; std::condition_variable cv; std::exception_ptr exception_ptr; - UInt32 active_event_count{0}; + UInt32 active_ref_count{0}; std::atomic_bool is_cancelled{false}; bool is_finished{false}; - // `result_queue.finish` can only be called in `onEventFinish` because `result_queue.pop` cannot end until events end. + // `result_queue.finish` can only be called in `decActiveRefCount` because `result_queue.pop` cannot end until events end. std::optional result_queue; QueryProfileInfo query_profile_info; diff --git a/dbms/src/Flash/Executor/tests/gtest_pipeline_executor_status.cpp b/dbms/src/Flash/Executor/tests/gtest_pipeline_executor_status.cpp index 3b62f80bc5b..80e695fc3cf 100644 --- a/dbms/src/Flash/Executor/tests/gtest_pipeline_executor_status.cpp +++ b/dbms/src/Flash/Executor/tests/gtest_pipeline_executor_status.cpp @@ -30,7 +30,7 @@ try PipelineExecutorStatus status; try { - status.onEventSchedule(); + status.incActiveRefCount(); std::chrono::milliseconds timeout(10); status.waitFor(timeout); GTEST_FAIL(); @@ -51,7 +51,7 @@ try status.toConsumeMode(1); try { - status.onEventSchedule(); + status.incActiveRefCount(); std::chrono::milliseconds timeout(10); ResultHandler result_handler{[](const Block &) { }}; @@ -71,9 +71,9 @@ TEST_F(PipelineExecutorStatusTestRunner, run) try { PipelineExecutorStatus status; - status.onEventSchedule(); + status.incActiveRefCount(); auto thread_manager = newThreadManager(); - thread_manager->schedule(false, "run", [&status]() mutable { status.onEventFinish(); }); + thread_manager->schedule(false, "run", [&status]() mutable { status.decActiveRefCount(); }); status.wait(); auto exception_ptr = status.getExceptionPtr(); auto err_msg = status.getExceptionMsg(); @@ -88,11 +88,11 @@ try auto test = [](std::string && err_msg) { auto expect_err_msg = err_msg; PipelineExecutorStatus status; - status.onEventSchedule(); + status.incActiveRefCount(); auto thread_manager = newThreadManager(); thread_manager->schedule(false, "err", [&status, &err_msg]() mutable { status.onErrorOccurred(err_msg); - status.onEventFinish(); + status.decActiveRefCount(); }); status.wait(); status.onErrorOccurred("unexpect exception"); diff --git a/dbms/src/Flash/Pipeline/Schedule/Events/Event.cpp b/dbms/src/Flash/Pipeline/Schedule/Events/Event.cpp index fa0ac58a4d0..e8e1dc88810 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Events/Event.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/Events/Event.cpp @@ -85,13 +85,13 @@ bool Event::prepare() assertStatus(EventStatus::INIT); if (is_source) { - // For source event, `exec_status.onEventSchedule()` needs to be called before schedule. + // For source event, `exec_status.incActiveRefCount()` needs to be called before schedule. // Suppose there are two source events, A and B, a possible sequence of calls is: // `A.prepareForSource --> B.prepareForSource --> A.schedule --> A.finish --> B.schedule --> B.finish`. - // if `exec_status.onEventSchedule()` be called in schedule just like non-source event, + // if `exec_status.incActiveRefCount()` be called in schedule just like non-source event, // `exec_status.wait` and `result_queue.pop` may return early. switchStatus(EventStatus::INIT, EventStatus::SCHEDULED); - exec_status.onEventSchedule(); + exec_status.incActiveRefCount(); return true; } else @@ -120,9 +120,9 @@ void Event::schedule() } else { - // for is_source == true, `exec_status.onEventSchedule()` has been called in `prepare`. + // for is_source == true, `exec_status.incActiveRefCount()` has been called in `prepare`. switchStatus(EventStatus::INIT, EventStatus::SCHEDULED); - exec_status.onEventSchedule(); + exec_status.incActiveRefCount(); } MemoryTrackerSetter setter{true, mem_tracker.get()}; try @@ -199,10 +199,10 @@ void Event::finish() // because of `exec_status.isCancelled()` will be destructured before the end of `exec_status.wait`. outputs.clear(); // In order to ensure that `exec_status.wait()` doesn't finish when there is an active event, - // we have to call `exec_status.onEventFinish()` here, - // since `exec_status.onEventSchedule()` will have been called by outputs. + // we have to call `exec_status.decActiveRefCount()` here, + // since `exec_status.incActiveRefCount()` will have been called by outputs. // The call order will be `eventA++ ───► eventB++ ───► eventA-- ───► eventB-- ───► exec_status.await finished`. - exec_status.onEventFinish(); + exec_status.decActiveRefCount(); } UInt64 Event::getScheduleDuration() const diff --git a/dbms/src/Flash/Pipeline/Schedule/Events/PlainPipelineEvent.cpp b/dbms/src/Flash/Pipeline/Schedule/Events/PlainPipelineEvent.cpp index 90cf2f8ceb7..29f512eefaa 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Events/PlainPipelineEvent.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/Events/PlainPipelineEvent.cpp @@ -31,8 +31,8 @@ void PlainPipelineEvent::finishImpl() { if (auto complete_event = pipeline->complete(exec_status); complete_event) insertEvent(complete_event); - // Plan nodes in pipeline hold resources like hash table for join, when destruction they will operate memory tracker in MPP task. But MPP task may get destructed once `exec_status.onEventFinish()` is called. - // So pipeline needs to be released before `exec_status.onEventFinish()` is called. + // Plan nodes in pipeline hold resources like hash table for join, when destruction they will operate memory tracker in MPP task. But MPP task may get destructed once `exec_status.decActiveRefCount()` is called. + // So pipeline needs to be released before `exec_status.decActiveRefCount()` is called. pipeline.reset(); } diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/RFWaitTask.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/RFWaitTask.h new file mode 100644 index 00000000000..fbe2d80ac4a --- /dev/null +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/RFWaitTask.h @@ -0,0 +1,122 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include + +namespace DB +{ +/// Polling in the wait reactor to check whether the runtime filters are ready. +/// Once the maximum check time is reached or all runtime filters are ready, +/// the segment pool will be submitted to the segment read task scheduler for execution. +class RFWaitTask : public Task +{ +public: + RFWaitTask( + const String & req_id, + PipelineExecutorStatus & exec_status_, + const DM::SegmentReadTaskPoolPtr & task_pool_, + int max_wait_time_ms, + RuntimeFilteList && waiting_rf_list_, + RuntimeFilteList && ready_rf_list_) + : Task(nullptr, req_id) // memory tracker is useless for for the task that only executes wait. + , exec_status(exec_status_) + , task_pool(task_pool_) + , max_wait_time_ns(max_wait_time_ms < 0 ? 0 : 1000000UL * max_wait_time_ms) + , waiting_rf_list(std::move(waiting_rf_list_)) + , ready_rf_list(std::move(ready_rf_list_)) + { + exec_status.incActiveRefCount(); + } + + ~RFWaitTask() override + { + // In order to ensure that `PipelineExecutorStatus` will not be destructed before `RFWaitTask` is destructed. + exec_status.decActiveRefCount(); + } + + static void filterAndMoveReadyRfs(RuntimeFilteList & waiting_rf_list, RuntimeFilteList & ready_rf_list) + { + for (auto it = waiting_rf_list.begin(); it != waiting_rf_list.end();) + { + if ((*it)->isReady()) + { + ready_rf_list.push_back(std::move((*it))); + it = waiting_rf_list.erase(it); + } + else if ((*it)->isFailed()) + { + it = waiting_rf_list.erase(it); + } + else + { + ++it; + } + } + } + + static void submitReadyRfsAndSegmentTaskPool(const RuntimeFilteList & ready_rf_list, const DM::SegmentReadTaskPoolPtr & task_pool) + { + for (const RuntimeFilterPtr & rf : ready_rf_list) + { + auto rs_operator = rf->parseToRSOperator(task_pool->getColumnToRead()); + task_pool->appendRSOperator(rs_operator); + } + DM::SegmentReadTaskScheduler::instance().add(task_pool); + } + +private: + ExecTaskStatus executeImpl() override + { + return ExecTaskStatus::WAITING; + } + + ExecTaskStatus awaitImpl() override + { + if unlikely (exec_status.isCancelled()) + return ExecTaskStatus::CANCELLED; + try + { + filterAndMoveReadyRfs(waiting_rf_list, ready_rf_list); + if (waiting_rf_list.empty() || stopwatch.elapsed() >= max_wait_time_ns) + { + submitReadyRfsAndSegmentTaskPool(ready_rf_list, task_pool); + return ExecTaskStatus::FINISHED; + } + return ExecTaskStatus::WAITING; + } + catch (...) + { + exec_status.onErrorOccurred(std::current_exception()); + return ExecTaskStatus::ERROR; + } + } + +private: + PipelineExecutorStatus & exec_status; + + DM::SegmentReadTaskPoolPtr task_pool; + + UInt64 max_wait_time_ns; + RuntimeFilteList waiting_rf_list; + RuntimeFilteList ready_rf_list; + + Stopwatch stopwatch{CLOCK_MONOTONIC_COARSE}; +}; +} // namespace DB diff --git a/dbms/src/Flash/Planner/PhysicalPlan.cpp b/dbms/src/Flash/Planner/PhysicalPlan.cpp index d360283ce61..0ea73dfc55d 100644 --- a/dbms/src/Flash/Planner/PhysicalPlan.cpp +++ b/dbms/src/Flash/Planner/PhysicalPlan.cpp @@ -50,13 +50,10 @@ bool pushDownSelection(Context & context, const PhysicalPlanNodePtr & plan, cons auto physical_table_scan = std::static_pointer_cast(plan); return physical_table_scan->setFilterConditions(executor_id, selection); } - if (unlikely(plan->tp() == PlanType::MockTableScan && context.isExecutorTest() && !context.getSettingsRef().enable_pipeline)) + if (unlikely(plan->tp() == PlanType::MockTableScan && context.isExecutorTest())) { auto physical_mock_table_scan = std::static_pointer_cast(plan); - if (context.mockStorage()->useDeltaMerge() && context.mockStorage()->tableExistsForDeltaMerge(physical_mock_table_scan->getLogicalTableID())) - { - return physical_mock_table_scan->setFilterConditions(context, executor_id, selection); - } + return physical_mock_table_scan->setFilterConditions(context, executor_id, selection); } return false; } diff --git a/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp b/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp index b54f35acf8c..fe66bc27377 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include namespace DB @@ -132,7 +133,19 @@ void PhysicalMockTableScan::buildPipelineExecGroupImpl( context, table_id, context.getMaxStreams(), - keep_order); + keep_order, + &filter_conditions, + runtime_filter_ids, + rf_max_wait_time_ms); + for (size_t i = 0; i < group_builder.concurrency(); ++i) + { + if (auto * source_op = dynamic_cast(group_builder.getCurBuilder(i).source_op.get())) + { + auto runtime_filter_list = context.getDAGContext()->runtime_filter_mgr.getLocalRuntimeFilterByIds(runtime_filter_ids); + // todo config max wait time + source_op->setRuntimeFilterInfo(runtime_filter_list, rf_max_wait_time_ms); + } + } } else { @@ -157,22 +170,28 @@ const Block & PhysicalMockTableScan::getSampleBlock() const return sample_block; } -void PhysicalMockTableScan::updateStreams(Context & context) -{ - mock_streams.clear(); - RUNTIME_CHECK(context.mockStorage()->tableExistsForDeltaMerge(table_id)); - mock_streams.emplace_back(context.mockStorage()->getStreamFromDeltaMerge(context, table_id, &filter_conditions, false, runtime_filter_ids, 10000)); -} - bool PhysicalMockTableScan::setFilterConditions(Context & context, const String & filter_executor_id, const tipb::Selection & selection) { if (unlikely(hasFilterConditions())) { return false; } - filter_conditions = FilterConditions::filterConditionsFrom(filter_executor_id, selection); - updateStreams(context); - return true; + if (context.mockStorage()->useDeltaMerge() && context.mockStorage()->tableExistsForDeltaMerge(table_id)) + { + filter_conditions = FilterConditions::filterConditionsFrom(filter_executor_id, selection); + + // For the pipeline model, because pipeline_exec has not been generated yet, there is no need to update it at this time. + // The update here is only for the stream model. + mock_streams.clear(); + RUNTIME_CHECK(context.mockStorage()->tableExistsForDeltaMerge(table_id)); + mock_streams.emplace_back(context.mockStorage()->getStreamFromDeltaMerge(context, table_id, &filter_conditions, false, runtime_filter_ids, 10000)); + + return true; + } + else + { + return false; + } } bool PhysicalMockTableScan::hasFilterConditions() const @@ -186,11 +205,6 @@ const String & PhysicalMockTableScan::getFilterConditionsId() const return filter_conditions.executor_id; } -Int64 PhysicalMockTableScan::getLogicalTableID() const -{ - return table_id; -} - void PhysicalMockTableScan::buildRuntimeFilterInLocalStream(Context & context) { for (const auto & local_stream : mock_streams) diff --git a/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.h b/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.h index 676e8614e0b..33c9af5964d 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.h +++ b/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.h @@ -59,10 +59,6 @@ class PhysicalMockTableScan : public PhysicalLeaf const String & getFilterConditionsId() const; - Int64 getLogicalTableID() const; - - void updateStreams(Context & context); - private: void buildBlockInputStreamImpl(DAGPipeline & pipeline, Context & /*context*/, size_t /*max_streams*/) override; diff --git a/dbms/src/Flash/tests/gtest_compute_server.cpp b/dbms/src/Flash/tests/gtest_compute_server.cpp index 7e06b32ed2a..05c5c421aca 100644 --- a/dbms/src/Flash/tests/gtest_compute_server.cpp +++ b/dbms/src/Flash/tests/gtest_compute_server.cpp @@ -788,7 +788,7 @@ try "random_pipeline_model_task_run_failpoint-0.8", "random_pipeline_model_task_construct_failpoint-1.0", "random_pipeline_model_event_schedule_failpoint-1.0", - // Because the mock table scan will always output data, there will be no event triggering onEventFinish, so the query will not terminate. + // Because the mock table scan will always output data, there will be no event triggering decActiveRefCount, so the query will not terminate. // "random_pipeline_model_event_finish_failpoint-0.99", "random_pipeline_model_operator_run_failpoint-0.8", "random_pipeline_model_cancel_failpoint-0.8", diff --git a/dbms/src/Flash/tests/gtest_executors_with_dm.cpp b/dbms/src/Flash/tests/gtest_executors_with_dm.cpp index 36ee82786f0..e95dfd79190 100644 --- a/dbms/src/Flash/tests/gtest_executors_with_dm.cpp +++ b/dbms/src/Flash/tests/gtest_executors_with_dm.cpp @@ -100,11 +100,22 @@ class ExecutorsWithDMTestRunner : public DB::tests::ExecutorTest ColumnWithNullableString col_string{{}, "pingcap", "PingCAP", {}, "PINGCAP", "PingCAP", {}, "Shanghai", "Shanghai"}; }; +#define WRAP_FOR_DM_TEST_BEGIN \ + enablePlanner(true); \ + std::vector pipeline_bools{false, true}; \ + for (auto enable_pipeline : pipeline_bools) \ + { \ + enablePipeline(enable_pipeline); + +#define WRAP_FOR_DM_TEST_END \ + } + TEST_F(ExecutorsWithDMTestRunner, Basic) try { std::vector keep_order_opt{false, true}; + WRAP_FOR_DM_TEST_BEGIN for (auto keep_order : keep_order_opt) { auto request = context @@ -191,8 +202,12 @@ try {{toNullableVec("col0", {0, 1, 2, 3})}, {toNullableVec("col1", {"col1-0", "col1-1", "col1-2", {}})}}); } + WRAP_FOR_DM_TEST_END } CATCH +#undef WRAP_FOR_DM_TEST_BEGIN +#undef WRAP_FOR_DM_TEST_END + } // namespace tests } // namespace DB diff --git a/dbms/src/Flash/tests/gtest_runtime_filter_executor.cpp b/dbms/src/Flash/tests/gtest_runtime_filter_executor.cpp index 62e963609b3..e66129d82ae 100644 --- a/dbms/src/Flash/tests/gtest_runtime_filter_executor.cpp +++ b/dbms/src/Flash/tests/gtest_runtime_filter_executor.cpp @@ -31,6 +31,16 @@ class RuntimeFilterExecutorTestRunner : public DB::tests::ExecutorTest static constexpr size_t concurrency = 10; }; +#define WRAP_FOR_RF_TEST_BEGIN \ + enablePlanner(true); \ + std::vector pipeline_bools{false, true}; \ + for (auto enable_pipeline : pipeline_bools) \ + { \ + enablePipeline(enable_pipeline); + +#define WRAP_FOR_RF_TEST_END \ + } + TEST_F(RuntimeFilterExecutorTestRunner, RuntimeFilterTest) try { @@ -49,13 +59,15 @@ try {{"k1", TiDB::TP::TypeLong}, {"k2", TiDB::TP::TypeLong}}, {toNullableVec("k1", {2, 2, 3, 4}), toNullableVec("k2", {2, 2, 3, 4})}); + + WRAP_FOR_RF_TEST_BEGIN { // without runtime filter, table_scan_0 return 3 rows auto request = context .scan("test_db", "left_table") .join(context.receive("right_exchange_table"), tipb::JoinType::TypeInnerJoin, {col("k1")}) .build(context); - Expect expect{{"table_scan_0", {3, 1}}, {"exchange_receiver_1", {4, concurrency}}, {"Join_2", {3, concurrency}}}; + Expect expect{{"table_scan_0", {3, enable_pipeline ? concurrency : 1}}, {"exchange_receiver_1", {4, concurrency}}, {"Join_2", {3, concurrency}}}; testForExecutionSummary(request, expect); } @@ -66,11 +78,15 @@ try .scan("test_db", "left_table", std::vector{1}) .join(context.receive("right_exchange_table"), tipb::JoinType::TypeInnerJoin, {col("k1")}, rf) .build(context); - Expect expect{{"table_scan_0", {2, 1}}, {"exchange_receiver_1", {4, concurrency}}, {"Join_2", {3, concurrency}}}; + Expect expect{{"table_scan_0", {2, enable_pipeline ? concurrency : 1}}, {"exchange_receiver_1", {4, concurrency}}, {"Join_2", {3, concurrency}}}; testForExecutionSummary(request, expect); } + WRAP_FOR_RF_TEST_END } CATCH +#undef WRAP_FOR_RF_TEST_BEGIN +#undef WRAP_FOR_RF_TEST_END + } // namespace tests } // namespace DB diff --git a/dbms/src/Operators/SharedAggregateRestorer.cpp b/dbms/src/Operators/SharedAggregateRestorer.cpp index 9b9430e490e..4f197d05dc0 100644 --- a/dbms/src/Operators/SharedAggregateRestorer.cpp +++ b/dbms/src/Operators/SharedAggregateRestorer.cpp @@ -32,7 +32,7 @@ SharedSpilledBucketDataLoader::SharedSpilledBucketDataLoader( bucket_inputs.emplace_back(bucket_stream); RUNTIME_CHECK(!bucket_inputs.empty()); - exec_status.onEventSchedule(); + exec_status.incActiveRefCount(); } SharedSpilledBucketDataLoader::~SharedSpilledBucketDataLoader() @@ -40,7 +40,7 @@ SharedSpilledBucketDataLoader::~SharedSpilledBucketDataLoader() bucket_data_queue = {}; bucket_inputs.clear(); // In order to ensure that `PipelineExecutorStatus` will not be destructed before `SharedSpilledBucketDataLoader` is destructed. - exec_status.onEventFinish(); + exec_status.decActiveRefCount(); } bool SharedSpilledBucketDataLoader::switchStatus(SharedLoaderStatus from, SharedLoaderStatus to) diff --git a/dbms/src/Operators/UnorderedSourceOp.cpp b/dbms/src/Operators/UnorderedSourceOp.cpp index c28066b04b9..c6b0f8e540f 100644 --- a/dbms/src/Operators/UnorderedSourceOp.cpp +++ b/dbms/src/Operators/UnorderedSourceOp.cpp @@ -12,8 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include #include +#include + namespace DB { OperatorStatus UnorderedSourceOp::readImpl(Block & block) @@ -54,4 +58,30 @@ OperatorStatus UnorderedSourceOp::awaitImpl() } } } + +void UnorderedSourceOp::operatePrefixImpl() +{ + std::call_once(task_pool->addToSchedulerFlag(), [&]() { + if (waiting_rf_list.empty()) + { + DM::SegmentReadTaskScheduler::instance().add(task_pool); + } + else + { + // Check if the RuntimeFilters is ready immediately. + RuntimeFilteList ready_rf_list; + RFWaitTask::filterAndMoveReadyRfs(waiting_rf_list, ready_rf_list); + + if (max_wait_time_ms <= 0 || waiting_rf_list.empty()) + { + RFWaitTask::submitReadyRfsAndSegmentTaskPool(ready_rf_list, task_pool); + } + else + { + // Poll and check if the RuntimeFilters is ready in the WaitReactor. + TaskScheduler::instance->submitToWaitReactor(std::make_unique(log->identifier(), exec_status, task_pool, max_wait_time_ms, std::move(waiting_rf_list), std::move(ready_rf_list))); + } + } + }); +} } // namespace DB diff --git a/dbms/src/Operators/UnorderedSourceOp.h b/dbms/src/Operators/UnorderedSourceOp.h index eb734072ae4..ea5eb0793ae 100644 --- a/dbms/src/Operators/UnorderedSourceOp.h +++ b/dbms/src/Operators/UnorderedSourceOp.h @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -33,10 +34,14 @@ class UnorderedSourceOp : public SourceOp const DM::SegmentReadTaskPoolPtr & task_pool_, const DM::ColumnDefines & columns_to_read_, int extra_table_id_index_, - const String & req_id) + const String & req_id, + const RuntimeFilteList & runtime_filter_list_ = std::vector{}, + int max_wait_time_ms_ = 0) : SourceOp(exec_status_, req_id) , task_pool(task_pool_) , ref_no(0) + , waiting_rf_list(runtime_filter_list_) + , max_wait_time_ms(max_wait_time_ms_) { setHeader(AddExtraTableIDColumnTransformAction::buildHeader(columns_to_read_, extra_table_id_index_)); ref_no = task_pool->increaseUnorderedInputStreamRefCount(); @@ -56,12 +61,19 @@ class UnorderedSourceOp : public SourceOp IOProfileInfoPtr getIOProfileInfo() const override { return IOProfileInfo::createForLocal(profile_info_ptr); } -protected: - void operatePrefixImpl() override + // only for unit test + // The logic order of unit test is error, it will build source_op firstly and register rf secondly. + // It causes source_op could not get RF list in constructor. + // So, for unit test, it should call this function separated. + void setRuntimeFilterInfo(const RuntimeFilteList & runtime_filter_list_, int max_wait_time_ms_) { - std::call_once(task_pool->addToSchedulerFlag(), [&]() { DM::SegmentReadTaskScheduler::instance().add(task_pool); }); + waiting_rf_list = runtime_filter_list_; + max_wait_time_ms = max_wait_time_ms_; } +protected: + void operatePrefixImpl() override; + OperatorStatus readImpl(Block & block) override; OperatorStatus awaitImpl() override; @@ -69,6 +81,10 @@ class UnorderedSourceOp : public SourceOp DM::SegmentReadTaskPoolPtr task_pool; int64_t ref_no; + // runtime filter + RuntimeFilteList waiting_rf_list; + int max_wait_time_ms; + bool done = false; Block t_block; }; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 0bd90fa4f02..cd23af47bdf 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -1076,6 +1076,8 @@ void DeltaMergeStore::read( size_t num_streams, UInt64 max_version, const PushDownFilterPtr & filter, + const RuntimeFilteList & runtime_filter_list, + const int rf_max_wait_time_ms, const String & tracing_id, bool keep_order, bool is_fast_scan, @@ -1136,7 +1138,9 @@ void DeltaMergeStore::read( read_task_pool, columns_to_read, extra_table_id_index, - log_tracing_id)); + log_tracing_id, + runtime_filter_list, + rf_max_wait_time_ms)); } } else diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index fc273fa344a..c4f0f9fd064 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -382,6 +382,8 @@ class DeltaMergeStore : private boost::noncopyable size_t num_streams, UInt64 max_version, const PushDownFilterPtr & filter, + const RuntimeFilteList & runtime_filter_list, + const int rf_max_wait_time_ms, const String & tracing_id, bool keep_order, bool is_fast_scan = false, diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index a73676d8e52..29aaa15e062 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -946,7 +946,7 @@ BlockInputStreams StorageDeltaMerge::read( return streams; } -RuntimeFilteList StorageDeltaMerge::parseRuntimeFilterList(const SelectQueryInfo & query_info, const Context & db_context) +RuntimeFilteList StorageDeltaMerge::parseRuntimeFilterList(const SelectQueryInfo & query_info, const Context & db_context) const { if (db_context.getDAGContext() == nullptr || query_info.dag_query == nullptr) { @@ -989,6 +989,8 @@ void StorageDeltaMerge::read( auto filter = parsePushDownFilter(query_info, columns_to_read, context, tracing_logger); + auto runtime_filter_list = parseRuntimeFilterList(query_info, context); + const auto & scan_context = mvcc_query_info.scan_context; store->read( @@ -1001,6 +1003,8 @@ void StorageDeltaMerge::read( num_streams, /*max_version=*/mvcc_query_info.read_tso, filter, + runtime_filter_list, + query_info.dag_query == nullptr ? 0 : query_info.dag_query->rf_max_wait_time_ms, query_info.req_id, query_info.keep_order, /* is_fast_scan */ query_info.is_fast_scan, diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h index f3b95e5ff5e..2f165b4116f 100644 --- a/dbms/src/Storages/StorageDeltaMerge.h +++ b/dbms/src/Storages/StorageDeltaMerge.h @@ -259,7 +259,7 @@ class StorageDeltaMerge const String & req_id, const LoggerPtr & tracing_logger); - RuntimeFilteList parseRuntimeFilterList(const SelectQueryInfo & query_info, const Context & db_context); + RuntimeFilteList parseRuntimeFilterList(const SelectQueryInfo & query_info, const Context & db_context) const; #ifndef DBMS_PUBLIC_GTEST private: diff --git a/tests/fullstack-test/mpp/runtime_filter.test b/tests/fullstack-test/mpp/runtime_filter.test new file mode 100644 index 00000000000..da64e46d838 --- /dev/null +++ b/tests/fullstack-test/mpp/runtime_filter.test @@ -0,0 +1,66 @@ +# Copyright 2023 PingCAP, Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +mysql> drop table if exists test.t1; +mysql> create table test.t1(k1 int, t1_tinyint tinyint, t1_smallint smallint, t1_mediumint mediumint, t1_bigint bigint, t1_int int, t1_bit bit, t1_bool bool, t1_float float, t1_double DOUBLE, t1_decimal decimal(10, 2)); +mysql> alter table test.t1 set tiflash replica 1; +mysql> insert into test.t1 values (1,1,1,1,1111111,1, 1, 1, 1.0, 1.00, 1.0); +mysql> insert into test.t1 values (1,1,1,1,1111111,1, 1, 1, 1.0, 1.00, 1.0); +mysql> insert into test.t1 values (2,2,2,2,2222222,2, 0, 2, 2.0, 2.00, 2.0); +mysql> insert into test.t1 values (3,2,2,2,2222222,2, 0, 2, 2.0, 2.00, 2.0); +mysql> insert into test.t1 values (4,null,null,null,null,null, null, null, null, null, null); + +mysql> drop table if exists test.t2; +mysql> create table test.t2(k1 int, t2_tinyint tinyint, t2_smallint smallint, t2_mediumint mediumint, t2_bigint bigint, t2_int int, t2_bit bit, t2_bool bool, t2_float float, t2_double DOUBLE, t2_decimal decimal(10, 2)); +mysql> alter table test.t2 set tiflash replica 1; +mysql> insert into test.t2 values (1,1,1,1,1111111,1, 1, 1, 1.0, 1.00, 1.0); +mysql> insert into test.t2 values (1,1,1,1,1111111,1, 1, 1, 1.0, 1.00, 1.0); +mysql> insert into test.t2 values (2,3,3,3,3333333,3, 1, 3, 3.0, 3.00, 3.0); +mysql> insert into test.t2 values (3,3,3,3,3333333,3, 1, 3, 3.0, 3.00, 3.0); +mysql> insert into test.t2 values (4,null,null,null,null,null, null, null, null, null, null); + +func> wait_table test t1 +func> wait_table test t2 + +# inner join +mysql> set @@tidb_isolation_read_engines='tiflash'; set tidb_enforce_mpp = 1; set tidb_runtime_filter_mode="LOCAL"; select t1_tinyint, t2_tinyint from test.t1, test.t2 where t1.t1_tinyint=t2.t2_tinyint; ++------------+------------+ +| t1_tinyint | t2_tinyint | ++------------+------------+ +| 1 | 1 | +| 1 | 1 | +| 1 | 1 | +| 1 | 1 | ++------------+------------+ +mysql> set @@tidb_isolation_read_engines='tiflash'; set tidb_enforce_mpp = 1; set tidb_runtime_filter_mode="LOCAL"; select /*+ hash_join_build(test.t1) */ t1_tinyint, t2_tinyint from test.t1, test.t2 where t1.t1_tinyint=t2.t2_tinyint; ++------------+------------+ +| t1_tinyint | t2_tinyint | ++------------+------------+ +| 1 | 1 | +| 1 | 1 | +| 1 | 1 | +| 1 | 1 | ++------------+------------+ + +# semi join +mysql> set @@tidb_isolation_read_engines='tiflash'; set tidb_enforce_mpp = 1; set tidb_runtime_filter_mode="LOCAL"; select t1_tinyint from test.t1 where t1.t1_tinyint in (select t2_tinyint from test.t2 where t1_bigint=t2_bigint); ++------------+ +| t1_tinyint | ++------------+ +| 1 | +| 1 | ++------------+ + +mysql> drop table test.t1; +mysql> drop table test.t2;