Skip to content
This repository has been archived by the owner on Feb 20, 2023. It is now read-only.

Commit

Permalink
Query Trace Recording (#1161)
Browse files Browse the repository at this point in the history
  • Loading branch information
rabbit721 authored Sep 26, 2020
1 parent 187e3e2 commit aa8b278
Show file tree
Hide file tree
Showing 11 changed files with 295 additions and 3 deletions.
5 changes: 4 additions & 1 deletion src/include/main/db_main.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ class DBMain {
*/
NetworkLayer(const common::ManagedPointer<common::DedicatedThreadRegistry> thread_registry,
const common::ManagedPointer<trafficcop::TrafficCop> traffic_cop, const uint16_t port,
const uint16_t connection_thread_count, const std::string socket_directory) {
const uint16_t connection_thread_count, const std::string &socket_directory) {
connection_handle_factory_ = std::make_unique<network::ConnectionHandleFactory>(traffic_cop);
command_factory_ = std::make_unique<network::PostgresCommandFactory>();
provider_ =
Expand Down Expand Up @@ -636,6 +636,7 @@ class DBMain {
bool use_metrics_ = false;
uint32_t metrics_interval_ = 10000;
bool use_metrics_thread_ = false;
bool metrics_query_trace_ = false;
bool metrics_pipeline_ = false;
bool metrics_transaction_ = false;
bool metrics_logging_ = false;
Expand Down Expand Up @@ -709,6 +710,7 @@ class DBMain {
? execution::vm::ExecutionMode::Compiled
: execution::vm::ExecutionMode::Interpret;

metrics_query_trace_ = settings_manager->GetBool(settings::Param::metrics_query_trace);
metrics_pipeline_ = settings_manager->GetBool(settings::Param::metrics_pipeline);
metrics_transaction_ = settings_manager->GetBool(settings::Param::metrics_transaction);
metrics_logging_ = settings_manager->GetBool(settings::Param::metrics_logging);
Expand All @@ -725,6 +727,7 @@ class DBMain {
*/
std::unique_ptr<metrics::MetricsManager> BootstrapMetricsManager() {
std::unique_ptr<metrics::MetricsManager> metrics_manager = std::make_unique<metrics::MetricsManager>();
if (metrics_query_trace_) metrics_manager->EnableMetric(metrics::MetricsComponent::QUERY_TRACE, 0);
if (metrics_pipeline_) metrics_manager->EnableMetric(metrics::MetricsComponent::EXECUTION_PIPELINE, 9);
if (metrics_transaction_) metrics_manager->EnableMetric(metrics::MetricsComponent::TRANSACTION, 0);
if (metrics_logging_) metrics_manager->EnableMetric(metrics::MetricsComponent::LOGGING, 0);
Expand Down
3 changes: 2 additions & 1 deletion src/include/metrics/metrics_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ enum class MetricsComponent : uint8_t {
EXECUTION_PIPELINE,
BIND_COMMAND,
EXECUTE_COMMAND,
QUERY_TRACE,
};

constexpr uint8_t NUM_COMPONENTS = 7;
constexpr uint8_t NUM_COMPONENTS = 8;

} // namespace terrier::metrics
26 changes: 26 additions & 0 deletions src/include/metrics/metrics_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <bitset>
#include <memory>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
Expand All @@ -19,6 +20,7 @@
#include "metrics/logging_metric.h"
#include "metrics/metrics_defs.h"
#include "metrics/pipeline_metric.h"
#include "metrics/query_trace_metric.h"
#include "metrics/transaction_metric.h"

namespace terrier::metrics {
Expand Down Expand Up @@ -175,6 +177,29 @@ class MetricsStore {
execute_command_metric_->RecordExecuteCommandData(portal_name_size, resource_metrics);
}

/**
* Record queries generated
* @param query_id id of the query
* @param query_text text of the query
* @param timestamp time of query generation
*/
void RecordQueryText(const execution::query_id_t query_id, const std::string &query_text, const uint64_t timestamp) {
TERRIER_ASSERT(ComponentEnabled(MetricsComponent::QUERY_TRACE), "QueryTraceMetric not enabled.");
TERRIER_ASSERT(query_trace_metric_ != nullptr, "QueryTraceMetric not allocated. Check MetricsStore constructor.");
query_trace_metric_->RecordQueryText(query_id, query_text, timestamp);
}

/**
* Record query execution history
* @param query_id id of the query
* @param timestamp time of the query execution
*/
void RecordQueryTrace(const execution::query_id_t query_id, const uint64_t timestamp) {
TERRIER_ASSERT(ComponentEnabled(MetricsComponent::QUERY_TRACE), "QueryTraceMetric not enabled.");
TERRIER_ASSERT(query_trace_metric_ != nullptr, "QueryTraceMetric not allocated. Check MetricsStore constructor.");
query_trace_metric_->RecordQueryTrace(query_id, timestamp);
}

/**
* @param component metrics component to test
* @return true if metrics enabled for this component, false otherwise
Expand Down Expand Up @@ -215,6 +240,7 @@ class MetricsStore {
std::array<std::unique_ptr<AbstractRawData>, NUM_COMPONENTS> GetDataToAggregate();

std::unique_ptr<LoggingMetric> logging_metric_;
std::unique_ptr<QueryTraceMetric> query_trace_metric_;
std::unique_ptr<TransactionMetric> txn_metric_;
std::unique_ptr<GarbageCollectionMetric> gc_metric_;
std::unique_ptr<ExecutionMetric> execution_metric_;
Expand Down
120 changes: 120 additions & 0 deletions src/include/metrics/query_trace_metric.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
#pragma once

#include <algorithm>
#include <chrono> //NOLINT
#include <fstream>
#include <list>
#include <string>
#include <utility>
#include <vector>

#include "catalog/catalog_defs.h"
#include "metrics/abstract_metric.h"
#include "metrics/metrics_util.h"
#include "transaction/transaction_defs.h"

namespace terrier::metrics {

/**
* Raw data object for holding stats collected at logging level
*/
class QueryTraceMetricRawData : public AbstractRawData {
public:
void Aggregate(AbstractRawData *const other) override {
auto other_db_metric = dynamic_cast<QueryTraceMetricRawData *>(other);
if (!other_db_metric->query_text_.empty()) {
query_text_.splice(query_text_.cend(), other_db_metric->query_text_);
}
if (!other_db_metric->query_trace_.empty()) {
query_trace_.splice(query_trace_.cend(), other_db_metric->query_trace_);
}
}

/**
* @return the type of the metric this object is holding the data for
*/
MetricsComponent GetMetricType() const override { return MetricsComponent::QUERY_TRACE; }

/**
* Writes the data out to ofstreams
* @param outfiles vector of ofstreams to write to that have been opened by the MetricsManager
*/
void ToCSV(std::vector<std::ofstream> *const outfiles) final {
TERRIER_ASSERT(outfiles->size() == FILES.size(), "Number of files passed to metric is wrong.");
TERRIER_ASSERT(std::count_if(outfiles->cbegin(), outfiles->cend(),
[](const std::ofstream &outfile) { return !outfile.is_open(); }) == 0,
"Not all files are open.");

auto &query_text_outfile = (*outfiles)[0];
auto &query_trace_outfile = (*outfiles)[1];

for (const auto &data : query_text_) {
query_text_outfile << data.query_id_ << ", " << data.query_text_ << ", " << data.timestamp_ << ", ";
query_text_outfile << std::endl;
}
for (const auto &data : query_trace_) {
query_trace_outfile << data.query_id_ << ", " << data.timestamp_ << ", ";
query_trace_outfile << std::endl;
}
query_text_.clear();
query_trace_.clear();
}

/**
* Files to use for writing to CSV.
*/
static constexpr std::array<std::string_view, 2> FILES = {"./query_text.csv", "./query_trace.csv"};
/**
* Columns to use for writing to CSV.
* Note: This includes the columns for the input feature, but not the output (resource counters)
*/
static constexpr std::array<std::string_view, 2> FEATURE_COLUMNS = {"query_id, query_text, timestamp",
"query_id, timestamp"};

private:
friend class QueryTraceMetric;
FRIEND_TEST(MetricsTests, QueryCSVTest);

void RecordQueryText(const execution::query_id_t query_id, const std::string &query_text, const uint64_t timestamp) {
query_text_.emplace_back(query_id, query_text, timestamp);
}

void RecordQueryTrace(const execution::query_id_t query_id, const uint64_t timestamp) {
query_trace_.emplace_back(query_id, timestamp);
}

struct QueryText {
QueryText(const execution::query_id_t query_id, std::string query_text, const uint64_t timestamp)
: query_id_(query_id), timestamp_(timestamp), query_text_(std::move(query_text)) {}
const execution::query_id_t query_id_;
const uint64_t timestamp_;
std::string query_text_;
};

struct QueryTrace {
QueryTrace(const execution::query_id_t query_id, const uint64_t timestamp)
: query_id_(query_id), timestamp_(timestamp) {}
const execution::query_id_t query_id_;
const uint64_t timestamp_;
};

std::list<QueryText> query_text_;
std::list<QueryTrace> query_trace_;
};

/**
* Metrics for the logging components of the system: currently buffer consumer (writes to disk) and the record
* serializer
*/
class QueryTraceMetric : public AbstractMetric<QueryTraceMetricRawData> {
private:
friend class MetricsStore;

void RecordQueryText(const execution::query_id_t query_id, const std::string &query_text, const uint64_t timestamp) {
GetRawData()->RecordQueryText(query_id, query_text, timestamp);
}
void RecordQueryTrace(const execution::query_id_t query_id, const uint64_t timestamp) {
GetRawData()->RecordQueryTrace(query_id, timestamp);
}
};
} // namespace terrier::metrics
10 changes: 10 additions & 0 deletions src/include/settings/settings_callbacks.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,5 +147,15 @@ class Callbacks {
*/
static void MetricsExecuteCommand(void *old_value, void *new_value, DBMain *db_main,
common::ManagedPointer<common::ActionContext> action_context);

/**
* Enable or disable metrics collection for Query Trace component
* @param old_value old settings value
* @param new_value new settings value
* @param db_main pointer to db_main
* @param action_context pointer to the action context for this settings change
*/
static void MetricsQueryTrace(void *old_value, void *new_value, DBMain *db_main,
common::ManagedPointer<common::ActionContext> action_context);
};
} // namespace terrier::settings
8 changes: 8 additions & 0 deletions src/include/settings/settings_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,14 @@ SETTING_bool(
terrier::settings::Callbacks::MetricsGC
)

SETTING_bool(
metrics_query_trace,
"Metrics collection for Query Traces (default: false).",
false,
true,
terrier::settings::Callbacks::MetricsQueryTrace
)

SETTING_bool(
metrics_execution,
"Metrics collection for the Execution component (default: false).",
Expand Down
9 changes: 9 additions & 0 deletions src/metrics/metrics_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ void MetricsManager::ResetMetric(const MetricsComponent component) const {
metric->Swap();
break;
}
case MetricsComponent::QUERY_TRACE: {
const auto &metric = metrics_store.second->query_trace_metric_;
metric->Swap();
break;
}
}
}
}
Expand Down Expand Up @@ -146,6 +151,10 @@ void MetricsManager::ToCSV() const {
OpenFiles<ExecuteCommandMetricRawData>(&outfiles);
break;
}
case MetricsComponent::QUERY_TRACE: {
OpenFiles<QueryTraceMetricRawData>(&outfiles);
break;
}
}
aggregated_metrics_[component]->ToCSV(&outfiles);
for (auto &file : outfiles) {
Expand Down
8 changes: 8 additions & 0 deletions src/metrics/metrics_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ MetricsStore::MetricsStore(const common::ManagedPointer<metrics::MetricsManager>
pipeline_metric_ = std::make_unique<PipelineMetric>();
bind_command_metric_ = std::make_unique<BindCommandMetric>();
execute_command_metric_ = std::make_unique<ExecuteCommandMetric>();
query_trace_metric_ = std::make_unique<QueryTraceMetric>();
}

std::array<std::unique_ptr<AbstractRawData>, NUM_COMPONENTS> MetricsStore::GetDataToAggregate() {
Expand Down Expand Up @@ -77,6 +78,13 @@ std::array<std::unique_ptr<AbstractRawData>, NUM_COMPONENTS> MetricsStore::GetDa
result[component] = execute_command_metric_->Swap();
break;
}
case MetricsComponent::QUERY_TRACE: {
TERRIER_ASSERT(
query_trace_metric_ != nullptr,
"QueryTraceMetric cannot be a nullptr. Check the MetricsStore constructor that it was allocated.");
result[component] = query_trace_metric_->Swap();
break;
}
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions src/settings/settings_callbacks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,15 @@ void Callbacks::MetricsExecuteCommand(void *const old_value, void *const new_val
action_context->SetState(common::ActionState::SUCCESS);
}

void Callbacks::MetricsQueryTrace(void *const old_value, void *const new_value, DBMain *const db_main,
common::ManagedPointer<common::ActionContext> action_context) {
action_context->SetState(common::ActionState::IN_PROGRESS);
bool new_status = *static_cast<bool *>(new_value);
if (new_status)
db_main->GetMetricsManager()->EnableMetric(metrics::MetricsComponent::QUERY_TRACE, 0);
else
db_main->GetMetricsManager()->DisableMetric(metrics::MetricsComponent::QUERY_TRACE);
action_context->SetState(common::ActionState::SUCCESS);
}

} // namespace terrier::settings
19 changes: 19 additions & 0 deletions src/traffic_cop/traffic_cop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
#include "catalog/catalog_accessor.h"
#include "common/error/error_data.h"
#include "common/error/exception.h"
#include "common/thread_context.h"
#include "execution/compiler/compilation_context.h"
#include "execution/compiler/executable_query.h"
#include "execution/exec/execution_context.h"
#include "execution/exec/execution_settings.h"
#include "execution/exec/output.h"
#include "execution/sql/ddl_executors.h"
#include "execution/vm/module.h"
#include "metrics/metrics_store.h"
#include "network/connection_context.h"
#include "network/postgres/portal.h"
#include "network/postgres/postgres_packet_writer.h"
Expand Down Expand Up @@ -364,6 +366,15 @@ TrafficCopResult TrafficCop::CodegenPhysicalPlan(
common::ManagedPointer<const std::string>(&portal->GetStatement()->GetQueryText()));

// TODO(Matt): handle code generation failing

const bool query_trace_metrics_enabled =
common::thread_context.metrics_store_ != nullptr &&
common::thread_context.metrics_store_->ComponentToRecord(metrics::MetricsComponent::QUERY_TRACE);
if (query_trace_metrics_enabled) {
common::thread_context.metrics_store_->RecordQueryText(
exec_query->GetQueryId(), portal->GetStatement()->GetQueryText(), metrics::MetricsUtil::Now());
}

portal->GetStatement()->SetExecutableQuery(std::move(exec_query));

return {ResultType::COMPLETE, 0u};
Expand Down Expand Up @@ -406,6 +417,14 @@ TrafficCopResult TrafficCop::RunExecutableQuery(const common::ManagedPointer<net
return {ResultType::ERROR, error};
}

const bool query_trace_metrics_enabled =
common::thread_context.metrics_store_ != nullptr &&
common::thread_context.metrics_store_->ComponentToRecord(metrics::MetricsComponent::QUERY_TRACE);

if (query_trace_metrics_enabled) {
common::thread_context.metrics_store_->RecordQueryTrace(exec_query->GetQueryId(), metrics::MetricsUtil::Now());
}

if (connection_ctx->TransactionState() == network::NetworkTransactionStateType::BLOCK) {
// Execution didn't set us to FAIL state, go ahead and return command complete
if (query_type == network::QueryType::QUERY_SELECT) {
Expand Down
Loading

0 comments on commit aa8b278

Please sign in to comment.