Skip to content

Stop writing to column tables if quota is exceeded #11580

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 43 commits into from
Mar 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
4a18254
Stop writing to column tables if quota is exceeded
aavdonkin Nov 13, 2024
f1a2ced
Correctly resolved conflict
aavdonkin Nov 14, 2024
ce8abf7
Increment OutOfSpace counter if disk quota is exceeded
aavdonkin Nov 14, 2024
4a732fa
Correclty resloved conflict
aavdonkin Nov 15, 2024
fd1e1f0
Moved quota functionality to separate file
aavdonkin Nov 21, 2024
82e4eae
Resolved issues
aavdonkin Nov 25, 2024
1c66736
Added test for quota exceedance
aavdonkin Nov 27, 2024
c1811e8
Correclty resolved conflict
aavdonkin Nov 27, 2024
66f9111
Fixed test
aavdonkin Nov 28, 2024
cb63e00
Added functional test for quotas in column tables
aavdonkin Dec 4, 2024
6a93448
Fixed style
aavdonkin Dec 4, 2024
a2eae61
Check that upsert into fails if disk quota is exceeded
aavdonkin Dec 5, 2024
dfc3129
Corrected conflict resolution
aavdonkin Dec 5, 2024
f590339
Fixed style
aavdonkin Dec 6, 2024
603f320
Fixed build
aavdonkin Jan 24, 2025
a5b6a55
Fixed build
aavdonkin Jan 24, 2025
2464f7f
Handle new events in TColumnShard::Enqueue too
aavdonkin Jan 27, 2025
b58b233
Fixed error message in test
aavdonkin Jan 27, 2025
2d98683
Fix
aavdonkin Jan 28, 2025
1d576da
Correctly resolved conflict
aavdonkin Jan 28, 2025
1d8ff00
Correctly resolved conflict
aavdonkin Jan 29, 2025
c50d30e
Fixed issues
aavdonkin Jan 30, 2025
cbb8ece
Fixed issues
aavdonkin Jan 30, 2025
2f63fdf
Removed conditions
aavdonkin Jan 31, 2025
13e5eb8
Fixed issues
aavdonkin Jan 31, 2025
193a5af
Removed SchemeShard from SubDomainPathId
aavdonkin Feb 3, 2025
281d8a3
Restored condition for CurrentSchemeShardId
aavdonkin Feb 3, 2025
9eb9855
Fixed issues
aavdonkin Feb 6, 2025
9e9d5ae
Fixed test
aavdonkin Feb 7, 2025
010566f
Fixed issues
aavdonkin Feb 7, 2025
35ceb15
Removed unnecessary comment
aavdonkin Feb 13, 2025
2d9ddd3
Changed error code
aavdonkin Feb 17, 2025
6e3f726
Handle SubdomainPathId and OutOfSpace in separate actor
aavdonkin Feb 24, 2025
f2ece2a
Merge branch 'main' into columnshard_quota
aavdonkin Feb 24, 2025
75501cf
Fixed broken tests
aavdonkin Feb 25, 2025
469051d
Fixed tests
aavdonkin Feb 26, 2025
f853af9
TEvWatchNotifyUpdated directly to TColumnShard
aavdonkin Feb 27, 2025
d9a561f
Fixed issues
aavdonkin Mar 3, 2025
520f594
Do not assign subDomainPathId equal to 0
aavdonkin Mar 3, 2025
bfa74fb
Removed default parameter
aavdonkin Mar 4, 2025
ffbb808
Fixed build
aavdonkin Mar 4, 2025
a6440bd
Fixed build
aavdonkin Mar 4, 2025
8469711
Removed parameter subDomainPathId from one constructor TEvProposeTran…
aavdonkin Mar 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ydb/core/protos/counters_columnshard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -205,4 +205,6 @@ enum ETxTypes {
TXTYPE_ASK_PORTION_METADATA = 38 [(TxTypeOpts) = {Name: "TxAskPortionMetadata"}];
TXTYPE_WRITE_PORTIONS_FINISHED = 39 [(TxTypeOpts) = {Name: "TxWritePortionsFinished"}];
TXTYPE_WRITE_PORTIONS_FAILED = 40 [(TxTypeOpts) = {Name: "TxWritePortionsFailed"}];
TXTYPE_PERSIST_SUBDOMAIN_OUT_OF_SPACE = 41 [(TxTypeOpts) = {Name: "TxPersistSubDomainOutOfSpace"}];
TXTYPE_PERSIST_SUBDOMAIN_PATH_ID = 42 [(TxTypeOpts) = {Name: "TxPersistSubDomainPathId"}];
}
1 change: 1 addition & 0 deletions ydb/core/protos/tx_columnshard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ message TEvProposeTransaction {
optional NKikimrSubDomains.TProcessingParams ProcessingParams = 6;
optional uint64 Flags = 7;
optional NKikimrTx.TMessageSeqNo SeqNo = 8;
optional uint64 SubDomainPathId = 9;
}

