Skip to content

Commit 4a8e559

Browse files
authored
Merge 4c1281e into 2dd1cac
2 parents 2dd1cac + 4c1281e commit 4a8e559

File tree

6 files changed

+135
-104
lines changed

6 files changed

+135
-104
lines changed

ydb/core/kqp/gateway/kqp_gateway.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,8 @@ class IKqpGateway : public NYql::IKikimrGateway {
201201
using NYql::IKikimrGateway::ExecuteLiteral;
202202
virtual NThreading::TFuture<TExecPhysicalResult> ExecuteLiteral(TExecPhysicalRequest&& request,
203203
TQueryData::TPtr params, ui32 txIndex) = 0;
204+
virtual TExecPhysicalResult ExecuteLiteralInstant1(TExecPhysicalRequest&& request,
205+
TQueryData::TPtr params, ui32 txIndex) = 0;
204206

205207
/* Scripting */
206208
virtual NThreading::TFuture<TQueryResult> ExplainDataQueryAst(const TString& cluster, const TString& query) = 0;

ydb/core/kqp/gateway/kqp_ic_gateway.cpp

Lines changed: 105 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,79 @@ struct TAppConfigResult : public IKqpGateway::TGenericResult {
7777
std::shared_ptr<const NKikimrConfig::TAppConfig> Config;
7878
};
7979

80+
bool ContainOnlyLiteralStages(NKikimr::NKqp::IKqpGateway::TExecPhysicalRequest& request) {
81+
for (const auto& tx : request.Transactions) {
82+
if (tx.Body->GetType() != NKqpProto::TKqpPhyTx::TYPE_COMPUTE) {
83+
return false;
84+
}
85+
86+
for (const auto& stage : tx.Body->GetStages()) {
87+
if (stage.InputsSize() != 0) {
88+
return false;
89+
}
90+
}
91+
}
92+
93+
return true;
94+
}
95+
96+
void PrepareLiteralRequest(IKqpGateway::TExecPhysicalRequest& literalRequest, NKqpProto::TKqpPhyQuery& phyQuery, const TString& program, const NKikimrMiniKQL::TType& resultType) {
97+
literalRequest.NeedTxId = false;
98+
literalRequest.MaxAffectedShards = 0;
99+
literalRequest.TotalReadSizeLimitBytes = 0;
100+
literalRequest.MkqlMemoryLimit = 100_MB;
101+
102+
auto& transaction = *phyQuery.AddTransactions();
103+
transaction.SetType(NKqpProto::TKqpPhyTx::TYPE_COMPUTE);
104+
105+
auto& stage = *transaction.AddStages();
106+
auto& stageProgram = *stage.MutableProgram();
107+
stageProgram.SetRuntimeVersion(NYql::NDqProto::RUNTIME_VERSION_YQL_1_0);
108+
stageProgram.SetRaw(program);
109+
stage.SetOutputsCount(1);
110+
111+
auto& taskResult = *transaction.AddResults();
112+
*taskResult.MutableItemType() = resultType;
113+
auto& taskConnection = *taskResult.MutableConnection();
114+
taskConnection.SetStageIndex(0);
115+
}
116+
117+
void FillLiteralResult(const IKqpGateway::TExecPhysicalResult& result, IKqpGateway::TExecuteLiteralResult& literalResult) {
118+
if (result.Success()) {
119+
YQL_ENSURE(result.Results.size() == 1);
120+
literalResult.SetSuccess();
121+
literalResult.Result = result.Results[0];
122+
} else {
123+
literalResult.SetStatus(result.Status());
124+
literalResult.AddIssues(result.Issues());
125+
}
126+
}
127+
128+
void FillPhysicalResult(std::unique_ptr<TEvKqpExecuter::TEvTxResponse>& ev, IKqpGateway::TExecPhysicalResult& result, TQueryData::TPtr params, ui32 txIndex) {
129+
auto& response = *ev->Record.MutableResponse();
130+
if (response.GetStatus() == Ydb::StatusIds::SUCCESS) {
131+
result.SetSuccess();
132+
result.ExecuterResult.Swap(response.MutableResult());
133+
{
134+
auto g = params->TypeEnv().BindAllocator();
135+
136+
auto& txResults = ev->GetTxResults();
137+
result.Results.reserve(txResults.size());
138+
for(auto& tx : txResults) {
139+
result.Results.emplace_back(tx.GetMkql());
140+
}
141+
params->AddTxHolders(std::move(ev->GetTxHolders()));
142+
143+
if (!txResults.empty()) {
144+
params->AddTxResults(txIndex, std::move(txResults));
145+
}
146+
}
147+
} else {
148+
for (auto& issue : response.GetIssues()) {
149+
result.AddIssue(NYql::IssueFromMessage(issue));
150+
}
151+
}
152+
}
80153

81154
template<typename TRequest, typename TResponse, typename TResult>
82155
class TProxyRequestHandler: public TRequestHandlerBase<
@@ -595,32 +668,8 @@ class TKqpExecLiteralRequestHandler: public TActorBootstrapped<TKqpExecLiteralRe
595668
}
596669

