Skip to content
This repository has been archived by the owner on Feb 20, 2023. It is now read-only.

Commit

Permalink
Augmenting Pilot with Interference Model and OU Feature Generation fr…
Browse files Browse the repository at this point in the history
…om the Stats (instead of Executing Queries) (#1574)
  • Loading branch information
linmagit authored May 9, 2021
1 parent abf0493 commit 91cb3ac
Show file tree
Hide file tree
Showing 46 changed files with 843 additions and 402 deletions.
2 changes: 1 addition & 1 deletion Jenkinsfile-utils.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ void stageModeling() {
// This script runs TPC-C with pipeline metrics enabled, saving to build/concurrent_runner_input/pipeline.csv.
sh script :'''
cd build
PYTHONPATH=.. python3 -m script.self_driving.forecasting.forecaster_standalone --generate_data --record_pipeline_metrics --pattern_iter=1
PYTHONPATH=.. python3 -m script.self_driving.forecasting.forecaster_standalone --generate_data --record_pipeline_metrics_with_counters --pattern_iter=1
mkdir concurrent_runner_input
mv pipeline.csv concurrent_runner_input
''', label: 'Interference model training data generation'
Expand Down
1 change: 1 addition & 0 deletions script/self_driving/forecasting/forecaster.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ def predict(self, cid: int, model: ForecastModel) -> Dict:
"""
test_seqs = self._cluster_seqs(cid, test_mode=True, with_label=False)
preds = list([model.predict(seq) for seq in test_seqs])
logging.info(f"Inference preds: {preds}")
query_preds = self._clusters[cid].segregate(preds)

return query_preds
Expand Down
4 changes: 2 additions & 2 deletions script/self_driving/forecasting/forecaster_standalone.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
action="store_true",
help="If specified, OLTP benchmark would be downloaded and built to generate the query trace data")
argp.add_argument(
"--record_pipeline_metrics",
"--record_pipeline_metrics_with_counters",
default=False,
action="store_true",
help="If specified, the database records the pipeline metrics data instead of the query trace data")
Expand Down Expand Up @@ -125,7 +125,7 @@
tpcc_weight=args.tpcc_weight,
tpcc_rates=args.tpcc_rates,
pattern_iter=args.pattern_iter,
record_pipeline_metrics=args.record_pipeline_metrics)
record_pipeline_metrics_with_counters=args.record_pipeline_metrics_with_counters)
elif args.test_file is None:
# Parse models arguments
models_kwargs = parse_model_config(args.models, args.models_config)
Expand Down
2 changes: 1 addition & 1 deletion script/self_driving/modeling/interference_model_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def _train_model_with_derived_data(self, impact_data_list, model_name):
# x.append(np.concatenate((ou_model_y_pred[-1] / predicted_elapsed_us,
# predicted_resource_util,
# d.resource_util_same_core_x)))
x.append(np.concatenate((ou_model_y_pred[-1] / predicted_elapsed_us, predicted_resource_util)))
x.append(np.concatenate((ou_model_y_pred[-1] / (predicted_elapsed_us + epsilon), predicted_resource_util)))
raw_y.append(d.target_grouped_op_unit_data.y)
y.append(raw_y[-1] / (ou_model_y_pred[-1] + epsilon))
# Do not adjust memory consumption since it shouldn't change
Expand Down
7 changes: 4 additions & 3 deletions script/testing/self_driving/forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def config_forecast_data(xml_config_file: str, rate_pattern: List[int]) -> None:


def gen_oltp_trace(
tpcc_weight: str, tpcc_rates: List[int], pattern_iter: int, record_pipeline_metrics: bool) -> bool:
tpcc_weight: str, tpcc_rates: List[int], pattern_iter: int, record_pipeline_metrics_with_counters: bool) -> bool:
"""
Generate the trace by running OLTPBench's TPCC benchmark on the built DBMS.
Expand All @@ -67,7 +67,7 @@ def gen_oltp_trace(
The arrival rates for each phase in a pattern.
pattern_iter : int
The number of patterns.
record_pipeline_metrics : bool
record_pipeline_metrics_with_counters : bool
Record the pipeline metrics instead of query traces
Returns
Expand All @@ -94,11 +94,12 @@ def gen_oltp_trace(
rates = tpcc_rates * pattern_iter
config_forecast_data(test_case.xml_config, rates)

if record_pipeline_metrics:
if record_pipeline_metrics_with_counters:
# Turn on pipeline metrics recording
db_server.execute("SET pipeline_metrics_enable='true'", expect_result=False)
db_server.execute("SET pipeline_metrics_sample_rate={}".format(DEFAULT_PIPELINE_METRICS_SAMPLE_RATE),
expect_result=False)
db_server.execute("SET counters_enable='true'", expect_result=False)
result_file = DEFAULT_PIPELINE_METRICS_FILE
else:
# Turn on query trace metrics tracing
Expand Down
25 changes: 12 additions & 13 deletions src/execution/compiler/compilation_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
#include "planner/plannodes/limit_plan_node.h"
#include "planner/plannodes/nested_loop_join_plan_node.h"
#include "planner/plannodes/order_by_plan_node.h"
#include "planner/plannodes/plan_meta_data.h"
#include "planner/plannodes/projection_plan_node.h"
#include "planner/plannodes/seq_scan_plan_node.h"
#include "planner/plannodes/set_op_plan_node.h"
Expand All @@ -78,9 +79,10 @@ namespace {
std::atomic<uint32_t> unique_ids{0};
} // namespace

CompilationContext::CompilationContext(ExecutableQuery *query, catalog::CatalogAccessor *accessor,
CompilationContext::CompilationContext(ExecutableQuery *query, query_id_t query_id, catalog::CatalogAccessor *accessor,
const CompilationMode mode, const exec::ExecutionSettings &settings)
: unique_id_(unique_ids++),
query_id_(query_id),
query_(query),
mode_(mode),
codegen_(query_->GetContext(), accessor),
Expand Down Expand Up @@ -118,7 +120,8 @@ ast::FunctionDecl *CompilationContext::GenerateTearDownFunction() {
return builder.Finish();
}

void CompilationContext::GeneratePlan(const planner::AbstractPlanNode &plan) {
void CompilationContext::GeneratePlan(const planner::AbstractPlanNode &plan,
common::ManagedPointer<planner::PlanMetaData> plan_meta_data) {
exec_ctx_ =
query_state_.DeclareStateEntry(GetCodeGen(), "execCtx", codegen_.PointerType(ast::BuiltinType::ExecutionContext));

Expand Down Expand Up @@ -160,7 +163,7 @@ void CompilationContext::GeneratePlan(const planner::AbstractPlanNode &plan) {
// Therefore translator extraction must happen before pipelines are generated.
selfdriving::OperatingUnitRecorder recorder(common::ManagedPointer(codegen_.GetCatalogAccessor()),
common::ManagedPointer(codegen_.GetAstContext()),
common::ManagedPointer(pipeline), query_->GetQueryText());
common::ManagedPointer(pipeline), plan_meta_data);
auto features = recorder.RecordTranslators(pipeline->GetTranslators());
codegen_.GetPipelineOperatingUnits()->RecordOperatingUnit(pipeline->GetPipelineId(), std::move(features));

Expand Down Expand Up @@ -188,23 +191,19 @@ void CompilationContext::GeneratePlan(const planner::AbstractPlanNode &plan) {
}

// static
std::unique_ptr<ExecutableQuery> CompilationContext::Compile(const planner::AbstractPlanNode &plan,
const exec::ExecutionSettings &exec_settings,
catalog::CatalogAccessor *accessor,
const CompilationMode mode,
std::optional<execution::query_id_t> override_qid,
common::ManagedPointer<const std::string> query_text) {
std::unique_ptr<ExecutableQuery> CompilationContext::Compile(
const planner::AbstractPlanNode &plan, const exec::ExecutionSettings &exec_settings,
catalog::CatalogAccessor *accessor, const CompilationMode mode, std::optional<execution::query_id_t> override_qid,
common::ManagedPointer<planner::PlanMetaData> plan_meta_data) {
// The query we're generating code for.
auto query = std::make_unique<ExecutableQuery>(plan, exec_settings);
if (override_qid.has_value()) {
query->SetQueryId(override_qid.value());
}
// TODO(Lin): Hacking... remove this after getting the counters in
query->SetQueryText(query_text);

// Generate the plan for the query
CompilationContext ctx(query.get(), accessor, mode, exec_settings);
ctx.GeneratePlan(plan);
CompilationContext ctx(query.get(), query->GetQueryId(), accessor, mode, exec_settings);
ctx.GeneratePlan(plan, plan_meta_data);

// Done
return query;
Expand Down
4 changes: 2 additions & 2 deletions src/execution/compiler/executable_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

#include "common/error/error_code.h"
#include "common/error/exception.h"
#include "execution/ast/ast_dump.h"
#include "execution/ast/context.h"
#include "execution/compiler/compiler.h"
#include "execution/exec/execution_context.h"
Expand Down Expand Up @@ -71,7 +70,8 @@ std::string GetFileName(const std::string &path) {
}
} // namespace

std::atomic<query_id_t> ExecutableQuery::query_identifier{0};
// We use 0 to represent NULL_QUERY_ID so the query id starts from 1.
std::atomic<query_id_t> ExecutableQuery::query_identifier{1};

void ExecutableQuery::SetPipelineOperatingUnits(std::unique_ptr<selfdriving::PipelineOperatingUnits> &&units) {
pipeline_operating_units_ = std::move(units);
Expand Down
26 changes: 15 additions & 11 deletions src/include/execution/compiler/compilation_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class AbstractExpression;

namespace noisepage::planner {
class AbstractPlanNode;
class PlanMetaData;
} // namespace noisepage::planner

namespace noisepage::execution::compiler {
Expand Down Expand Up @@ -49,14 +50,13 @@ class CompilationContext {
* @param accessor The catalog accessor to use for compilation.
* @param mode The compilation mode.
* @param override_qid Optional indicating how to override the plan's query id
* @param query_text The SQL query string (temporary)
* @param plan_meta_data Query plan meta data (stores cardinality information)
*/
static std::unique_ptr<ExecutableQuery> Compile(const planner::AbstractPlanNode &plan,
const exec::ExecutionSettings &exec_settings,
catalog::CatalogAccessor *accessor,
CompilationMode mode = CompilationMode::Interleaved,
std::optional<execution::query_id_t> override_qid = std::nullopt,
common::ManagedPointer<const std::string> query_text = nullptr);
static std::unique_ptr<ExecutableQuery> Compile(
const planner::AbstractPlanNode &plan, const exec::ExecutionSettings &exec_settings,
catalog::CatalogAccessor *accessor, CompilationMode mode = CompilationMode::Interleaved,
std::optional<execution::query_id_t> override_qid = std::nullopt,
common::ManagedPointer<planner::PlanMetaData> plan_meta_data = nullptr);

/**
* Register a pipeline in this context.
Expand Down Expand Up @@ -127,15 +127,16 @@ class CompilationContext {
bool IsPipelineMetricsEnabled() const { return pipeline_metrics_enabled_; }

/** @return Query Id associated with the query */
query_id_t GetQueryId() const { return query_id_t{unique_id_}; }
query_id_t GetQueryId() const { return query_id_; }

private:
// Private to force use of static Compile() function.
explicit CompilationContext(ExecutableQuery *query, catalog::CatalogAccessor *accessor, CompilationMode mode,
const exec::ExecutionSettings &exec_settings);
explicit CompilationContext(ExecutableQuery *query, query_id_t query_id_, catalog::CatalogAccessor *accessor,
CompilationMode mode, const exec::ExecutionSettings &exec_settings);

// Given a plan node, compile it into a compiled query object.
void GeneratePlan(const planner::AbstractPlanNode &plan);
void GeneratePlan(const planner::AbstractPlanNode &plan,
common::ManagedPointer<planner::PlanMetaData> plan_meta_data);

// Generate the query initialization function.
ast::FunctionDecl *GenerateInitFunction();
Expand All @@ -150,6 +151,9 @@ class CompilationContext {
// Unique ID used as a prefix for all generated functions to ensure uniqueness.
uint32_t unique_id_;

// Query ID generated from ExecutableQuery or overridden specifically
query_id_t query_id_;

// The compiled query object we'll update.
ExecutableQuery *query_;

Expand Down
7 changes: 0 additions & 7 deletions src/include/execution/compiler/executable_query.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,6 @@ class ExecutableQuery {
/** @return The Query Identifier */
query_id_t GetQueryId() { return query_id_; }

/** @param query_text The SQL string for this query */
void SetQueryText(common::ManagedPointer<const std::string> query_text) { query_text_ = query_text; }

/** @return The SQL query string */
common::ManagedPointer<const std::string> GetQueryText() { return query_text_; }

private:
// The plan.
const planner::AbstractPlanNode &plan_;
Expand Down Expand Up @@ -202,7 +196,6 @@ class ExecutableQuery {
std::string query_name_;
query_id_t query_id_;
static std::atomic<query_id_t> query_identifier;
common::ManagedPointer<const std::string> query_text_;

// MiniRunners needs to set query_identifier and pipeline_operating_units_.
friend class noisepage::runner::ExecutionRunners;
Expand Down
10 changes: 10 additions & 0 deletions src/include/execution/exec/execution_settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ namespace noisepage::tpch {
class Workload;
} // namespace noisepage::tpch

namespace noisepage::selfdriving {
class PilotUtil;
} // namespace noisepage::selfdriving

namespace noisepage::task {
class TaskDML;
} // namespace noisepage::task

namespace noisepage::execution::exec {
/**
* ExecutionSettings stores settings that are passed down from the upper layers.
Expand Down Expand Up @@ -94,5 +102,7 @@ class EXPORT ExecutionSettings {
friend class noisepage::optimizer::IdxJoinTest_FooOnlyScan_Test;
friend class noisepage::optimizer::IdxJoinTest_BarOnlyScan_Test;
friend class noisepage::optimizer::IdxJoinTest_IndexToIndexJoin_Test;
friend class noisepage::task::TaskDML;
friend class noisepage::selfdriving::PilotUtil;
};
} // namespace noisepage::execution::exec
2 changes: 2 additions & 0 deletions src/include/execution/exec_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
namespace noisepage::execution {

constexpr uint32_t NULL_PIPELINE_ID = 0;
constexpr uint32_t NULL_QUERY_ID = 0;

STRONG_TYPEDEF_HEADER(query_id_t, uint32_t);
STRONG_TYPEDEF_HEADER(pipeline_id_t, uint32_t);
STRONG_TYPEDEF_HEADER(feature_id_t, uint32_t);
STRONG_TYPEDEF_HEADER(translator_id_t, uint32_t);

constexpr pipeline_id_t INVALID_PIPELINE_ID = pipeline_id_t(NULL_PIPELINE_ID);
constexpr query_id_t INVALID_QUERY_ID = query_id_t(NULL_QUERY_ID);

} // namespace noisepage::execution
17 changes: 10 additions & 7 deletions src/include/main/db_main.h
Original file line number Diff line number Diff line change
Expand Up @@ -540,11 +540,12 @@ class DBMain {
query_exec_util ? util::QueryExecUtil::ConstructThreadLocal(common::ManagedPointer(query_exec_util))
: nullptr;
pilot = std::make_unique<selfdriving::Pilot>(
model_save_path_, forecast_model_save_path_, common::ManagedPointer(catalog_layer->GetCatalog()),
common::ManagedPointer(metrics_thread), common::ManagedPointer(model_server_manager),
common::ManagedPointer(settings_manager), common::ManagedPointer(stats_storage),
common::ManagedPointer(txn_layer->GetTransactionManager()), std::move(util),
common::ManagedPointer(task_manager), workload_forecast_interval_, sequence_length_, horizon_length_);
ou_model_save_path_, interference_model_save_path_, forecast_model_save_path_,
common::ManagedPointer(catalog_layer->GetCatalog()), common::ManagedPointer(metrics_thread),
common::ManagedPointer(model_server_manager), common::ManagedPointer(settings_manager),
common::ManagedPointer(stats_storage), common::ManagedPointer(txn_layer->GetTransactionManager()),
std::move(util), common::ManagedPointer(task_manager), workload_forecast_interval_, sequence_length_,
horizon_length_);
pilot_thread = std::make_unique<selfdriving::PilotThread>(
common::ManagedPointer(pilot), std::chrono::microseconds{pilot_interval_},
std::chrono::microseconds{forecast_train_interval_}, pilot_planning_);
Expand Down Expand Up @@ -939,7 +940,8 @@ class DBMain {
uint64_t forecast_sample_limit_ = 5;

std::string wal_file_path_ = "wal.log";
std::string model_save_path_;
std::string ou_model_save_path_;
std::string interference_model_save_path_;
std::string forecast_model_save_path_;
std::string bytecode_handlers_path_ = "./bytecode_handlers_ir.bc";
std::string network_identity_ = "primary";
Expand Down Expand Up @@ -1036,7 +1038,8 @@ class DBMain {
workload_forecast_interval_ = settings_manager->GetInt64(settings::Param::workload_forecast_interval);
sequence_length_ = settings_manager->GetInt64(settings::Param::sequence_length);
horizon_length_ = settings_manager->GetInt64(settings::Param::horizon_length);
model_save_path_ = settings_manager->GetString(settings::Param::model_save_path);
ou_model_save_path_ = settings_manager->GetString(settings::Param::ou_model_save_path);
interference_model_save_path_ = settings_manager->GetString(settings::Param::interference_model_save_path);
forecast_model_save_path_ = settings_manager->GetString(settings::Param::forecast_model_save_path);
task_pool_size_ = settings_manager->GetInt(settings::Param::task_pool_size);

Expand Down
4 changes: 1 addition & 3 deletions src/include/metrics/metrics_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@ class MetricsManager {
/**
* @return the MetricsManager's aggregated metrics. Currently used in tests
*/
const std::array<std::unique_ptr<AbstractRawData>, NUM_COMPONENTS> &AggregatedMetrics() const {
return aggregated_metrics_;
}
std::array<std::unique_ptr<AbstractRawData>, NUM_COMPONENTS> &AggregatedMetrics() { return aggregated_metrics_; }

/**
* @param component to be tested
Expand Down
9 changes: 0 additions & 9 deletions src/include/network/postgres/statement.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,6 @@ class Statement {
optimize_result_ = std::move(optimize_result);
}

/**
* @param physical_plan physical plan to take ownership of
*/
void SetPhysicalPlan(std::unique_ptr<planner::AbstractPlanNode> &&physical_plan) {
if (!optimize_result_) {
optimize_result_ = std::make_unique<optimizer::OptimizeResult>();
}
optimize_result_->SetPlanNode(std::move(physical_plan));
}
/**
* @param executable_query executable query to take ownership of
*/
Expand Down
Loading

0 comments on commit 91cb3ac

Please sign in to comment.