Skip to content

Commit 3a45605

Browse files
committed
Added lease for queued requests table
1 parent 62cb0fa commit 3a45605

File tree

5 files changed

+127
-55
lines changed

5 files changed

+127
-55
lines changed

ydb/core/kqp/workload_service/kqp_workload_service_queues.cpp

Lines changed: 42 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,12 @@ class TStateBase : public IState {
4848
public:
4949
TStateBase(const TActorContext& actorContext, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, NMonitoring::TDynamicCounterPtr counters)
5050
: Counters(counters)
51-
, PoolConfig(poolConfig)
5251
, ActorContext(actorContext)
5352
, PoolId(poolId)
54-
, CancelAfter(poolConfig.QueryCancelAfter)
5553
, PoolSizeLimit(GetMaxPoolSize(poolConfig))
5654
, InFlightLimit(GetMaxInFlight(poolConfig))
55+
, PoolConfig(poolConfig)
56+
, CancelAfter(poolConfig.QueryCancelAfter)
5757
{
5858
RegisterCounters();
5959
}
@@ -69,7 +69,9 @@ class TStateBase : public IState {
6969
}
7070

7171
LOG_D("received new request, worker id: " << workerActorId << ", session id: " << sessionId);
72-
ActorContext.Schedule(CancelAfter, new TEvPrivate::TEvCancelRequest(PoolId, sessionId));
72+
if (CancelAfter) {
73+
ActorContext.Schedule(CancelAfter, new TEvPrivate::TEvCancelRequest(PoolId, sessionId));
74+
}
7375

7476
TRequest* request = &LocalSessions.insert({sessionId, TRequest(workerActorId, sessionId)}).first->second;
7577
LocalDelayedRequests->Inc();
@@ -145,7 +147,7 @@ class TStateBase : public IState {
145147
if (!request->Started && request->State != TRequest::EState::Finishing) {
146148
if (request->State == TRequest::EState::Canceling && status == Ydb::StatusIds::SUCCESS) {
147149
status = Ydb::StatusIds::CANCELLED;
148-
issues.AddIssue("Delay deadline exceeded");
150+
issues.AddIssue(TStringBuilder() << "Delay deadline exceeded in pool " << PoolId);
149151
}
150152
ReplyContinue(request, status, issues);
151153
return;
@@ -190,6 +192,21 @@ class TStateBase : public IState {
190192
return LocalInFlight;
191193
}
192194

195+
TMaybe<TInstant> GetWaitDeadline(TInstant startTime) {
196+
if (!CancelAfter) {
197+
return Nothing();
198+
}
199+
return startTime + CancelAfter;
200+
}
201+
202+
NYql::TIssue GroupIssues(const TString& message, NYql::TIssues issues) {
203+
NYql::TIssue rootIssue(message);
204+
for (const NYql::TIssue& issue : issues) {
205+
rootIssue.AddSubIssue(MakeIntrusive<NYql::TIssue>(issue));
206+
}
207+
return rootIssue;
208+
}
209+
193210
TString LogPrefix() const {
194211
return TStringBuilder() << "PoolId: " << PoolId << ", ";
195212
}
@@ -245,14 +262,15 @@ class TStateBase : public IState {
245262
protected:
246263
NMonitoring::TDynamicCounterPtr Counters;
247264

248-
const NResourcePool::TPoolSettings PoolConfig;
249265
const TActorContext ActorContext;
250266
const TString PoolId;
251-
const TDuration CancelAfter;
252267
const ui64 PoolSizeLimit;
253268
const ui64 InFlightLimit;
254269

255270
private:
271+
const NResourcePool::TPoolSettings PoolConfig;
272+
const TDuration CancelAfter;
273+
256274
ui64 LocalInFlight = 0;
257275
std::unordered_map<TString, TRequest> LocalSessions;
258276

@@ -337,9 +355,12 @@ class TFifoState : public TStateBase {
337355

338356
void RefreshState(bool refreshRequired = false) override {
339357
RefreshRequired |= refreshRequired;
340-
DoCleanupRequests();
358+
if (!PreparingFinished) {
359+
return;
360+
}
341361

342-
if (RunningOperation || !PreparingFinished) {
362+
DoCleanupRequests();
363+
if (RunningOperation) {
343364
return;
344365
}
345366

@@ -418,17 +439,17 @@ class TFifoState : public TStateBase {
418439

419440
if (ev->Get()->Status != Ydb::StatusIds::SUCCESS) {
420441
LOG_E("failed to delay request " << ev->Get()->Status << ", session id: " << ev->Get()->SessionId << ", issues: " << ev->Get()->Issues.ToOneLineString());
421-
ForUnfinished(ev->Get()->SessionId, [this, ev](TRequest* request) {
422-
ReplyContinue(request, ev->Get()->Status, ev->Get()->Issues);
442+
NYql::TIssue issue = GroupIssues("Failed to put request in queue", ev->Get()->Issues);
443+
ForUnfinished(ev->Get()->SessionId, [this, ev, issue](TRequest* request) {
444+
ReplyContinue(request, ev->Get()->Status, {issue});
423445
});
424446
RefreshRequired = true;
425447
return;
426448
}
427449

428-
LOG_D("succefully delayed request, session id: " << ev->Get()->SessionId);
429-
430450
GlobalState.DelayedRequests++;
431451
GlobalDelayedRequests->Inc();
452+
LOG_D("succefully delayed request, session id: " << ev->Get()->SessionId);
432453

433454
DoStartDelayedRequest();
434455
RefreshState();
@@ -440,9 +461,10 @@ class TFifoState : public TStateBase {
440461
const TString& sessionId = ev->Get()->SessionId;
441462
if (ev->Get()->Status != Ydb::StatusIds::SUCCESS) {
442463
LOG_E("failed start request " << ev->Get()->Status << ", session id: " << sessionId << ", issues: " << ev->Get()->Issues.ToOneLineString());
443-
ForUnfinished(sessionId, [this, ev](TRequest* request) {
464+
NYql::TIssue issue = GroupIssues("Failed to start request", ev->Get()->Issues);
465+
ForUnfinished(sessionId, [this, ev, issue](TRequest* request) {
444466
AddFinishedRequest(request->SessionId);
445-
ReplyContinue(request, ev->Get()->Status, ev->Get()->Issues);
467+
ReplyContinue(request, ev->Get()->Status, {issue});
446468
});
447469
RefreshState();
448470
return;
@@ -467,8 +489,9 @@ class TFifoState : public TStateBase {
467489
GlobalInFly->Inc();
468490
ReplyContinue(request);
469491
} else {
470-
AddFinishedRequest(request->SessionId);
471-
request->State = TRequest::EState::Canceling;
492+
// Request was dropped due to lease expiration
493+
PendingRequests.emplace_front(request->SessionId);
494+
PendingRequestsCount->Inc();
472495
}
473496
});
474497
DelayedRequests.pop_front();
@@ -533,9 +556,10 @@ class TFifoState : public TStateBase {
533556
if (!PendingRequests.empty()) {
534557
RunningOperation = true;
535558
const TString& sessionId = PopPendingRequest();
536-
ActorContext.Register(CreateDelayRequestActor(ActorContext.SelfID, PoolId, sessionId, GetRequest(sessionId)->StartTime + CancelAfter, Counters));
559+
TRequest* request = GetRequest(sessionId);
560+
ActorContext.Register(CreateDelayRequestActor(ActorContext.SelfID, PoolId, sessionId, request->StartTime, GetWaitDeadline(request->StartTime), LEASE_DURATION, Counters));
537561
DelayedRequests.emplace_back(sessionId);
538-
GetRequest(sessionId)->CleanupRequired = true;
562+
request->CleanupRequired = true;
539563
}
540564
}
541565

@@ -547,7 +571,6 @@ class TFifoState : public TStateBase {
547571
if (!FinishedRequests.empty()) {
548572
RunningOperation = true;
549573
ActorContext.Register(CreateCleanupRequestsActor(ActorContext.SelfID, PoolId, FinishedRequests, Counters));
550-
551574
FinishedRequests.clear();
552575
FinishingRequestsCount->Set(0);
553576
}

0 commit comments

Comments
 (0)