From 91cb3acea275894d60b308c93df654c2fdf44f5b Mon Sep 17 00:00:00 2001 From: Lin Ma Date: Sat, 8 May 2021 21:40:44 -0400 Subject: [PATCH] Augmenting Pilot with Interference Model and OU Feature Generation from the Stats (instead of Executing Queries) (#1574) --- Jenkinsfile-utils.groovy | 2 +- script/self_driving/forecasting/forecaster.py | 1 + .../forecasting/forecaster_standalone.py | 4 +- .../modeling/interference_model_trainer.py | 2 +- script/testing/self_driving/forecast.py | 7 +- .../compiler/compilation_context.cpp | 25 +- src/execution/compiler/executable_query.cpp | 4 +- .../execution/compiler/compilation_context.h | 26 +- .../execution/compiler/executable_query.h | 7 - .../execution/exec/execution_settings.h | 10 + src/include/execution/exec_defs.h | 2 + src/include/main/db_main.h | 17 +- src/include/metrics/metrics_manager.h | 4 +- src/include/network/postgres/statement.h | 9 - src/include/optimizer/group.h | 54 +++- src/include/optimizer/index_util.h | 6 + src/include/optimizer/statistics/histogram.h | 2 +- .../optimizer/statistics/stats_calculator.h | 24 +- .../optimizer/statistics/top_k_elements.h | 2 +- .../planner/plannodes/plan_meta_data.h | 32 +- .../forecasting/workload_forecast.h | 4 +- .../self_driving/modeling/operating_unit.h | 9 + .../modeling/operating_unit_recorder.h | 13 +- .../planning/mcts/monte_carlo_tree_search.h | 7 +- .../self_driving/planning/mcts/tree_node.h | 7 +- src/include/self_driving/planning/pilot.h | 30 +- .../self_driving/planning/pilot_util.h | 70 ++++- src/include/settings/settings_defs.h | 24 +- src/include/task/task.h | 8 +- src/include/util/query_exec_util.h | 22 +- src/optimizer/optimizer.cpp | 3 +- src/optimizer/statistics/selectivity_util.cpp | 3 + src/optimizer/statistics/stats_calculator.cpp | 78 +++-- .../forecasting/workload_forecast.cpp | 4 +- .../modeling/operating_unit_recorder.cpp | 213 +++++++++----- .../planning/mcts/monte_carlo_tree_search.cpp | 8 +- src/self_driving/planning/mcts/tree_node.cpp | 26 +- src/self_driving/planning/pilot.cpp | 75 ++--- src/self_driving/planning/pilot_util.cpp | 278 +++++++++++++----- src/task/task.cpp | 15 +- src/traffic_cop/traffic_cop.cpp | 3 +- src/util/query_exec_util.cpp | 52 ++-- .../index_nested_loops_join_test.cpp | 22 +- .../self_driving/query_trace_logging_test.cpp | 7 +- test/sql/analyze_test.cpp | 20 +- test/task/task_manager_test.cpp | 4 +- 46 files changed, 843 insertions(+), 402 deletions(-) diff --git a/Jenkinsfile-utils.groovy b/Jenkinsfile-utils.groovy index cac7b0feb9..dd2c33afa7 100644 --- a/Jenkinsfile-utils.groovy +++ b/Jenkinsfile-utils.groovy @@ -235,7 +235,7 @@ void stageModeling() { // This script runs TPC-C with pipeline metrics enabled, saving to build/concurrent_runner_input/pipeline.csv. sh script :''' cd build - PYTHONPATH=.. python3 -m script.self_driving.forecasting.forecaster_standalone --generate_data --record_pipeline_metrics --pattern_iter=1 + PYTHONPATH=.. python3 -m script.self_driving.forecasting.forecaster_standalone --generate_data --record_pipeline_metrics_with_counters --pattern_iter=1 mkdir concurrent_runner_input mv pipeline.csv concurrent_runner_input ''', label: 'Interference model training data generation' diff --git a/script/self_driving/forecasting/forecaster.py b/script/self_driving/forecasting/forecaster.py index 381f633489..068b6ab4c4 100644 --- a/script/self_driving/forecasting/forecaster.py +++ b/script/self_driving/forecasting/forecaster.py @@ -202,6 +202,7 @@ def predict(self, cid: int, model: ForecastModel) -> Dict: """ test_seqs = self._cluster_seqs(cid, test_mode=True, with_label=False) preds = list([model.predict(seq) for seq in test_seqs]) + logging.info(f"Inference preds: {preds}") query_preds = self._clusters[cid].segregate(preds) return query_preds diff --git a/script/self_driving/forecasting/forecaster_standalone.py b/script/self_driving/forecasting/forecaster_standalone.py index 8cf7dbed5e..ca6d7ecee2 100755 --- a/script/self_driving/forecasting/forecaster_standalone.py +++ b/script/self_driving/forecasting/forecaster_standalone.py @@ -52,7 +52,7 @@ action="store_true", help="If specified, OLTP benchmark would be downloaded and built to generate the query trace data") argp.add_argument( - "--record_pipeline_metrics", + "--record_pipeline_metrics_with_counters", default=False, action="store_true", help="If specified, the database records the pipeline metrics data instead of the query trace data") @@ -125,7 +125,7 @@ tpcc_weight=args.tpcc_weight, tpcc_rates=args.tpcc_rates, pattern_iter=args.pattern_iter, - record_pipeline_metrics=args.record_pipeline_metrics) + record_pipeline_metrics_with_counters=args.record_pipeline_metrics_with_counters) elif args.test_file is None: # Parse models arguments models_kwargs = parse_model_config(args.models, args.models_config) diff --git a/script/self_driving/modeling/interference_model_trainer.py b/script/self_driving/modeling/interference_model_trainer.py index 66e01ea1bb..c7c9eeab77 100644 --- a/script/self_driving/modeling/interference_model_trainer.py +++ b/script/self_driving/modeling/interference_model_trainer.py @@ -181,7 +181,7 @@ def _train_model_with_derived_data(self, impact_data_list, model_name): # x.append(np.concatenate((ou_model_y_pred[-1] / predicted_elapsed_us, # predicted_resource_util, # d.resource_util_same_core_x))) - x.append(np.concatenate((ou_model_y_pred[-1] / predicted_elapsed_us, predicted_resource_util))) + x.append(np.concatenate((ou_model_y_pred[-1] / (predicted_elapsed_us + epsilon), predicted_resource_util))) raw_y.append(d.target_grouped_op_unit_data.y) y.append(raw_y[-1] / (ou_model_y_pred[-1] + epsilon)) # Do not adjust memory consumption since it shouldn't change diff --git a/script/testing/self_driving/forecast.py b/script/testing/self_driving/forecast.py index a03e443bb0..8aa239722d 100644 --- a/script/testing/self_driving/forecast.py +++ b/script/testing/self_driving/forecast.py @@ -55,7 +55,7 @@ def config_forecast_data(xml_config_file: str, rate_pattern: List[int]) -> None: def gen_oltp_trace( - tpcc_weight: str, tpcc_rates: List[int], pattern_iter: int, record_pipeline_metrics: bool) -> bool: + tpcc_weight: str, tpcc_rates: List[int], pattern_iter: int, record_pipeline_metrics_with_counters: bool) -> bool: """ Generate the trace by running OLTPBench's TPCC benchmark on the built DBMS. @@ -67,7 +67,7 @@ def gen_oltp_trace( The arrival rates for each phase in a pattern. pattern_iter : int The number of patterns. - record_pipeline_metrics : bool + record_pipeline_metrics_with_counters : bool Record the pipeline metrics instead of query traces Returns @@ -94,11 +94,12 @@ def gen_oltp_trace( rates = tpcc_rates * pattern_iter config_forecast_data(test_case.xml_config, rates) - if record_pipeline_metrics: + if record_pipeline_metrics_with_counters: # Turn on pipeline metrics recording db_server.execute("SET pipeline_metrics_enable='true'", expect_result=False) db_server.execute("SET pipeline_metrics_sample_rate={}".format(DEFAULT_PIPELINE_METRICS_SAMPLE_RATE), expect_result=False) + db_server.execute("SET counters_enable='true'", expect_result=False) result_file = DEFAULT_PIPELINE_METRICS_FILE else: # Turn on query trace metrics tracing diff --git a/src/execution/compiler/compilation_context.cpp b/src/execution/compiler/compilation_context.cpp index 7a30e15de8..d2ae44f5bf 100644 --- a/src/execution/compiler/compilation_context.cpp +++ b/src/execution/compiler/compilation_context.cpp @@ -64,6 +64,7 @@ #include "planner/plannodes/limit_plan_node.h" #include "planner/plannodes/nested_loop_join_plan_node.h" #include "planner/plannodes/order_by_plan_node.h" +#include "planner/plannodes/plan_meta_data.h" #include "planner/plannodes/projection_plan_node.h" #include "planner/plannodes/seq_scan_plan_node.h" #include "planner/plannodes/set_op_plan_node.h" @@ -78,9 +79,10 @@ namespace { std::atomic unique_ids{0}; } // namespace -CompilationContext::CompilationContext(ExecutableQuery *query, catalog::CatalogAccessor *accessor, +CompilationContext::CompilationContext(ExecutableQuery *query, query_id_t query_id, catalog::CatalogAccessor *accessor, const CompilationMode mode, const exec::ExecutionSettings &settings) : unique_id_(unique_ids++), + query_id_(query_id), query_(query), mode_(mode), codegen_(query_->GetContext(), accessor), @@ -118,7 +120,8 @@ ast::FunctionDecl *CompilationContext::GenerateTearDownFunction() { return builder.Finish(); } -void CompilationContext::GeneratePlan(const planner::AbstractPlanNode &plan) { +void CompilationContext::GeneratePlan(const planner::AbstractPlanNode &plan, + common::ManagedPointer plan_meta_data) { exec_ctx_ = query_state_.DeclareStateEntry(GetCodeGen(), "execCtx", codegen_.PointerType(ast::BuiltinType::ExecutionContext)); @@ -160,7 +163,7 @@ void CompilationContext::GeneratePlan(const planner::AbstractPlanNode &plan) { // Therefore translator extraction must happen before pipelines are generated. selfdriving::OperatingUnitRecorder recorder(common::ManagedPointer(codegen_.GetCatalogAccessor()), common::ManagedPointer(codegen_.GetAstContext()), - common::ManagedPointer(pipeline), query_->GetQueryText()); + common::ManagedPointer(pipeline), plan_meta_data); auto features = recorder.RecordTranslators(pipeline->GetTranslators()); codegen_.GetPipelineOperatingUnits()->RecordOperatingUnit(pipeline->GetPipelineId(), std::move(features)); @@ -188,23 +191,19 @@ void CompilationContext::GeneratePlan(const planner::AbstractPlanNode &plan) { } // static -std::unique_ptr CompilationContext::Compile(const planner::AbstractPlanNode &plan, - const exec::ExecutionSettings &exec_settings, - catalog::CatalogAccessor *accessor, - const CompilationMode mode, - std::optional override_qid, - common::ManagedPointer query_text) { +std::unique_ptr CompilationContext::Compile( + const planner::AbstractPlanNode &plan, const exec::ExecutionSettings &exec_settings, + catalog::CatalogAccessor *accessor, const CompilationMode mode, std::optional override_qid, + common::ManagedPointer plan_meta_data) { // The query we're generating code for. auto query = std::make_unique(plan, exec_settings); if (override_qid.has_value()) { query->SetQueryId(override_qid.value()); } - // TODO(Lin): Hacking... remove this after getting the counters in - query->SetQueryText(query_text); // Generate the plan for the query - CompilationContext ctx(query.get(), accessor, mode, exec_settings); - ctx.GeneratePlan(plan); + CompilationContext ctx(query.get(), query->GetQueryId(), accessor, mode, exec_settings); + ctx.GeneratePlan(plan, plan_meta_data); // Done return query; diff --git a/src/execution/compiler/executable_query.cpp b/src/execution/compiler/executable_query.cpp index 75c103504e..967d7c03a4 100644 --- a/src/execution/compiler/executable_query.cpp +++ b/src/execution/compiler/executable_query.cpp @@ -4,7 +4,6 @@ #include "common/error/error_code.h" #include "common/error/exception.h" -#include "execution/ast/ast_dump.h" #include "execution/ast/context.h" #include "execution/compiler/compiler.h" #include "execution/exec/execution_context.h" @@ -71,7 +70,8 @@ std::string GetFileName(const std::string &path) { } } // namespace -std::atomic ExecutableQuery::query_identifier{0}; +// We use 0 to represent NULL_QUERY_ID so the query id starts from 1. +std::atomic ExecutableQuery::query_identifier{1}; void ExecutableQuery::SetPipelineOperatingUnits(std::unique_ptr &&units) { pipeline_operating_units_ = std::move(units); diff --git a/src/include/execution/compiler/compilation_context.h b/src/include/execution/compiler/compilation_context.h index 97f94145ce..2e00a70f67 100644 --- a/src/include/execution/compiler/compilation_context.h +++ b/src/include/execution/compiler/compilation_context.h @@ -16,6 +16,7 @@ class AbstractExpression; namespace noisepage::planner { class AbstractPlanNode; +class PlanMetaData; } // namespace noisepage::planner namespace noisepage::execution::compiler { @@ -49,14 +50,13 @@ class CompilationContext { * @param accessor The catalog accessor to use for compilation. * @param mode The compilation mode. * @param override_qid Optional indicating how to override the plan's query id - * @param query_text The SQL query string (temporary) + * @param plan_meta_data Query plan meta data (stores cardinality information) */ - static std::unique_ptr Compile(const planner::AbstractPlanNode &plan, - const exec::ExecutionSettings &exec_settings, - catalog::CatalogAccessor *accessor, - CompilationMode mode = CompilationMode::Interleaved, - std::optional override_qid = std::nullopt, - common::ManagedPointer query_text = nullptr); + static std::unique_ptr Compile( + const planner::AbstractPlanNode &plan, const exec::ExecutionSettings &exec_settings, + catalog::CatalogAccessor *accessor, CompilationMode mode = CompilationMode::Interleaved, + std::optional override_qid = std::nullopt, + common::ManagedPointer plan_meta_data = nullptr); /** * Register a pipeline in this context. @@ -127,15 +127,16 @@ class CompilationContext { bool IsPipelineMetricsEnabled() const { return pipeline_metrics_enabled_; } /** @return Query Id associated with the query */ - query_id_t GetQueryId() const { return query_id_t{unique_id_}; } + query_id_t GetQueryId() const { return query_id_; } private: // Private to force use of static Compile() function. - explicit CompilationContext(ExecutableQuery *query, catalog::CatalogAccessor *accessor, CompilationMode mode, - const exec::ExecutionSettings &exec_settings); + explicit CompilationContext(ExecutableQuery *query, query_id_t query_id_, catalog::CatalogAccessor *accessor, + CompilationMode mode, const exec::ExecutionSettings &exec_settings); // Given a plan node, compile it into a compiled query object. - void GeneratePlan(const planner::AbstractPlanNode &plan); + void GeneratePlan(const planner::AbstractPlanNode &plan, + common::ManagedPointer plan_meta_data); // Generate the query initialization function. ast::FunctionDecl *GenerateInitFunction(); @@ -150,6 +151,9 @@ class CompilationContext { // Unique ID used as a prefix for all generated functions to ensure uniqueness. uint32_t unique_id_; + // Query ID generated from ExecutableQuery or overridden specifically + query_id_t query_id_; + // The compiled query object we'll update. ExecutableQuery *query_; diff --git a/src/include/execution/compiler/executable_query.h b/src/include/execution/compiler/executable_query.h index c3f5ecda99..d4346e6448 100644 --- a/src/include/execution/compiler/executable_query.h +++ b/src/include/execution/compiler/executable_query.h @@ -157,12 +157,6 @@ class ExecutableQuery { /** @return The Query Identifier */ query_id_t GetQueryId() { return query_id_; } - /** @param query_text The SQL string for this query */ - void SetQueryText(common::ManagedPointer query_text) { query_text_ = query_text; } - - /** @return The SQL query string */ - common::ManagedPointer GetQueryText() { return query_text_; } - private: // The plan. const planner::AbstractPlanNode &plan_; @@ -202,7 +196,6 @@ class ExecutableQuery { std::string query_name_; query_id_t query_id_; static std::atomic query_identifier; - common::ManagedPointer query_text_; // MiniRunners needs to set query_identifier and pipeline_operating_units_. friend class noisepage::runner::ExecutionRunners; diff --git a/src/include/execution/exec/execution_settings.h b/src/include/execution/exec/execution_settings.h index 81dc464cc9..c48b73e7e7 100644 --- a/src/include/execution/exec/execution_settings.h +++ b/src/include/execution/exec/execution_settings.h @@ -29,6 +29,14 @@ namespace noisepage::tpch { class Workload; } // namespace noisepage::tpch +namespace noisepage::selfdriving { +class PilotUtil; +} // namespace noisepage::selfdriving + +namespace noisepage::task { +class TaskDML; +} // namespace noisepage::task + namespace noisepage::execution::exec { /** * ExecutionSettings stores settings that are passed down from the upper layers. @@ -94,5 +102,7 @@ class EXPORT ExecutionSettings { friend class noisepage::optimizer::IdxJoinTest_FooOnlyScan_Test; friend class noisepage::optimizer::IdxJoinTest_BarOnlyScan_Test; friend class noisepage::optimizer::IdxJoinTest_IndexToIndexJoin_Test; + friend class noisepage::task::TaskDML; + friend class noisepage::selfdriving::PilotUtil; }; } // namespace noisepage::execution::exec diff --git a/src/include/execution/exec_defs.h b/src/include/execution/exec_defs.h index 6d3f5a81f1..0852d48372 100644 --- a/src/include/execution/exec_defs.h +++ b/src/include/execution/exec_defs.h @@ -5,6 +5,7 @@ namespace noisepage::execution { constexpr uint32_t NULL_PIPELINE_ID = 0; +constexpr uint32_t NULL_QUERY_ID = 0; STRONG_TYPEDEF_HEADER(query_id_t, uint32_t); STRONG_TYPEDEF_HEADER(pipeline_id_t, uint32_t); @@ -12,5 +13,6 @@ STRONG_TYPEDEF_HEADER(feature_id_t, uint32_t); STRONG_TYPEDEF_HEADER(translator_id_t, uint32_t); constexpr pipeline_id_t INVALID_PIPELINE_ID = pipeline_id_t(NULL_PIPELINE_ID); +constexpr query_id_t INVALID_QUERY_ID = query_id_t(NULL_QUERY_ID); } // namespace noisepage::execution diff --git a/src/include/main/db_main.h b/src/include/main/db_main.h index c0e8bc6dc8..8a8e8de8a6 100644 --- a/src/include/main/db_main.h +++ b/src/include/main/db_main.h @@ -540,11 +540,12 @@ class DBMain { query_exec_util ? util::QueryExecUtil::ConstructThreadLocal(common::ManagedPointer(query_exec_util)) : nullptr; pilot = std::make_unique( - model_save_path_, forecast_model_save_path_, common::ManagedPointer(catalog_layer->GetCatalog()), - common::ManagedPointer(metrics_thread), common::ManagedPointer(model_server_manager), - common::ManagedPointer(settings_manager), common::ManagedPointer(stats_storage), - common::ManagedPointer(txn_layer->GetTransactionManager()), std::move(util), - common::ManagedPointer(task_manager), workload_forecast_interval_, sequence_length_, horizon_length_); + ou_model_save_path_, interference_model_save_path_, forecast_model_save_path_, + common::ManagedPointer(catalog_layer->GetCatalog()), common::ManagedPointer(metrics_thread), + common::ManagedPointer(model_server_manager), common::ManagedPointer(settings_manager), + common::ManagedPointer(stats_storage), common::ManagedPointer(txn_layer->GetTransactionManager()), + std::move(util), common::ManagedPointer(task_manager), workload_forecast_interval_, sequence_length_, + horizon_length_); pilot_thread = std::make_unique( common::ManagedPointer(pilot), std::chrono::microseconds{pilot_interval_}, std::chrono::microseconds{forecast_train_interval_}, pilot_planning_); @@ -939,7 +940,8 @@ class DBMain { uint64_t forecast_sample_limit_ = 5; std::string wal_file_path_ = "wal.log"; - std::string model_save_path_; + std::string ou_model_save_path_; + std::string interference_model_save_path_; std::string forecast_model_save_path_; std::string bytecode_handlers_path_ = "./bytecode_handlers_ir.bc"; std::string network_identity_ = "primary"; @@ -1036,7 +1038,8 @@ class DBMain { workload_forecast_interval_ = settings_manager->GetInt64(settings::Param::workload_forecast_interval); sequence_length_ = settings_manager->GetInt64(settings::Param::sequence_length); horizon_length_ = settings_manager->GetInt64(settings::Param::horizon_length); - model_save_path_ = settings_manager->GetString(settings::Param::model_save_path); + ou_model_save_path_ = settings_manager->GetString(settings::Param::ou_model_save_path); + interference_model_save_path_ = settings_manager->GetString(settings::Param::interference_model_save_path); forecast_model_save_path_ = settings_manager->GetString(settings::Param::forecast_model_save_path); task_pool_size_ = settings_manager->GetInt(settings::Param::task_pool_size); diff --git a/src/include/metrics/metrics_manager.h b/src/include/metrics/metrics_manager.h index 126bb6a76f..08529aae4f 100644 --- a/src/include/metrics/metrics_manager.h +++ b/src/include/metrics/metrics_manager.h @@ -51,9 +51,7 @@ class MetricsManager { /** * @return the MetricsManager's aggregated metrics. Currently used in tests */ - const std::array, NUM_COMPONENTS> &AggregatedMetrics() const { - return aggregated_metrics_; - } + std::array, NUM_COMPONENTS> &AggregatedMetrics() { return aggregated_metrics_; } /** * @param component to be tested diff --git a/src/include/network/postgres/statement.h b/src/include/network/postgres/statement.h index c5f86ccfa2..bf3d913dee 100644 --- a/src/include/network/postgres/statement.h +++ b/src/include/network/postgres/statement.h @@ -102,15 +102,6 @@ class Statement { optimize_result_ = std::move(optimize_result); } - /** - * @param physical_plan physical plan to take ownership of - */ - void SetPhysicalPlan(std::unique_ptr &&physical_plan) { - if (!optimize_result_) { - optimize_result_ = std::make_unique(); - } - optimize_result_->SetPlanNode(std::move(physical_plan)); - } /** * @param executable_query executable query to take ownership of */ diff --git a/src/include/optimizer/group.h b/src/include/optimizer/group.h index aea2c836ca..79a006f693 100644 --- a/src/include/optimizer/group.h +++ b/src/include/optimizer/group.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -26,6 +27,9 @@ class GroupExpression; */ class Group { public: + /** Value for uninitialized stats (cardinality and number of rows in table) */ + static constexpr size_t UNINITIALIZED_NUM_ROWS = std::numeric_limits::max(); + /** * Constructor for a group * @param id ID of the Group @@ -121,13 +125,45 @@ class Group { * Sets Number of rows * @param num_rows Number of rows */ - void SetNumRows(int num_rows) { num_rows_ = num_rows; } + void SetNumRows(size_t num_rows) { num_rows_ = num_rows; } /** * Gets the estimated cardinality in # rows * @returns # rows estimated */ - int GetNumRows() { return num_rows_; } + size_t GetNumRows() const { return num_rows_; } + + /** + * Set the number of rows in the base table to scan + * @param table_num_rows Number of rows + */ + void SetTableNumRows(size_t table_num_rows) { table_num_rows_ = table_num_rows; } + + /** + * Gets the number of rows in the base table to scan + * @returns number of rows in table + */ + size_t GetTableNumRows() const { return table_num_rows_; } + + /** + * Add the selectivity of a filter column (multiply selectivities for the same column, assuming conjunction AND) + * @param column_id column ID + * @param selectivity estimated selectivity + */ + void AddFilterColumnSelectivity(catalog::col_oid_t column_id, double selectivity) { + if (filter_column_selectivities_.find(column_id) == filter_column_selectivities_.end()) + filter_column_selectivities_[column_id] = selectivity; + else + filter_column_selectivities_[column_id] *= selectivity; + } + + /** + * Get the selectivities for filter columns (selectivities multiplied for the same column, assuming conjunction AND) + * @returns estimated selectivities + */ + const std::unordered_map &GetFilterColumnSelectivities() const { + return filter_column_selectivities_; + } /** * Checks if num rows is initialized @@ -194,17 +230,25 @@ class Group { */ std::vector enforced_exprs_; - static constexpr int UNINITIALIZED_NUM_ROWS = -1; - /** * Number of rows */ - int num_rows_ = UNINITIALIZED_NUM_ROWS; + size_t num_rows_ = UNINITIALIZED_NUM_ROWS; + + /** + * Number of rows in the base table (for LogicalGet) + */ + size_t table_num_rows_ = UNINITIALIZED_NUM_ROWS; /** * Cost Lower Bound */ double cost_lower_bound_ = -1; + + /** + * Map from a column ID in the filter to the selectivity on that column + */ + std::unordered_map filter_column_selectivities_; }; } // namespace noisepage::optimizer diff --git a/src/include/optimizer/index_util.h b/src/include/optimizer/index_util.h index ddeaa0c20d..382e2fe9a1 100644 --- a/src/include/optimizer/index_util.h +++ b/src/include/optimizer/index_util.h @@ -19,6 +19,10 @@ namespace noisepage::parser { class AbstractExpression; } // namespace noisepage::parser +namespace noisepage::selfdriving { +class OperatingUnitRecorder; +} + namespace noisepage::optimizer { /** @@ -85,6 +89,8 @@ class IndexUtil { std::unordered_map> *bounds); private: + friend class selfdriving::OperatingUnitRecorder; + /** * Check whether predicate can take part in index computation * @param schema Index Schema diff --git a/src/include/optimizer/statistics/histogram.h b/src/include/optimizer/statistics/histogram.h index 16edd9fb52..210bd2b010 100644 --- a/src/include/optimizer/statistics/histogram.h +++ b/src/include/optimizer/statistics/histogram.h @@ -33,7 +33,7 @@ namespace noisepage::optimizer { */ template class Histogram { - static constexpr uint8_t DEFAULT_MAX_BINS = 64; + static constexpr uint8_t DEFAULT_MAX_BINS = 255; public: /** diff --git a/src/include/optimizer/statistics/stats_calculator.h b/src/include/optimizer/statistics/stats_calculator.h index 305f84e688..5930907edc 100644 --- a/src/include/optimizer/statistics/stats_calculator.h +++ b/src/include/optimizer/statistics/stats_calculator.h @@ -62,24 +62,44 @@ class StatsCalculator : public OperatorVisitor { */ void Visit(const LogicalLimit *op) override; + /** + * Visit a LogicalInsert + * @param op Operator being visited + */ + void Visit(const LogicalInsert *op) override; + + /** + * Visit a LogicalUpdate + * @param op Operator being visited + */ + void Visit(const LogicalUpdate *op) override; + + /** + * Visit a LogicalDelete + * @param op Operator being visited + */ + void Visit(const LogicalDelete *op) override; + private: /** * Return estimated cardinality for a filter + * @param group The Group to estimated the cardinality for * @param num_rows Number of rows of base table * @param predicate_stats The stats for columns in the expression * @param predicates conjunction predicates * @returns Estimated cardinality */ - size_t EstimateCardinalityForFilter(size_t num_rows, const TableStats &predicate_stats, + size_t EstimateCardinalityForFilter(Group *group, size_t num_rows, const TableStats &predicate_stats, const std::vector &predicates); /** * Calculates selectivity for predicate + * @param group The Group to calculate selectivity for * @param predicate_table_stats Table Statistics * @param expr Predicate * @returns selectivity estimate */ - double CalculateSelectivityForPredicate(const TableStats &predicate_table_stats, + double CalculateSelectivityForPredicate(Group *group, const TableStats &predicate_table_stats, common::ManagedPointer expr); /** diff --git a/src/include/optimizer/statistics/top_k_elements.h b/src/include/optimizer/statistics/top_k_elements.h index c33c45c7ec..3ac948b739 100644 --- a/src/include/optimizer/statistics/top_k_elements.h +++ b/src/include/optimizer/statistics/top_k_elements.h @@ -35,7 +35,7 @@ class TopKElements { */ using KeyCountPair = std::pair; static constexpr size_t DEFAULT_K = 16; - static constexpr uint64_t DEFAULT_WIDTH = 64; + static constexpr uint64_t DEFAULT_WIDTH = 1024; public: /** diff --git a/src/include/planner/plannodes/plan_meta_data.h b/src/include/planner/plannodes/plan_meta_data.h index b35564f5f0..456a3c4c97 100644 --- a/src/include/planner/plannodes/plan_meta_data.h +++ b/src/include/planner/plannodes/plan_meta_data.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include "planner/plannodes/plan_node_defs.h" @@ -21,16 +22,41 @@ class PlanMetaData { /** * Construct a PlanNodeMetaData with a cardinality value * @param cardinality output cardinality of the plan node + * @param table_num_rows number of rows in the base table (for sequential or index scans) + * @param filter_column_selectivities maps from column id to the selectivity on that column (multiplied + * with duplicates) */ - explicit PlanNodeMetaData(int cardinality) : cardinality_(cardinality) {} + explicit PlanNodeMetaData(size_t cardinality, size_t table_num_rows, + std::unordered_map filter_column_selectivities) + : cardinality_(cardinality), + table_num_rows_(table_num_rows), + filter_column_selectivities_(std::move(filter_column_selectivities)) {} /** * @return the output cardinality */ - int GetCardinality() { return cardinality_; } + size_t GetCardinality() const { return cardinality_; } + + /** + * @return the number of rows in the table to scan + */ + size_t GetTableNumRows() const { return table_num_rows_; } + + /** + * @return the number of rows in the table to scan + */ + double GetFilterColumnSelectivity(catalog::col_oid_t col_oid) const { + if (filter_column_selectivities_.find(col_oid) == filter_column_selectivities_.end()) + // Does not filter on this column + return 1; + + return filter_column_selectivities_.at(col_oid); + } private: - int cardinality_ = -1; + size_t cardinality_; + size_t table_num_rows_; + std::unordered_map filter_column_selectivities_; }; /** diff --git a/src/include/self_driving/forecasting/workload_forecast.h b/src/include/self_driving/forecasting/workload_forecast.h index d8655d0a2c..565b87cfe9 100644 --- a/src/include/self_driving/forecasting/workload_forecast.h +++ b/src/include/self_driving/forecasting/workload_forecast.h @@ -64,9 +64,11 @@ class WorkloadForecast { /** * Constructor for WorkloadForecast from on-disk inference results * @param inference Workload inference + * @param forecast_interval Interval used to partition the queries into segments * @param num_sample Number of samples for query parameters */ - explicit WorkloadForecast(const WorkloadForecastPrediction &inference, uint64_t num_sample); + explicit WorkloadForecast(const WorkloadForecastPrediction &inference, uint64_t forecast_interval, + uint64_t num_sample); /** * Get number of forecasted segments diff --git a/src/include/self_driving/modeling/operating_unit.h b/src/include/self_driving/modeling/operating_unit.h index a16f72ffaa..f0f825be10 100644 --- a/src/include/self_driving/modeling/operating_unit.h +++ b/src/include/self_driving/modeling/operating_unit.h @@ -406,6 +406,15 @@ class PipelineOperatingUnits { return itr->second; } + /** + * Gets the map from pipeline id to features + * @return the pipeline feature map + */ + const std::unordered_map &GetPipelineFeatureMap() + const { + return units_; + } + /** * Checks whether a certain pipeline exists * @param pipeline Pipeline Identifier diff --git a/src/include/self_driving/modeling/operating_unit_recorder.h b/src/include/self_driving/modeling/operating_unit_recorder.h index b1082de6cf..5fcdb01391 100644 --- a/src/include/self_driving/modeling/operating_unit_recorder.h +++ b/src/include/self_driving/modeling/operating_unit_recorder.h @@ -36,6 +36,7 @@ namespace noisepage::planner { class AbstractPlanNode; class AbstractJoinPlanNode; class AbstractScanPlanNode; +class PlanMetaData; } // namespace noisepage::planner namespace noisepage::selfdriving { @@ -51,13 +52,13 @@ class OperatingUnitRecorder : planner::PlanVisitor { * @param accessor CatalogAccessor * @param ast_ctx AstContext * @param pipeline Current pipeline, used to figure out if a given translator is Build or Probe. - * @param query_text The SQL query string + * @param plan_meta_data The plan meta data that stores the stats */ explicit OperatingUnitRecorder(common::ManagedPointer accessor, common::ManagedPointer ast_ctx, common::ManagedPointer pipeline, - common::ManagedPointer query_text) - : accessor_(accessor), ast_ctx_(ast_ctx), current_pipeline_(pipeline) {} + common::ManagedPointer plan_meta_data) + : accessor_(accessor), ast_ctx_(ast_ctx), current_pipeline_(pipeline), plan_meta_data_(plan_meta_data) {} /** * Extracts features from OperatorTranslators @@ -114,9 +115,10 @@ class OperatingUnitRecorder : planner::PlanVisitor { * that requires modifying any system index. * * @param index_oids Index OIDs to record operations for. + * @param table_oid Table OID that the indexes belong to. */ template - void RecordIndexOperations(const std::vector &index_oids); + void RecordIndexOperations(const std::vector &index_oids, catalog::table_oid_t table_oid); template void RecordAggregateTranslator(common::ManagedPointer translator, const planner::AggregatePlanNode *plan); @@ -237,6 +239,9 @@ class OperatingUnitRecorder : planner::PlanVisitor { /** Pipeline, used to figure out if current translator is Build or Probe. */ common::ManagedPointer current_pipeline_; + + /** Query plan meta data */ + common::ManagedPointer plan_meta_data_; }; } // namespace noisepage::selfdriving diff --git a/src/include/self_driving/planning/mcts/monte_carlo_tree_search.h b/src/include/self_driving/planning/mcts/monte_carlo_tree_search.h index 4dc61a0de3..2c5cbba6ce 100644 --- a/src/include/self_driving/planning/mcts/monte_carlo_tree_search.h +++ b/src/include/self_driving/planning/mcts/monte_carlo_tree_search.h @@ -35,11 +35,12 @@ class MonteCarloTreeSearch { /** * Returns query string of the best action to take at the root of the current tree * @param simulation_number number of simulations to run - * @param best_action_seq - * @return query string of the best first action as well as the associated database oid + * @param best_action_seq storing output: query string of the best first action as well as the associated database oid + * @param memory_constraint maximum allowed memory in bytes */ void BestAction(uint64_t simulation_number, - std::vector> *best_action_seq); + std::vector> *best_action_seq, + uint64_t memory_constraint); private: const common::ManagedPointer pilot_; diff --git a/src/include/self_driving/planning/mcts/tree_node.h b/src/include/self_driving/planning/mcts/tree_node.h index 3fc242078a..deb9d72f71 100644 --- a/src/include/self_driving/planning/mcts/tree_node.h +++ b/src/include/self_driving/planning/mcts/tree_node.h @@ -31,9 +31,10 @@ class TreeNode { * @param current_segment_cost cost of executing current segment with actions applied on path from root to current * node * @param later_segments_cost cost of later segments when actions applied on path from root to current node + * @param memory memory consumption at the current node in bytes */ TreeNode(common::ManagedPointer parent, action_id_t current_action, double current_segment_cost, - double later_segments_cost); + double later_segments_cost, uint64_t memory); /** * @return action id at node with least cost @@ -62,11 +63,12 @@ class TreeNode { * @param tree_end_segment_index end_segment_index of the search tree * @param action_map action map of the search tree * @param candidate_actions candidate actions of the search tree + * @param memory_constraint maximum allowed memory in bytes */ void ChildrenRollout(common::ManagedPointer pilot, common::ManagedPointer forecast, uint64_t tree_start_segment_index, uint64_t tree_end_segment_index, const std::map> &action_map, - const std::unordered_set &candidate_actions); + const std::unordered_set &candidate_actions, uint64_t memory_constraint); /** * Update the visits number and cost of the node and its ancestors in tree due to expansion of its children, @@ -142,6 +144,7 @@ class TreeNode { uint64_t number_of_visits_; // number of leaf in subtree rooted at node std::vector> children_; double cost_; + uint64_t memory_; }; } // namespace pilot diff --git a/src/include/self_driving/planning/pilot.h b/src/include/self_driving/planning/pilot.h index ea05c8fbc8..a21cd15cf3 100644 --- a/src/include/self_driving/planning/pilot.h +++ b/src/include/self_driving/planning/pilot.h @@ -74,6 +74,14 @@ class Pilot { /** The default timeout for pilot futures. Inferences take milliseconds, but CI is flaky. */ static constexpr std::chrono::seconds FUTURE_TIMEOUT{10}; + /** + * Whether to use "what-if" API during the action search. + * If true, the pilot only create the entries in the catalog for the indexes during the search. And the pilot uses + * the stats to generate OU features. + * If false, the pilot populate the candidate indexes during the search and execute queries to get OU features. + */ + static constexpr bool WHAT_IF = true; + /** Describes how the workload forecast should be initialized */ enum class WorkloadForecastInitMode : uint8_t { /** @@ -97,7 +105,8 @@ class Pilot { /** * Constructor for Pilot - * @param model_save_path model save path + * @param ou_model_save_path OU model save path + * @param interference_model_save_path interference model save path * @param forecast_model_save_path forecast model save path * @param catalog catalog * @param metrics_thread metrics thread for metrics manager @@ -111,7 +120,7 @@ class Pilot { * @param sequence_length Length of a planning sequence * @param horizon_length Length of the planning horizon */ - Pilot(std::string model_save_path, std::string forecast_model_save_path, + Pilot(std::string ou_model_save_path, std::string interference_model_save_path, std::string forecast_model_save_path, common::ManagedPointer catalog, common::ManagedPointer metrics_thread, common::ManagedPointer model_server_manager, common::ManagedPointer settings_manager, @@ -124,7 +133,7 @@ class Pilot { * Get model save path * @return save path of the mini model */ - const std::string &GetModelSavePath() { return model_save_path_; } + const std::string &GetOUModelSavePath() { return ou_model_save_path_; } /** * Get pointer to model server manager @@ -200,14 +209,16 @@ class Pilot { /** * Execute, collect pipeline metrics, and get ou prediction for each pipeline under different query parameters for * queries between start and end segment indices (both inclusive) in workload forecast. - * @param pipeline_to_prediction to be populated, map from a pipeline in forecasted queries to the list of ou - * prediction for different parameters, each ou prediction is a 2D double array * @param start_segment_index start segment index in forecast to be considered * @param end_segment_index end segment index in forecast to be considered + * @param query_info > + * @param segment_to_offset start index of ou records belonging to a segment in input to the interference model + * @param interference_result_matrix stores the final results of the interference model */ - void ExecuteForecast(std::map, - std::vector>>> *pipeline_to_prediction, - uint64_t start_segment_index, uint64_t end_segment_index); + void ExecuteForecast(uint64_t start_segment_index, uint64_t end_segment_index, + std::map> *query_info, + std::map *segment_to_offset, + std::vector> *interference_result_matrix); /** * Computes the valid range of data to be pulling from the internal tables. @@ -217,7 +228,8 @@ class Pilot { */ std::pair ComputeTimestampDataRange(uint64_t now, bool train); - std::string model_save_path_; + std::string ou_model_save_path_; + std::string interference_model_save_path_; std::string forecast_model_save_path_; common::ManagedPointer catalog_; common::ManagedPointer metrics_thread_; diff --git a/src/include/self_driving/planning/pilot_util.h b/src/include/self_driving/planning/pilot_util.h index 08cf49c5e4..405b1071a5 100644 --- a/src/include/self_driving/planning/pilot_util.h +++ b/src/include/self_driving/planning/pilot_util.h @@ -51,14 +51,16 @@ class PilotUtil { * @param end_segment_index end index of segments of interest (inclusive) * @param pipeline_qids vector of real pipeline qids to be populated (necessary to restore the qids to the original * forecasted qids due to the auto-incremental nature of the qids in pipeline metrics) - * @returns const pointer to the collected pipeline data + * @param execute_query whether to execute the queries to get the correct features + * @returns unique pointer to the collected pipeline data */ - static const std::list &CollectPipelineFeatures( + static std::unique_ptr CollectPipelineFeatures( common::ManagedPointer pilot, common::ManagedPointer forecast, - uint64_t start_segment_index, uint64_t end_segment_index, std::vector *pipeline_qids); + uint64_t start_segment_index, uint64_t end_segment_index, std::vector *pipeline_qids, + bool execute_query); /** - * Perform inference through model server manager with collected pipeline metrics + * Perform inference on OU models through model server manager with collected pipeline metrics * To recover the result for each pipeline, also maintain a multimap pipeline_to_ou_position * @param model_save_path model save path * @param model_server_manager model server manager @@ -67,20 +69,44 @@ class PilotUtil { * @param pipeline_data collected pipeline metrics after executing the forecasted queries * @param pipeline_to_prediction list of tuples of query id, pipeline id and result of prediction */ - static void InferenceWithFeatures(const std::string &model_save_path, - common::ManagedPointer model_server_manager, - const std::vector &pipeline_qids, - const std::list &pipeline_data, - std::map, - std::vector>>> *pipeline_to_prediction); + static void OUModelInference(const std::string &model_save_path, + common::ManagedPointer model_server_manager, + const std::vector &pipeline_qids, + const std::list &pipeline_data, + std::map, + std::vector>>> *pipeline_to_prediction); + + /** + * Perform inference on the interference model through model server manager + * @param interference_model_save_path Model save path + * @param model_server_manager Model server manager + * @param pipeline_to_prediction List of tuples of query id, pipeline id and result of prediction + * @param forecast The predicted workload + * @param start_segment_index The start segment in the workload forecast to do inference + * @param end_segment_index The end segment in the workload forecast to do inference + * @param query_info Query id, + * @param segment_to_offset The start index of ou records belonging to a segment in input to the interference model + * @param interference_result_matrix Stores the inference results as return values + */ + static void InterferenceModelInference( + const std::string &interference_model_save_path, + common::ManagedPointer model_server_manager, + const std::map, + std::vector>>> &pipeline_to_prediction, + common::ManagedPointer forecast, uint64_t start_segment_index, + uint64_t end_segment_index, std::map> *query_info, + std::map *segment_to_offset, std::vector> *interference_result_matrix); /** * Apply an action supplied through its query string to the database specified * @param pilot pointer to the pilot * @param sql_query query of the action to be executed * @param db_oid oid of the database where this action should be applied + * @param what_if whether this is a "what-if" API call (e.g., only create the index entry in the catalog without + * populating it) */ - static void ApplyAction(common::ManagedPointer pilot, const std::string &sql_query, catalog::db_oid_t db_oid); + static void ApplyAction(common::ManagedPointer pilot, const std::string &sql_query, catalog::db_oid_t db_oid, + bool what_if); /** * Retrieve all query plans associated with queries in the interval of forecasted segments @@ -107,6 +133,26 @@ class PilotUtil { uint64_t start_segment_index, uint64_t end_segment_index); private: + /** + * Add features to existing features + * @param feature The original feature to add in-place + * @param delta_feature The amount to add + * @param normalization Divide delta_feature by this value + */ + static void SumFeatureInPlace(std::vector *feature, const std::vector &delta_feature, + double normalization); + + /** + * Populate interference with first 9 dimension as feature vector normalized by the last dimension (ELAPSED_US); + * next 9 dimension as sum of ou features in current segment normazlied by interval of segment; + * last 9 dimension as all zeros. + * @param feature + * @param normalization + * @return + */ + static std::vector GetInterferenceFeature(const std::vector &feature, + const std::vector &normalized_feat_sum); + /** * Group pipeline features by ou for block inference * To recover the result for each pipeline, also maintain a multimap pipeline_to_ou_position @@ -124,6 +170,8 @@ class PilotUtil { const std::vector &pipeline_qids, const std::list &pipeline_data, std::unordered_map>> *ou_to_features); + + static const uint64_t INTERFERENCE_DIMENSION{27}; }; } // namespace noisepage::selfdriving diff --git a/src/include/settings/settings_defs.h b/src/include/settings/settings_defs.h index 44d78d0898..7a82eee54a 100644 --- a/src/include/settings/settings_defs.h +++ b/src/include/settings/settings_defs.h @@ -255,6 +255,16 @@ SETTING_int64( noisepage::settings::Callbacks::NoOp ) +SETTING_int64( + pilot_memory_constraint, + "Maximum amount of memory allowed for the pilot to plan. (default : 1000000000, unit: byte)", + 1000000000, + 0, + 100000000000, + true, + noisepage::settings::Callbacks::NoOp +) + SETTING_bool( metrics, "Metrics sub-system for various components (default: true).", @@ -492,9 +502,17 @@ SETTING_string( // Save path of the model relative to the build path (model saved at ${BUILD_ABS_PATH} + SAVE_PATH) SETTING_string( - model_save_path, - "Save path of the model relative to the build path (default: ../script/model/terrier_model_server_trained/mini_model_test.pickle)", - "../script/model/terrier_model_server_trained/mini_model_test.pickle", + ou_model_save_path, + "Save path of the OU model relative to the build path (default: ou_model_map.pickle)", + "ou_model_map.pickle", + false, + noisepage::settings::Callbacks::NoOp +) + +SETTING_string( + interference_model_save_path, + "Save path of the forecast model relative to the build path (default: interference_direct_model.pickle)", + "interference_direct_model.pickle", false, noisepage::settings::Callbacks::NoOp ) diff --git a/src/include/task/task.h b/src/include/task/task.h index b5556d2e68..dea3d1b3fb 100644 --- a/src/include/task/task.h +++ b/src/include/task/task.h @@ -114,6 +114,8 @@ class TaskDML : public Task { * @param param_types Types of the query parameters if any * @param tuple_fn Function for processing rows * @param metrics_manager Metrics Manager to be used + * @param settings ExecutionSettings of this query (note: clang-tidy complains that ExecutionSettings is + * trivially-copyable so we can't use std::move()) * @param force_abort Whether to forcefully abort the transaction * @param skip_query_cache Whether to skip retrieving pre-optimized and saving optimized plans * @param override_qid Describes whether to override the qid with a value @@ -122,8 +124,8 @@ class TaskDML : public Task { TaskDML(catalog::db_oid_t db_oid, std::string query_text, std::unique_ptr cost_model, std::vector> &¶ms, std::vector &¶m_types, util::TupleFunction tuple_fn, common::ManagedPointer metrics_manager, - bool force_abort, bool skip_query_cache, std::optional override_qid, - common::ManagedPointer> sync) + execution::exec::ExecutionSettings settings, bool force_abort, bool skip_query_cache, + std::optional override_qid, common::ManagedPointer> sync) : db_oid_(db_oid), query_text_(std::move(query_text)), cost_model_(std::move(cost_model)), @@ -131,6 +133,7 @@ class TaskDML : public Task { param_types_(param_types), tuple_fn_(std::move(tuple_fn)), metrics_manager_(metrics_manager), + settings_(settings), force_abort_(force_abort), skip_query_cache_(skip_query_cache), override_qid_(override_qid), @@ -175,6 +178,7 @@ class TaskDML : public Task { std::vector param_types_; util::TupleFunction tuple_fn_; common::ManagedPointer metrics_manager_; + execution::exec::ExecutionSettings settings_; bool force_abort_; bool skip_query_cache_; std::optional override_qid_; diff --git a/src/include/util/query_exec_util.h b/src/include/util/query_exec_util.h index 3c912a3590..3e35b9e88c 100644 --- a/src/include/util/query_exec_util.h +++ b/src/include/util/query_exec_util.h @@ -8,6 +8,7 @@ #include #include "catalog/catalog_defs.h" +#include "execution/compiler/executable_query.h" #include "execution/exec/execution_settings.h" #include "execution/exec_defs.h" #include "type/type_id.h" @@ -21,10 +22,6 @@ namespace noisepage::parser { class ConstantValueExpression; } // namespace noisepage::parser -namespace noisepage::execution::compiler { -class ExecutableQuery; -} // namespace noisepage::execution::compiler - namespace noisepage::execution::sql { struct Val; } // namespace noisepage::execution::sql @@ -127,9 +124,11 @@ class QueryExecUtil { /** * Execute a standalone DDL * @param query DDL query to execute + * @param what_if whether this is a "what-if" API call (e.g., only create the index entry in the catalog without + * populating it) * @return true if success */ - bool ExecuteDDL(const std::string &query); + bool ExecuteDDL(const std::string &query, bool what_if); /** * Execute a standalone DML statement @@ -180,15 +179,24 @@ class QueryExecUtil { common::ManagedPointer metrics, const execution::exec::ExecutionSettings &exec_settings); + /** + * Get ExecutableQuery + * @param statement Previously compiled query statement (serves as identifier) + * @return compiled query statement or nullptr if haven't compiled + */ + common::ManagedPointer GetExecutableQuery(const std::string &statement) { + return common::ManagedPointer(exec_queries_[statement]); + } + /** * Plans a query * @param query Statement to plan * @param params Placeholder parameters for query plan * @param param_types Types of query parameters * @param cost Cost model to use - * @return pair of resultant statement and plan node + * @return the result Statement including the optimized plan */ - std::pair, std::unique_ptr> PlanStatement( + std::unique_ptr PlanStatement( const std::string &query, common::ManagedPointer> params, common::ManagedPointer> param_types, std::unique_ptr cost); diff --git a/src/optimizer/optimizer.cpp b/src/optimizer/optimizer.cpp index 0c310df284..8c06c03c9c 100644 --- a/src/optimizer/optimizer.cpp +++ b/src/optimizer/optimizer.cpp @@ -111,7 +111,8 @@ std::unique_ptr Optimizer::ChooseBestPlan( // Derive root plan auto *op = new OperatorNode(gexpr->Contents(), {}, txn); - planner::PlanMetaData::PlanNodeMetaData plan_node_meta_data(context_->GetMemo().GetGroupByID(id)->GetNumRows()); + planner::PlanMetaData::PlanNodeMetaData plan_node_meta_data(group->GetNumRows(), group->GetTableNumRows(), + group->GetFilterColumnSelectivities()); auto plan = generator->ConvertOpNode(txn, accessor, op, required_props, required_cols, output_cols, std::move(children_plans), std::move(children_expr_map), plan_node_meta_data); OPTIMIZER_LOG_TRACE("Finish Choosing best plan for group " + std::to_string(id.UnderlyingValue())); diff --git a/src/optimizer/statistics/selectivity_util.cpp b/src/optimizer/statistics/selectivity_util.cpp index dd348abde2..35c7faa3f6 100644 --- a/src/optimizer/statistics/selectivity_util.cpp +++ b/src/optimizer/statistics/selectivity_util.cpp @@ -135,6 +135,9 @@ double SelectivityUtil::Equal(common::ManagedPointer> column_stat // Find frequency of the value if present in the top K elements. auto value_frequency_estimate = top_k->EstimateItemCount(value); + // If all values are distinct, then there can be at most one value equal to the specified value + if (column_stats->GetDistinctValues() == numrows) value_frequency_estimate = std::min(value_frequency_estimate, 1lu); + double res = value_frequency_estimate / static_cast(numrows); NOISEPAGE_ASSERT(res >= 0 && res <= 1, "Selectivity of operator must be within valid range"); diff --git a/src/optimizer/statistics/stats_calculator.cpp b/src/optimizer/statistics/stats_calculator.cpp index 9a8da44b34..7ee68ceb78 100644 --- a/src/optimizer/statistics/stats_calculator.cpp +++ b/src/optimizer/statistics/stats_calculator.cpp @@ -1,20 +1,14 @@ #include "optimizer/statistics/stats_calculator.h" -#include -#include #include -#include -#include #include #include -#include "catalog/catalog_accessor.h" #include "optimizer/logical_operators.h" #include "optimizer/memo.h" #include "optimizer/optimizer_context.h" #include "optimizer/physical_operators.h" #include "optimizer/statistics/selectivity_util.h" -#include "optimizer/statistics/stats_storage.h" #include "optimizer/statistics/table_stats.h" #include "optimizer/statistics/value_condition.h" #include "parser/expression/column_value_expression.h" @@ -38,16 +32,18 @@ void StatsCalculator::Visit(const LogicalGet *op) { auto *root_group = context_->GetMemo().GetGroupByID(gexpr_->GetGroupID()); // Compute selectivity at the first time - if (root_group->GetNumRows() == -1) { + if (root_group->GetNumRows() == Group::UNINITIALIZED_NUM_ROWS) { const auto latched_table_stats_reference = context_->GetStatsStorage()->GetTableStats( op->GetDatabaseOid(), op->GetTableOid(), context_->GetCatalogAccessor()); NOISEPAGE_ASSERT(latched_table_stats_reference.table_stats_.GetColumnCount() != 0, "Should have table stats for all tables"); // Use predicates to estimate cardinality. - auto est = EstimateCardinalityForFilter(latched_table_stats_reference.table_stats_.GetNumRows(), - latched_table_stats_reference.table_stats_, op->GetPredicates()); - root_group->SetNumRows(static_cast(est)); + size_t table_num_rows = latched_table_stats_reference.table_stats_.GetNumRows(); + root_group->SetTableNumRows(table_num_rows); + auto est = EstimateCardinalityForFilter(root_group, table_num_rows, latched_table_stats_reference.table_stats_, + op->GetPredicates()); + root_group->SetNumRows(static_cast(est)); } } @@ -65,7 +61,7 @@ void StatsCalculator::Visit(const LogicalInnerJoin *op) { auto *root_group = context_->GetMemo().GetGroupByID(gexpr_->GetGroupID()); // Calculate output num rows first - if (root_group->GetNumRows() == -1) { + if (root_group->GetNumRows() == Group::UNINITIALIZED_NUM_ROWS) { size_t curr_rows = left_child_group->GetNumRows() * right_child_group->GetNumRows(); for (const auto &annotated_expr : op->GetJoinPredicates()) { // See if there are join conditions @@ -79,10 +75,10 @@ void StatsCalculator::Visit(const LogicalInnerJoin *op) { * i.e. if predicate 1 matches two rows and predicate 2 matches the same two rows, then predicate 2 will have * no affect on the total row count but we will unnecessary lower the total row count. */ - curr_rows /= std::max(std::max(left_child_group->GetNumRows(), right_child_group->GetNumRows()), 1); + curr_rows /= std::max(std::max(left_child_group->GetNumRows(), right_child_group->GetNumRows()), 1UL); } } - root_group->SetNumRows(static_cast(curr_rows)); + root_group->SetNumRows(static_cast(curr_rows)); } // TODO(boweic): calculate stats based on predicates other than join conditions @@ -96,7 +92,7 @@ void StatsCalculator::Visit(const LogicalSemiJoin *op) { auto *root_group = context_->GetMemo().GetGroupByID(gexpr_->GetGroupID()); // Calculate output num rows first - if (root_group->GetNumRows() == -1) { + if (root_group->GetNumRows() == Group::UNINITIALIZED_NUM_ROWS) { size_t curr_rows = left_child_group->GetNumRows() * right_child_group->GetNumRows(); for (const auto &annotated_expr : op->GetJoinPredicates()) { // See if there are join conditions @@ -110,10 +106,10 @@ void StatsCalculator::Visit(const LogicalSemiJoin *op) { * i.e. if predicate 1 matches two rows and predicate 2 matches the same two rows, then predicate 2 will have * no affect on the total row count but we will unnecessary lower the total row count. */ - curr_rows /= std::max(std::max(left_child_group->GetNumRows(), right_child_group->GetNumRows()), 1); + curr_rows /= std::max(std::max(left_child_group->GetNumRows(), right_child_group->GetNumRows()), 1UL); } } - root_group->SetNumRows(static_cast(curr_rows)); + root_group->SetNumRows(static_cast(curr_rows)); } } @@ -131,15 +127,47 @@ void StatsCalculator::Visit(const LogicalLimit *op) { NOISEPAGE_ASSERT(gexpr_->GetChildrenGroupsSize() == 1, "Limit must have 1 child"); auto *child_group = context_->GetMemo().GetGroupByID(gexpr_->GetChildGroupId(0)); auto *group = context_->GetMemo().GetGroupByID(gexpr_->GetGroupID()); - group->SetNumRows(std::min(static_cast(op->GetLimit()), child_group->GetNumRows())); + group->SetNumRows(std::min(static_cast(op->GetLimit()), child_group->GetNumRows())); } -size_t StatsCalculator::EstimateCardinalityForFilter(size_t num_rows, const TableStats &predicate_stats, +void StatsCalculator::Visit(const LogicalInsert *op) { + NOISEPAGE_ASSERT(gexpr_->GetChildrenGroupsSize() == 0, "Insert should not have children"); + auto *root_group = context_->GetMemo().GetGroupByID(gexpr_->GetGroupID()); + + // Get the number of rows to insert from the operator + if (root_group->GetNumRows() == Group::UNINITIALIZED_NUM_ROWS) { + root_group->SetNumRows(op->GetValues()->size()); + } +} + +void StatsCalculator::Visit(const LogicalUpdate *op) { + NOISEPAGE_ASSERT(gexpr_->GetChildrenGroupsSize() == 1, "Update must have one child"); + auto *child_group = context_->GetMemo().GetGroupByID(gexpr_->GetChildGroupId(0)); + auto *root_group = context_->GetMemo().GetGroupByID(gexpr_->GetGroupID()); + + // Pass in num rows from the child + if (root_group->GetNumRows() == Group::UNINITIALIZED_NUM_ROWS) { + root_group->SetNumRows(child_group->GetNumRows()); + } +} + +void StatsCalculator::Visit(const LogicalDelete *op) { + NOISEPAGE_ASSERT(gexpr_->GetChildrenGroupsSize() == 1, "Delete must have one children"); + auto *child_group = context_->GetMemo().GetGroupByID(gexpr_->GetChildGroupId(0)); + auto *root_group = context_->GetMemo().GetGroupByID(gexpr_->GetGroupID()); + + // Pass in num rows from the child + if (root_group->GetNumRows() == Group::UNINITIALIZED_NUM_ROWS) { + root_group->SetNumRows(child_group->GetNumRows()); + } +} + +size_t StatsCalculator::EstimateCardinalityForFilter(Group *group, size_t num_rows, const TableStats &predicate_stats, const std::vector &predicates) { double selectivity = 1.F; for (const auto &annotated_expr : predicates) { // Loop over conjunction exprs - selectivity *= CalculateSelectivityForPredicate(predicate_stats, annotated_expr.GetExpr()); + selectivity *= CalculateSelectivityForPredicate(group, predicate_stats, annotated_expr.GetExpr()); } // Update selectivity @@ -148,7 +176,7 @@ size_t StatsCalculator::EstimateCardinalityForFilter(size_t num_rows, const Tabl // Calculate the selectivity given the predicate and the stats of columns in the // predicate -double StatsCalculator::CalculateSelectivityForPredicate(const TableStats &predicate_table_stats, +double StatsCalculator::CalculateSelectivityForPredicate(Group *group, const TableStats &predicate_table_stats, common::ManagedPointer expr) { double selectivity = 1.F; if (predicate_table_stats.GetColumnCount() == 0) { @@ -156,7 +184,7 @@ double StatsCalculator::CalculateSelectivityForPredicate(const TableStats &predi } if (expr->GetExpressionType() == parser::ExpressionType::OPERATOR_NOT) { - selectivity = 1 - CalculateSelectivityForPredicate(predicate_table_stats, expr->GetChild(0)); + selectivity = 1 - CalculateSelectivityForPredicate(group, predicate_table_stats, expr->GetChild(0)); } else if (expr->GetExpressionType() == parser::ExpressionType::VALUE_CONSTANT) { NOISEPAGE_ASSERT(expr->GetChildrenSize() == 0, "CVE should have no child."); auto cve = expr.CastManagedPointerTo(); @@ -168,7 +196,7 @@ double StatsCalculator::CalculateSelectivityForPredicate(const TableStats &predi } } else if (expr->GetExpressionType() == parser::ExpressionType::OPERATOR_CAST) { NOISEPAGE_ASSERT(expr->GetChildrenSize() == 1, "Cast should have a single child."); - selectivity = CalculateSelectivityForPredicate(predicate_table_stats, expr->GetChild(0)); + selectivity = CalculateSelectivityForPredicate(group, predicate_table_stats, expr->GetChild(0)); } else if (expr->GetChildrenSize() == 1 && expr->GetChild(0)->GetExpressionType() == parser::ExpressionType::COLUMN_VALUE) { auto child_expr = expr->GetChild(0); @@ -177,6 +205,7 @@ double StatsCalculator::CalculateSelectivityForPredicate(const TableStats &predi auto expr_type = expr->GetExpressionType(); ValueCondition condition(col_oid, col_name, expr_type, nullptr); selectivity = SelectivityUtil::ComputeSelectivity(predicate_table_stats, condition); + group->AddFilterColumnSelectivity(col_oid, selectivity); } else if ((expr->GetChild(0)->GetExpressionType() == parser::ExpressionType::COLUMN_VALUE && (expr->GetChild(1)->GetExpressionType() == parser::ExpressionType::VALUE_CONSTANT || expr->GetChild(1)->GetExpressionType() == parser::ExpressionType::VALUE_PARAMETER)) || @@ -215,10 +244,11 @@ double StatsCalculator::CalculateSelectivityForPredicate(const TableStats &predi ValueCondition condition(col_oid, col_name, expr_type, std::move(value)); selectivity = SelectivityUtil::ComputeSelectivity(predicate_table_stats, condition); + group->AddFilterColumnSelectivity(col_oid, selectivity); } else if (expr->GetExpressionType() == parser::ExpressionType::CONJUNCTION_AND || expr->GetExpressionType() == parser::ExpressionType::CONJUNCTION_OR) { - double left_selectivity = CalculateSelectivityForPredicate(predicate_table_stats, expr->GetChild(0)); - double right_selectivity = CalculateSelectivityForPredicate(predicate_table_stats, expr->GetChild(1)); + double left_selectivity = CalculateSelectivityForPredicate(group, predicate_table_stats, expr->GetChild(0)); + double right_selectivity = CalculateSelectivityForPredicate(group, predicate_table_stats, expr->GetChild(1)); if (expr->GetExpressionType() == parser::ExpressionType::CONJUNCTION_AND) { selectivity = left_selectivity * right_selectivity; } else { diff --git a/src/self_driving/forecasting/workload_forecast.cpp b/src/self_driving/forecasting/workload_forecast.cpp index 766fabdc86..111c128486 100644 --- a/src/self_driving/forecasting/workload_forecast.cpp +++ b/src/self_driving/forecasting/workload_forecast.cpp @@ -44,8 +44,10 @@ WorkloadForecast::WorkloadForecast(const WorkloadForecastPrediction &inference, InitFromInference(inference); } -WorkloadForecast::WorkloadForecast(const WorkloadForecastPrediction &inference, uint64_t num_sample) { +WorkloadForecast::WorkloadForecast(const WorkloadForecastPrediction &inference, uint64_t forecast_interval, + uint64_t num_sample) { num_sample_ = num_sample; + forecast_interval_ = forecast_interval; LoadQueryText(); LoadQueryTrace(); InitFromInference(inference); diff --git a/src/self_driving/modeling/operating_unit_recorder.cpp b/src/self_driving/modeling/operating_unit_recorder.cpp index 804a60a553..27e5ebd678 100644 --- a/src/self_driving/modeling/operating_unit_recorder.cpp +++ b/src/self_driving/modeling/operating_unit_recorder.cpp @@ -5,7 +5,6 @@ #include "catalog/catalog_accessor.h" #include "execution/ast/ast.h" #include "execution/ast/context.h" -#include "execution/ast/type.h" #include "execution/compiler/operator/hash_aggregation_translator.h" #include "execution/compiler/operator/hash_join_translator.h" #include "execution/compiler/operator/operator_translator.h" @@ -13,6 +12,7 @@ #include "execution/compiler/operator/static_aggregation_translator.h" #include "execution/sql/aggregators.h" #include "execution/sql/hash_table_entry.h" +#include "optimizer/index_util.h" #include "parser/expression/constant_value_expression.h" #include "parser/expression/function_expression.h" #include "parser/expression_defs.h" @@ -42,6 +42,7 @@ #include "planner/plannodes/limit_plan_node.h" #include "planner/plannodes/nested_loop_join_plan_node.h" #include "planner/plannodes/order_by_plan_node.h" +#include "planner/plannodes/plan_meta_data.h" #include "planner/plannodes/plan_visitor.h" #include "planner/plannodes/projection_plan_node.h" #include "planner/plannodes/seq_scan_plan_node.h" @@ -50,12 +51,14 @@ #include "self_driving/modeling/operating_unit_util.h" #include "storage/block_layout.h" #include "storage/index/index.h" +#include "storage/sql_table.h" #include "type/type_id.h" namespace noisepage::selfdriving { template -void OperatingUnitRecorder::RecordIndexOperations(const std::vector &index_oids) { +void OperatingUnitRecorder::RecordIndexOperations(const std::vector &index_oids, + catalog::table_oid_t table_oid) { selfdriving::ExecutionOperatingUnitType type; if (std::is_same::value) { type = selfdriving::ExecutionOperatingUnitType::INDEX_INSERT; @@ -63,8 +66,8 @@ void OperatingUnitRecorder::RecordIndexOperations(const std::vector::value) { // UPDATE is done as a DELETE followed by INSERT - RecordIndexOperations(index_oids); - RecordIndexOperations(index_oids); + RecordIndexOperations(index_oids, table_oid); + RecordIndexOperations(index_oids, table_oid); return; } else { NOISEPAGE_ASSERT(false, "Recording index operations for non-modiying plan node"); @@ -73,12 +76,12 @@ void OperatingUnitRecorder::RecordIndexOperations(const std::vectorGetIndexSchema(oid); - // Currenty favor using the index size extracted from the index as opposed to table size - // estimate. This will probably be replaced with stats in the future. - auto index = accessor_->GetIndex(oid); + // TODO(lin): Use the table size instead of the index size as the estiamte (since there may be "what-if" indexes + // that we don't populate). We probably need to use the stats if the pilot is not running on the primary. + auto table = accessor_->GetTable(table_oid); std::vector keys; - size_t num_rows = index->GetSize(); + size_t num_rows = table->GetNumTuple(); size_t num_keys = index_schema.GetColumns().size(); size_t key_size = ComputeKeySize(common::ManagedPointer(&index_schema), false, keys, &num_keys); @@ -202,72 +205,146 @@ size_t OperatingUnitRecorder::ComputeKeySize(catalog::index_oid_t idx_oid, void OperatingUnitRecorder::AggregateFeatures(selfdriving::ExecutionOperatingUnitType type, size_t key_size, size_t num_keys, const planner::AbstractPlanNode *plan, size_t scaling_factor, double mem_factor) { - // TODO(wz2): Populate actual num_rows/cardinality after #759 - size_t num_rows = 1; - size_t cardinality = 1; + size_t num_rows; + // TODO(lin): Some times cardinality represents the number of distinct values for some OUs (e.g., SORT_BUILD), + // but we don't have a good way to estimate that right now. So in those cases we just copy num_rows + size_t cardinality; size_t num_loops = 0; size_t num_concurrent = 0; // the number of concurrently executing threads (issue #1241) - if (type == ExecutionOperatingUnitType::OUTPUT) { - if (accessor_->GetDatabaseOid("tpch_runner_db") != catalog::INVALID_DATABASE_OID) { - // Unfortunately we don't know what kind of output callback that we're going to call at runtime, so we just - // special case this when we execute the plans directly from the TPCH runner and use the NoOpResultConsumer - cardinality = 0; - } else { - // Uses the network result consumer - cardinality = 1; - } - auto child_translator = current_translator_->GetChildTranslator(); - if (child_translator != nullptr) { - if (child_translator->Op()->GetPlanNodeType() == planner::PlanNodeType::PROJECTION) { - auto output = child_translator->Op()->GetOutputSchema()->GetColumn(0).GetExpr(); - if (output && output->GetExpressionType() == parser::ExpressionType::FUNCTION) { - auto f_expr = output.CastManagedPointerTo(); - if (f_expr->GetFuncName() == "nprunnersemitint" || f_expr->GetFuncName() == "nprunnersemitreal") { - auto child = f_expr->GetChild(0); - NOISEPAGE_ASSERT(child, "NpRunnersEmit should have children"); - NOISEPAGE_ASSERT(child->GetExpressionType() == parser::ExpressionType::VALUE_CONSTANT, - "Child should be constants"); - - auto cve = child.CastManagedPointerTo(); - num_rows = cve->GetInteger().val_; + + // Dummy default values in case we don't have stats + size_t current_plan_cardinality = 1; + size_t table_num_rows = 1; + if (plan_meta_data_ != nullptr) { + auto &plan_node_meta_data = plan_meta_data_->GetPlanNodeMetaData(plan->GetPlanNodeId()); + current_plan_cardinality = plan_node_meta_data.GetCardinality(); + table_num_rows = plan_node_meta_data.GetTableNumRows(); + } + + switch (type) { + case ExecutionOperatingUnitType::OUTPUT: { + num_rows = current_plan_cardinality; + if (accessor_->GetDatabaseOid("tpch_runner_db") != catalog::INVALID_DATABASE_OID) { + // Unfortunately we don't know what kind of output callback that we're going to call at runtime, so we just + // special case this when we execute the plans directly from the TPCH runner and use the NoOpResultConsumer + cardinality = 0; + } else { + // Uses the network result consumer + cardinality = 1; + } + auto child_translator = current_translator_->GetChildTranslator(); + if (child_translator != nullptr) { + if (child_translator->Op()->GetPlanNodeType() == planner::PlanNodeType::PROJECTION) { + auto output = child_translator->Op()->GetOutputSchema()->GetColumn(0).GetExpr(); + if (output && output->GetExpressionType() == parser::ExpressionType::FUNCTION) { + auto f_expr = output.CastManagedPointerTo(); + if (f_expr->GetFuncName() == "nprunnersemitint" || f_expr->GetFuncName() == "nprunnersemitreal") { + auto child = f_expr->GetChild(0); + NOISEPAGE_ASSERT(child, "NpRunnersEmit should have children"); + NOISEPAGE_ASSERT(child->GetExpressionType() == parser::ExpressionType::VALUE_CONSTANT, + "Child should be constants"); + + auto cve = child.CastManagedPointerTo(); + num_rows = cve->GetInteger().val_; + } } } } - } - } else if (type > ExecutionOperatingUnitType::PLAN_OPS_DELIMITER) { - // If feature is OUTPUT or computation, then cardinality = num_rows - cardinality = num_rows; - } else if (type == ExecutionOperatingUnitType::HASHJOIN_PROBE) { - NOISEPAGE_ASSERT(plan->GetPlanNodeType() == planner::PlanNodeType::HASHJOIN, "HashJoin plan expected"); - UNUSED_ATTRIBUTE auto *c_plan = plan->GetChild(1); - num_rows = 1; // extract from c_plan num_rows (# row to probe) - cardinality = 1; // extract from plan num_rows (# matched rows) - } else if (type == ExecutionOperatingUnitType::IDX_SCAN) { - // For IDX_SCAN, the feature is as follows: - // - num_rows is the size of the index - // - cardinality is the scan size - if (plan->GetPlanNodeType() == planner::PlanNodeType::INDEXSCAN) { - num_rows = reinterpret_cast(plan)->GetIndexSize(); - } else { - NOISEPAGE_ASSERT(plan->GetPlanNodeType() == planner::PlanNodeType::INDEXNLJOIN, "Expected IdxJoin"); - num_rows = reinterpret_cast(plan)->GetIndexSize(); + } break; + case ExecutionOperatingUnitType::HASHJOIN_PROBE: { + NOISEPAGE_ASSERT(plan->GetPlanNodeType() == planner::PlanNodeType::HASHJOIN, "HashJoin plan expected"); + cardinality = current_plan_cardinality; // extract from plan num_rows (# matched rows) + + if (plan_meta_data_ != nullptr) { + auto *c_plan = plan->GetChild(1); + // extract from c_plan num_rows (# row to probe) + num_rows = plan_meta_data_->GetPlanNodeMetaData(c_plan->GetPlanNodeId()).GetCardinality(); + } else { + num_rows = 1; + } + } break; + case ExecutionOperatingUnitType::IDX_SCAN: { + // For IDX_SCAN, the feature is as follows: + // - num_rows is the size of the index + // - cardinality is the scan size + catalog::table_oid_t table_oid; + catalog::index_oid_t index_oid; + if (plan->GetPlanNodeType() == planner::PlanNodeType::INDEXSCAN) { + auto index_scan_plan = reinterpret_cast(plan); + index_oid = index_scan_plan->GetIndexOid(); + table_oid = index_scan_plan->GetTableOid(); + if (table_num_rows == 0) + // When we didn't run Analyze + num_rows = index_scan_plan->GetIndexSize(); + else + num_rows = table_num_rows; + + std::vector mapped_cols; + std::unordered_map lookup; + + UNUSED_ATTRIBUTE bool status = optimizer::IndexUtil::ConvertIndexKeyOidToColOid( + accessor_.Get(), table_oid, accessor_->GetIndexSchema(index_oid), &lookup, &mapped_cols); + NOISEPAGE_ASSERT(status, "Failed to get index key oids in operating unit recorder"); + + if (plan_meta_data_ != nullptr) { + cardinality = table_num_rows; // extract from plan num_rows (this is the scan size) + auto &plan_node_meta_data = plan_meta_data_->GetPlanNodeMetaData(plan->GetPlanNodeId()); + double selectivity = 1; + for (auto col_id : mapped_cols) { + selectivity *= plan_node_meta_data.GetFilterColumnSelectivity(col_id); + } + cardinality = cardinality * selectivity; + } else { + cardinality = 1; + } + } else { + NOISEPAGE_ASSERT(plan->GetPlanNodeType() == planner::PlanNodeType::INDEXNLJOIN, "Expected IdxJoin"); + auto index_join_plan = reinterpret_cast(plan); - UNUSED_ATTRIBUTE auto *c_plan = plan->GetChild(0); - num_loops = 0; // extract from c_plan num_rows - } + if (plan_meta_data_ != nullptr) { + auto *c_plan = plan->GetChild(0); + // extract from c_plan num_row + num_loops = plan_meta_data_->GetPlanNodeMetaData(c_plan->GetPlanNodeId()).GetCardinality(); + } - cardinality = 1; // extract from plan num_rows (this is the scan size) - } else if (type == ExecutionOperatingUnitType::CREATE_INDEX) { - // We extract the num_rows and cardinality from the table name if possible - // This is a special case for mini-runners - std::string idx_name = reinterpret_cast(plan)->GetIndexName(); - auto mrpos = idx_name.find("minirunners__"); - if (mrpos != std::string::npos) { - num_rows = atoi(idx_name.c_str() + mrpos + sizeof("minirunners__") - 1); - cardinality = num_rows; - } + // FIXME(lin): Right now we do not populate the cardinality or selectivity stats for the inner index scan of + // INDEXNLJOIN. We directly get the size from the index and assume the inner index scan only returns 1 tuple. + num_rows = index_join_plan->GetIndexSize(); + cardinality = 1; + } + } break; + case ExecutionOperatingUnitType::SEQ_SCAN: { + num_rows = table_num_rows; + cardinality = table_num_rows; + } break; + case ExecutionOperatingUnitType::SORT_TOPK_BUILD: { + num_rows = current_plan_cardinality; + // TODO(lin): This should be the limit size for the OrderByPlanNode, which is the parent plan for the plan of + // the SORT_TOPK_BUILD OU. We need to refactor the interface to pass in this information correctly. + cardinality = 1; + } break; + case ExecutionOperatingUnitType::CREATE_INDEX: { + num_rows = table_num_rows; + cardinality = table_num_rows; + // We extract the num_rows and cardinality from the table name if possible + // This is a special case for mini-runners + std::string idx_name = reinterpret_cast(plan)->GetIndexName(); + auto mrpos = idx_name.find("minirunners__"); + if (mrpos != std::string::npos) { + num_rows = atoi(idx_name.c_str() + mrpos + sizeof("minirunners__") - 1); + cardinality = num_rows; + } + } break; + default: + num_rows = current_plan_cardinality; + cardinality = current_plan_cardinality; } + // Setting the cardinality to at least to one in case losing accuracy during casting. We don't model the case when + // the cardinality is 0 either. + num_rows = std::max(num_rows, 1lu); + cardinality = std::max(cardinality, 1lu); + num_rows *= scaling_factor; cardinality *= scaling_factor; @@ -548,7 +625,7 @@ void OperatingUnitRecorder::Visit(const planner::InsertPlanNode *plan) { } if (!plan->GetIndexOids().empty()) { - RecordIndexOperations(plan->GetIndexOids()); + RecordIndexOperations(plan->GetIndexOids(), plan->GetTableOid()); } } @@ -579,7 +656,7 @@ void OperatingUnitRecorder::Visit(const planner::UpdatePlanNode *plan) { AggregateFeatures(selfdriving::ExecutionOperatingUnitType::INSERT, key_size, num_cols, plan, 1, 1); AggregateFeatures(selfdriving::ExecutionOperatingUnitType::DELETE, key_size, num_cols, plan, 1, 1); - RecordIndexOperations(plan->GetIndexOids()); + RecordIndexOperations(plan->GetIndexOids(), plan->GetTableOid()); } } @@ -593,7 +670,7 @@ void OperatingUnitRecorder::Visit(const planner::DeletePlanNode *plan) { AggregateFeatures(plan_feature_type_, key_size, num_cols, plan, 1, 1); if (!plan->GetIndexOids().empty()) { - RecordIndexOperations(plan->GetIndexOids()); + RecordIndexOperations(plan->GetIndexOids(), plan->GetTableOid()); } } diff --git a/src/self_driving/planning/mcts/monte_carlo_tree_search.cpp b/src/self_driving/planning/mcts/monte_carlo_tree_search.cpp index dce9449590..15f2b9d0ef 100644 --- a/src/self_driving/planning/mcts/monte_carlo_tree_search.cpp +++ b/src/self_driving/planning/mcts/monte_carlo_tree_search.cpp @@ -36,18 +36,20 @@ MonteCarloTreeSearch::MonteCarloTreeSearch(common::ManagedPointer pilot, // create root_ auto later_cost = PilotUtil::ComputeCost(pilot, forecast, 0, end_segment_index); // root correspond to no action applied to any segment - root_ = std::make_unique(nullptr, static_cast(NULL_ACTION), 0, later_cost); + root_ = std::make_unique(nullptr, static_cast(NULL_ACTION), 0, later_cost, 0); } void MonteCarloTreeSearch::BestAction(uint64_t simulation_number, - std::vector> *best_action_seq) { + std::vector> *best_action_seq, + uint64_t memory_constraint) { for (uint64_t i = 0; i < simulation_number; i++) { std::unordered_set candidate_actions; for (auto action_id : candidate_actions_) candidate_actions.insert(action_id); auto vertex = TreeNode::Selection(common::ManagedPointer(root_), pilot_, action_map_, &candidate_actions, end_segment_index_); - vertex->ChildrenRollout(pilot_, forecast_, 0, end_segment_index_, action_map_, candidate_actions); + vertex->ChildrenRollout(pilot_, forecast_, 0, end_segment_index_, action_map_, candidate_actions, + memory_constraint); vertex->BackPropogate(pilot_, action_map_, use_min_cost_); } // return the best action at root diff --git a/src/self_driving/planning/mcts/tree_node.cpp b/src/self_driving/planning/mcts/tree_node.cpp index d756ac7c5c..a357a0c1ed 100644 --- a/src/self_driving/planning/mcts/tree_node.cpp +++ b/src/self_driving/planning/mcts/tree_node.cpp @@ -15,18 +15,22 @@ namespace noisepage::selfdriving::pilot { TreeNode::TreeNode(common::ManagedPointer parent, action_id_t current_action, double current_segment_cost, - double later_segments_cost) + double later_segments_cost, uint64_t memory) : is_leaf_{true}, depth_(parent == nullptr ? 0 : parent->depth_ + 1), current_action_(current_action), ancestor_cost_(current_segment_cost + (parent == nullptr ? 0 : parent->ancestor_cost_)), parent_(parent), - number_of_visits_{1} { + number_of_visits_{1}, + memory_(memory) { if (parent != nullptr) parent->is_leaf_ = false; cost_ = ancestor_cost_ + later_segments_cost; SELFDRIVING_LOG_INFO( "Creating Tree Node: Depth {} Action {} Cost {} Current_Segment_Cost {} Later_Segment_Cost {} Ancestor_Cost {}", depth_, current_action_, cost_, current_segment_cost, later_segments_cost, ancestor_cost_); + + // TODO(lin): check the memory constraint + (void)memory_; } common::ManagedPointer TreeNode::BestSubtree() { @@ -100,7 +104,8 @@ common::ManagedPointer TreeNode::Selection( for (auto enabled_action : action_map.at(action)->GetEnabledActions()) { candidate_actions->insert(enabled_action); } - PilotUtil::ApplyAction(pilot, action_map.at(action)->GetSQLCommand(), action_map.at(action)->GetDatabaseOid()); + PilotUtil::ApplyAction(pilot, action_map.at(action)->GetSQLCommand(), action_map.at(action)->GetDatabaseOid(), + Pilot::WHAT_IF); } return curr; } @@ -109,7 +114,7 @@ void TreeNode::ChildrenRollout(common::ManagedPointer pilot, common::ManagedPointer forecast, uint64_t tree_start_segment_index, uint64_t tree_end_segment_index, const std::map> &action_map, - const std::unordered_set &candidate_actions) { + const std::unordered_set &candidate_actions, uint64_t memory_constraint) { auto start_segment_index = tree_start_segment_index + depth_; auto end_segment_index = tree_end_segment_index; NOISEPAGE_ASSERT(start_segment_index <= end_segment_index, @@ -120,21 +125,22 @@ void TreeNode::ChildrenRollout(common::ManagedPointer pilot, if (!action_map.at(action_id)->IsValid() || action_map.at(action_id)->GetSQLCommand() == "set compiled_query_execution = 'true';") continue; - PilotUtil::ApplyAction(pilot, action_map.at(action_id)->GetSQLCommand(), - action_map.at(action_id)->GetDatabaseOid()); + PilotUtil::ApplyAction(pilot, action_map.at(action_id)->GetSQLCommand(), action_map.at(action_id)->GetDatabaseOid(), + Pilot::WHAT_IF); double child_segment_cost = PilotUtil::ComputeCost(pilot, forecast, start_segment_index, start_segment_index); double later_segments_cost = 0; if (start_segment_index != end_segment_index) later_segments_cost = PilotUtil::ComputeCost(pilot, forecast, start_segment_index + 1, end_segment_index); - children_.push_back( - std::make_unique(common::ManagedPointer(this), action_id, child_segment_cost, later_segments_cost)); + // TODO(lin): store the current memory consumption up to this node instead of 0 + children_.push_back(std::make_unique(common::ManagedPointer(this), action_id, child_segment_cost, + later_segments_cost, 0)); // apply one reverse action to undo the above auto rev_actions = action_map.at(action_id)->GetReverseActions(); PilotUtil::ApplyAction(pilot, action_map.at(rev_actions[0])->GetSQLCommand(), - action_map.at(rev_actions[0])->GetDatabaseOid()); + action_map.at(rev_actions[0])->GetDatabaseOid(), Pilot::WHAT_IF); } } @@ -149,7 +155,7 @@ void TreeNode::BackPropogate(common::ManagedPointer pilot, while (curr != nullptr && curr->parent_ != nullptr) { auto rev_action = action_map.at(curr->current_action_)->GetReverseActions()[0]; PilotUtil::ApplyAction(pilot, action_map.at(rev_action)->GetSQLCommand(), - action_map.at(rev_action)->GetDatabaseOid()); + action_map.at(rev_action)->GetDatabaseOid(), Pilot::WHAT_IF); if (use_min_cost) { curr->cost_ = std::min(curr->cost_, expanded_cost); } else { diff --git a/src/self_driving/planning/pilot.cpp b/src/self_driving/planning/pilot.cpp index 38d184908b..dde0668fd3 100644 --- a/src/self_driving/planning/pilot.cpp +++ b/src/self_driving/planning/pilot.cpp @@ -33,8 +33,8 @@ namespace noisepage::selfdriving { -Pilot::Pilot(std::string model_save_path, std::string forecast_model_save_path, - common::ManagedPointer catalog, +Pilot::Pilot(std::string ou_model_save_path, std::string interference_model_save_path, + std::string forecast_model_save_path, common::ManagedPointer catalog, common::ManagedPointer metrics_thread, common::ManagedPointer model_server_manager, common::ManagedPointer settings_manager, @@ -43,7 +43,8 @@ Pilot::Pilot(std::string model_save_path, std::string forecast_model_save_path, std::unique_ptr query_exec_util, common::ManagedPointer task_manager, uint64_t workload_forecast_interval, uint64_t sequence_length, uint64_t horizon_length) - : model_save_path_(std::move(model_save_path)), + : ou_model_save_path_(std::move(ou_model_save_path)), + interference_model_save_path_(std::move(interference_model_save_path)), forecast_model_save_path_(std::move(forecast_model_save_path)), catalog_(catalog), metrics_thread_(metrics_thread), @@ -372,7 +373,7 @@ void Pilot::LoadWorkloadForecast(WorkloadForecastInitMode mode) { // Construct the WorkloadForecast froM a mix of on-disk and inference information auto sample = settings_manager_->GetInt(settings::Param::forecast_sample_limit); - forecast_ = std::make_unique(result.first, sample); + forecast_ = std::make_unique(result.first, workload_forecast_interval_, sample); } else { NOISEPAGE_ASSERT(mode == WorkloadForecastInitMode::DISK_ONLY, "Expected the mode to be directly from disk"); @@ -411,69 +412,45 @@ void Pilot::ActionSearch(std::vectorGetInt64(settings::Param::pilot_memory_constraint)); for (uint64_t i = 0; i < best_action_seq->size(); i++) { SELFDRIVING_LOG_INFO(fmt::format("Action Selected: Time Interval: {}; Action Command: {} Applied to Database {}", i, best_action_seq->at(i).first, static_cast(best_action_seq->at(i).second))); } PilotUtil::ApplyAction(common::ManagedPointer(this), best_action_seq->begin()->first, - best_action_seq->begin()->second); + best_action_seq->begin()->second, false); } -void Pilot::ExecuteForecast(std::map, - std::vector>>> *pipeline_to_prediction, - uint64_t start_segment_index, uint64_t end_segment_index) { +void Pilot::ExecuteForecast(uint64_t start_segment_index, uint64_t end_segment_index, + std::map> *query_info, + std::map *segment_to_offset, + std::vector> *interference_result_matrix) { NOISEPAGE_ASSERT(forecast_ != nullptr, "Need forecast_ initialized."); // first we make sure the pipeline metrics flag as well as the counters is enabled. Also set the sample rate to be 0 // so that every query execution is being recorded - // record previous parameters to be restored at the end of this function - const bool old_metrics_enable = settings_manager_->GetBool(settings::Param::pipeline_metrics_enable); - const bool old_counters_enable = settings_manager_->GetBool(settings::Param::counters_enable); - const auto old_sample_rate = settings_manager_->GetInt64(settings::Param::pipeline_metrics_sample_rate); - - auto action_context = std::make_unique(common::action_id_t(1)); - if (!old_metrics_enable) { - settings_manager_->SetBool(settings::Param::pipeline_metrics_enable, true, common::ManagedPointer(action_context), - EmptySetterCallback); - } - - action_context = std::make_unique(common::action_id_t(2)); - if (!old_counters_enable) { - settings_manager_->SetBool(settings::Param::counters_enable, true, common::ManagedPointer(action_context), - EmptySetterCallback); - } - - action_context = std::make_unique(common::action_id_t(3)); - settings_manager_->SetInt(settings::Param::pipeline_metrics_sample_rate, 100, common::ManagedPointer(action_context), - EmptySetterCallback); - std::vector pipeline_qids; // Collect pipeline metrics of forecasted queries within the interval of segments auto pipeline_data = PilotUtil::CollectPipelineFeatures(common::ManagedPointer(this), common::ManagedPointer(forecast_), start_segment_index, - end_segment_index, &pipeline_qids); - // Then we perform inference through model server to get ou prediction results for all pipelines - PilotUtil::InferenceWithFeatures(model_save_path_, model_server_manager_, pipeline_qids, pipeline_data, - pipeline_to_prediction); - - // restore the old parameters - action_context = std::make_unique(common::action_id_t(4)); - if (!old_metrics_enable) { - settings_manager_->SetBool(settings::Param::pipeline_metrics_enable, false, common::ManagedPointer(action_context), - EmptySetterCallback); - } + end_segment_index, &pipeline_qids, !WHAT_IF); - action_context = std::make_unique(common::action_id_t(5)); - if (!old_counters_enable) { - settings_manager_->SetBool(settings::Param::counters_enable, false, common::ManagedPointer(action_context), - EmptySetterCallback); - } + // pipeline_to_prediction maps each pipeline to a vector of ou inference results for all ous of this pipeline + // (where each entry corresponds to a different query param) + // Each element of the outermost vector is a vector of ou prediction (each being a double vector) for one set of + // parameters + std::map, std::vector>>> + pipeline_to_prediction; + + // Then we perform inference through model server to get ou prediction results for all pipelines + PilotUtil::OUModelInference(ou_model_save_path_, model_server_manager_, pipeline_qids, pipeline_data->pipeline_data_, + &pipeline_to_prediction); - action_context = std::make_unique(common::action_id_t(6)); - settings_manager_->SetInt(settings::Param::pipeline_metrics_sample_rate, old_sample_rate, - common::ManagedPointer(action_context), EmptySetterCallback); + PilotUtil::InterferenceModelInference(interference_model_save_path_, model_server_manager_, pipeline_to_prediction, + common::ManagedPointer(forecast_), start_segment_index, end_segment_index, + query_info, segment_to_offset, interference_result_matrix); } } // namespace noisepage::selfdriving diff --git a/src/self_driving/planning/pilot_util.cpp b/src/self_driving/planning/pilot_util.cpp index 148c904c65..8ca15a36c0 100644 --- a/src/self_driving/planning/pilot_util.cpp +++ b/src/self_driving/planning/pilot_util.cpp @@ -34,8 +34,10 @@ namespace noisepage::selfdriving { -void PilotUtil::ApplyAction(common::ManagedPointer pilot, const std::string &sql_query, - catalog::db_oid_t db_oid) { +void PilotUtil::ApplyAction(common::ManagedPointer pilot, const std::string &sql_query, catalog::db_oid_t db_oid, + bool what_if) { + SELFDRIVING_LOG_INFO("Applying action: {}", sql_query); + auto txn_manager = pilot->txn_manager_; auto catalog = pilot->catalog_; util::QueryExecUtil util(txn_manager, catalog, pilot->settings_manager_, pilot->stats_storage_, @@ -52,7 +54,7 @@ void PilotUtil::ApplyAction(common::ManagedPointer pilot, const std::stri } if (is_query_ddl) { - util.ExecuteDDL(sql_query); + util.ExecuteDDL(sql_query, what_if); } else { // Parameters are also specified in the query string, hence we have no parameters nor parameter types here execution::exec::ExecutionSettings settings{}; @@ -88,62 +90,73 @@ void PilotUtil::GetQueryPlans(common::ManagedPointer pilot, common::Manag auto param_types = common::ManagedPointer(forecast->GetParamtypesByQid(qid)); auto result = query_exec_util->PlanStatement(query_text, params, param_types, std::make_unique()); - plan_vecs->emplace_back(std::move(result.second)); + plan_vecs->emplace_back(std::move(result->OptimizeResult()->TakePlanNodeOwnership())); query_exec_util->UseTransaction(db_oid, nullptr); } } +void PilotUtil::SumFeatureInPlace(std::vector *feature, const std::vector &delta_feature, + double normalization) { + for (size_t i = 0; i < feature->size(); i++) { + (*feature)[i] += delta_feature[i] / normalization; + } +} + +std::vector PilotUtil::GetInterferenceFeature(const std::vector &feature, + const std::vector &normalized_feat_sum) { + std::vector interference_feat; + for (size_t i = 0; i < feature.size(); i++) { + // normalize the output of ou_model by the elapsed time + interference_feat.emplace_back(feature[i] / (feature[feature.size() - 1] + 1e-2)); + } + + // append with the normalized feature for this segment + interference_feat.insert(interference_feat.end(), normalized_feat_sum.begin(), normalized_feat_sum.end()); + NOISEPAGE_ASSERT(interference_feat.size() == 18, "expect 18 nonzero elements in interference feature"); + interference_feat.resize(INTERFERENCE_DIMENSION, 0.0); + return interference_feat; +} + double PilotUtil::ComputeCost(common::ManagedPointer pilot, common::ManagedPointer forecast, uint64_t start_segment_index, uint64_t end_segment_index) { // Compute cost as total latency of queries based on their num of exec - // pipeline_to_prediction maps each pipeline to a vector of ou inference results for all ous of this pipeline - // (where each entry corresponds to a different query param) - // Each element of the outermost vector is a vector of ou prediction (each being a double vector) for one set of - // parameters - std::map, std::vector>>> - pipeline_to_prediction; - pilot->ExecuteForecast(&pipeline_to_prediction, start_segment_index, end_segment_index); - - std::vector> query_cost; - execution::query_id_t prev_qid = pipeline_to_prediction.begin()->first.first; - query_cost.emplace_back(prev_qid, 0); + // query id, + std::map> query_info; + // This is to record the start index of ou records belonging to a segment in input to the interference model + std::map segment_to_offset; + std::vector> interference_result_matrix; - for (auto const &pipeline_to_pred : pipeline_to_prediction) { - double pipeline_sum = 0; - for (auto const &pipeline_res : pipeline_to_pred.second) { - for (auto ou_res : pipeline_res) { - // sum up the latency of ous - pipeline_sum += ou_res[ou_res.size() - 1]; - } - } - // record average cost of this pipeline among the same queries with diff param - if (prev_qid == pipeline_to_pred.first.first) { - query_cost.back().second += pipeline_sum / pipeline_to_pred.second.size(); - } else { - query_cost.emplace_back(pipeline_to_pred.first.first, pipeline_sum / pipeline_to_pred.second.size()); - prev_qid = pipeline_to_pred.first.first; - } - } - double total_cost = 0; - double num_queries = 0; - for (auto qcost : query_cost) { - for (auto i = start_segment_index; i <= end_segment_index; i++) { - // It is possible that within the forecast, we don't actually have the qid. - // (i.e., query executed count is 0 within this particular segment). - const auto &seg_map = forecast->GetSegmentByIndex(i).GetIdToNumexec(); - if (seg_map.find(qcost.first) != seg_map.end()) { - total_cost += seg_map.at(qcost.first) * qcost.second; - num_queries += seg_map.at(qcost.first); + pilot->ExecuteForecast(start_segment_index, end_segment_index, &query_info, &segment_to_offset, + &interference_result_matrix); + + double total_cost = 0.0; + + for (auto seg_idx = start_segment_index; seg_idx <= end_segment_index; seg_idx++) { + std::vector> query_cost; + + auto query_ou_offset = segment_to_offset[seg_idx]; + + // separately get the cost of each query, averaged over diff set of params, for this segment + // iterate through the sorted list of qids for this segment + for (auto id_to_num_exec : forecast->GetSegmentByIndex(seg_idx).GetIdToNumexec()) { + double curr_query_cost = 0.0; + for (auto ou_idx = query_ou_offset; ou_idx < query_ou_offset + query_info[id_to_num_exec.first].second; + ou_idx++) { + curr_query_cost += + interference_result_matrix.at(ou_idx).back() / static_cast(query_info[id_to_num_exec.first].first); } + total_cost += static_cast(id_to_num_exec.second) * curr_query_cost; + query_ou_offset += query_info[id_to_num_exec.first].second; } } - NOISEPAGE_ASSERT(num_queries > 0, "expect more then one query"); + return total_cost; } -const std::list &PilotUtil::CollectPipelineFeatures( +std::unique_ptr PilotUtil::CollectPipelineFeatures( common::ManagedPointer pilot, common::ManagedPointer forecast, - uint64_t start_segment_index, uint64_t end_segment_index, std::vector *pipeline_qids) { + uint64_t start_segment_index, uint64_t end_segment_index, std::vector *pipeline_qids, + const bool execute_query) { std::unordered_set qids; for (auto i = start_segment_index; i <= end_segment_index; i++) { for (auto &it : forecast->GetSegmentByIndex(i).GetIdToNumexec()) { @@ -152,50 +165,101 @@ const std::list &PilotUtil::Collec } auto metrics_manager = pilot->metrics_thread_->GetMetricsManager(); + std::unique_ptr aggregated_data = nullptr; + bool old_metrics_enable = false; + uint8_t old_sample_rate = 0; + if (!execute_query) { + aggregated_data = std::make_unique(); + } else { + // record previous parameters to be restored at the end of this function + old_metrics_enable = pilot->settings_manager_->GetBool(settings::Param::pipeline_metrics_enable); + old_sample_rate = pilot->settings_manager_->GetInt64(settings::Param::pipeline_metrics_sample_rate); + if (!old_metrics_enable) metrics_manager->EnableMetric(metrics::MetricsComponent::EXECUTION_PIPELINE); + metrics_manager->SetMetricSampleRate(metrics::MetricsComponent::EXECUTION_PIPELINE, 100); + } + for (const auto &qid : qids) { catalog::db_oid_t db_oid = static_cast(forecast->GetDboidByQid(qid)); auto query_text = forecast->GetQuerytextByQid(qid); - // If this copying gets expensive, we might have to tweak the Task::TaskDML - // constructor to allow specifying pointer params or a custom planning task. - std::vector param_types(*forecast->GetParamtypesByQid(qid)); - std::vector> params(*forecast->GetQueryparamsByQid(qid)); + std::vector *param_types = forecast->GetParamtypesByQid(qid); + std::vector> *params = forecast->GetQueryparamsByQid(qid); pipeline_qids->push_back(qid); - // Forcefully reoptimize all the queries and set the query identifier to use - common::Future sync; - pilot->task_manager_->AddTask( - std::make_unique(db_oid, query_text, std::make_unique(), - std::move(params), std::move(param_types), nullptr, metrics_manager, true, true, - std::make_optional(qid), common::ManagedPointer(&sync))); - auto future_result = sync.WaitFor(Pilot::FUTURE_TIMEOUT); - if (!future_result.has_value()) { - throw PILOT_EXCEPTION("Future timed out.", common::ErrorCode::ERRCODE_IO_ERROR); + if (execute_query) { + // Execute the queries to get features with counters + common::Future sync; + execution::exec::ExecutionSettings settings{}; + settings.is_counters_enabled_ = true; + settings.is_pipeline_metrics_enabled_ = true; + // Forcefully reoptimize all the queries and set the query identifier to use + // If copying params and param_types gets expensive, we might have to tweak the Task::TaskDML + // constructor to allow specifying pointer params or a custom planning task. + pilot->task_manager_->AddTask(std::make_unique( + db_oid, query_text, std::make_unique(), + std::vector>(*params), std::vector(*param_types), + nullptr, metrics_manager, settings, true, true, std::make_optional(qid), + common::ManagedPointer(&sync))); + auto future_result = sync.WaitFor(Pilot::FUTURE_TIMEOUT); + if (!future_result.has_value()) { + throw PILOT_EXCEPTION("Future timed out.", common::ErrorCode::ERRCODE_IO_ERROR); + } + } else { + // Just compile the queries (generate the bytecodes) to get features with statistics + auto &query_util = pilot->query_exec_util_; + query_util->BeginTransaction(db_oid); + // TODO(lin): Do we need to pass in any settings? + execution::exec::ExecutionSettings settings{}; + for (auto ¶m : *params) { + query_util->CompileQuery(query_text, common::ManagedPointer(¶m), common::ManagedPointer(param_types), + std::make_unique(), std::nullopt, settings); + auto executable_query = query_util->GetExecutableQuery(query_text); + + auto ous = executable_query->GetPipelineOperatingUnits(); + + // used just a placeholder + const auto &resource_metrics = common::thread_context.resource_tracker_.GetMetrics(); + + for (auto &iter : ous->GetPipelineFeatureMap()) { + // TODO(lin): Get the execution mode from settings manager when we can support changing it... + aggregated_data->RecordPipelineData( + qid, iter.first, 0, std::vector(iter.second), resource_metrics); + } + query_util->ClearPlan(query_text); + } + + query_util->EndTransaction(false); } } - // retrieve the features - metrics_manager->Aggregate(); + if (execute_query) { + // retrieve the features + metrics_manager->Aggregate(); - auto aggregated_data = reinterpret_cast( - metrics_manager->AggregatedMetrics() - .at(static_cast(metrics::MetricsComponent::EXECUTION_PIPELINE)) - .get()); + aggregated_data.reset(reinterpret_cast( + metrics_manager->AggregatedMetrics() + .at(static_cast(metrics::MetricsComponent::EXECUTION_PIPELINE)) + .release())); + + // restore the old parameters + metrics_manager->SetMetricSampleRate(metrics::MetricsComponent::EXECUTION_PIPELINE, old_sample_rate); + if (!old_metrics_enable) metrics_manager->DisableMetric(metrics::MetricsComponent::EXECUTION_PIPELINE); + } SELFDRIVING_LOG_DEBUG("Printing qid and pipeline id to sanity check pipeline metrics recorded"); for (auto it = aggregated_data->pipeline_data_.begin(); it != aggregated_data->pipeline_data_.end(); it++) { SELFDRIVING_LOG_DEBUG(fmt::format("qid: {}; pipeline_id: {}", static_cast(it->query_id_), static_cast(it->pipeline_id_))); } - return aggregated_data->pipeline_data_; + return aggregated_data; } -void PilotUtil::InferenceWithFeatures(const std::string &model_save_path, - common::ManagedPointer model_server_manager, - const std::vector &pipeline_qids, - const std::list &pipeline_data, - std::map, - std::vector>>> *pipeline_to_prediction) { +void PilotUtil::OUModelInference(const std::string &model_save_path, + common::ManagedPointer model_server_manager, + const std::vector &pipeline_qids, + const std::list &pipeline_data, + std::map, + std::vector>>> *pipeline_to_prediction) { std::unordered_map>> ou_to_features; std::list>>> @@ -231,6 +295,80 @@ void PilotUtil::InferenceWithFeatures(const std::string &model_save_path, } } +void PilotUtil::InterferenceModelInference( + const std::string &interference_model_save_path, + common::ManagedPointer model_server_manager, + const std::map, + std::vector>>> &pipeline_to_prediction, + common::ManagedPointer forecast, uint64_t start_segment_index, + uint64_t end_segment_index, std::map> *query_info, + std::map *segment_to_offset, std::vector> *interference_result_matrix) { + std::vector>> query_feat_sum; + execution::query_id_t curr_qid = execution::INVALID_QUERY_ID; + + auto feat_dim = pipeline_to_prediction.begin()->second.back().back().size(); + + // Compute the sum of ous for a query, averaged over diff set of params + for (auto const &pipeline_to_pred : pipeline_to_prediction) { + std::vector pipeline_sum(feat_dim, 0.0); + auto num_ou_for_ppl = 0; + + if (curr_qid != pipeline_to_pred.first.first) { + curr_qid = pipeline_to_pred.first.first; + query_feat_sum.emplace_back(curr_qid, std::vector(feat_dim, 0.0)); + query_info->emplace(curr_qid, std::make_pair(pipeline_to_pred.second.size(), 0)); + } + + for (auto const &pipeline_res : pipeline_to_pred.second) { + for (const auto &ou_res : pipeline_res) { + // sum up the ou prediction results of all ous in a pipeline + SumFeatureInPlace(&pipeline_sum, ou_res, 1); + } + num_ou_for_ppl += pipeline_res.size(); + } + // record average feat sum of this pipeline among the same queries with diff param + SumFeatureInPlace(&query_feat_sum.back().second, pipeline_sum, pipeline_to_pred.second.size()); + query_info->at(curr_qid).second += num_ou_for_ppl; + } + + // Populate interference_features matrix: + // Compute sum of all ous in a segment, normalized by its interval + std::vector> interference_features; + + for (auto i = start_segment_index; i <= end_segment_index; i++) { + std::vector normalized_feat_sum(feat_dim, 0.0); + auto id_to_num_exec = forecast->GetSegmentByIndex(i).GetIdToNumexec(); + + segment_to_offset->emplace(i, interference_features.size()); + + for (auto const &id_to_query_sum : query_feat_sum) { + if (id_to_num_exec.find(id_to_query_sum.first) != id_to_num_exec.end()) { + // account for number of exec of this query + // and normalize the ou_sum in an interval by the length of this interval + SumFeatureInPlace(&normalized_feat_sum, id_to_query_sum.second, + forecast->forecast_interval_ / id_to_num_exec[id_to_query_sum.first]); + } + } + // curr_feat_sum now holds the sum of ous for queries contained in this segment (averaged over diff set of param) + // multiplied by number of execution of the query containing it and normalized by its interval + for (auto const &pipeline_to_pred : pipeline_to_prediction) { + if (id_to_num_exec.find(pipeline_to_pred.first.first) == id_to_num_exec.end()) { + continue; + } + for (auto const &pipeline_res : pipeline_to_pred.second) { + for (const auto &ou_res : pipeline_res) { + interference_features.emplace_back(GetInterferenceFeature(ou_res, normalized_feat_sum)); + } + } + } + } + + auto interference_result = + model_server_manager->InferInterferenceModel(interference_model_save_path, interference_features); + NOISEPAGE_ASSERT(interference_result.second, "Inference through interference model has error"); + *interference_result_matrix = interference_result.first; +} + void PilotUtil::GroupFeaturesByOU( std::list>>> *pipeline_to_ou_position, diff --git a/src/task/task.cpp b/src/task/task.cpp index eb2dc24a83..bf726940ff 100644 --- a/src/task/task.cpp +++ b/src/task/task.cpp @@ -9,7 +9,7 @@ namespace noisepage::task { void TaskDDL::Execute(common::ManagedPointer query_exec_util, common::ManagedPointer task_manager) { query_exec_util->BeginTransaction(db_oid_); - bool status = query_exec_util->ExecuteDDL(query_text_); + bool status = query_exec_util->ExecuteDDL(query_text_, false); query_exec_util->EndTransaction(status); if (sync_) { @@ -31,25 +31,22 @@ void TaskDML::Execute(common::ManagedPointer query_exec_uti query_exec_util->ClearPlan(query_text_); } - // TODO(wz2): https://github.com/cmu-db/noisepage/issues/1352 - // This works for now. Fixing the above issue will make it work beter. - execution::exec::ExecutionSettings settings{}; if (params_.empty()) { - result = query_exec_util->ExecuteDML(query_text_, nullptr, nullptr, tuple_fn_, nullptr, - std::make_unique(), override_qid_, settings); + result = query_exec_util->ExecuteDML(query_text_, nullptr, nullptr, tuple_fn_, metrics_manager_, + std::make_unique(), override_qid_, settings_); } else { std::vector ¶ms_0 = params_[0]; result = query_exec_util->CompileQuery(query_text_, common::ManagedPointer(¶ms_0), common::ManagedPointer(¶m_types_), std::move(cost_model_), override_qid_, - settings); + settings_); // Execute with specified parameters only if compilation succeeded if (result) { for (auto ¶m_vec : params_) { if (!result) break; - result &= query_exec_util->ExecuteQuery(query_text_, tuple_fn_, common::ManagedPointer(¶m_vec), nullptr, - settings); + result &= query_exec_util->ExecuteQuery(query_text_, tuple_fn_, common::ManagedPointer(¶m_vec), + metrics_manager_, settings_); } } diff --git a/src/traffic_cop/traffic_cop.cpp b/src/traffic_cop/traffic_cop.cpp index 675297171b..3668e48d27 100644 --- a/src/traffic_cop/traffic_cop.cpp +++ b/src/traffic_cop/traffic_cop.cpp @@ -449,8 +449,7 @@ TrafficCopResult TrafficCop::CodegenPhysicalPlan( auto exec_query = execution::compiler::CompilationContext::Compile( *physical_plan, exec_settings, connection_ctx->Accessor().Get(), - execution::compiler::CompilationMode::Interleaved, std::nullopt, - common::ManagedPointer(&portal->GetStatement()->GetQueryText())); + execution::compiler::CompilationMode::Interleaved, std::nullopt, portal->OptimizeResult()->GetPlanMetaData()); // TODO(Matt): handle code generation failing diff --git a/src/util/query_exec_util.cpp b/src/util/query_exec_util.cpp index 7ced22aa13..ffc082f3b8 100644 --- a/src/util/query_exec_util.cpp +++ b/src/util/query_exec_util.cpp @@ -92,7 +92,7 @@ void QueryExecUtil::EndTransaction(bool commit) { own_txn_ = false; } -std::pair, std::unique_ptr> QueryExecUtil::PlanStatement( +std::unique_ptr QueryExecUtil::PlanStatement( const std::string &query, common::ManagedPointer> params, common::ManagedPointer> param_types, std::unique_ptr cost) { NOISEPAGE_ASSERT(txn_ != nullptr, "Transaction must have been started"); @@ -113,13 +113,13 @@ std::pair, std::unique_ptrGetQueryType() == network::QueryType::QUERY_SET) { - return std::make_pair(std::move(statement), nullptr); + return statement; } try { @@ -133,27 +133,24 @@ std::pair, std::unique_ptrParseResult(), - db_oid_, stats_, std::move(cost), optimizer_timeout_, params) - ->TakePlanNodeOwnership(); - return std::make_pair(std::move(statement), std::move(out_plan)); + statement->SetOptimizeResult(trafficcop::TrafficCopUtil::Optimize(txn, common::ManagedPointer(accessor), + statement->ParseResult(), db_oid_, stats_, + std::move(cost), optimizer_timeout_, params)); + return statement; } -bool QueryExecUtil::ExecuteDDL(const std::string &query) { +bool QueryExecUtil::ExecuteDDL(const std::string &query, bool what_if) { NOISEPAGE_ASSERT(txn_ != nullptr, "Requires BeginTransaction() or UseTransaction()"); ResetError(); auto txn = common::ManagedPointer(txn_); auto accessor = catalog_->GetAccessor(txn, db_oid_, DISABLED); - auto result = PlanStatement(query, nullptr, nullptr, std::make_unique()); - const std::unique_ptr &statement = result.first; - const std::unique_ptr &out_plan = result.second; - if (statement == nullptr || out_plan == nullptr) { + auto statement = PlanStatement(query, nullptr, nullptr, std::make_unique()); + if (statement == nullptr) { return false; } - NOISEPAGE_ASSERT(!network::NetworkUtil::DMLQueryType(statement->GetQueryType()), "ExecuteDDL expects DDL statement"); // Handle SET queries @@ -163,33 +160,37 @@ bool QueryExecUtil::ExecuteDDL(const std::string &query) { settings_->SetParameter(set_stmt->GetParameterName(), set_stmt->GetValues()); status = true; } else { + if (statement->OptimizeResult() == nullptr) { + return false; + } + auto out_plan = statement->OptimizeResult()->GetPlanNode(); switch (statement->GetQueryType()) { case network::QueryType::QUERY_CREATE_TABLE: status = execution::sql::DDLExecutors::CreateTableExecutor( common::ManagedPointer( - reinterpret_cast(out_plan.get())), + reinterpret_cast(out_plan.Get())), common::ManagedPointer(accessor), db_oid_); break; case network::QueryType::QUERY_DROP_INDEX: // Drop index does not need execution of compiled query status = - execution::sql::DDLExecutors::DropIndexExecutor(common::ManagedPointer(out_plan) - .CastManagedPointerTo(), + execution::sql::DDLExecutors::DropIndexExecutor(out_plan.CastManagedPointerTo(), common::ManagedPointer(accessor)); break; case network::QueryType::QUERY_CREATE_INDEX: status = execution::sql::DDLExecutors::CreateIndexExecutor( - common::ManagedPointer(out_plan) - .CastManagedPointerTo(), + out_plan.CastManagedPointerTo(), common::ManagedPointer(accessor)); - if (status) { + + if (status && !what_if) { // This is unfortunate but this is because we can't re-parse the query once the CreateIndexExecutor // has run. We can't compile the query before the CreateIndexExecutor because codegen would have // no idea which index to insert into. execution::exec::ExecutionSettings settings{}; common::ManagedPointer schema = out_plan->GetOutputSchema(); auto exec_query = execution::compiler::CompilationContext::Compile( - *out_plan, settings, accessor.get(), execution::compiler::CompilationMode::OneShot, std::nullopt); + *out_plan, settings, accessor.get(), execution::compiler::CompilationMode::OneShot, std::nullopt, + statement->OptimizeResult()->GetPlanMetaData()); schemas_[query] = schema->Copy(); exec_queries_[query] = std::move(exec_query); ExecuteQuery(query, nullptr, nullptr, nullptr, settings); @@ -225,16 +226,17 @@ bool QueryExecUtil::CompileQuery(const std::string &statement, auto txn = common::ManagedPointer(txn_); auto accessor = catalog_->GetAccessor(txn, db_oid_, DISABLED); auto result = PlanStatement(statement, params, param_types, std::move(cost)); - if (!result.first || !result.second) { + if (!result || !result->OptimizeResult()) { return false; } - const std::unique_ptr &out_plan = result.second; - NOISEPAGE_ASSERT(network::NetworkUtil::DMLQueryType(result.first->GetQueryType()), "ExecuteDML expects DML"); + const common::ManagedPointer out_plan = result->OptimizeResult()->GetPlanNode(); + NOISEPAGE_ASSERT(network::NetworkUtil::DMLQueryType(result->GetQueryType()), "ExecuteDML expects DML"); common::ManagedPointer schema = out_plan->GetOutputSchema(); auto exec_query = execution::compiler::CompilationContext::Compile( - *out_plan, exec_settings, accessor.get(), execution::compiler::CompilationMode::OneShot, override_qid); + *out_plan, exec_settings, accessor.get(), execution::compiler::CompilationMode::OneShot, override_qid, + result->OptimizeResult()->GetPlanMetaData()); schemas_[statement] = schema->Copy(); exec_queries_[statement] = std::move(exec_query); return true; diff --git a/test/optimizer/index_nested_loops_join_test.cpp b/test/optimizer/index_nested_loops_join_test.cpp index 480baf2670..8090ac8ca8 100644 --- a/test/optimizer/index_nested_loops_join_test.cpp +++ b/test/optimizer/index_nested_loops_join_test.cpp @@ -36,11 +36,11 @@ namespace noisepage::optimizer { struct IdxJoinTest : public TerrierTest { const uint64_t optimizer_timeout_ = 1000000; - void CompileAndRun(std::unique_ptr *plan, network::Statement *stmt) { + void CompileAndRun(std::unique_ptr *optimize_result, network::Statement *stmt) { network::WriteQueue queue; auto pwriter = network::PostgresPacketWriter(common::ManagedPointer(&queue)); auto portal = network::Portal(common::ManagedPointer(stmt)); - stmt->SetPhysicalPlan(std::move(*plan)); + stmt->SetOptimizeResult(std::move(*optimize_result)); auto result = tcop_->CodegenPhysicalPlan(common::ManagedPointer(&context_), common::ManagedPointer(&pwriter), common::ManagedPointer(&portal)); NOISEPAGE_ASSERT(result.type_ == trafficcop::ResultType::COMPLETE, "Codegen should have succeeded"); @@ -49,9 +49,9 @@ struct IdxJoinTest : public TerrierTest { NOISEPAGE_ASSERT(result.type_ == trafficcop::ResultType::COMPLETE, "Execute should have succeeded"); } - void ExecuteCreate(std::unique_ptr *plan, network::QueryType qtype) { + void ExecuteCreate(std::unique_ptr *optimize_result, network::QueryType qtype) { auto result = - tcop_->ExecuteCreateStatement(common::ManagedPointer(&context_), common::ManagedPointer(*plan), qtype); + tcop_->ExecuteCreateStatement(common::ManagedPointer(&context_), (*optimize_result)->GetPlanNode(), qtype); NOISEPAGE_ASSERT(result.type_ == trafficcop::ResultType::COMPLETE, "Execute should have succeeded"); } @@ -64,17 +64,15 @@ struct IdxJoinTest : public TerrierTest { common::ManagedPointer(¶ms)); NOISEPAGE_ASSERT(result.type_ == trafficcop::ResultType::COMPLETE, "Bind should have succeeded"); - auto plan = - tcop_ - ->OptimizeBoundQuery(common::ManagedPointer(&context_), stmt.ParseResult(), common::ManagedPointer(¶ms)) - ->TakePlanNodeOwnership(); + auto optimize_result = tcop_->OptimizeBoundQuery(common::ManagedPointer(&context_), stmt.ParseResult(), + common::ManagedPointer(¶ms)); if (qtype >= network::QueryType::QUERY_CREATE_TABLE && qtype != network::QueryType::QUERY_CREATE_INDEX) { - ExecuteCreate(&plan, qtype); + ExecuteCreate(&optimize_result, qtype); } else if (qtype == network::QueryType::QUERY_CREATE_INDEX) { - ExecuteCreate(&plan, qtype); - CompileAndRun(&plan, &stmt); + ExecuteCreate(&optimize_result, qtype); + CompileAndRun(&optimize_result, &stmt); } else { - CompileAndRun(&plan, &stmt); + CompileAndRun(&optimize_result, &stmt); } tcop_->EndTransaction(common::ManagedPointer(&context_), network::QueryType::QUERY_COMMIT); diff --git a/test/self_driving/query_trace_logging_test.cpp b/test/self_driving/query_trace_logging_test.cpp index 49437f10ff..600d40e38b 100644 --- a/test/self_driving/query_trace_logging_test.cpp +++ b/test/self_driving/query_trace_logging_test.cpp @@ -166,10 +166,11 @@ TEST_F(QueryTraceLogging, BasicLogging) { std::vector param_types; common::Future sync; + execution::exec::ExecutionSettings settings{}; task_manager->AddTask(std::make_unique( catalog::INVALID_DATABASE_OID, "SELECT * FROM noisepage_forecast_frequencies", std::make_unique(), std::move(params), std::move(param_types), freq_check, nullptr, - false, true, std::nullopt, common::ManagedPointer(&sync))); + settings, false, true, std::nullopt, common::ManagedPointer(&sync))); auto sync_result = sync.DangerousWait(); bool result = sync_result.second; @@ -210,8 +211,8 @@ TEST_F(QueryTraceLogging, BasicLogging) { auto type_json = std::string(reinterpret_cast(values[3])->StringView()); std::vector type_strs; - for (const auto &val : parameters[qid]) { - auto tstr = type::TypeUtil::TypeIdToString(val.GetReturnValueType()); + for (const auto ¶m_val : parameters[qid]) { + auto tstr = type::TypeUtil::TypeIdToString(param_val.GetReturnValueType()); type_strs.push_back(tstr); } diff --git a/test/sql/analyze_test.cpp b/test/sql/analyze_test.cpp index 94dc68e977..0a0d5f2b55 100644 --- a/test/sql/analyze_test.cpp +++ b/test/sql/analyze_test.cpp @@ -101,8 +101,8 @@ class PgStatisticOutputChecker : public execution::compiler::test::OutputChecker TEST_F(AnalyzeTest, SingleColumnTest) { auto table_name = "empty_nullable_table"; auto table_oid = accessor_->GetTableOid(table_name); - optimizer::TopKElements top_k(16, 64); - optimizer::Histogram histogram(64); + optimizer::TopKElements top_k(16, 1024); + optimizer::Histogram histogram(255); // Row is initially empty for column PgStatisticOutputChecker empty_checker(table_oid.UnderlyingValue(), 1, 0, 0, 0, true, true); @@ -187,14 +187,14 @@ TEST_F(AnalyzeTest, MultiColumnTest) { auto table_oid = accessor_->GetTableOid(table_name); auto num_cols = accessor_->GetSchema(table_oid).GetColumns().size(); - optimizer::TopKElements string_top_k(16, 64); - optimizer::Histogram string_histogram(64); - optimizer::TopKElements date_top_k(16, 64); - optimizer::Histogram date_histogram(64); - optimizer::TopKElements bool_top_k(16, 64); - optimizer::Histogram bool_histogram(64); - optimizer::TopKElements int_top_k(16, 64); - optimizer::Histogram int_histogram(64); + optimizer::TopKElements string_top_k(16, 1024); + optimizer::Histogram string_histogram(255); + optimizer::TopKElements date_top_k(16, 1024); + optimizer::Histogram date_histogram(255); + optimizer::TopKElements bool_top_k(16, 1024); + optimizer::Histogram bool_histogram(255); + optimizer::TopKElements int_top_k(16, 1024); + optimizer::Histogram int_histogram(255); // Row is initially empty for columns for (int64_t i = 0; static_cast(i) < num_cols; i++) { diff --git a/test/task/task_manager_test.cpp b/test/task/task_manager_test.cpp index 7b4ce99f0e..e53cada488 100644 --- a/test/task/task_manager_test.cpp +++ b/test/task/task_manager_test.cpp @@ -136,8 +136,8 @@ TEST_F(TaskManagerTests, Index) { std::string query = "SELECT * FROM t WHERE a = 1"; auto result = query_exec_util->PlanStatement(query, nullptr, nullptr, std::make_unique()); - EXPECT_TRUE(result.second != nullptr); - EXPECT_EQ(result.second->GetPlanNodeType(), planner::PlanNodeType::INDEXSCAN); + EXPECT_TRUE(result != nullptr); + EXPECT_EQ(result->PhysicalPlan()->GetPlanNodeType(), planner::PlanNodeType::INDEXSCAN); query_exec_util->EndTransaction(true); }