Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions ydb/core/kqp/common/simple/query_id.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <google/protobuf/util/message_differencer.h>

#include <util/generic/yexception.h>
#include <util/string/escape.h>

#include <memory>

Expand Down Expand Up @@ -74,4 +75,25 @@ bool TKqpQueryId::operator==(const TKqpQueryId& other) const {
return true;
}

TString TKqpQueryId::SerializeToString() const {
TStringBuilder result = TStringBuilder() << "{"
<< "Cluster: " << Cluster << ", "
<< "Database: " << Database << ", "
<< "UserSid: " << UserSid << ", "
<< "Text: " << EscapeC(Text) << ", "
<< "Settings: " << Settings.SerializeToString() << ", ";
if (QueryParameterTypes) {
result << "QueryParameterTypes: [";
for (const auto& param : *QueryParameterTypes) {
result << "name: " << param.first << ", type: " << param.second.ShortDebugString();
}
result << "], ";
} else {
result << "QueryParameterTypes: <empty>, ";
}

result << "GUCSettings: " << GUCSettings.SerializeToString() << "}";
return result;
}

} // namespace NKikimr::NKqp
2 changes: 2 additions & 0 deletions ydb/core/kqp/common/simple/query_id.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ struct TKqpQueryId {
GUCSettings.GetHash());
return THash<decltype(tuple)>()(tuple);
}

TString SerializeToString() const;
};
} // namespace NKikimr::NKqp

Expand Down
10 changes: 10 additions & 0 deletions ydb/core/kqp/common/simple/settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
#include <ydb/core/protos/kqp.pb.h>
#include <ydb/public/api/protos/ydb_query.pb.h>

#include <util/generic/string.h>
#include <util/str_stl.h>
#include <util/string/builder.h>

#include <tuple>

Expand Down Expand Up @@ -39,6 +41,14 @@ struct TKqpQuerySettings {
auto tuple = std::make_tuple(DocumentApiRestricted, IsInternalCall, QueryType, Syntax);
return THash<decltype(tuple)>()(tuple);
}

TString SerializeToString() const {
TStringBuilder result = TStringBuilder() << "{"
<< "DocumentApiRestricted: " << DocumentApiRestricted << ", "
<< "IsInternalCall: " << IsInternalCall << ", "
<< "QueryType: " << QueryType << "}";
return result;
}
};

} // namespace NKikimr::NKqp
Expand Down
22 changes: 18 additions & 4 deletions ydb/core/kqp/compile_service/kqp_compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ class TKqpQueryCache {
YQL_ENSURE(compileResult->PreparedQuery);

auto queryIt = QueryIndex.emplace(query, compileResult->Uid);
if (!queryIt.second) {
EraseByUid(compileResult->Uid);
}
Y_ENSURE(queryIt.second);
}

Expand Down Expand Up @@ -672,6 +675,8 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
Y_ENSURE(query.UserSid == userSid);
}

LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Try to find query by queryId, queryId: " << query.SerializeToString());

auto compileResult = QueryCache.FindByQuery(query, request.KeepInCache);
if (HasTempTablesNameClashes(compileResult, request.TempTablesState)) {
compileResult = nullptr;
Expand Down Expand Up @@ -853,7 +858,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
try {
if (compileResult->Status == Ydb::StatusIds::SUCCESS) {
if (!hasTempTablesNameClashes) {
UpdateQueryCache(compileResult, keepInCache, compileRequest.CompileSettings.IsQueryActionPrepare, isPerStatementExecution);
UpdateQueryCache(ctx, compileResult, keepInCache, compileRequest.CompileSettings.IsQueryActionPrepare, isPerStatementExecution);
}

if (ev->Get()->ReplayMessage && !QueryReplayBackend->IsNull()) {
Expand Down Expand Up @@ -935,15 +940,21 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
return compileResult->PreparedQuery->HasTempTables(tempTablesState, withSessionId);
}

void UpdateQueryCache(TKqpCompileResult::TConstPtr compileResult, bool keepInCache, bool isQueryActionPrepare, bool isPerStatementExecution) {
void UpdateQueryCache(const TActorContext& ctx, TKqpCompileResult::TConstPtr compileResult, bool keepInCache, bool isQueryActionPrepare, bool isPerStatementExecution) {
if (QueryCache.FindByUid(compileResult->Uid, false)) {
QueryCache.Replace(compileResult);
} else if (keepInCache) {
if (compileResult->Query) {
LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Insert query into compile cache, queryId: " << compileResult->Query->SerializeToString());
if (QueryCache.FindByQuery(*compileResult->Query, keepInCache)) {
LOG_ERROR_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Trying to insert query into compile cache when it is already there");
}
}
if (QueryCache.Insert(compileResult, TableServiceConfig.GetEnableAstCache(), isPerStatementExecution)) {
Counters->CompileQueryCacheEvicted->Inc();
}
if (compileResult->Query && isQueryActionPrepare) {
if (InsertPreparingQuery(compileResult, true, isPerStatementExecution)) {
if (InsertPreparingQuery(ctx, compileResult, true, isPerStatementExecution)) {
Counters->CompileQueryCacheEvicted->Inc();
};
}
Expand All @@ -954,6 +965,8 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
YQL_ENSURE(queryAst.Ast);
YQL_ENSURE(queryAst.Ast->IsOk());
YQL_ENSURE(queryAst.Ast->Root);
LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Try to find query by ast, queryId: " << compileRequest.Query.SerializeToString()
<< ", ast: " << queryAst.Ast->Root->ToString());
auto compileResult = QueryCache.FindByAst(compileRequest.Query, *queryAst.Ast, compileRequest.CompileSettings.KeepInCache);

if (HasTempTablesNameClashes(compileResult, compileRequest.TempTablesState)) {
Expand Down Expand Up @@ -1022,7 +1035,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
}

private:
bool InsertPreparingQuery(const TKqpCompileResult::TConstPtr& compileResult, bool keepInCache, bool isPerStatementExecution) {
bool InsertPreparingQuery(const TActorContext& ctx, const TKqpCompileResult::TConstPtr& compileResult, bool keepInCache, bool isPerStatementExecution) {
YQL_ENSURE(compileResult->Query);
auto query = *compileResult->Query;

Expand All @@ -1047,6 +1060,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
auto newCompileResult = TKqpCompileResult::Make(CreateGuidAsString(), compileResult->Status, compileResult->Issues, compileResult->MaxReadType, std::move(query), compileResult->Ast);
newCompileResult->AllowCache = compileResult->AllowCache;
newCompileResult->PreparedQuery = compileResult->PreparedQuery;
LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Insert preparing query with params, queryId: " << query.SerializeToString());
return QueryCache.Insert(newCompileResult, TableServiceConfig.GetEnableAstCache(), isPerStatementExecution);
}

Expand Down