Skip to content

Commit 3964471

Browse files
authored
Merge 88f7ac2 into 87530be
2 parents 87530be + 88f7ac2 commit 3964471

File tree

10 files changed

+485
-148
lines changed

10 files changed

+485
-148
lines changed

ydb/core/kqp/common/compilation/events.h

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,14 +152,25 @@ struct TEvParseResponse: public TEventLocal<TEvParseResponse, TKqpEvents::EvPars
152152
};
153153

154154
struct TEvSplitResponse: public TEventLocal<TEvSplitResponse, TKqpEvents::EvSplitResponse> {
155-
TEvSplitResponse(const TKqpQueryId& query, TVector<NYql::TExprNode::TPtr> exprs, NYql::TExprNode::TPtr world, THolder<NYql::TExprContext> ctx)
156-
: Query(query)
155+
TEvSplitResponse(
156+
Ydb::StatusIds::StatusCode status,
157+
const NYql::TIssues& issues,
158+
const TKqpQueryId& query,
159+
TVector<NYql::TExprNode::TPtr> exprs,
160+
NYql::TExprNode::TPtr world,
161+
std::shared_ptr<NYql::TExprContext> ctx)
162+
: Status(status)
163+
, Issues(issues)
164+
, Query(query)
157165
, Ctx(std::move(ctx))
158166
, Exprs(std::move(exprs))
159167
, World(std::move(world)) {}
160168

169+
Ydb::StatusIds::StatusCode Status;
170+
NYql::TIssues Issues;
171+
161172
TKqpQueryId Query;
162-
THolder<NYql::TExprContext> Ctx;
173+
std::shared_ptr<NYql::TExprContext> Ctx;
163174
TVector<NYql::TExprNode::TPtr> Exprs;
164175
NYql::TExprNode::TPtr World;
165176
};

ydb/core/kqp/compile_service/kqp_compile_actor.cpp

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
127127
STFUNC(CompileState) {
128128
try {
129129
switch (ev->GetTypeRewrite()) {
130-
HFunc(TEvKqp::TEvContinueProcess, Handle);
130+
HFunc(TEvKqp::TEvContinueProcess, HandleCompile);
131131
cFunc(TEvents::TSystem::Wakeup, HandleTimeout);
132132
default:
133133
UnexpectedEvent("CompileState", ev->GetTypeRewrite());
@@ -137,6 +137,20 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
137137
}
138138
}
139139

140+
STFUNC(SplitState) {
141+
try {
142+
switch (ev->GetTypeRewrite()) {
143+
HFunc(TEvKqp::TEvContinueProcess, HandleSplit);
144+
cFunc(TEvents::TSystem::Wakeup, HandleTimeout);
145+
default:
146+
UnexpectedEvent("SplitState", ev->GetTypeRewrite());
147+
}
148+
} catch (const yexception& e) {
149+
InternalError(e.what());
150+
}
151+
}
152+
153+
140154
private:
141155
TVector<TQueryAst> GetAstStatements(const TActorContext &ctx) {
142156
TString cluster = QueryId.Cluster;
@@ -158,9 +172,11 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
158172
ALOG_DEBUG(NKikimrServices::KQP_COMPILE_ACTOR, "Send split result"
159173
<< ", self: " << SelfId()
160174
<< ", owner: " << Owner
161-
<< (!result.Exprs.empty() ? ", split is successful" : ", split is not successful"));
175+
<< ", success: " << GetYdbStatus(result)
176+
<< ", issues: " << result.Issues().ToOneLineString());
162177

163178
auto responseEv = MakeHolder<TEvKqp::TEvSplitResponse>(
179+
GetYdbStatus(result), result.Issues(),
164180
QueryId, std::move(result.Exprs), std::move(result.World), std::move(result.Ctx));
165181
Send(Owner, responseEv.Release());
166182

@@ -174,11 +190,38 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
174190
}
175191

176192
void StartSplitting(const TActorContext &ctx) {
193+
Become(&TKqpCompileActor::SplitState);
194+
TimeoutTimerActorId = CreateLongTimer(ctx, CompilationTimeout, new IEventHandle(SelfId(), SelfId(),
195+
new TEvents::TEvWakeup()));
196+
177197
const auto prepareSettings = PrepareCompilationSettings(ctx);
178-
auto result = KqpHost->SplitQuery(QueryRef, prepareSettings);
198+
AsyncSplitResult = KqpHost->SplitQuery(QueryRef, prepareSettings);
199+
ContinueSplittig(ctx);
200+
}
179201

180-
Become(&TKqpCompileActor::CompileState);
181-
ReplySplitResult(ctx, std::move(result));
202+
void ContinueSplittig(const TActorContext &ctx) {
203+
TActorSystem* actorSystem = ctx.ExecutorThread.ActorSystem;
204+
TActorId selfId = ctx.SelfID;
205+
206+
auto callback = [actorSystem, selfId](const TFuture<bool>& future) {
207+
bool finished = future.GetValue();
208+
auto processEv = MakeHolder<TEvKqp::TEvContinueProcess>(0, finished);
209+
actorSystem->Send(selfId, processEv.Release());
210+
};
211+
212+
AsyncSplitResult->Continue().Apply(callback);
213+
}
214+
215+
void HandleSplit(TEvKqp::TEvContinueProcess::TPtr &ev, const TActorContext &ctx) {
216+
Y_ENSURE(!ev->Get()->QueryId);
217+
218+
if (!ev->Get()->Finished) {
219+
ContinueSplittig(ctx);
220+
return;
221+
}
222+
223+
auto splitResult = std::move(AsyncSplitResult->GetResult());
224+
ReplySplitResult(ctx, std::move(splitResult));
182225
}
183226

184227
void StartParsing(const TActorContext &ctx) {
@@ -460,7 +503,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
460503
KqpCompileResult->AllowCache = CanCacheQuery(KqpCompileResult->PreparedQuery->GetPhysicalQuery()) && allowCache;
461504
}
462505

463-
void Handle(TEvKqp::TEvContinueProcess::TPtr &ev, const TActorContext &ctx) {
506+
void HandleCompile(TEvKqp::TEvContinueProcess::TPtr &ev, const TActorContext &ctx) {
464507
Y_ENSURE(!ev->Get()->QueryId);
465508

466509
TYqlLogScope logScope(ctx, NKikimrServices::KQP_YQL, YqlName, UserRequestContext->TraceId);
@@ -558,6 +601,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
558601
TIntrusivePtr<IKqpGateway> Gateway;
559602
TIntrusivePtr<IKqpHost> KqpHost;
560603
TIntrusivePtr<IKqpHost::IAsyncQueryResult> AsyncCompileResult;
604+
TIntrusivePtr<IKqpHost::IAsyncSplitResult> AsyncSplitResult;
561605
std::shared_ptr<TKqpCompileResult> KqpCompileResult;
562606
std::optional<TString> ReplayMessage; // here metadata is encoded protobuf - for logs
563607
std::optional<TString> ReplayMessageUserView; // here metadata is part of json - full readable json for diagnostics

0 commit comments

Comments
 (0)