Skip to content

Commit 3dc007b

Browse files
authored
Merge 20aa3ad into 5f731ba
2 parents 5f731ba + 20aa3ad commit 3dc007b

File tree

7 files changed

+46
-17
lines changed

7 files changed

+46
-17
lines changed

ydb/core/kqp/executer_actor/kqp_executer_impl.h

+11-6
Original file line numberDiff line numberDiff line change
@@ -684,7 +684,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
684684
if (statusCode == Ydb::StatusIds::INTERNAL_ERROR) {
685685
InternalError(issues);
686686
} else if (statusCode == Ydb::StatusIds::TIMEOUT) {
687-
TimeoutError(ev->Sender);
687+
TimeoutError(ev->Sender, issues);
688688
} else {
689689
RuntimeError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), issues);
690690
}
@@ -1706,29 +1706,34 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
17061706
ReplyErrorAndDie(status, &issues);
17071707
}
17081708

1709-
void TimeoutError(TActorId abortSender) {
1709+
void TimeoutError(TActorId abortSender, NYql::TIssues issues) {
17101710
if (AlreadyReplied) {
17111711
LOG_E("Timeout when we already replied - not good" << Endl << TBackTrace().PrintToString() << Endl);
17121712
return;
17131713
}
17141714

17151715
const auto status = NYql::NDqProto::StatusIds::TIMEOUT;
1716-
const TString message = "Request timeout exceeded";
1716+
if (issues.Empty()) {
1717+
issues.AddIssue("Request timeout exceeded");
1718+
}
17171719

1718-
TerminateComputeActors(Ydb::StatusIds::TIMEOUT, message);
1720+
TerminateComputeActors(Ydb::StatusIds::TIMEOUT, issues);
17191721

17201722
AlreadyReplied = true;
17211723

1722-
LOG_E("Abort execution: " << NYql::NDqProto::StatusIds_StatusCode_Name(status) << "," << message);
1724+
LOG_E("Abort execution: " << NYql::NDqProto::StatusIds_StatusCode_Name(status) << ", " << issues.ToOneLineString());
17231725
if (ExecuterSpan) {
17241726
ExecuterSpan.EndError(TStringBuilder() << NYql::NDqProto::StatusIds_StatusCode_Name(status));
17251727
}
17261728

17271729
ResponseEv->Record.MutableResponse()->SetStatus(Ydb::StatusIds::TIMEOUT);
1730+
for (const auto& issue : issues) {
1731+
NYql::IssueToMessage(issue, ResponseEv->Record.MutableResponse()->AddIssues());
1732+
}
17281733

17291734
// TEvAbortExecution can come from either ComputeActor or SessionActor (== Target).
17301735
if (abortSender != Target) {
1731-
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(status, message);
1736+
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(status, issues);
17321737
this->Send(Target, abortEv.Release());
17331738
}
17341739

ydb/core/kqp/proxy_service/kqp_proxy_service.cpp

+6-4
Original file line numberDiff line numberDiff line change
@@ -148,12 +148,14 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
148148
struct TEvOnRequestTimeout: public TEventLocal<TEvOnRequestTimeout, EEv::EvOnRequestTimeout> {
149149
ui64 RequestId;
150150
TDuration Timeout;
151+
TDuration InitialTimeout;
151152
NYql::NDqProto::StatusIds::StatusCode Status;
152153
int Round;
153154

154155
TEvOnRequestTimeout(ui64 requestId, TDuration timeout, NYql::NDqProto::StatusIds::StatusCode status, int round)
155156
: RequestId(requestId)
156157
, Timeout(timeout)
158+
, InitialTimeout(timeout)
157159
, Status(status)
158160
, Round(round)
159161
{}
@@ -1283,9 +1285,9 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
12831285

12841286
const TKqpSessionInfo* info = LocalSessions->FindPtr(reqInfo->SessionId);
12851287
if (msg->Round == 0 && info) {
1286-
TString message = TStringBuilder()
1287-
<< "request's " << (msg->Status == NYql::NDqProto::StatusIds::TIMEOUT ? "timeout" : "cancelAfter")
1288-
<< " exceeded";
1288+
TString message = msg->Status == NYql::NDqProto::StatusIds::TIMEOUT
1289+
? (TStringBuilder() << "Request's timeout " << msg->Timeout.MilliSeconds() << "ms exceeded")
1290+
: (TStringBuilder() << "Request's canceled after " << msg->Timeout.MilliSeconds() << "ms");
12891291

12901292
Send(info->WorkerId, new TEvKqp::TEvAbortExecution(msg->Status, message));
12911293

@@ -1297,7 +1299,7 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
12971299
}
12981300
} else {
12991301
TString message = TStringBuilder()
1300-
<< "Query did not complete within specified timeout, session id " << reqInfo->SessionId;
1302+
<< "Query did not complete within specified timeout " << msg->InitialTimeout.MilliSeconds() << "ms, session id " << reqInfo->SessionId;
13011303
ReplyProcessError(NYql::NDq::DqStatusToYdbStatus(msg->Status), message, requestId);
13021304
}
13031305
}

ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp

+5
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,11 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
216216
WaitFinalizationRequest = true;
217217
RunState = IsExecuting() ? ERunState::Finishing : RunState;
218218

219+
if (RunState == ERunState::Cancelling) {
220+
NYql::TIssue cancelIssue("Request was canceled by user");
221+
cancelIssue.SetCode(NYql::DEFAULT_ERROR, NYql::TSeverityIds::S_INFO);
222+
}
223+
219224
auto scriptFinalizeRequest = std::make_unique<TEvScriptFinalizeRequest>(
220225
GetFinalizationStatusFromRunState(), ExecutionId, Database, Status, GetExecStatusFromStatusCode(Status),
221226
Issues, std::move(QueryStats), std::move(QueryPlan), std::move(QueryAst), LeaseGeneration

ydb/core/kqp/session_actor/kqp_query_state.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ std::unique_ptr<TEvTxProxySchemeCache::TEvNavigateKeySet> TKqpQueryState::BuildN
138138

139139

140140
bool TKqpQueryState::SaveAndCheckCompileResult(TEvKqp::TEvCompileResponse* ev) {
141+
CompilationRunning = false;
141142
CompileResult = ev->CompileResult;
142143
YQL_ENSURE(CompileResult);
143144
MaxReadType = CompileResult->MaxReadType;

ydb/core/kqp/session_actor/kqp_query_state.h

+1
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ class TKqpQueryState : public TNonCopyable {
128128
bool KeepSession = false;
129129
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
130130
NActors::TMonotonic StartedAt;
131+
bool CompilationRunning = false;
131132

132133
THashMap<NKikimr::TTableId, ui64> TableVersions;
133134

ydb/core/kqp/session_actor/kqp_session_actor.cpp

+13-6
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
524524

525525
void CompileQuery() {
526526
YQL_ENSURE(QueryState);
527+
QueryState->CompilationRunning = true;
527528
auto ev = QueryState->BuildCompileRequest(CompilationCookie, GUCSettings);
528529
LOG_D("Sending CompileQuery request");
529530

@@ -1528,16 +1529,22 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
15281529
TString logMsg = TStringBuilder() << "got TEvAbortExecution in " << CurrentStateFuncName();
15291530
LOG_I(logMsg << ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode()) << " send to: " << ExecuterId);
15301531

1531-
TString reason = TStringBuilder() << "Request timeout exceeded, cancelling after "
1532-
<< (AppData()->MonotonicTimeProvider->Now() - QueryState->StartedAt).MilliSeconds()
1533-
<< " milliseconds.";
1532+
auto issues = ev->Get()->GetIssues();
1533+
TStringBuilder reason = TStringBuilder() << "Cancelling after " << (AppData()->MonotonicTimeProvider->Now() - QueryState->StartedAt).MilliSeconds() << "ms";
1534+
if (QueryState->CompilationRunning) {
1535+
reason << " during compilation";
1536+
} else if (ExecuterId) {
1537+
reason << " during execution";
1538+
} else {
1539+
reason << " in " << CurrentStateFuncName();
1540+
}
1541+
issues.AddIssue(reason);
15341542

15351543
if (ExecuterId) {
1536-
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(msg.GetStatusCode(), reason);
1544+
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(msg.GetStatusCode(), issues);
15371545
Send(ExecuterId, abortEv.Release(), IEventHandle::FlagTrackDelivery);
15381546
} else {
1539-
const auto& issues = ev->Get()->GetIssues();
1540-
ReplyQueryError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), logMsg, MessageFromIssues(issues));
1547+
ReplyQueryError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), "", MessageFromIssues(issues));
15411548
}
15421549
}
15431550

ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

