Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion ydb/core/kqp/common/compilation/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::
const TMaybe<TKqpQueryId>& query, bool isQueryActionPrepare, TInstant deadline,
TKqpDbCountersPtr dbCounters, const TGUCSettings::TPtr& gUCSettings, const TMaybe<TString>& applicationName,
std::shared_ptr<std::atomic<bool>> intrestedInResult, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
NLWTrace::TOrbit orbit = {}, TKqpTempTablesState::TConstPtr tempTablesState = nullptr)
NLWTrace::TOrbit orbit = {}, TKqpTempTablesState::TConstPtr tempTablesState = nullptr, TMaybe<TQueryAst> queryAst = Nothing())
: UserToken(userToken)
, Uid(uid)
, Query(query)
Expand All @@ -89,6 +89,7 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::
, Orbit(std::move(orbit))
, TempTablesState(std::move(tempTablesState))
, IntrestedInResult(std::move(intrestedInResult))
, QueryAst(queryAst)
{
}

Expand All @@ -107,6 +108,8 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::

TKqpTempTablesState::TConstPtr TempTablesState;
std::shared_ptr<std::atomic<bool>> IntrestedInResult;

TMaybe<TQueryAst> QueryAst;
};

struct TEvCompileResponse: public TEventLocal<TEvCompileResponse, TKqpEvents::EvCompileResponse> {
Expand Down
10 changes: 7 additions & 3 deletions ydb/core/kqp/compile_service/kqp_compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -733,13 +733,17 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
NWilson::TSpan compileServiceSpan(TWilsonKqp::CompileService, ev->Get() ? std::move(ev->TraceId) : NWilson::TTraceId(), "CompileService");

TKqpCompileSettings compileSettings(true, request.IsQueryActionPrepare, false, request.Deadline, TableServiceConfig.GetEnableAstCache() ? ECompileActorAction::PARSE : ECompileActorAction::COMPILE);
TKqpCompileRequest compileRequest(ev->Sender, request.Uid, compileResult ? *compileResult->Query : *request.Query,
TKqpCompileRequest compileRequest(ev->Sender, request.Uid, request.Query ? *request.Query : *compileResult->Query,
compileSettings, request.UserToken, dbCounters, request.GUCSettings, request.ApplicationName,
ev->Cookie, std::move(ev->Get()->IntrestedInResult),
ev->Get()->UserRequestContext,
ev->Get() ? std::move(ev->Get()->Orbit) : NLWTrace::TOrbit(),
std::move(compileServiceSpan), std::move(ev->Get()->TempTablesState));

if (TableServiceConfig.GetEnableAstCache() && request.QueryAst) {
return CompileByAst(*request.QueryAst, compileRequest, ctx);
}

if (!RequestsQueue.Enqueue(std::move(compileRequest))) {
Counters->ReportCompileRequestRejected(dbCounters);

Expand Down Expand Up @@ -1107,7 +1111,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
<< ", message: " << e.what());
}

void Reply(const TActorId& sender, const TVector<TQueryAst> astStatements, const TKqpQueryId query,
void Reply(const TActorId& sender, const TVector<TQueryAst>& astStatements, const TKqpQueryId query,
const TActorContext& ctx, ui64 cookie, NLWTrace::TOrbit orbit, NWilson::TSpan span)
{
LWTRACK(KqpCompileServiceReply,
Expand All @@ -1126,7 +1130,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
ctx.Send(sender, responseEv.Release(), 0, cookie);
}

void ReplyQueryStatements(const TActorId& sender, const TVector<TQueryAst> astStatements,
void ReplyQueryStatements(const TActorId& sender, const TVector<TQueryAst>& astStatements,
const TKqpQueryId query, const TActorContext& ctx, ui64 cookie, NLWTrace::TOrbit orbit, NWilson::TSpan span)
{
LWTRACK(KqpCompileServiceReplyStatements, orbit);
Expand Down
71 changes: 44 additions & 27 deletions ydb/core/kqp/session_actor/kqp_query_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,31 @@ using namespace NSchemeCache;
#define LOG_D(msg) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg)
#define LOG_T(msg) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg)


TKqpQueryState::TQueryTxId::TQueryTxId(const TQueryTxId& other) {
YQL_ENSURE(!Id);
Id = other.Id;
}

TKqpQueryState::TQueryTxId& TKqpQueryState::TQueryTxId::operator=(const TQueryTxId& id) {
YQL_ENSURE(!Id);
Id = id.Id;
return *this;
}

void TKqpQueryState::TQueryTxId::SetValue(const TTxId& id) {
YQL_ENSURE(!Id);
Id = id.Id;
}

TTxId TKqpQueryState::TQueryTxId::GetValue() {
return Id ? *Id : TTxId();
}

void TKqpQueryState::TQueryTxId::Reset() {
Id = TTxId();
}

bool TKqpQueryState::EnsureTableVersions(const TEvTxProxySchemeCache::TEvNavigateKeySetResult& response) {
Y_ENSURE(response.Request);
const auto& navigate = *response.Request;
Expand Down Expand Up @@ -163,7 +188,7 @@ std::unique_ptr<TEvKqp::TEvCompileRequest> TKqpQueryState::BuildCompileRequest(s
settings.Syntax = GetSyntax();

bool keepInCache = false;
bool perStatementResult = !HasTxControl() && GetAction() == NKikimrKqp::QUERY_ACTION_EXECUTE;
bool perStatementResult = HasImplicitTx();
TGUCSettings gUCSettings = gUCSettingsPtr ? *gUCSettingsPtr : TGUCSettings();
switch (GetAction()) {
case NKikimrKqp::QUERY_ACTION_EXECUTE:
Expand Down Expand Up @@ -244,8 +269,15 @@ std::unique_ptr<TEvKqp::TEvRecompileRequest> TKqpQueryState::BuildReCompileReque
compileDeadline = Min(compileDeadline, QueryDeadlines.CancelAt);
}

return std::make_unique<TEvKqp::TEvRecompileRequest>(UserToken, CompileResult->Uid, CompileResult->Query, isQueryActionPrepare,
compileDeadline, DbCounters, gUCSettingsPtr, ApplicationName, std::move(cookie), UserRequestContext, std::move(Orbit), TempTablesState);
TMaybe<TQueryAst> statementAst;
if (!Statements.empty()) {
YQL_ENSURE(CurrentStatementId < Statements.size());
statementAst = Statements[CurrentStatementId];
}

return std::make_unique<TEvKqp::TEvRecompileRequest>(UserToken, CompileResult->Uid, query, isQueryActionPrepare,
compileDeadline, DbCounters, gUCSettingsPtr, ApplicationName, std::move(cookie), UserRequestContext, std::move(Orbit), TempTablesState,
statementAst);
}

std::unique_ptr<TEvKqp::TEvCompileRequest> TKqpQueryState::BuildSplitRequest(std::shared_ptr<std::atomic<bool>> cookie, const TGUCSettings::TPtr& gUCSettingsPtr) {
Expand Down Expand Up @@ -291,11 +323,14 @@ std::unique_ptr<TEvKqp::TEvCompileRequest> TKqpQueryState::BuildCompileSplittedR
false, SplittedCtx.Get(), SplittedExprs.at(NextSplittedExpr));
}

bool TKqpQueryState::ProcessingLastStatementPart() {
return SplittedExprs.empty() || (NextSplittedExpr + 1 >= static_cast<int>(SplittedExprs.size()));
}

bool TKqpQueryState::PrepareNextStatementPart() {
QueryData = {};
PreparedQuery = {};
CompileResult = {};
TxCtx = {};
CurrentTx = 0;
TableVersions = {};
MaxReadType = ETableReadType::Other;
Expand All @@ -304,16 +339,15 @@ bool TKqpQueryState::PrepareNextStatementPart() {
TopicOperations = {};
ReplayMessage = {};

++NextSplittedExpr;

if (NextSplittedExpr >= static_cast<int>(SplittedExprs.size()) || SplittedExprs.empty()) {
if (ProcessingLastStatementPart()) {
SplittedWorld.Reset();
SplittedExprs.clear();
SplittedCtx.Reset();
NextSplittedExpr = -1;
return false;
}

++NextSplittedExpr;
return true;
}

Expand Down Expand Up @@ -421,15 +455,13 @@ bool TKqpQueryState::HasErrors(const NSchemeCache::TSchemeCacheNavigate& respons
return true;
}

bool TKqpQueryState::HasImpliedTx() const {
bool TKqpQueryState::HasImplicitTx() const {
if (HasTxControl()) {
return false;
}

const NKikimrKqp::EQueryAction action = RequestEv->GetAction();
if (action != NKikimrKqp::QUERY_ACTION_EXECUTE &&
action != NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED)
{
if (action != NKikimrKqp::QUERY_ACTION_EXECUTE) {
return false;
}

Expand All @@ -441,22 +473,7 @@ bool TKqpQueryState::HasImpliedTx() const {
return false;
}

for (const auto& transactionPtr : PreparedQuery->GetTransactions()) {
switch (transactionPtr->GetType()) {
case NKqpProto::TKqpPhyTx::TYPE_GENERIC: // data transaction
return true;
case NKqpProto::TKqpPhyTx::TYPE_UNSPECIFIED:
case NKqpProto::TKqpPhyTx::TYPE_COMPUTE:
case NKqpProto::TKqpPhyTx::TYPE_DATA: // data transaction, but not in QueryService API
case NKqpProto::TKqpPhyTx::TYPE_SCAN:
case NKqpProto::TKqpPhyTx::TYPE_SCHEME:
case NKqpProto::TKqpPhyTx_EType_TKqpPhyTx_EType_INT_MIN_SENTINEL_DO_NOT_USE_:
case NKqpProto::TKqpPhyTx_EType_TKqpPhyTx_EType_INT_MAX_SENTINEL_DO_NOT_USE_:
break;
}
}

return false;
return true;
}

}
27 changes: 20 additions & 7 deletions ydb/core/kqp/session_actor/kqp_query_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,21 @@ namespace NKikimr::NKqp {
// common case).
class TKqpQueryState : public TNonCopyable {
public:
class TQueryTxId {
public:
TQueryTxId() = default;
TQueryTxId(const TQueryTxId& other);
TQueryTxId& operator=(const TQueryTxId& id);

void SetValue(const TTxId& id);
TTxId GetValue();

void Reset();

private:
TMaybe<TTxId> Id;
};

TKqpQueryState(TEvKqp::TEvQueryRequest::TPtr& ev, ui64 queryId, const TString& database, const TMaybe<TString>& applicationName,
const TString& cluster, TKqpDbCountersPtr dbCounters, bool longSession, const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const TString& sessionId, TMonotonic startedAt)
Expand Down Expand Up @@ -110,7 +125,7 @@ class TKqpQueryState : public TNonCopyable {
NWilson::TSpan KqpSessionSpan;
ETableReadType MaxReadType = ETableReadType::Other;

TTxId TxId; // User tx
TQueryTxId TxId; // User tx
bool Commit = false;
bool Commited = false;

Expand All @@ -132,6 +147,7 @@ class TKqpQueryState : public TNonCopyable {
NYql::TIssues Issues;

TVector<TQueryAst> Statements;
TMaybe<TQueryTxId> ImplicitTxId = {}; // Implicit tx for all statements
ui32 CurrentStatementId = 0;
ui32 StatementResultIndex = 0;
ui32 StatementResultSize = 0;
Expand Down Expand Up @@ -406,7 +422,7 @@ class TKqpQueryState : public TNonCopyable {
return RequestEv->HasTxControl();
}

bool HasImpliedTx() const; // (only for QueryService API) user has not specified TxControl in the request. In this case we behave like Begin/Commit was specified.
bool HasImplicitTx() const; // (only for QueryService API) user has not specified TxControl in the request. In this case we behave like Begin/Commit was specified.

const ::Ydb::Table::TransactionControl& GetTxControl() const {
return RequestEv->GetTxControl();
Expand All @@ -417,15 +433,12 @@ class TKqpQueryState : public TNonCopyable {
}

void PrepareCurrentStatement() {
QueryData = {};
QueryData = std::make_shared<TQueryData>(TxCtx->TxAlloc);
PreparedQuery = {};
CompileResult = {};
TxCtx = {};
CurrentTx = 0;
TableVersions = {};
MaxReadType = ETableReadType::Other;
Commit = false;
Commited = false;
TopicOperations = {};
ReplayMessage = {};
}
Expand All @@ -444,7 +457,6 @@ class TKqpQueryState : public TNonCopyable {
TxCtx->EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_UNDEFINED;
break;
default:
Commit = true;
TxCtx->EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE;
}
}
Expand All @@ -469,6 +481,7 @@ class TKqpQueryState : public TNonCopyable {
std::unique_ptr<TEvKqp::TEvCompileRequest> BuildSplitRequest(std::shared_ptr<std::atomic<bool>> cookie, const TGUCSettings::TPtr& gUCSettingsPtr);
std::unique_ptr<TEvKqp::TEvCompileRequest> BuildCompileSplittedRequest(std::shared_ptr<std::atomic<bool>> cookie, const TGUCSettings::TPtr& gUCSettingsPtr);

bool ProcessingLastStatementPart();
bool PrepareNextStatementPart();

const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>& GetYdbParameters() const {
Expand Down
44 changes: 24 additions & 20 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
}
QueryState->TxCtx = std::move(txCtx);
QueryState->QueryData = std::make_shared<TQueryData>(QueryState->TxCtx->TxAlloc);
QueryState->TxId = txId;
QueryState->TxId.SetValue(txId);
if (!CheckTransactionLocks(/*tx*/ nullptr)) {
return;
}
Expand Down Expand Up @@ -680,7 +680,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
}

void BeginTx(const Ydb::Table::TransactionSettings& settings) {
QueryState->TxId = UlidGen.Next();
QueryState->TxId.SetValue(UlidGen.Next());
QueryState->TxCtx = MakeIntrusive<TKqpTransactionContext>(false, AppData()->FunctionRegistry,
AppData()->TimeProvider, AppData()->RandomProvider, Config->EnableKqpImmediateEffects);

Expand All @@ -704,7 +704,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
QueryState->TxCtx->SetIsolationLevel(settings);
QueryState->TxCtx->OnBeginQuery();

if (!Transactions.CreateNew(QueryState->TxId, QueryState->TxCtx)) {
if (!Transactions.CreateNew(QueryState->TxId.GetValue(), QueryState->TxCtx)) {
std::vector<TIssue> issues{
YqlIssue({}, TIssuesIds::KIKIMR_TOO_MANY_TRANSACTIONS)};
ythrow TRequestFail(Ydb::StatusIds::BAD_SESSION,
Expand All @@ -717,21 +717,23 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
Counters->ReportBeginTransaction(Settings.DbCounters, Transactions.EvictedTx, Transactions.Size(), Transactions.ToBeAbortedSize());
}

static const Ydb::Table::TransactionControl& GetImpliedTxControl() {
auto create = []() -> Ydb::Table::TransactionControl {
Ydb::Table::TransactionControl control;
control.mutable_begin_tx()->mutable_serializable_read_write();
control.set_commit_tx(true);
return control;
};
static const Ydb::Table::TransactionControl control = create();
Ydb::Table::TransactionControl GetTxControlWithImplicitTx() {
if (!QueryState->ImplicitTxId) {
Ydb::Table::TransactionSettings settings;
settings.mutable_serializable_read_write();
BeginTx(settings);
QueryState->ImplicitTxId = QueryState->TxId;
}
Ydb::Table::TransactionControl control;
control.set_commit_tx(QueryState->ProcessingLastStatement() && QueryState->ProcessingLastStatementPart());
control.set_tx_id(QueryState->ImplicitTxId->GetValue().GetHumanStr());
return control;
}

bool PrepareQueryTransaction() {
const bool hasTxControl = QueryState->HasTxControl();
if (hasTxControl || QueryState->HasImpliedTx()) {
const auto& txControl = hasTxControl ? QueryState->GetTxControl() : GetImpliedTxControl();
if (hasTxControl || QueryState->HasImplicitTx()) {
const auto& txControl = hasTxControl ? QueryState->GetTxControl() : GetTxControlWithImplicitTx();

QueryState->Commit = txControl.commit_tx();
switch (txControl.tx_selector_case()) {
Expand All @@ -744,14 +746,16 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
}
QueryState->TxCtx = txCtx;
QueryState->QueryData = std::make_shared<TQueryData>(QueryState->TxCtx->TxAlloc);
QueryState->TxId = txId;
if (hasTxControl) {
QueryState->TxId.SetValue(txId);
}
break;
}
case Ydb::Table::TransactionControl::kBeginTx: {
BeginTx(txControl.begin_tx());
break;
}
case Ydb::Table::TransactionControl::TX_SELECTOR_NOT_SET:
}
case Ydb::Table::TransactionControl::TX_SELECTOR_NOT_SET:
ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST)
<< "wrong TxControl: tx_selector must be set";
break;
Expand Down Expand Up @@ -1539,7 +1543,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {

void FillTxInfo(NKikimrKqp::TQueryResponse* response) {
YQL_ENSURE(QueryState);
response->MutableTxMeta()->set_id(QueryState->TxId.GetHumanStr());
response->MutableTxMeta()->set_id(QueryState->TxId.GetValue().GetHumanStr());

if (QueryState->TxCtx) {
auto txInfo = QueryState->TxCtx->GetInfo();
Expand Down Expand Up @@ -1612,8 +1616,8 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {

if (QueryState->Commit) {
ResetTxState();
Transactions.ReleaseTransaction(QueryState->TxId);
QueryState->TxId = TTxId();
Transactions.ReleaseTransaction(QueryState->TxId.GetValue());
QueryState->TxId.Reset();
}

FillTxInfo(response);
Expand Down Expand Up @@ -1958,7 +1962,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
auto& txCtx = QueryState->TxCtx;
if (txCtx->IsInvalidated()) {
Transactions.AddToBeAborted(txCtx);
Transactions.ReleaseTransaction(QueryState->TxId);
Transactions.ReleaseTransaction(QueryState->TxId.GetValue());
}
DiscardPersistentSnapshot(txCtx->SnapshotHandle);
}
Expand Down
Loading