597670
void ProcessPureExecution(std::unique_ptr<TEvKqpExecuter::TEvTxResponse>& ev) {
598-
auto* response = ev->Record.MutableResponse();
599-
600671
TResult result;
601-
if (response->GetStatus() == Ydb::StatusIds::SUCCESS) {
602-
result.SetSuccess();
603-
result.ExecuterResult.Swap(response->MutableResult());
604-
{
605-
auto g = Parameters->TypeEnv().BindAllocator();
606-
607-
auto& txResults = ev->GetTxResults();
608-
result.Results.reserve(txResults.size());
609-
for(auto& tx : txResults) {
610-
result.Results.emplace_back(tx.GetMkql());
611-
}
612-
Parameters->AddTxHolders(std::move(ev->GetTxHolders()));
613-
614-
if (!txResults.empty()) {
615-
Parameters->AddTxResults(TxIndex, std::move(txResults));
616-
}
617-
}
618-
} else {
619-
for (auto& issue : response->GetIssues()) {
620-
result.AddIssue(NYql::IssueFromMessage(issue));
621-
}
622-
}
623-
672+
FillPhysicalResult(ev, result, Parameters, TxIndex);
624673
Promise.SetValue(std::move(result));
625674
this->PassAway();
626675
}
@@ -1785,79 +1834,60 @@ class TKikimrIcGateway : public IKqpGateway {
17851834
auto preparedQuery = std::make_unique<NKikimrKqp::TPreparedQuery>();
17861835
auto& phyQuery = *preparedQuery->MutablePhysicalQuery();
17871836
NKikimr::NKqp::IKqpGateway::TExecPhysicalRequest literalRequest(txAlloc);
1788-
1789-
literalRequest.NeedTxId = false;
1790-
literalRequest.MaxAffectedShards = 0;
1791-
literalRequest.TotalReadSizeLimitBytes = 0;
1792-
literalRequest.MkqlMemoryLimit = 100_MB;
1793-
1794-
auto& transaction = *phyQuery.AddTransactions();
1795-
transaction.SetType(NKqpProto::TKqpPhyTx::TYPE_COMPUTE);
1796-
1797-
auto& stage = *transaction.AddStages();
1798-
auto& stageProgram = *stage.MutableProgram();
1799-
stageProgram.SetRuntimeVersion(NYql::NDqProto::RUNTIME_VERSION_YQL_1_0);
1800-
stageProgram.SetRaw(program);
1801-
stage.SetOutputsCount(1);
1802-
1803-
auto& taskResult = *transaction.AddResults();
1804-
*taskResult.MutableItemType() = resultType;
1805-
auto& taskConnection = *taskResult.MutableConnection();
1806-
taskConnection.SetStageIndex(0);
1837+
PrepareLiteralRequest(literalRequest, phyQuery, program, resultType);
18071838

18081839
NKikimr::NKqp::TPreparedQueryHolder queryHolder(preparedQuery.release(), txAlloc->HolderFactory.GetFunctionRegistry());
1809-
18101840
NKikimr::NKqp::TQueryData::TPtr params = std::make_shared<NKikimr::NKqp::TQueryData>(txAlloc);
1811-
18121841
literalRequest.Transactions.emplace_back(queryHolder.GetPhyTx(0), params);
18131842

18141843
return ExecuteLiteral(std::move(literalRequest), params, 0).Apply([](const auto& future) {
18151844
const auto& result = future.GetValue();
1816-
18171845
TExecuteLiteralResult literalResult;
1818-
1819-
if (result.Success()) {
1820-
YQL_ENSURE(result.Results.size() == 1);
1821-
literalResult.SetSuccess();
1822-
literalResult.Result = result.Results[0];
1823-
} else {
1824-
literalResult.SetStatus(result.Status());
1825-
literalResult.AddIssues(result.Issues());
1826-
}
1827-
1846+
FillLiteralResult(result, literalResult);
18281847
return literalResult;
18291848
});
18301849
}
18311850

