Skip to content

Commit d043476

Browse files
authored
Merge 96c7e84 into 202d1ff
2 parents 202d1ff + 96c7e84 commit d043476

File tree

6 files changed

+79
-10
lines changed

6 files changed

+79
-10
lines changed

ydb/core/kqp/common/events/workload_service.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,19 +51,21 @@ struct TEvContinueRequest : public NActors::TEventLocal<TEvContinueRequest, TKqp
5151
};
5252

5353
struct TEvCleanupRequest : public NActors::TEventLocal<TEvCleanupRequest, TKqpWorkloadServiceEvents::EvCleanupRequest> {
54-
TEvCleanupRequest(const TString& database, const TString& sessionId, const TString& poolId, TDuration duration, TDuration cpuConsumed)
54+
TEvCleanupRequest(const TString& database, const TString& sessionId, const TString& poolId, TDuration duration, TDuration cpuConsumed, bool sendResponseOnNotfound = true)
5555
: Database(database)
5656
, SessionId(sessionId)
5757
, PoolId(poolId)
5858
, Duration(duration)
5959
, CpuConsumed(cpuConsumed)
60+
, SendResponseOnNotfound(sendResponseOnNotfound)
6061
{}
6162

6263
const TString Database;
6364
const TString SessionId;
6465
const TString PoolId;
6566
const TDuration Duration;
6667
const TDuration CpuConsumed;
68+
const bool SendResponseOnNotfound;
6769
};
6870

6971
struct TEvCleanupResponse : public NActors::TEventLocal<TEvCleanupResponse, TKqpWorkloadServiceEvents::EvCleanupResponse> {

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
256256
QueryState->UserToken
257257
), IEventHandle::FlagTrackDelivery);
258258

259+
QueryState->PoolHandlerActor = MakeKqpWorkloadServiceId(SelfId().NodeId());
259260
Become(&TKqpSessionActor::ExecuteState);
260261
}
261262

@@ -2421,6 +2422,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
24212422
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleNoop);
24222423
hFunc(TEvents::TEvUndelivered, HandleNoop);
24232424
hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, HandleNoop);
2425+
hFunc(TEvKqpExecuter::TEvStreamData, HandleNoop);
24242426
hFunc(NWorkload::TEvContinueRequest, HandleNoop);
24252427

24262428
// always come from WorkerActor

ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
161161
}
162162

163163
SendPoolInfoUpdate(std::nullopt, std::nullopt, Subscribers);
164+
this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), new TEvPrivate::TEvStopPoolHandlerResponse(Database, PoolId));
164165

165166
Counters.OnCleanup(ResetCountersOnStrop);
166167

@@ -184,16 +185,16 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
184185
}
185186

186187
void Handle(TEvPrivate::TEvResolvePoolResponse::TPtr& ev) {
187-
this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), new TEvPrivate::TEvPlaceRequestIntoPoolResponse(Database, PoolId));
188-
189188
auto event = std::move(ev->Get()->Event);
189+
const TString& sessionId = event->Get()->SessionId;
190+
this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), new TEvPrivate::TEvPlaceRequestIntoPoolResponse(Database, PoolId, sessionId));
191+
190192
const TActorId& workerActorId = event->Sender;
191193
if (!InFlightLimit) {
192194
this->Send(workerActorId, new TEvContinueRequest(Ydb::StatusIds::PRECONDITION_FAILED, PoolId, PoolConfig, {NYql::TIssue(TStringBuilder() << "Resource pool " << PoolId << " was disabled due to zero concurrent query limit")}));
193195
return;
194196
}
195197