message TEvCheckPlannedTransaction {
Expand Down
9 changes: 6 additions & 3 deletions ydb/core/tx/columnshard/columnshard.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,16 +140,19 @@ namespace TEvColumnShard {
}

TEvProposeTransaction(NKikimrTxColumnShard::ETransactionKind txKind, ui64 ssId, const TActorId& source,
ui64 txId, TString txBody, const ui32 flags = 0)
ui64 txId, TString txBody, const ui32 flags, ui64 subDomainPathId)
: TEvProposeTransaction(txKind, source, txId, std::move(txBody), flags)
{
// Y_ABORT_UNLESS(txKind == NKikimrTxColumnShard::TX_KIND_SCHEMA);
Record.SetSchemeShardId(ssId);
if (subDomainPathId != 0) {
Record.SetSubDomainPathId(subDomainPathId);
}
}

TEvProposeTransaction(NKikimrTxColumnShard::ETransactionKind txKind, ui64 ssId, const TActorId& source,
ui64 txId, TString txBody, const TMessageSeqNo& seqNo, const NKikimrSubDomains::TProcessingParams& processingParams, const ui32 flags = 0)
: TEvProposeTransaction(txKind, ssId, source, txId, std::move(txBody), flags)
ui64 txId, TString txBody, const TMessageSeqNo& seqNo, const NKikimrSubDomains::TProcessingParams& processingParams, const ui32 flags, ui64 subDomainPathId)
: TEvProposeTransaction(txKind, ssId, source, txId, std::move(txBody), flags, subDomainPathId)
{
Record.MutableProcessingParams()->CopyFrom(processingParams);
*Record.MutableSeqNo() = seqNo.SerializeToProto();
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/columnshard/columnshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ void TTxInit::Complete(const TActorContext& ctx) {
AFL_VERIFY(!Self->IsTxInitFinished);
Self->IsTxInitFinished = true;
Self->TrySwitchToWork(ctx);
if (Self->SpaceWatcher->SubDomainPathId) {
Self->SpaceWatcher->StartWatchingSubDomainPathId();
} else {
Self->SpaceWatcher->StartFindSubDomainPathId();
}
}

class TTxUpdateSchema: public TTransactionBase<TColumnShard> {
Expand Down
10 changes: 10 additions & 0 deletions ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@ class TTxProposeTransaction: public NTabletFlatExecutor::TTransactionBase<TColum
} else {
AFL_VERIFY(Self->CurrentSchemeShardId == record.GetSchemeShardId());
}
if (txKind == NKikimrTxColumnShard::TX_KIND_SCHEMA) {
if (record.HasSubDomainPathId()) {
ui64 subDomainPathId = record.GetSubDomainPathId();
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "propose")("subdomain_id", subDomainPathId);
Self->SpaceWatcher->PersistSubDomainPathId(subDomainPathId, txc);
Self->SpaceWatcher->StartWatchingSubDomainPathId();
} else {
Self->SpaceWatcher->StartFindSubDomainPathId();
}
}
}
std::optional<TMessageSeqNo> msgSeqNo;
if (Ev->Get()->Record.HasSeqNo()) {
Expand Down
24 changes: 17 additions & 7 deletions ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,24 +203,30 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
writeMeta.SetLongTxId(NLongTxService::TLongTxId::FromProto(record.GetLongTxId()));
writeMeta.SetWritePartId(record.GetWritePartId());

const auto returnFail = [&](const NColumnShard::ECumulativeCounters signalIndex, const EWriteFailReason reason) {
const auto returnFail = [&](const NColumnShard::ECumulativeCounters signalIndex, const EWriteFailReason reason, NKikimrTxColumnShard::EResultStatus resultStatus) {
Counters.GetTabletCounters()->IncCounter(signalIndex);

ctx.Send(source, std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR));
ctx.Send(source, std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, resultStatus));
Counters.GetCSCounters().OnFailedWriteResponse(reason);
return;
};

if (SpaceWatcher->SubDomainOutOfSpace && (!record.HasModificationType() || (record.GetModificationType() != NKikimrTxColumnShard::TEvWrite::OPERATION_DELETE))) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_writing")("reason", "quota_exceeded");
Counters.GetTabletCounters()->IncCounter(COUNTER_OUT_OF_SPACE);
return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::Overload, NKikimrTxColumnShard::EResultStatus::OVERLOADED);
}