1851+
TExecuteLiteralResult ExecuteLiteralInstant(const TString& program, const NKikimrMiniKQL::TType& resultType, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc) override {
1852+
auto preparedQuery = std::make_unique<NKikimrKqp::TPreparedQuery>();
1853+
auto& phyQuery = *preparedQuery->MutablePhysicalQuery();
1854+
NKikimr::NKqp::IKqpGateway::TExecPhysicalRequest literalRequest(txAlloc);
1855+
PrepareLiteralRequest(literalRequest, phyQuery, program, resultType);
1856+
1857+
NKikimr::NKqp::TPreparedQueryHolder queryHolder(preparedQuery.release(), txAlloc->HolderFactory.GetFunctionRegistry());
1858+
NKikimr::NKqp::TQueryData::TPtr params = std::make_shared<NKikimr::NKqp::TQueryData>(txAlloc);
1859+
literalRequest.Transactions.emplace_back(queryHolder.GetPhyTx(0), params);
1860+
1861+
auto result = ExecuteLiteralInstant1(std::move(literalRequest), params, 0);
1862+
1863+
TExecuteLiteralResult literalResult;
1864+
FillLiteralResult(result, literalResult);
1865+
return literalResult;
1866+
}
18321867

18331868
TFuture<TExecPhysicalResult> ExecuteLiteral(TExecPhysicalRequest&& request, TQueryData::TPtr params, ui32 txIndex) override {
18341869
YQL_ENSURE(!request.Transactions.empty());
18351870
YQL_ENSURE(request.DataShardLocks.empty());
18361871
YQL_ENSURE(!request.NeedTxId);
1837-
1838-
auto containOnlyLiteralStages = [](const auto& request) {
1839-
for (const auto& tx : request.Transactions) {
1840-
if (tx.Body->GetType() != NKqpProto::TKqpPhyTx::TYPE_COMPUTE) {
1841-
return false;
1842-
}
1843-
1844-
for (const auto& stage : tx.Body->GetStages()) {
1845-
if (stage.InputsSize() != 0) {
1846-
return false;
1847-
}
1848-
}
1849-
}
1850-
1851-
return true;
1852-
};
1853-
1854-
YQL_ENSURE(containOnlyLiteralStages(request));
1872+
YQL_ENSURE(ContainOnlyLiteralStages(request));
18551873
auto promise = NewPromise<TExecPhysicalResult>();
18561874
IActor* requestHandler = new TKqpExecLiteralRequestHandler(std::move(request), Counters, promise, params, txIndex);
18571875
RegisterActor(requestHandler);
18581876
return promise.GetFuture();
18591877
}
18601878