196-
const TString& sessionId = event->Get()->SessionId;
197198
if (LocalSessions.contains(sessionId)) {
198199
this->Send(workerActorId, new TEvContinueRequest(Ydb::StatusIds::INTERNAL_ERROR, PoolId, PoolConfig, {NYql::TIssue(TStringBuilder() << "Got duplicate session id " << sessionId << " for pool " << PoolId)}));
199200
return;
@@ -217,7 +218,13 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
217218
const TActorId& workerActorId = ev->Sender;
218219

219220
TRequest* request = GetRequestSafe(sessionId);
220-
if (!request || request->State == TRequest::EState::Canceling) {
221+
if (!request) {
222+
if (ev->Get()->SendResponseOnNotfound) {
223+
this->Send(workerActorId, new TEvCleanupResponse(Ydb::StatusIds::SUCCESS));
224+
}
225+
return;
226+
}
227+
if (request->State == TRequest::EState::Canceling) {
221228
this->Send(workerActorId, new TEvCleanupResponse(Ydb::StatusIds::SUCCESS));
222229
return;
223230
}

ydb/core/kqp/workload_service/common/events.h

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ struct TEvPrivate {
2929
EvFinishRequestInPool,
3030
EvResignPoolHandler,
3131
EvStopPoolHandler,
32+
EvStopPoolHandlerResponse,
3233
EvCancelRequest,
3334
EvUpdatePoolSubscription,
3435

@@ -128,13 +129,15 @@ struct TEvPrivate {
128129
};
129130

130131
struct TEvPlaceRequestIntoPoolResponse : public NActors::TEventLocal<TEvPlaceRequestIntoPoolResponse, EvPlaceRequestIntoPoolResponse> {
131-
TEvPlaceRequestIntoPoolResponse(const TString& database, const TString& poolId)
132+
TEvPlaceRequestIntoPoolResponse(const TString& database, const TString& poolId, const TString& sessionId)
132133
: Database(database)
133134
, PoolId(poolId)
135+
, SessionId(sessionId)
134136
{}
135137

136138
const TString Database;
137139
const TString PoolId;
140+
const TString SessionId;
138141
};
139142

140143
struct TEvFinishRequestInPool : public NActors::TEventLocal<TEvFinishRequestInPool, EvFinishRequestInPool> {
@@ -173,6 +176,16 @@ struct TEvPrivate {
173176
const bool ResetCounters;
174177
};
175178

179+
struct TEvStopPoolHandlerResponse : public NActors::TEventLocal<TEvStopPoolHandlerResponse, EvStopPoolHandlerResponse> {
180+
TEvStopPoolHandlerResponse(const TString& database, const TString& poolId)
181+
: Database(database)
182+
, PoolId(poolId)
183+
{}
184+
185+
const TString Database;
186+
const TString PoolId;
187+
};
188+
176189
struct TEvCancelRequest : public NActors::TEventLocal<TEvCancelRequest, EvCancelRequest> {
177190
explicit TEvCancelRequest(const TString& sessionId)
178191
: SessionId(sessionId)

ydb/core/kqp/workload_service/kqp_workload_service.cpp

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,12 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
168168

169169
void Handle(TEvCleanupRequest::TPtr& ev) {
170170
const TString& database = ev->Get()->Database;
171+
const TString& sessionId = ev->Get()->SessionId;
172+
if (GetOrCreateDatabaseState(database)->PendingSessionIds.contains(sessionId)) {
173+
PendingCancelRequests[sessionId].emplace_back(std::move(ev));
174+
return;
175+
}
176+
171177
const TString& poolId = ev->Get()->PoolId;
172178
auto poolState = GetPoolState(database, poolId);
173179
if (!poolState) {
@@ -176,7 +182,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
176182
}
177183

178184
LOG_D("Finished request with worker actor " << ev->Sender << ", Database: " << database << ", PoolId: " << poolId << ", SessionId: " << ev->Get()->SessionId);
179-
Send(ev->Forward(poolState->PoolHandler));
185+
poolState->DoCleanupRequest(std::move(ev));
180186
}
181187

182188
void Handle(TEvents::TEvWakeup::TPtr& ev) {
@@ -220,6 +226,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
220226
hFunc(TEvPrivate::TEvTablesCreationFinished, Handle);
221227
hFunc(TEvPrivate::TEvCpuLoadResponse, Handle);
222228
hFunc(TEvPrivate::TEvResignPoolHandler, Handle);
229+
hFunc(TEvPrivate::TEvStopPoolHandlerResponse, Handle);
223230
)
224231

225232
private:
@@ -265,9 +272,22 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
265272
void Handle(TEvPrivate::TEvPlaceRequestIntoPoolResponse::TPtr& ev) {
266273
const TString& database = ev->Get()->Database;
267274
const TString& poolId = ev->Get()->PoolId;
268-
LOG_T("Request placed into pool, Database: " << database << ", PoolId: " << poolId);
275+
const TString& sessionId = ev->Get()->SessionId;
276+
LOG_T("Request placed into pool, Database: " << database << ", PoolId: " << poolId << ", SessionId: " << sessionId);
269277

270-
if (auto poolState = GetPoolState(database, poolId)) {
278+
GetOrCreateDatabaseState(database)->PendingSessionIds.erase(sessionId);
279+
280+
auto poolState = GetPoolState(database, poolId);
281+
for (auto& event : PendingCancelRequests[sessionId]) {
282+
if (poolState) {
283+
poolState->DoCleanupRequest(std::move(event));
284+
} else {
285+
ReplyCleanupError(ev->Sender, Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Pool " << poolId << " not found");
286+
}
287+
}
288+
PendingCancelRequests.erase(sessionId);
289+
290+
if (poolState) {
271291
poolState->PlaceRequestRunning = false;
272292
poolState->UpdateHandler();
273293
poolState->StartPlaceRequest();
@@ -388,6 +408,17 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
388408
}
389409
}
390410

411+
void Handle(TEvPrivate::TEvStopPoolHandlerResponse::TPtr& ev) {
412+
const TString& database = ev->Get()->Database;
413+
const TString& poolId = ev->Get()->PoolId;
414+
LOG_T("Got stop pool handler response, Database: " << database << ", PoolId: " << poolId);
415+
416+
Counters.ActivePools->Dec();
417+
if (auto poolState = GetPoolState(database, poolId)) {
418+
poolState->PreviousPoolHandlers.erase(ev->Sender);
419+
}
420+
}
421+
391422
private:
392423
void InitializeWorkloadService() {
393424
if (ServiceInitialized) {
@@ -449,7 +480,6 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
449480
}
450481
for (const auto& poolKey : poolsToDelete) {
451482
PoolIdToState.erase(poolKey);
452-
Counters.ActivePools->Dec();
453483
}
454484

455485
if (!PoolIdToState.empty()) {
@@ -567,6 +597,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
567597
bool IdleChecksStarted = false;
568598
ETablesCreationStatus TablesCreationStatus = ETablesCreationStatus::Cleanup;
569599
std::unordered_set<TString> PendingHandlers;
600+
std::unordered_map<TString, std::vector<TEvCleanupRequest::TPtr>> PendingCancelRequests;
570601

571602
std::unordered_map<TString, TDatabaseState> DatabaseToState;
572603
std::unordered_map<TString, TPoolState> PoolIdToState;

ydb/core/kqp/workload_service/kqp_workload_service_impl.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ struct TDatabaseState {
1919
bool& EnabledResourcePoolsOnServerless;
2020

2121
std::vector<TEvPlaceRequestIntoPool::TPtr> PendingRequersts = {};
22+
std::unordered_set<TString> PendingSessionIds = {};
2223
std::unordered_map<TString, std::unordered_set<TActorId>> PendingSubscriptions = {};
2324
bool HasDefaultPool = false;
2425
bool Serverless = false;
@@ -38,6 +39,7 @@ struct TDatabaseState {
3839

3940
void DoPlaceRequest(TEvPlaceRequestIntoPool::TPtr ev) {
4041
TString database = ev->Get()->Database;
42+
PendingSessionIds.emplace(ev->Get()->SessionId);
4143
PendingRequersts.emplace_back(std::move(ev));
4244

4345
if (!EnabledResourcePoolsOnServerless && (TInstant::Now() - LastUpdateTime) > IDLE_DURATION) {
@@ -112,6 +114,7 @@ struct TPoolState {
112114
bool WaitingInitialization = false;
113115
bool PlaceRequestRunning = false;
114116
std::optional<TActorId> NewPoolHandler = std::nullopt;
117+
std::unordered_set<TActorId> PreviousPoolHandlers = {};
115118

116119
ui64 InFlightRequests = 0;
117120
TInstant LastUpdateTime = TInstant::Now();
@@ -122,6 +125,7 @@ struct TPoolState {
122125
}
123126

124127
ActorContext.Send(PoolHandler, new TEvPrivate::TEvStopPoolHandler(false));
128+
PreviousPoolHandlers.insert(PoolHandler);
125129
PoolHandler = *NewPoolHandler;
126130
NewPoolHandler = std::nullopt;
127131
InFlightRequests = 0;
@@ -143,6 +147,16 @@ struct TPoolState {
143147
InFlightRequests--;
144148
LastUpdateTime = TInstant::Now();
145149
}
150+
151+
void DoCleanupRequest(TEvCleanupRequest::TPtr event) {
152+
for (const auto& poolHandler : PreviousPoolHandlers) {
153+
ActorContext.Send(poolHandler, new TEvCleanupRequest(
154+
event->Get()->Database, event->Get()->SessionId, event->Get()->PoolId,
155+
event->Get()->Duration, event->Get()->CpuConsumed, false
156+
));
157+
}
158+
ActorContext.Send(event->Forward(PoolHandler));
159+
}
146160
};
147161

148162
struct TCpuQuotaManagerState {

0 commit comments

Comments
 (0)