2121#include " blobs_action/transaction/tx_remove_blobs.h"
2222#include " blobs_action/transaction/tx_gc_insert_table.h"
2323#include " blobs_action/transaction/tx_gc_indexed.h"
24+ #include " blobs_reader/actor.h"
2425#include " bg_tasks/events/events.h"
2526
2627#include " data_accessor/manager.h"
@@ -579,8 +580,13 @@ class TChangesReadTask: public NOlap::NBlobOperations::NRead::ITask {
579580
580581protected:
581582 virtual void DoOnDataReady (const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) override {
583+ if (!!resourcesGuard) {
584+ AFL_VERIFY (!TxEvent->IndexChanges ->ResourcesGuard );
585+ TxEvent->IndexChanges ->ResourcesGuard = resourcesGuard;
586+ } else {
587+ AFL_VERIFY (TxEvent->IndexChanges ->ResourcesGuard );
588+ }
582589 TxEvent->IndexChanges ->Blobs = ExtractBlobsData ();
583- TxEvent->IndexChanges ->ResourcesGuard = resourcesGuard;
584590 const bool isInsert = !!dynamic_pointer_cast<NOlap::TInsertColumnEngineChanges>(TxEvent->IndexChanges );
585591 std::shared_ptr<NConveyor::ITask> task = std::make_shared<TChangesTask>(std::move (TxEvent), Counters, TabletId, ParentActorId, LastCompletedTx);
586592 if (isInsert) {
@@ -615,6 +621,7 @@ class TDataAccessorsSubscriber: public NOlap::IDataAccessorRequestsSubscriber {
615621 const NActors::TActorId ShardActorId;
616622 std::shared_ptr<NOlap::TColumnEngineChanges> Changes;
617623 std::shared_ptr<NOlap::TVersionedIndex> VersionedIndex;
624+ std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard> ResourcesGuard;
618625
619626 virtual void DoOnRequestsFinishedImpl () = 0;
620627
@@ -624,6 +631,16 @@ class TDataAccessorsSubscriber: public NOlap::IDataAccessorRequestsSubscriber {
624631 }
625632
626633public:
634+ void SetResourcesGuard (const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& guard) {
635+ AFL_VERIFY (!ResourcesGuard);
636+ ResourcesGuard = guard;
637+ }
638+
639+ std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>&& ExtractResourcesGuard() {
640+ AFL_VERIFY (ResourcesGuard);
641+ return std::move (ResourcesGuard);
642+ }
643+
627644 TDataAccessorsSubscriber (const NActors::TActorId& shardActorId, const std::shared_ptr<NOlap::TColumnEngineChanges>& changes,
628645 const std::shared_ptr<NOlap::TVersionedIndex>& versionedIndex)
629646 : ShardActorId(shardActorId)
@@ -801,6 +818,30 @@ void TColumnShard::SetupCompaction(const std::set<ui64>& pathIds) {
801818 }
802819}
803820
821+ class TAccessorsMemorySubscriber : public NOlap ::NResourceBroker::NSubscribe::ITask {
822+ private:
823+ using TBase = NOlap::NResourceBroker::NSubscribe::ITask;
824+ std::shared_ptr<NOlap::TDataAccessorsRequest> Request;
825+ std::shared_ptr<TDataAccessorsSubscriber> Subscriber;
826+ std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager> DataAccessorsManager;
827+
828+ virtual void DoOnAllocationSuccess (const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& guard) override {
829+ Subscriber->SetResourcesGuard (guard);
830+ Request->RegisterSubscriber (Subscriber);
831+ DataAccessorsManager->AskData (Request);
832+ }
833+
834+ public:
835+ TAccessorsMemorySubscriber (const ui64 memory, const TString& externalTaskId, const NOlap::NResourceBroker::NSubscribe::TTaskContext& context,
836+ std::shared_ptr<NOlap::TDataAccessorsRequest>&& request, const std::shared_ptr<TDataAccessorsSubscriber>& subscriber,
837+ const std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager)
838+ : TBase(0 , memory, externalTaskId, context)
839+ , Request(std::move(request))
840+ , Subscriber(subscriber)
841+ , DataAccessorsManager(dataAccessorsManager) {
842+ }
843+ };
844+
804845class TCompactionDataAccessorsSubscriber : public TDataAccessorsSubscriberWithRead {
805846private:
806847 using TBase = TDataAccessorsSubscriberWithRead;
@@ -811,10 +852,9 @@ class TCompactionDataAccessorsSubscriber: public TDataAccessorsSubscriberWithRea
811852 AFL_DEBUG (NKikimrServices::TX_COLUMNSHARD)(" event" , " compaction" )(" external_task_id" , externalTaskId);
812853
813854 auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(VersionedIndex, Changes, CacheDataAfterWrite);
814- auto readSubscriber = std::make_shared<NOlap::NBlobOperations::NRead::ITask::TReadSubscriber>(
815- std::make_shared<TCompactChangesReadTask>(std::move (ev), ShardActorId, ShardTabletId, Counters, SnapshotModification), 0 ,
816- Changes->CalcMemoryForUsage (), externalTaskId, TaskSubscriptionContext);
817- NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription (ResourceSubscribeActor, readSubscriber);
855+ ev->IndexChanges ->ResourcesGuard = ExtractResourcesGuard ();
856+ TActorContext::AsActorContext ().Register (new NOlap::NBlobOperations::NRead::TActor (
857+ std::make_shared<TCompactChangesReadTask>(std::move (ev), ShardActorId, ShardTabletId, Counters, SnapshotModification)));
818858 }
819859
820860public:
@@ -837,10 +877,14 @@ void TColumnShard::StartCompaction(const std::shared_ptr<NPrioritiesQueue::TAllo
837877
838878 auto actualIndexInfo = std::make_shared<NOlap::TVersionedIndex>(TablesManager.GetPrimaryIndex ()->GetVersionedIndex ());
839879 auto request = compaction->ExtractDataAccessorsRequest ();
840- request->RegisterSubscriber (std::make_shared<TCompactionDataAccessorsSubscriber>(ResourceSubscribeActor, indexChanges, actualIndexInfo,
880+ const ui64 accessorsMemory = request->PredictAccessorsMemory (TablesManager.GetPrimaryIndex ()->GetVersionedIndex ().GetLastSchema ()) +
881+ indexChanges->CalcMemoryForUsage ();
882+ const auto subscriber = std::make_shared<TCompactionDataAccessorsSubscriber>(ResourceSubscribeActor, indexChanges, actualIndexInfo,
841883 Settings.CacheDataAfterCompaction , SelfId (), TabletID (), Counters.GetCompactionCounters (), GetLastCompletedTx (),
842- CompactTaskSubscription));
843- TablesManager.GetPrimaryIndex ()->FetchDataAccessors (request);
884+ CompactTaskSubscription);
885+ NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription (
886+ ResourceSubscribeActor, std::make_shared<TAccessorsMemorySubscriber>(accessorsMemory, indexChanges->GetTaskIdentifier (),
887+ CompactTaskSubscription, std::move(request), subscriber, DataAccessorsManager.GetObjectPtrVerified()));
844888}
845889
846890class TWriteEvictPortionsDataAccessorsSubscriber : public TDataAccessorsSubscriberWithRead {
@@ -851,11 +895,9 @@ class TWriteEvictPortionsDataAccessorsSubscriber: public TDataAccessorsSubscribe
851895 virtual void DoOnRequestsFinishedImpl () override {
852896 ACFL_DEBUG (" background" , " ttl" )(" need_writes" , true );
853897 auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(VersionedIndex, Changes, false );
854- auto readSubscriber = std::make_shared<NOlap::NBlobOperations::NRead::ITask::TReadSubscriber>(
855- std::make_shared<TTTLChangesReadTask>(std::move (ev), ShardActorId, ShardTabletId, Counters, SnapshotModification), 0 ,
856- Changes->CalcMemoryForUsage (), Changes->GetTaskIdentifier (), TaskSubscriptionContext);
857-
858- NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription (ResourceSubscribeActor, readSubscriber);
898+ ev->IndexChanges ->ResourcesGuard = ExtractResourcesGuard ();
899+ TActorContext::AsActorContext ().Register (new NOlap::NBlobOperations::NRead::TActor (
900+ std::make_shared<TTTLChangesReadTask>(std::move (ev), ShardActorId, ShardTabletId, Counters, SnapshotModification)));
859901 }
860902
861903public:
@@ -911,7 +953,8 @@ void TColumnShard::SetupMetadata() {
911953}
912954
913955bool TColumnShard::SetupTtl (const THashMap<ui64, NOlap::TTiering>& pathTtls) {
914- if (!AppDataVerified ().ColumnShardConfig .GetTTLEnabled () || !NYDBTest::TControllers::GetColumnShardController ()->IsBackgroundEnabled (NYDBTest::ICSController::EBackground::TTL)) {
956+ if (!AppDataVerified ().ColumnShardConfig .GetTTLEnabled () ||
957+ !NYDBTest::TControllers::GetColumnShardController ()->IsBackgroundEnabled (NYDBTest::ICSController::EBackground::TTL)) {
915958 AFL_WARN (NKikimrServices::TX_COLUMNSHARD)(" event" , " skip_ttl" )(" reason" , " disabled" );
916959 return false ;
917960 }
@@ -922,7 +965,8 @@ bool TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls) {
922965 }
923966
924967 const ui64 memoryUsageLimit = HasAppData () ? AppDataVerified ().ColumnShardConfig .GetTieringsMemoryLimit () : ((ui64)512 * 1024 * 1024 );
925- std::vector<std::shared_ptr<NOlap::TTTLColumnEngineChanges>> indexChanges = TablesManager.MutablePrimaryIndex ().StartTtl (eviction, DataLocksManager, memoryUsageLimit);
968+ std::vector<std::shared_ptr<NOlap::TTTLColumnEngineChanges>> indexChanges =
969+ TablesManager.MutablePrimaryIndex ().StartTtl (eviction, DataLocksManager, memoryUsageLimit);
926970
927971 if (indexChanges.empty ()) {
928972 ACFL_DEBUG (" background" , " ttl" )(" skip_reason" , " no_changes" );
@@ -933,14 +977,21 @@ bool TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls) {
933977 for (auto && i : indexChanges) {
934978 i->Start (*this );
935979 auto request = i->ExtractDataAccessorsRequest ();
980+ ui64 memoryUsage = 0 ;
981+ std::shared_ptr<TDataAccessorsSubscriber> subscriber;
936982 if (i->NeedConstruction ()) {
937- request->RegisterSubscriber (std::make_shared<TWriteEvictPortionsDataAccessorsSubscriber>(ResourceSubscribeActor, i,
938- actualIndexInfo, Settings.CacheDataAfterCompaction , SelfId (), TabletID (), Counters.GetEvictionCounters (), GetLastCompletedTx (),
939- TTLTaskSubscription));
983+ subscriber = std::make_shared<TWriteEvictPortionsDataAccessorsSubscriber>(ResourceSubscribeActor, i, actualIndexInfo,
984+ Settings.CacheDataAfterCompaction , SelfId (), TabletID (), Counters.GetEvictionCounters (), GetLastCompletedTx (),
985+ TTLTaskSubscription);
986+ memoryUsage = i->CalcMemoryForUsage ();
940987 } else {
941- request-> RegisterSubscriber ( std::make_shared<TNoWriteEvictPortionsDataAccessorsSubscriber>(SelfId (), i, actualIndexInfo) );
988+ subscriber = std::make_shared<TNoWriteEvictPortionsDataAccessorsSubscriber>(SelfId (), i, actualIndexInfo);
942989 }
943- TablesManager.GetPrimaryIndex ()->FetchDataAccessors (request);
990+ const ui64 accessorsMemory =
991+ request->PredictAccessorsMemory (TablesManager.GetPrimaryIndex ()->GetVersionedIndex ().GetLastSchema ()) + memoryUsage;
992+ NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription (
993+ ResourceSubscribeActor, std::make_shared<TAccessorsMemorySubscriber>(accessorsMemory, i->GetTaskIdentifier (), TTLTaskSubscription,
994+ std::move(request), subscriber, DataAccessorsManager.GetObjectPtrVerified()));
944995 }
945996 return true ;
946997}
@@ -953,6 +1004,7 @@ class TCleanupPortionsDataAccessorsSubscriber: public TDataAccessorsSubscriber {
9531004 virtual void DoOnRequestsFinishedImpl () override {
9541005 AFL_DEBUG (NKikimrServices::TX_COLUMNSHARD)(" background" , " cleanup" )(" changes_info" , Changes->DebugString ());
9551006 auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(VersionedIndex, Changes, false );
1007+ ev->IndexChanges ->ResourcesGuard = ExtractResourcesGuard ();
9561008 ev->SetPutStatus (NKikimrProto::OK); // No new blobs to write
9571009 NActors::TActivationContext::Send (ShardActorId, std::move (ev));
9581010 }
@@ -982,8 +1034,12 @@ void TColumnShard::SetupCleanupPortions() {
9821034
9831035 auto request = changes->ExtractDataAccessorsRequest ();
9841036 auto actualIndexInfo = std::make_shared<NOlap::TVersionedIndex>(TablesManager.GetPrimaryIndex ()->GetVersionedIndex ());
985- request->RegisterSubscriber (std::make_shared<TCleanupPortionsDataAccessorsSubscriber>(SelfId (), changes, actualIndexInfo));
986- TablesManager.GetPrimaryIndex ()->FetchDataAccessors (request);
1037+ const ui64 accessorsMemory = request->PredictAccessorsMemory (TablesManager.GetPrimaryIndex ()->GetVersionedIndex ().GetLastSchema ());
1038+ const auto subscriber = std::make_shared<TCleanupPortionsDataAccessorsSubscriber>(SelfId (), changes, actualIndexInfo);
1039+
1040+ NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription (
1041+ ResourceSubscribeActor, std::make_shared<TAccessorsMemorySubscriber>(accessorsMemory, changes->GetTaskIdentifier (), TTLTaskSubscription,
1042+ std::move(request), subscriber, DataAccessorsManager.GetObjectPtrVerified()));
9871043}
9881044
9891045void TColumnShard::SetupCleanupTables () {
0 commit comments