+9-1
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
109109
public:
110110
void Bootstrap() {
111111
try {
112+
StartTime = TInstant::Now();
112113
{
113114
TStringBuilder prefixBuilder;
114115
prefixBuilder << "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", task: " << Task.GetId() << ". ";
@@ -1049,8 +1050,14 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
10491050
);
10501051
}
10511052

1053+
TStringBuilder reason = TStringBuilder() << "Task execution timeout ";
1054+
if (RuntimeSettings.Timeout) {
1055+
reason << RuntimeSettings.Timeout->MilliSeconds() << "ms ";
1056+
}
1057+
reason << "exceeded, terminating after " << (TInstant::Now() - StartTime).MilliSeconds() << "ms";
1058+
10521059
State = NDqProto::COMPUTE_STATE_FAILURE;
1053-
ReportStateAndMaybeDie(NYql::NDqProto::StatusIds::TIMEOUT, {TIssue("timeout exceeded")}, true);
1060+
ReportStateAndMaybeDie(NYql::NDqProto::StatusIds::TIMEOUT, {TIssue(reason)}, true);
10541061
break;
10551062
}
10561063
case EEvWakeupTag::PeriodicStatsTag: {
@@ -1928,6 +1935,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
19281935
NWilson::TSpan ComputeActorSpan;
19291936
TDuration SourceCpuTime;
19301937
private:
1938+
TInstant StartTime;
19311939
bool Running = true;
19321940
TInstant LastSendStatsTime;
19331941
bool PassExceptions = false;

0 commit comments

Comments
 (0)