Skip to content

Commit fa468f7

Browse files
authored
Merge 6a93b06 into 71347f6
2 parents 71347f6 + 6a93b06 commit fa468f7

File tree

6 files changed

+189
-36
lines changed

6 files changed

+189
-36
lines changed

ydb/core/kqp/common/compilation/events.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::
7676
const TMaybe<TKqpQueryId>& query, bool isQueryActionPrepare, TInstant deadline,
7777
TKqpDbCountersPtr dbCounters, const TGUCSettings::TPtr& gUCSettings, const TMaybe<TString>& applicationName,
7878
std::shared_ptr<std::atomic<bool>> intrestedInResult, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
79-
NLWTrace::TOrbit orbit = {}, TKqpTempTablesState::TConstPtr tempTablesState = nullptr)
79+
NLWTrace::TOrbit orbit = {}, TKqpTempTablesState::TConstPtr tempTablesState = nullptr, TMaybe<TQueryAst> queryAst = Nothing())
8080
: UserToken(userToken)
8181
, Uid(uid)
8282
, Query(query)
@@ -89,6 +89,7 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::
8989
, Orbit(std::move(orbit))
9090
, TempTablesState(std::move(tempTablesState))
9191
, IntrestedInResult(std::move(intrestedInResult))
92+
, QueryAst(queryAst)
9293
{
9394
}
9495

@@ -107,6 +108,8 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::
107108

108109
TKqpTempTablesState::TConstPtr TempTablesState;
109110
std::shared_ptr<std::atomic<bool>> IntrestedInResult;
111+
112+
TMaybe<TQueryAst> QueryAst;
110113
};
111114

