|
| 1 | +#include "columnshard_impl.h" |
| 2 | + |
| 3 | +namespace NKikimr::NColumnShard { |
| 4 | + |
| 5 | +class TTxPersistSubDomainOutOfSpace : public NTabletFlatExecutor::TTransactionBase<TColumnShard> { |
| 6 | +public: |
| 7 | + TTxPersistSubDomainOutOfSpace(TColumnShard* self, bool outOfSpace) |
| 8 | + : TTransactionBase(self) |
| 9 | + , OutOfSpace(outOfSpace) |
| 10 | + { } |
| 11 | + |
| 12 | + TTxType GetTxType() const override { return TXTYPE_PERSIST_SUBDOMAIN_OUT_OF_SPACE; } |
| 13 | + |
| 14 | + bool Execute(TTransactionContext& txc, const TActorContext&) override { |
| 15 | + NIceDb::TNiceDb db(txc.DB); |
| 16 | + |
| 17 | + if (Self->SpaceWatcher->SubDomainOutOfSpace != OutOfSpace) { |
| 18 | + Schema::SaveSpecialValue(db, Schema::EValueIds::SubDomainOutOfSpace, ui64(OutOfSpace ? 1 : 0)); |
| 19 | + Self->SpaceWatcher->SubDomainOutOfSpace = OutOfSpace; |
| 20 | + } |
| 21 | + |
| 22 | + return true; |
| 23 | + } |
| 24 | + |
| 25 | + void Complete(const TActorContext&) override { |
| 26 | + // nothing |
| 27 | + } |
| 28 | + |
| 29 | +private: |
| 30 | + const bool OutOfSpace; |
| 31 | +}; |
| 32 | + |
| 33 | +class TTxPersistSubDomainPathId : public NTabletFlatExecutor::TTransactionBase<TColumnShard> { |
| 34 | +public: |
| 35 | + TTxPersistSubDomainPathId(TColumnShard* self, ui64 localPathId) |
| 36 | + : TTransactionBase(self) |
| 37 | + , LocalPathId(localPathId) |
| 38 | + { } |
| 39 | + |
| 40 | + TTxType GetTxType() const override { return TXTYPE_PERSIST_SUBDOMAIN_PATH_ID; } |
| 41 | + |
| 42 | + bool Execute(TTransactionContext& txc, const TActorContext&) override { |
| 43 | + if (!Self->SpaceWatcher->SubDomainPathId) { |
| 44 | + Self->SpaceWatcher->PersistSubDomainPathId(LocalPathId, txc); |
| 45 | + Self->SpaceWatcher->StartWatchingSubDomainPathId(); |
| 46 | + } |
| 47 | + return true; |
| 48 | + } |
| 49 | + |
| 50 | + void Complete(const TActorContext&) override { |
| 51 | + // nothing |
| 52 | + } |
| 53 | + |
| 54 | +private: |
| 55 | + const ui64 LocalPathId; |
| 56 | +}; |
| 57 | + |
| 58 | +void TSpaceWatcher::PersistSubDomainPathId(ui64 localPathId, |
| 59 | + NTabletFlatExecutor::TTransactionContext &txc) { |
| 60 | + SubDomainPathId = localPathId; |
| 61 | + NIceDb::TNiceDb db(txc.DB); |
| 62 | + Schema::SaveSpecialValue(db, Schema::EValueIds::SubDomainLocalPathId, localPathId); |
| 63 | +} |
| 64 | + |
| 65 | +void TSpaceWatcher::StopWatchingSubDomainPathId() { |
| 66 | + if (WatchingSubDomainPathId) { |
| 67 | + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchRemove()); |
| 68 | + WatchingSubDomainPathId.reset(); |
| 69 | + } |
| 70 | +} |
| 71 | + |
| 72 | +void TSpaceWatcher::StartWatchingSubDomainPathId() { |
| 73 | + if (!SubDomainPathId) { |
| 74 | + return; |
| 75 | + } |
| 76 | + |
| 77 | + if (!WatchingSubDomainPathId) { |
| 78 | + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("started_watching_subdomain", *SubDomainPathId); |
| 79 | + Self->Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchPathId(TPathId(Self->CurrentSchemeShardId, *SubDomainPathId))); |
| 80 | + WatchingSubDomainPathId = *SubDomainPathId; |
| 81 | + } |
| 82 | +} |
| 83 | + |
| 84 | +void TSpaceWatcher::Handle(NActors::TEvents::TEvPoison::TPtr& , const TActorContext& ctx) { |
| 85 | + Die(ctx); |
| 86 | +} |
| 87 | + |
| 88 | +void TColumnShard::Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& ctx) { |
| 89 | + const auto* msg = ev->Get(); |
| 90 | + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("notify_subdomain", msg->PathId); |
| 91 | + const bool outOfSpace = msg->Result->GetPathDescription() |
| 92 | + .GetDomainDescription() |
| 93 | + .GetDomainState() |
| 94 | + .GetDiskQuotaExceeded(); |
| 95 | + |
| 96 | + Execute(new TTxPersistSubDomainOutOfSpace(this, outOfSpace), ctx); |
| 97 | +} |
| 98 | + |
| 99 | +static constexpr TDuration MaxFindSubDomainPathIdDelay = TDuration::Minutes(10); |
| 100 | + |
| 101 | +void TSpaceWatcher::StartFindSubDomainPathId(bool delayFirstRequest) { |
| 102 | + if (!FindSubDomainPathIdActor && |
| 103 | + Self->CurrentSchemeShardId != 0 && |
| 104 | + (!SubDomainPathId)) |
| 105 | + { |
| 106 | + FindSubDomainPathIdActor = Register(CreateFindSubDomainPathIdActor(SelfId(), Self->TabletID(), Self->CurrentSchemeShardId, delayFirstRequest, MaxFindSubDomainPathIdDelay)); |
| 107 | + } |
| 108 | +} |
| 109 | + |
| 110 | + |
| 111 | +void TSpaceWatcher::Handle(NSchemeShard::TEvSchemeShard::TEvSubDomainPathIdFound::TPtr& ev, const TActorContext& ctx) { |
| 112 | + const auto* msg = ev->Get(); |
| 113 | + if (FindSubDomainPathIdActor == ev->Sender) { |
| 114 | + FindSubDomainPathIdActor = { }; |
| 115 | + } |
| 116 | + AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "subdomain_found")("scheme_shard_id", msg->SchemeShardId)("local_path_id", msg->LocalPathId); |
| 117 | + Self->Execute(new TTxPersistSubDomainPathId(Self, msg->LocalPathId), ctx); |
| 118 | +} |
| 119 | + |
| 120 | +} |
0 commit comments