Skip to content

Commit

Permalink
[#17381] YCQL: Store prepared statements metrics
Browse files Browse the repository at this point in the history
Summary:
Tracks the prepared stmts metrics. Metrics are stored in the prepared_stmts_map.
Added Gflag to enable/disable cql_stats.
Jira: DB-6568

Test Plan:
ybd --cxx-test cqlserver-test --gtest_filter TestCQLService.TestCQLUpdateStmtCounters
ybd --cxx-test cql-test --gtest_filter CqlTest.TestCQLPreparedStmtStats

Reviewers: hbhanawat, oleg, aagrawal

Reviewed By: oleg

Differential Revision: https://phorge.dev.yugabyte.com/D25612
  • Loading branch information
divchaturvedi committed Jun 16, 2023
1 parent 20a0391 commit 487d628
Show file tree
Hide file tree
Showing 10 changed files with 289 additions and 17 deletions.
55 changes: 55 additions & 0 deletions src/yb/integration-tests/cql-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
#include "yb/tserver/ts_tablet_manager.h"

#include "yb/util/backoff_waiter.h"
#include "yb/util/curl_util.h"
#include "yb/util/jsonreader.h"
#include "yb/util/random_util.h"
#include "yb/util/range.h"
#include "yb/util/status_log.h"
Expand Down Expand Up @@ -77,6 +79,7 @@ class CqlTest : public CqlTestBase<MiniCluster> {
void TestAlteredPrepareForIndexWithPaging(bool check_schema_in_paging,
bool metadata_in_exec_resp = false);
void TestPrepareWithDropTableWithPaging();
void TestCQLPreparedStmtStats();
};

TEST_F(CqlTest, ProcessorsLimit) {
Expand Down Expand Up @@ -780,6 +783,58 @@ TEST_F(CqlTest, AlteredPrepare_MetadataInExecResp) {
LOG(INFO) << "Test finished: " << CURRENT_TEST_CASE_AND_TEST_NAME_STR();
}

TEST_F(CqlTest, TestCQLPreparedStmtStats) {
auto session = ASSERT_RESULT(EstablishSession(driver_.get()));
EasyCurl curl;
faststring buf;
std::vector<Endpoint> addrs;
CHECK_OK(cql_server_->web_server()->GetBoundAddresses(&addrs));
CHECK_EQ(addrs.size(), 1);
LOG(INFO) << "Create Table";
ASSERT_OK(session.ExecuteQuery("CREATE TABLE t1 (i INT PRIMARY KEY, j INT)"));

LOG(INFO) << "Prepare";
auto sel_prepared = ASSERT_RESULT(session.Prepare("SELECT * FROM t1 WHERE i = ?"));
auto ins_prepared = ASSERT_RESULT(session.Prepare("INSERT INTO t1 (i, j) VALUES (?, ?)"));

for (int i = 0; i < 10; i++) {
ASSERT_OK(session.Execute(ins_prepared.Bind().Bind(0, i).Bind(1, i)));
}

for (int i = 0; i < 9; i += 2) {
CassandraResult res = ASSERT_RESULT(
session.ExecuteWithResult(sel_prepared.Bind().Bind(0, i)));
ASSERT_EQ(res.RenderToString(), strings::Substitute("$0,$1", i, i));
}

ASSERT_OK(curl.FetchURL(strings::Substitute("http://$0/statements", ToString(addrs[0])), &buf));
JsonReader r(buf.ToString());
ASSERT_OK(r.Init());
std::vector<const rapidjson::Value*> stmt_stats;
ASSERT_OK(r.ExtractObjectArray(r.root(), "prepared_statements", &stmt_stats));
ASSERT_EQ(2, stmt_stats.size());

const rapidjson::Value* insert_stat = stmt_stats[0];
string insert_query;
ASSERT_OK(r.ExtractString(insert_stat, "query", &insert_query));
ASSERT_EQ("INSERT INTO t1 (i, j) VALUES (?, ?)", insert_query);

int64 insert_num_calls = 0;
ASSERT_OK(r.ExtractInt64(insert_stat, "num_calls", &insert_num_calls));
ASSERT_EQ(10, insert_num_calls);

const rapidjson::Value* select_stat = stmt_stats[1];
string select_query;
ASSERT_OK(r.ExtractString(select_stat, "query", &select_query));
ASSERT_EQ("SELECT * FROM t1 WHERE i = ?", select_query);

int64 select_num_calls = 0;
ASSERT_OK(r.ExtractInt64(select_stat, "num_calls", &select_num_calls));
ASSERT_EQ(5, select_num_calls);

LOG(INFO) << "Test finished: " << CURRENT_TEST_CASE_AND_TEST_NAME_STR();
}

void CqlTest::TestAlteredPrepareWithPaging(bool check_schema_in_paging,
bool metadata_in_exec_resp) {
FLAGS_cql_check_table_schema_in_paging_state = check_schema_in_paging;
Expand Down
11 changes: 11 additions & 0 deletions src/yb/yql/cql/cqlserver/cql_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ DECLARE_bool(use_cassandra_authentication);
DECLARE_bool(ycql_cache_login_info);
DECLARE_int32(client_read_write_timeout_ms);

DEFINE_RUNTIME_bool(ycql_enable_stat_statements, true,
"If enabled, it will track queries and dump the metrics on http://localhost:12000/statements.");
DEFINE_RUNTIME_bool(ycql_enable_tracing_flag, true,
"If enabled, setting TRACING ON in cqlsh will cause "
"the server to enable tracing for the requested RPCs and print them. Use this as a safety flag "
Expand Down Expand Up @@ -309,6 +311,14 @@ void CQLProcessor::SendResponse(const CQLResponse& response) {
cql_metrics_->time_to_queue_cql_response_->Increment(
response_done.GetDeltaSince(response_begin).ToMicroseconds());

// Query id of a prepared statement if type of request is Execute request.
const std::string prep_query_id = GetPrepQueryId();
if (FLAGS_ycql_enable_stat_statements && !prep_query_id.empty()) {
service_impl_->UpdatePrepStmtCounters(
prep_query_id,
response_done.GetDeltaSince(execute_begin_).ToSeconds()*1000.);
}

Release();
}

Expand Down Expand Up @@ -399,6 +409,7 @@ unique_ptr<CQLResponse> CQLProcessor::ProcessRequest(const PrepareRequest& req)
VLOG(1) << "PREPARE " << req.query();
const CQLMessage::QueryId query_id = CQLStatement::GetQueryId(
ql_env_.CurrentKeyspace(), req.query());
VLOG(1) << "Generated Query Id = " << query_id;
// To prevent multiple clients from preparing the same new statement in parallel and trying to
// cache the same statement (a typical "login storm" scenario), each caller will try to allocate
// the statement in the cached statement first. If it already exists, the existing one will be
Expand Down
5 changes: 5 additions & 0 deletions src/yb/yql/cql/cqlserver/cql_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ class CQLProcessor : public ql::QLProcessor {
void PrepareAndSendResponse(const std::unique_ptr<ql::CQLResponse>& response);
void SendResponse(const ql::CQLResponse& response);

ql::CQLMessage::QueryId GetPrepQueryId() const {
return request_ && request_->opcode() == ql::CQLMessage::Opcode::EXECUTE
? static_cast<const ql::ExecuteRequest&>(*request_).query_id() : "";
}

const std::unordered_map<std::string, std::vector<std::string>> kSupportedOptions = {
{ql::CQLMessage::kCQLVersionOption,
{"3.0.0" /* minimum */, "3.4.2" /* current */}},
Expand Down
58 changes: 57 additions & 1 deletion src/yb/yql/cql/cqlserver/cql_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -448,12 +448,68 @@ void CQLServiceImpl::GetPreparedStatementMetrics(
int64_t num_statements = 0;
std::lock_guard<std::mutex> guard(prepared_stmts_mutex_);
for (auto stmt : prepared_stmts_map_) {
metrics->push_back(std::make_shared<StatementMetrics>(stmt.second->text(), stmt.first));
shared_ptr<StmtCounters> stmt_counters = stmt.second->GetWritableCounters();
if (!stmt_counters) { // To ensure GetCounters does not return null.
stmt_counters = std::make_shared<StmtCounters>(stmt.second->text());
stmt.second->SetCounters(stmt_counters);
}
metrics->push_back(std::make_shared<StatementMetrics>(stmt.first, stmt_counters));
if (++num_statements >= statement_limit) {
break;
}
}
}

void CQLServiceImpl::UpdatePrepStmtCounters(const ql::CQLMessage::QueryId& query_id,
double execute_time_in_msec) {
std::lock_guard<std::mutex> guard(prepared_stmts_mutex_);
auto itr = prepared_stmts_map_.find(query_id);
if(itr == prepared_stmts_map_.end()) {
return;
}
std::shared_ptr<StmtCounters> stmt_counters = itr->second->GetWritableCounters();
if(!stmt_counters) {
stmt_counters = std::make_shared<StmtCounters>(itr->second->text());
itr->second->SetCounters(stmt_counters);
}
LOG_IF(DFATAL, stmt_counters->query.empty()) << "Unexpected empty query string in the counters";
UpdateCountersUnlocked(execute_time_in_msec, stmt_counters);
}

void CQLServiceImpl::UpdateCountersUnlocked(
double execute_time_in_msec,
std::shared_ptr<StmtCounters> stmt_counters) {
LOG_IF(DFATAL, stmt_counters == nullptr) << "Null pointer counters received";
if (stmt_counters->num_calls == 0) {
stmt_counters->num_calls = 1;
stmt_counters->total_time_in_msec = execute_time_in_msec;
stmt_counters->min_time_in_msec = execute_time_in_msec;
stmt_counters->max_time_in_msec = execute_time_in_msec;
} else {
const double old_mean = stmt_counters->total_time_in_msec/stmt_counters->num_calls;
stmt_counters->num_calls += 1;
stmt_counters->total_time_in_msec += execute_time_in_msec;
const double new_mean = stmt_counters->total_time_in_msec/stmt_counters->num_calls;

// Welford's method for accurately computing variance. See
// <http://www.johndcook.com/blog/standard_deviation/>
stmt_counters->sum_var_time_in_msec +=
(execute_time_in_msec - old_mean)*(execute_time_in_msec - new_mean);

if (stmt_counters->max_time_in_msec < execute_time_in_msec) {
stmt_counters->max_time_in_msec = execute_time_in_msec;
}
if (stmt_counters->min_time_in_msec > execute_time_in_msec) {
stmt_counters->min_time_in_msec = execute_time_in_msec;
}
}
}

shared_ptr<StmtCounters> CQLServiceImpl::GetWritableStmtCounters(const std::string& query_id) {
std::lock_guard<std::mutex> guard(prepared_stmts_mutex_);
auto itr = prepared_stmts_map_.find(query_id);
return itr == prepared_stmts_map_.end() ? nullptr : itr->second->GetWritableCounters();
}

} // namespace cqlserver
} // namespace yb
12 changes: 12 additions & 0 deletions src/yb/yql/cql/cqlserver/cql_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ class CQLServiceImpl : public CQLServerServiceIf,
// Get the list of prepared statements and metrics in an inmemory vector.
void GetPreparedStatementMetrics(std::vector<std::shared_ptr<StatementMetrics>>* metrics);

// Update the counters for the prepared stmt. Acquires the "prepared_stmts_mutex_".
void UpdatePrepStmtCounters(const ql::CQLMessage::QueryId& query_id, double execute_time_in_msec);

// Returns the counters corresponding to the query with the given query id.
// Returns nullptr if query doesn't exist in the prepared_stmt_map_.
std::shared_ptr<StmtCounters> GetWritableStmtCounters(const std::string& query_id);

private:
constexpr static int kRpcTimeoutSec = 5;

Expand All @@ -141,6 +148,11 @@ class CQLServiceImpl : public CQLServerServiceIf,
// Delete the least recently used prepared statement from the cache to free up memory.
void CollectGarbage(size_t required) override;

// Executes the update counters for both prepared and unprepared statements.
// "prepared_stmts_mutex_" needs to be locked before this call.
void UpdateCountersUnlocked(double execute_time_in_msec,
std::shared_ptr<StmtCounters> stmt_counters);

// CQLServer of this service.
CQLServer* const server_;

Expand Down
36 changes: 36 additions & 0 deletions src/yb/yql/cql/cqlserver/cql_statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,41 @@ ql::CQLMessage::QueryId CQLStatement::GetQueryId(const string& keyspace, const s
return ql::CQLMessage::QueryId(to_char_ptr(md5), sizeof(md5));
}

void StmtCounters::WriteAsJson(
JsonWriter *jw, const ql::CQLMessage::QueryId& query_id) const {
jw->StartObject();
jw->String("query");
jw->String(this->query);

jw->String("query_id");
jw->String(query_id);

jw->String("num_calls");
jw->Int64(this->num_calls);

jw->String("total_time_in_msec");
jw->Double(this->total_time_in_msec);

jw->String("min_time_in_msec");
jw->Double(this->min_time_in_msec);

jw->String("max_time_in_msec");
jw->Double(this->max_time_in_msec);

jw->String("mean_time");
jw->Double(this->total_time_in_msec/this->num_calls);

// Note we are calculating the population variance here, not the
// sample variance, as we have data for the whole population, so
// Bessel's correction is not used, and we don't divide by
// this->num_calls-1.
const double stddev_time = this->num_calls == 0 ? 0. :
sqrt(this->sum_var_time_in_msec / this->num_calls);

jw->String("stddev_time");
jw->Double(stddev_time);
jw->EndObject();
}

} // namespace cqlserver
} // namespace yb
30 changes: 30 additions & 0 deletions src/yb/yql/cql/cqlserver/cql_statement.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <list>

#include "yb/util/jsonwriter.h"
#include "yb/yql/cql/ql/statement.h"
#include "yb/yql/cql/ql/util/cql_message.h"

Expand All @@ -37,6 +38,24 @@ using CQLStatementMap = std::unordered_map<ql::CQLMessage::QueryId, std::shared_
using CQLStatementList = std::list<std::shared_ptr<CQLStatement>>;
using CQLStatementListPos = CQLStatementList::iterator;

struct StmtCounters{
explicit StmtCounters(const std::string& text) : query(text) {}

explicit StmtCounters(const std::shared_ptr<StmtCounters>& other) :
num_calls(other->num_calls), total_time_in_msec(other->total_time_in_msec),
min_time_in_msec(other->min_time_in_msec), max_time_in_msec(other->max_time_in_msec),
sum_var_time_in_msec(other->sum_var_time_in_msec), query(other->query) {}

void WriteAsJson(JsonWriter* jw, const ql::CQLMessage::QueryId& query_id) const;

int64 num_calls = 0; // Number of times executed.
double total_time_in_msec = 0.; // Total execution time, in msec.
double min_time_in_msec = 0.; // Minimum execution time in msec.
double max_time_in_msec = 0.; // Maximum execution time in msec.
double sum_var_time_in_msec = 0.; // Sum of variances in execution time in msec.
std::string query; // Stores the query text.
};

// A CQL statement that is prepared and cached.
class CQLStatement : public ql::Statement {
public:
Expand Down Expand Up @@ -64,10 +83,21 @@ class CQLStatement : public ql::Statement {
// Return the query id of a statement.
static ql::CQLMessage::QueryId GetQueryId(const std::string& keyspace, const std::string& query);

std::shared_ptr<StmtCounters> GetWritableCounters() {
return stmt_counters_;
}

void SetCounters(const std::shared_ptr<StmtCounters>& other) {
stmt_counters_ = other;
}

private:
// Position of the statement in the LRU.
mutable CQLStatementListPos pos_;

// Stores the metrics for a prepared statements.
std::shared_ptr<StmtCounters> stmt_counters_;

ScopedTrackedConsumption consumption_;
};

Expand Down
Loading

0 comments on commit 487d628

Please sign in to comment.