112115
struct TEvCompileResponse: public TEventLocal<TEvCompileResponse, TKqpEvents::EvCompileResponse> {

ydb/core/kqp/compile_service/kqp_compile_service.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -759,13 +759,17 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
759759
NWilson::TSpan compileServiceSpan(TWilsonKqp::CompileService, ev->Get() ? std::move(ev->TraceId) : NWilson::TTraceId(), "CompileService");
760760

761761
TKqpCompileSettings compileSettings(true, request.IsQueryActionPrepare, false, request.Deadline, TableServiceConfig.GetEnableAstCache() ? ECompileActorAction::PARSE : ECompileActorAction::COMPILE);
762-
TKqpCompileRequest compileRequest(ev->Sender, request.Uid, compileResult ? *compileResult->Query : *request.Query,
762+
TKqpCompileRequest compileRequest(ev->Sender, request.Uid, request.Query ? *request.Query : *compileResult->Query,
763763
compileSettings, request.UserToken, dbCounters, request.GUCSettings, request.ApplicationName,
764764
ev->Cookie, std::move(ev->Get()->IntrestedInResult),
765765
ev->Get()->UserRequestContext,
766766
ev->Get() ? std::move(ev->Get()->Orbit) : NLWTrace::TOrbit(),
767767
std::move(compileServiceSpan), std::move(ev->Get()->TempTablesState));
768768

769+
if (TableServiceConfig.GetEnableAstCache() && request.QueryAst) {
770+
return CompileByAst(*request.QueryAst, compileRequest, ctx);
771+
}
772+
769773
if (!RequestsQueue.Enqueue(std::move(compileRequest))) {
770774
Counters->ReportCompileRequestRejected(dbCounters);
771775

@@ -1138,7 +1142,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
11381142
<< ", message: " << e.what());
11391143
}
11401144

1141-
void Reply(const TActorId& sender, const TVector<TQueryAst> astStatements, const TKqpQueryId query,
1145+
void Reply(const TActorId& sender, const TVector<TQueryAst>& astStatements, const TKqpQueryId query,
11421146
const TActorContext& ctx, ui64 cookie, NLWTrace::TOrbit orbit, NWilson::TSpan span)
11431147
{
11441148
LWTRACK(KqpCompileServiceReply,
@@ -1157,7 +1161,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
11571161
ctx.Send(sender, responseEv.Release(), 0, cookie);
11581162
}
11591163

1160-
void ReplyQueryStatements(const TActorId& sender, const TVector<TQueryAst> astStatements,
1164+
void ReplyQueryStatements(const TActorId& sender, const TVector<TQueryAst>& astStatements,
11611165
const TKqpQueryId query, const TActorContext& ctx, ui64 cookie, NLWTrace::TOrbit orbit, NWilson::TSpan span)
11621166
{
11631167
LWTRACK(KqpCompileServiceReplyStatements, orbit);

ydb/core/kqp/session_actor/kqp_query_state.cpp

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,34 @@ using namespace NSchemeCache;
1414
#define LOG_D(msg) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg)
1515
#define LOG_T(msg) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg)
1616

17+
18+
TKqpQueryState::TQueryTxId::TQueryTxId(const TQueryTxId& other) {
19+
YQL_ENSURE(!IsValueSet);
20+
Id = other.Id;
21+
IsValueSet = true;
22+
}
23+
24+
TKqpQueryState::TQueryTxId& TKqpQueryState::TQueryTxId::operator=(const TQueryTxId& id) {
25+
YQL_ENSURE(!IsValueSet);
26+
Id = id.Id;
27+
IsValueSet = true;
28+
return *this;
29+
}
30+
31+
void TKqpQueryState::TQueryTxId::SetValue(const TTxId& id) {
32+
YQL_ENSURE(!IsValueSet);
33+
Id = id.Id;
34+
IsValueSet = true;
35+
}
36+
37+
TTxId TKqpQueryState::TQueryTxId::GetValue() {
38+
return Id;
39+
}
40+
41+
void TKqpQueryState::TQueryTxId::Reset() {
42+
Id = TTxId();
43+
}
44+
1745
bool TKqpQueryState::EnsureTableVersions(const TEvTxProxySchemeCache::TEvNavigateKeySetResult& response) {
1846
Y_ENSURE(response.Request);
1947
const auto& navigate = *response.Request;
@@ -244,8 +272,15 @@ std::unique_ptr<TEvKqp::TEvRecompileRequest> TKqpQueryState::BuildReCompileReque
244272
compileDeadline = Min(compileDeadline, QueryDeadlines.CancelAt);
245273
}
246274

247-
return std::make_unique<TEvKqp::TEvRecompileRequest>(UserToken, CompileResult->Uid, CompileResult->Query, isQueryActionPrepare,
248-
compileDeadline, DbCounters, gUCSettingsPtr, ApplicationName, std::move(cookie), UserRequestContext, std::move(Orbit), TempTablesState);
275+
TMaybe<TQueryAst> statementAst;
276+
if (!Statements.empty()) {
277+
YQL_ENSURE(CurrentStatementId < Statements.size());
278+
statementAst = Statements[CurrentStatementId];
279+
}
280+
281+
return std::make_unique<TEvKqp::TEvRecompileRequest>(UserToken, CompileResult->Uid, query, isQueryActionPrepare,
282+
compileDeadline, DbCounters, gUCSettingsPtr, ApplicationName, std::move(cookie), UserRequestContext, std::move(Orbit), TempTablesState,
283+
statementAst);
249284
}
250285

251286
std::unique_ptr<TEvKqp::TEvCompileRequest> TKqpQueryState::BuildSplitRequest(std::shared_ptr<std::atomic<bool>> cookie, const TGUCSettings::TPtr& gUCSettingsPtr) {
@@ -295,7 +330,6 @@ bool TKqpQueryState::PrepareNextStatementPart() {
295330
QueryData = {};
296331
PreparedQuery = {};
297332
CompileResult = {};
298-
TxCtx = {};
299333
CurrentTx = 0;
300334
TableVersions = {};
301335
MaxReadType = ETableReadType::Other;

ydb/core/kqp/session_actor/kqp_query_state.h

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,22 @@ namespace NKikimr::NKqp {
3333
// common case).
3434
class TKqpQueryState : public TNonCopyable {
3535
public:
36+
class TQueryTxId {
37+
public:
38+
TQueryTxId() = default;
39+
TQueryTxId(const TQueryTxId& other);
40+
TQueryTxId& operator=(const TQueryTxId& id);
41+
42+
void SetValue(const TTxId& id);
43+
TTxId GetValue();
44+
45+
void Reset();
46+
47+
private:
48+
TTxId Id;
49+
bool IsValueSet = false;
50+
};
51+
3652
TKqpQueryState(TEvKqp::TEvQueryRequest::TPtr& ev, ui64 queryId, const TString& database, const TMaybe<TString>& applicationName,
3753
const TString& cluster, TKqpDbCountersPtr dbCounters, bool longSession, const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
3854
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const TString& sessionId, TMonotonic startedAt)
@@ -110,7 +126,7 @@ class TKqpQueryState : public TNonCopyable {
110126
NWilson::TSpan KqpSessionSpan;
111127
ETableReadType MaxReadType = ETableReadType::Other;
112128

113-
TTxId TxId; // User tx
129+
TQueryTxId TxId; // User tx
114130
bool Commit = false;
115131
bool Commited = false;
116132

@@ -132,6 +148,7 @@ class TKqpQueryState : public TNonCopyable {
132148
NYql::TIssues Issues;
133149

134150
TVector<TQueryAst> Statements;
151+
TMaybe<TQueryTxId> ImpliedTxId = {}; // Implied tx
135152
ui32 CurrentStatementId = 0;
136153
ui32 StatementResultIndex = 0;
137154
ui32 StatementResultSize = 0;
@@ -417,15 +434,12 @@ class TKqpQueryState : public TNonCopyable {
417434
}
418435

419436
void PrepareCurrentStatement() {
420-
QueryData = {};
437+
QueryData = std::make_shared<TQueryData>(TxCtx->TxAlloc);
421438
PreparedQuery = {};
422439
CompileResult = {};
423-
TxCtx = {};
424440
CurrentTx = 0;
425441
TableVersions = {};
426442
MaxReadType = ETableReadType::Other;
427-
Commit = false;
428-
Commited = false;
429443
TopicOperations = {};
430444
ReplayMessage = {};
431445
}
@@ -444,7 +458,6 @@ class TKqpQueryState : public TNonCopyable {
444458
TxCtx->EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_UNDEFINED;
445459
break;
446460
default:
447-
Commit = true;
448461
TxCtx->EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE;
449462
}
450463
}

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
284284
}
285285
QueryState->TxCtx = std::move(txCtx);
286286
QueryState->QueryData = std::make_shared<TQueryData>(QueryState->TxCtx->TxAlloc);
287-
QueryState->TxId = txId;
287+
QueryState->TxId.SetValue(txId);
288288
if (!CheckTransactionLocks(/*tx*/ nullptr)) {
289289
return;
290290
}
@@ -540,6 +540,10 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
540540

541541
void Handle(TEvKqp::TEvParseResponse::TPtr& ev) {
542542
QueryState->SaveAndCheckParseResult(std::move(*ev->Get()));
543+
Ydb::Table::TransactionSettings settings;
544+
settings.mutable_serializable_read_write();
545+
BeginTx(settings);
546+
QueryState->ImpliedTxId = QueryState->TxId;
543547
CompileStatement();
544548
}
545549

@@ -680,7 +684,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
680684
}
681685

682686
void BeginTx(const Ydb::Table::TransactionSettings& settings) {
683-
QueryState->TxId = UlidGen.Next();
687+
QueryState->TxId.SetValue(UlidGen.Next());
684688
QueryState->TxCtx = MakeIntrusive<TKqpTransactionContext>(false, AppData()->FunctionRegistry,
685689
AppData()->TimeProvider, AppData()->RandomProvider, Config->EnableKqpImmediateEffects);
686690

@@ -704,7 +708,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
704708
QueryState->TxCtx->SetIsolationLevel(settings);
705709
QueryState->TxCtx->OnBeginQuery();
706710

707-
if (!Transactions.CreateNew(QueryState->TxId, QueryState->TxCtx)) {
711+
if (!Transactions.CreateNew(QueryState->TxId.GetValue(), QueryState->TxCtx)) {
708712
std::vector<TIssue> issues{
709713
YqlIssue({}, TIssuesIds::KIKIMR_TOO_MANY_TRANSACTIONS)};
710714
ythrow TRequestFail(Ydb::StatusIds::BAD_SESSION,
@@ -717,14 +721,14 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
717721
Counters->ReportBeginTransaction(Settings.DbCounters, Transactions.EvictedTx, Transactions.Size(), Transactions.ToBeAbortedSize());
718722
}
719723

720-
static const Ydb::Table::TransactionControl& GetImpliedTxControl() {
721-
auto create = []() -> Ydb::Table::TransactionControl {
722-
Ydb::Table::TransactionControl control;
724+
Ydb::Table::TransactionControl GetImpliedTxControl() {
725+
Ydb::Table::TransactionControl control;
726+
control.set_commit_tx(QueryState->ProcessingLastStatement());
727+
if (QueryState->ImpliedTxId) {
728+
control.set_tx_id(QueryState->ImpliedTxId->GetValue().GetHumanStr());
729+
} else {
723730
control.mutable_begin_tx()->mutable_serializable_read_write();
724-
control.set_commit_tx(true);
725-
return control;
726-
};
727-
static const Ydb::Table::TransactionControl control = create();
731+
}
728732
return control;
729733
}
730734

@@ -744,14 +748,16 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
744748
}
745749
QueryState->TxCtx = txCtx;
746750
QueryState->QueryData = std::make_shared<TQueryData>(QueryState->TxCtx->TxAlloc);
747-
QueryState->TxId = txId;
751+
if (QueryState->TxId.GetValue() != txId) {
752+
QueryState->TxId.SetValue(txId);
753+
}
748754
break;
749755
}
750756
case Ydb::Table::TransactionControl::kBeginTx: {
751757
BeginTx(txControl.begin_tx());
752758
break;
753-
}
754-
case Ydb::Table::TransactionControl::TX_SELECTOR_NOT_SET:
759+
}
760+
case Ydb::Table::TransactionControl::TX_SELECTOR_NOT_SET:
755761
ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST)
756762
<< "wrong TxControl: tx_selector must be set";
757763
break;
@@ -1539,7 +1545,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
15391545

15401546
void FillTxInfo(NKikimrKqp::TQueryResponse* response) {
15411547
YQL_ENSURE(QueryState);
1542-
response->MutableTxMeta()->set_id(QueryState->TxId.GetHumanStr());
1548+
response->MutableTxMeta()->set_id(QueryState->TxId.GetValue().GetHumanStr());
15431549

15441550
if (QueryState->TxCtx) {
15451551
auto txInfo = QueryState->TxCtx->GetInfo();
@@ -1612,8 +1618,8 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
16121618

16131619
if (QueryState->Commit) {
16141620
ResetTxState();
1615-
Transactions.ReleaseTransaction(QueryState->TxId);
1616-
QueryState->TxId = TTxId();
1621+
Transactions.ReleaseTransaction(QueryState->TxId.GetValue());
1622+
QueryState->TxId.Reset();
16171623
}
16181624

16191625
FillTxInfo(response);
@@ -1958,7 +1964,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
19581964
auto& txCtx = QueryState->TxCtx;
19591965
if (txCtx->IsInvalidated()) {
19601966
Transactions.AddToBeAborted(txCtx);
1961-
Transactions.ReleaseTransaction(QueryState->TxId);
1967+
Transactions.ReleaseTransaction(QueryState->TxId.GetValue());
19621968
}
19631969
DiscardPersistentSnapshot(txCtx->SnapshotHandle);
19641970
}

0 commit comments

Comments
 (0)