1879+
TExecPhysicalResult ExecuteLiteralInstant1(TExecPhysicalRequest&& request, TQueryData::TPtr params, ui32 txIndex) override {
1880+
YQL_ENSURE(!request.Transactions.empty());
1881+
YQL_ENSURE(request.DataShardLocks.empty());
1882+
YQL_ENSURE(!request.NeedTxId);
1883+
YQL_ENSURE(ContainOnlyLiteralStages(request));
1884+
1885+
auto ev = ::NKikimr::NKqp::ExecuteLiteral(std::move(request), Counters, TActorId{}, MakeIntrusive<TUserRequestContext>());
1886+
TExecPhysicalResult result;
1887+
FillPhysicalResult(ev, result, params, txIndex);
1888+
return result;
1889+
}
1890+
18611891
TFuture<TQueryResult> ExecScanQueryAst(const TString& cluster, const TString& query,
18621892
TQueryData::TPtr params, const TAstQuerySettings& settings, ui64 rowsLimit) override
18631893
{

ydb/core/kqp/host/kqp_gateway_proxy.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2442,6 +2442,12 @@ class TKqpGatewayProxy : public IKikimrGateway {
24422442
return Gateway->ExecuteLiteral(program, resultType, txAlloc);
24432443
}
24442444

2445+
TExecuteLiteralResult ExecuteLiteralInstant(const TString& program,
2446+
const NKikimrMiniKQL::TType& resultType, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc) override
2447+
{
2448+
return Gateway->ExecuteLiteralInstant(program, resultType, txAlloc);
2449+
}
2450+
24452451
private:
24462452
bool IsPrepare() const {
24472453
if (!SessionCtx) {

ydb/core/kqp/provider/yql_kikimr_exec.cpp

Lines changed: 19 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -923,39 +923,30 @@ class TKiSourceCallableExecutionTransformer : public TAsyncCallbackTransformer<T
923923
if (status.Level != TStatus::Ok) {
924924
return SyncStatus(status);
925925
}
926-
auto asyncResult = Gateway->ExecuteLiteral(program, resultType, SessionCtx->Query().QueryData->GetAllocState());
927926

928-
return std::make_pair(IGraphTransformer::TStatus::Async, asyncResult.Apply(
929-
[this](const NThreading::TFuture<IKikimrGateway::TExecuteLiteralResult>& future) {
930-
return TAsyncTransformCallback(
931-
[future, this](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
927+
auto literalResult = Gateway->ExecuteLiteralInstant(program, resultType, SessionCtx->Query().QueryData->GetAllocState());
932928

933-
const auto& literalResult = future.GetValueSync();
934-
935-
if (!literalResult.Success()) {
936-
for (const auto& issue : literalResult.Issues()) {
937-
ctx.AddError(issue);
938-
}
939-
input->SetState(TExprNode::EState::Error);
940-
return IGraphTransformer::TStatus::Error;
941-
}
929+
if (!literalResult.Success()) {
930+
for (const auto& issue : literalResult.Issues()) {
931+
ctx.AddError(issue);
932+
}
933+
input->SetState(TExprNode::EState::Error);
934+
return SyncError();
935+
}
942936

943-
bool truncated = false;
944-
auto yson = this->EncodeResultToYson(literalResult.Result, truncated);
945-
if (truncated) {
946-
input->SetState(TExprNode::EState::Error);
947-
ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), "EvaluteExpr result is too big and was truncated"));
948-
return IGraphTransformer::TStatus::Error;
949-
}
937+
bool truncated = false;
938+
auto yson = EncodeResultToYson(literalResult.Result, truncated);
939+
if (truncated) {
940+
ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), "EvaluteExpr result is too big and was truncated"));
941+
input->SetState(TExprNode::EState::Error);
942+
return SyncError();
943+
}
950944

951-
output = input;
952-
input->SetState(TExprNode::EState::ExecutionComplete);
953-
input->SetResult(ctx.NewAtom(input->Pos(), yson));
954-
return IGraphTransformer::TStatus::Ok;
955-
});
956-
}));
945+
output = input;
946+
input->SetState(TExprNode::EState::ExecutionComplete);
947+
input->SetResult(ctx.NewAtom(input->Pos(), yson));
948+
return SyncOk();
957949
}
958-
959950
if (input->Content() == ConfigureName) {
960951
auto requireStatus = RequireChild(*input, 0);
961952
if (requireStatus.Level != TStatus::Ok) {

ydb/core/kqp/provider/yql_kikimr_gateway.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1126,6 +1126,8 @@ class IKikimrGateway : public TThrRefBase {
11261126

11271127
virtual NThreading::TFuture<TExecuteLiteralResult> ExecuteLiteral(const TString& program, const NKikimrMiniKQL::TType& resultType, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc) = 0;
11281128

1129+
virtual TExecuteLiteralResult ExecuteLiteralInstant(const TString& program, const NKikimrMiniKQL::TType& resultType, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc) = 0;
1130+
11291131
public:
11301132
using TCreateDirFunc = std::function<void(const TString&, const TString&, NThreading::TPromise<TGenericResult>)>;
11311133

ydb/core/kqp/ut/service/kqp_service_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ Y_UNIT_TEST_SUITE(KqpService) {
6767
}
6868

6969
Y_UNIT_TEST(CloseSessionsWithLoad) {
70-
UNIT_FAIL("Fast fail to avoid 10 min time waste, https://github.com/ydb-platform/ydb/issues/5349");
70+
// UNIT_FAIL("Fast fail to avoid 10 min time waste, https://github.com/ydb-platform/ydb/issues/5349");
7171

7272
auto kikimr = std::make_shared<TKikimrRunner>();
7373
kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_DEBUG);

0 commit comments

Comments
 (0)