@@ -169,14 +169,21 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
169169 void Handle (TEvCleanupRequest::TPtr& ev) {
170170 const TString& database = ev->Get ()->Database ;
171171 const TString& poolId = ev->Get ()->PoolId ;
172+ const TString& sessionId = ev->Get ()->SessionId ;
173+ if (GetOrCreateDatabaseState (database)->PendingSessionIds .contains (sessionId)) {
174+ LOG_D (" Finished request with worker actor " << ev->Sender << " , wait for place request, Database: " << database << " , PoolId: " << poolId << " , SessionId: " << ev->Get ()->SessionId );
175+ GetOrCreateDatabaseState (database)->PendingCancelRequests [sessionId].emplace_back (std::move (ev));
176+ return ;
177+ }
178+
172179 auto poolState = GetPoolState (database, poolId);
173180 if (!poolState) {
174181 ReplyCleanupError (ev->Sender , Ydb::StatusIds::NOT_FOUND, TStringBuilder () << " Pool " << poolId << " not found" );
175182 return ;
176183 }
177184
178185 LOG_D (" Finished request with worker actor " << ev->Sender << " , Database: " << database << " , PoolId: " << poolId << " , SessionId: " << ev->Get ()->SessionId );
179- Send (ev-> Forward ( poolState->PoolHandler ));
186+ poolState->DoCleanupRequest ( std::move (ev ));
180187 }
181188
182189 void Handle (TEvents::TEvWakeup::TPtr& ev) {
@@ -220,6 +227,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
220227 hFunc (TEvPrivate::TEvTablesCreationFinished, Handle);
221228 hFunc (TEvPrivate::TEvCpuLoadResponse, Handle);
222229 hFunc (TEvPrivate::TEvResignPoolHandler, Handle);
230+ hFunc (TEvPrivate::TEvStopPoolHandlerResponse, Handle);
223231 )
224232
225233private:
@@ -245,12 +253,16 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
245253 void Handle (TEvPrivate::TEvResolvePoolResponse::TPtr& ev) {
246254 const auto & event = ev->Get ()->Event ;
247255 const TString& database = event->Get ()->Database ;
256+ auto databaseState = GetOrCreateDatabaseState (database);
248257 if (ev->Get ()->DefaultPoolCreated ) {
249- GetOrCreateDatabaseState (database) ->HasDefaultPool = true ;
258+ databaseState ->HasDefaultPool = true ;
250259 }
251260
252261 const TString& poolId = event->Get ()->PoolId ;
253262 if (ev->Get ()->Status != Ydb::StatusIds::SUCCESS) {
263+ databaseState->RemovePendingSession (event->Get ()->SessionId , [this ](TEvCleanupRequest::TPtr event) {
264+ ReplyCleanupError (event->Sender , Ydb::StatusIds::NOT_FOUND, TStringBuilder () << " Pool " << event->Get ()->PoolId << " not found" );
265+ });
254266 ReplyContinueError (event->Sender , ev->Get ()->Status , ev->Get ()->Issues );
255267 return ;
256268 }
@@ -265,9 +277,19 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
265277 void Handle (TEvPrivate::TEvPlaceRequestIntoPoolResponse::TPtr& ev) {
266278 const TString& database = ev->Get ()->Database ;
267279 const TString& poolId = ev->Get ()->PoolId ;
268- LOG_T (" Request placed into pool, Database: " << database << " , PoolId: " << poolId);
280+ const TString& sessionId = ev->Get ()->SessionId ;
281+ LOG_T (" Request placed into pool, Database: " << database << " , PoolId: " << poolId << " , SessionId: " << sessionId);
269282
270- if (auto poolState = GetPoolState (database, poolId)) {
283+ auto poolState = GetPoolState (database, poolId);
284+ GetOrCreateDatabaseState (database)->RemovePendingSession (sessionId, [this , poolState](TEvCleanupRequest::TPtr event) {
285+ if (poolState) {
286+ poolState->DoCleanupRequest (std::move (event));
287+ } else {
288+ ReplyCleanupError (event->Sender , Ydb::StatusIds::NOT_FOUND, TStringBuilder () << " Pool " << event->Get ()->PoolId << " not found" );
289+ }
290+ });
291+
292+ if (poolState) {
271293 poolState->PlaceRequestRunning = false ;
272294 poolState->UpdateHandler ();
273295 poolState->StartPlaceRequest ();
@@ -388,6 +410,17 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
388410 }
389411 }
390412
413+ void Handle (TEvPrivate::TEvStopPoolHandlerResponse::TPtr& ev) {
414+ const TString& database = ev->Get ()->Database ;
415+ const TString& poolId = ev->Get ()->PoolId ;
416+ LOG_T (" Got stop pool handler response, Database: " << database << " , PoolId: " << poolId);
417+
418+ Counters.ActivePools ->Dec ();
419+ if (auto poolState = GetPoolState (database, poolId)) {
420+ poolState->PreviousPoolHandlers .erase (ev->Sender );
421+ }
422+ }
423+
391424private:
392425 void InitializeWorkloadService () {
393426 if (ServiceInitialized) {
@@ -441,15 +474,14 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
441474 std::vector<TString> poolsToDelete;
442475 poolsToDelete.reserve (PoolIdToState.size ());
443476 for (const auto & [poolKey, poolState] : PoolIdToState) {
444- if (!poolState.InFlightRequests && TInstant::Now () - poolState.LastUpdateTime > IDLE_DURATION) {
477+ if (!poolState.InFlightRequests && TInstant::Now () - poolState.LastUpdateTime > IDLE_DURATION && poolState. PendingRequests . empty () ) {
445478 CpuQuotaManager->CleanupHandler (poolState.PoolHandler );
446479 Send (poolState.PoolHandler , new TEvPrivate::TEvStopPoolHandler (true ));
447480 poolsToDelete.emplace_back (poolKey);
448481 }
449482 }
450483 for (const auto & poolKey : poolsToDelete) {
451484 PoolIdToState.erase (poolKey);
452- Counters.ActivePools ->Dec ();
453485 }
454486
455487 if (!PoolIdToState.empty ()) {
@@ -512,7 +544,8 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
512544 Send (replyActorId, new TEvCleanupResponse (status, {NYql::TIssue (message)}));
513545 }
514546
515- TDatabaseState* GetOrCreateDatabaseState (const TString& database) {
547+ TDatabaseState* GetOrCreateDatabaseState (TString database) {
548+ database = CanonizePath (database);
516549 auto databaseIt = DatabaseToState.find (database);
517550 if (databaseIt != DatabaseToState.end ()) {
518551 return &databaseIt->second ;
0 commit comments