if (!AppDataVerified().ColumnShardConfig.GetWritingEnabled()) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "skip_writing")("reason", "disabled");
return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::Disabled);
return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::Disabled, NKikimrTxColumnShard::EResultStatus::ERROR);
}

if (!TablesManager.IsReadyForStartWrite(pathId, false)) {
LOG_S_NOTICE("Write (fail) into pathId:" << writeMeta.GetTableId() << (TablesManager.HasPrimaryIndex() ? "" : " no index")
<< " at tablet " << TabletID());

return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::NoTable);
return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::NoTable, NKikimrTxColumnShard::EResultStatus::ERROR);
}

{
Expand All @@ -230,7 +236,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
.CheckWriteData();
if (status.IsFail()) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "writing_fail_through_compaction")("reason", status.GetErrorMessage());
return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::CompactionCriteria);
return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::CompactionCriteria, NKikimrTxColumnShard::EResultStatus::ERROR);
}
}

Expand All @@ -239,7 +245,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
if (!arrowData->ParseFromProto(record)) {
LOG_S_ERROR(
"Write (fail) " << record.GetData().size() << " bytes into pathId " << writeMeta.GetTableId() << " at tablet " << TabletID());
return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::IncorrectSchema);
return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::IncorrectSchema, NKikimrTxColumnShard::EResultStatus::ERROR);
}

NEvWrite::TWriteData writeData(writeMetaPtr, arrowData, snapshotSchema->GetIndexInfo().GetReplaceKey(),
Expand Down Expand Up @@ -564,7 +570,11 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
return;
}

auto overloadStatus = CheckOverloadedImmediate(pathId);
const bool outOfSpace = SpaceWatcher->SubDomainOutOfSpace && (*mType != NEvWrite::EModificationType::Delete);
if (outOfSpace) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_writing")("reason", "quota_exceeded")("source", "dataevent");
}
auto overloadStatus = outOfSpace ? EOverloadStatus::Disk : CheckOverloadedImmediate(pathId);
if (overloadStatus != EOverloadStatus::None) {
std::unique_ptr<NActors::IEventBase> result = NEvents::TDataEvents::TEvWriteResult::BuildError(
Copy link
Collaborator

@zverevgeny zverevgeny Jan 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

в TEvWriteResult есть оцпия STATUS_DISK_SPACE_EXHAUSTED = 10. Можно посмотреть в DS, какой они флаг возвращают при разных ошибках. Не нужно ли, например, при ChannelYellowStop также возвращать STATUS_DISK_SPACE_EXHAUSTED?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

На каналах - согласен. А при исчерпании квоты?

TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, "overload data error");
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
, NormalizerController(StoragesManager, Counters.GetSubscribeCounters())
, SysLocks(this) {
AFL_VERIFY(TabletActivityImpl->Inc() == 1);
SpaceWatcher = new TSpaceWatcher(this);
SpaceWatcherId = TActorContext::AsActorContext().Register(SpaceWatcher);
}

void TColumnShard::OnDetach(const TActorContext& ctx) {
Expand Down Expand Up @@ -1158,6 +1160,7 @@ void TColumnShard::Die(const TActorContext& ctx) {
NTabletPipe::CloseAndForgetClient(SelfId(), StatsReportPipe);
UnregisterMediatorTimeCast();
NYDBTest::TControllers::GetColumnShardController()->OnTabletStopped(*this);
Send(SpaceWatcherId, new NActors::TEvents::TEvPoison);
IActor::Die(ctx);
}

Expand Down Expand Up @@ -1606,6 +1609,7 @@ void TColumnShard::Enqueue(STFUNC_SIG) {
HFunc(TEvPrivate::TEvTieringModified, HandleInit);
HFunc(TEvPrivate::TEvNormalizerResult, Handle);
HFunc(NOlap::NDataAccessorControl::TEvAskTabletDataAccessors, Handle);
HFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle);
default:
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "unexpected event in enqueue");
return NTabletFlatExecutor::TTabletExecutedFlat::Enqueue(ev);
Expand Down
10 changes: 9 additions & 1 deletion ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "background_controller.h"
#include "columnshard.h"
#include "columnshard_private_events.h"
#include "columnshard_subdomain_path_id.h"
#include "counters.h"
#include "defs.h"
#include "inflight_request_tracker.h"
Expand Down Expand Up @@ -39,6 +40,7 @@
#include <ydb/core/tx/tiering/common.h>
#include <ydb/core/tx/time_cast/time_cast.h>
#include <ydb/core/tx/tx_processing.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>

#include <ydb/services/metadata/abstract/common.h>
#include <ydb/services/metadata/service.h>
Expand Down Expand Up @@ -182,6 +184,9 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
friend class TTxRemoveSharedBlobs;
friend class TTxFinishAsyncTransaction;
friend class TWaitEraseTablesTxSubscriber;
friend class TTxPersistSubDomainOutOfSpace;
friend class TTxPersistSubDomainPathId;
friend class TSpaceWatcher;

friend class NOlap::TCleanupPortionsColumnEngineChanges;
friend class NOlap::TCleanupTablesColumnEngineChanges;
Expand Down Expand Up @@ -295,8 +300,8 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
void Handle(NOlap::NDataSharing::NEvents::TEvFinishedFromSource::TPtr& ev, const TActorContext& ctx);
void Handle(NOlap::NDataSharing::NEvents::TEvAckFinishToSource::TPtr& ev, const TActorContext& ctx);
void Handle(NOlap::NDataSharing::NEvents::TEvAckFinishFromInitiator::TPtr& ev, const TActorContext& ctx);

void Handle(NOlap::NDataAccessorControl::TEvAskTabletDataAccessors::TPtr& ev, const TActorContext& ctx);
void Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& ctx);

void HandleInit(TEvPrivate::TEvTieringModified::TPtr& ev, const TActorContext&);

Expand Down Expand Up @@ -463,6 +468,7 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
HFunc(NOlap::NDataSharing::NEvents::TEvAckFinishToSource, Handle);
HFunc(NOlap::NDataSharing::NEvents::TEvAckFinishFromInitiator, Handle);
HFunc(NOlap::NDataAccessorControl::TEvAskTabletDataAccessors, Handle);
HFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle);

default:
if (!HandleDefaultEvents(ev, SelfId())) {
Expand Down Expand Up @@ -552,6 +558,8 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
TLimits Limits;
NOlap::TNormalizationController NormalizerController;
NDataShard::TSysLocks SysLocks;
TSpaceWatcher* SpaceWatcher;
TActorId SpaceWatcherId;

void TryRegisterMediatorTimeCast();
void UnregisterMediatorTimeCast();
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/columnshard/columnshard_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ struct Schema : NIceDb::Schema {
LastCompletedTxId = 14,
LastNormalizerSequentialId = 15,
GCBarrierPreparationGen = 16,
GCBarrierPreparationStep = 17
GCBarrierPreparationStep = 17,
SubDomainLocalPathId = 18,
SubDomainOutOfSpace = 19
};

enum class EInsertTableIds : ui8 {
Expand Down
120 changes: 120 additions & 0 deletions ydb/core/tx/columnshard/columnshard_subdomain_path_id.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
#include "columnshard_impl.h"

namespace NKikimr::NColumnShard {

class TTxPersistSubDomainOutOfSpace : public NTabletFlatExecutor::TTransactionBase<TColumnShard> {
public:
TTxPersistSubDomainOutOfSpace(TColumnShard* self, bool outOfSpace)
: TTransactionBase(self)
, OutOfSpace(outOfSpace)
{ }

TTxType GetTxType() const override { return TXTYPE_PERSIST_SUBDOMAIN_OUT_OF_SPACE; }

bool Execute(TTransactionContext& txc, const TActorContext&) override {
NIceDb::TNiceDb db(txc.DB);

if (Self->SpaceWatcher->SubDomainOutOfSpace != OutOfSpace) {
Schema::SaveSpecialValue(db, Schema::EValueIds::SubDomainOutOfSpace, ui64(OutOfSpace ? 1 : 0));
Self->SpaceWatcher->SubDomainOutOfSpace = OutOfSpace;
}

return true;
}

void Complete(const TActorContext&) override {
// nothing
}

private:
const bool OutOfSpace;
};

class TTxPersistSubDomainPathId : public NTabletFlatExecutor::TTransactionBase<TColumnShard> {
public:
TTxPersistSubDomainPathId(TColumnShard* self, ui64 localPathId)
: TTransactionBase(self)
, LocalPathId(localPathId)
{ }

TTxType GetTxType() const override { return TXTYPE_PERSIST_SUBDOMAIN_PATH_ID; }

bool Execute(TTransactionContext& txc, const TActorContext&) override {
if (!Self->SpaceWatcher->SubDomainPathId) {
Self->SpaceWatcher->PersistSubDomainPathId(LocalPathId, txc);
Self->SpaceWatcher->StartWatchingSubDomainPathId();
}
return true;
}

void Complete(const TActorContext&) override {
// nothing
}

private:
const ui64 LocalPathId;
};

void TSpaceWatcher::PersistSubDomainPathId(ui64 localPathId,
NTabletFlatExecutor::TTransactionContext &txc) {
SubDomainPathId = localPathId;
NIceDb::TNiceDb db(txc.DB);
Schema::SaveSpecialValue(db, Schema::EValueIds::SubDomainLocalPathId, localPathId);
}

void TSpaceWatcher::StopWatchingSubDomainPathId() {
if (WatchingSubDomainPathId) {
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchRemove());
WatchingSubDomainPathId.reset();
}
}

void TSpaceWatcher::StartWatchingSubDomainPathId() {
if (!SubDomainPathId) {
return;
}

if (!WatchingSubDomainPathId) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("started_watching_subdomain", *SubDomainPathId);
Self->Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchPathId(TPathId(Self->CurrentSchemeShardId, *SubDomainPathId)));
WatchingSubDomainPathId = *SubDomainPathId;
}
}

void TSpaceWatcher::Handle(NActors::TEvents::TEvPoison::TPtr& , const TActorContext& ctx) {
Die(ctx);
}

void TColumnShard::Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& ctx) {
const auto* msg = ev->Get();
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("notify_subdomain", msg->PathId);
const bool outOfSpace = msg->Result->GetPathDescription()
.GetDomainDescription()
.GetDomainState()
.GetDiskQuotaExceeded();

Execute(new TTxPersistSubDomainOutOfSpace(this, outOfSpace), ctx);
}

static constexpr TDuration MaxFindSubDomainPathIdDelay = TDuration::Minutes(10);

void TSpaceWatcher::StartFindSubDomainPathId(bool delayFirstRequest) {
if (!FindSubDomainPathIdActor &&
Self->CurrentSchemeShardId != 0 &&
(!SubDomainPathId))
{
FindSubDomainPathIdActor = Register(CreateFindSubDomainPathIdActor(SelfId(), Self->TabletID(), Self->CurrentSchemeShardId, delayFirstRequest, MaxFindSubDomainPathIdDelay));
}
}


void TSpaceWatcher::Handle(NSchemeShard::TEvSchemeShard::TEvSubDomainPathIdFound::TPtr& ev, const TActorContext& ctx) {
const auto* msg = ev->Get();
if (FindSubDomainPathIdActor == ev->Sender) {
FindSubDomainPathIdActor = { };
}
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "subdomain_found")("scheme_shard_id", msg->SchemeShardId)("local_path_id", msg->LocalPathId);
Self->Execute(new TTxPersistSubDomainPathId(Self, msg->LocalPathId), ctx);
}

}
56 changes: 56 additions & 0 deletions ydb/core/tx/columnshard/columnshard_subdomain_path_id.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#pragma once

#include <ydb/library/actors/core/actorid.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>

namespace NKikimr::NColumnShard::NLoading {
class TSpecialValuesInitializer;
};

namespace NKikimr::NColumnShard {

class TSpaceWatcher : public TActorBootstrapped<TSpaceWatcher> {
TColumnShard* Self;
NActors::TActorId FindSubDomainPathIdActor;
std::optional<NKikimr::TLocalPathId> SubDomainPathId;
std::optional<NKikimr::TLocalPathId> WatchingSubDomainPathId;
bool SubDomainOutOfSpace = false;

public:
friend class TColumnShard;
friend class TTxInit;
friend class TTxPersistSubDomainOutOfSpace;
friend class TTxPersistSubDomainPathId;
friend class NKikimr::NColumnShard::NLoading::TSpecialValuesInitializer;

public:
TSpaceWatcher(TColumnShard* self)
: Self(self) {
}

void PersistSubDomainPathId(ui64 localPathId, NTabletFlatExecutor::TTransactionContext &txc);
void StopWatchingSubDomainPathId();
void StartWatchingSubDomainPathId();
void StartFindSubDomainPathId(bool delayFirstRequest = true);

void Bootstrap(const TActorContext& /*ctx*/) {
Become(&TThis::StateWork);
}

void Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& ctx);
void Handle(NSchemeShard::TEvSchemeShard::TEvSubDomainPathIdFound::TPtr& ev, const TActorContext&);
void Handle(NActors::TEvents::TEvPoison::TPtr& ev, const TActorContext&);

STFUNC(StateWork) {
switch (ev->GetTypeRewrite()) {
HFunc(NSchemeShard::TEvSchemeShard::TEvSubDomainPathIdFound, Handle);
HFunc(NActors::TEvents::TEvPoison, Handle);
default:
LOG_S_WARN("TSpaceWatcher.StateWork at " << " unhandled event type: " << ev->GetTypeName()
<< " event: " << ev->ToString());
break;
}
}
};

}
Loading
Loading