Skip to content

Commit ac73a05

Browse files
authored
Merge 69c4eac into b76c6f8
2 parents b76c6f8 + 69c4eac commit ac73a05

15 files changed

+205
-5
lines changed

ydb/core/protos/counters_columnshard.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,4 +203,6 @@ enum ETxTypes {
203203
TXTYPE_START_INTERNAL_SCAN = 36 [(TxTypeOpts) = {Name: "TxStartInternalScan"}];
204204
TXTYPE_DATA_SHARING_START_SOURCE_CURSOR = 37 [(TxTypeOpts) = {Name: "TxDataSharingStartSourceCursor"}];
205205
TXTYPE_ASK_PORTION_METADATA = 38 [(TxTypeOpts) = {Name: "TxAskPortionMetadata"}];
206+
TXTYPE_PERSIST_SUBDOMAIN_OUT_OF_SPACE = 39 [(TxTypeOpts) = {Name: "TxPersistSubDomainOutOfSpace"}];
207+
TXTYPE_PERSIST_SUBDOMAIN_PATH_ID = 40 [(TxTypeOpts) = {Name: "TxPersistSubDomainPathId"}];
206208
}

ydb/core/protos/tx_columnshard.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ message TEvProposeTransaction {
162162
optional NKikimrSubDomains.TProcessingParams ProcessingParams = 6;
163163
optional uint64 Flags = 7;
164164
optional NKikimrTx.TMessageSeqNo SeqNo = 8;
165+
optional uint64 SubDomainPathId = 9;
165166
}
166167

167168
message TEvCheckPlannedTransaction {

ydb/core/tx/columnshard/columnshard__init.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,11 @@ void TTxInit::Complete(const TActorContext& ctx) {
107107
Self->ProgressTxController->OnTabletInit();
108108
Self->SwitchToWork(ctx);
109109
NYDBTest::TControllers::GetColumnShardController()->OnTabletInitCompleted(*Self);
110+
if (Self->SubDomainPathId) {
111+
Self->StartWatchingSubDomainPathId();
112+
} else {
113+
Self->StartFindSubDomainPathId();
114+
}
110115
}
111116

112117
class TTxUpdateSchema: public TTransactionBase<TColumnShard> {

ydb/core/tx/columnshard/columnshard__propose_transaction.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,14 @@ class TTxProposeTransaction: public NTabletFlatExecutor::TTransactionBase<TColum
5555
AFL_VERIFY(Self->CurrentSchemeShardId == record.GetSchemeShardId());
5656
}
5757
}
58+
if (txKind == NKikimrTxColumnShard::TX_KIND_SCHEMA) {
59+
ui64 subDomainPathId = record.GetSubDomainPathId();
60+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "propose")("subdomain_id", subDomainPathId);
61+
if (subDomainPathId) {
62+
Self->PersistSubDomainPathId(record.GetSchemeShardId(), subDomainPathId, txc);
63+
Self->StartWatchingSubDomainPathId();
64+
}
65+
}
5866
std::optional<TMessageSeqNo> msgSeqNo;
5967
if (Ev->Get()->Record.HasSeqNo()) {
6068
TMessageSeqNo seqNo;
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
#include "columnshard_impl.h"
2+
3+
namespace NKikimr::NColumnShard {
4+
5+
class TTxPersistSubDomainOutOfSpace : public NTabletFlatExecutor::TTransactionBase<TColumnShard> {
6+
public:
7+
TTxPersistSubDomainOutOfSpace(TColumnShard* self, bool outOfSpace)
8+
: TTransactionBase(self)
9+
, OutOfSpace(outOfSpace)
10+
{ }
11+
12+
TTxType GetTxType() const override { return TXTYPE_PERSIST_SUBDOMAIN_OUT_OF_SPACE; }
13+
14+
bool Execute(TTransactionContext& txc, const TActorContext&) override {
15+
NIceDb::TNiceDb db(txc.DB);
16+
17+
if (Self->SubDomainOutOfSpace != OutOfSpace) {
18+
Schema::SaveSpecialValue(db, Schema::EValueIds::SubDomainOutOfSpace, ui64(OutOfSpace ? 1 : 0));
19+
Self->SubDomainOutOfSpace = OutOfSpace;
20+
}
21+
22+
return true;
23+
}
24+
25+
void Complete(const TActorContext&) override {
26+
// nothing
27+
}
28+
29+
private:
30+
const bool OutOfSpace = false;
31+
};
32+
33+
class TTxPersistSubDomainPathId : public NTabletFlatExecutor::TTransactionBase<TColumnShard> {
34+
public:
35+
TTxPersistSubDomainPathId(TColumnShard* self, ui64 schemeShardId, ui64 localPathId)
36+
: TTransactionBase(self)
37+
, SchemeShardId(schemeShardId)
38+
, LocalPathId(localPathId)
39+
{ }
40+
41+
TTxType GetTxType() const override { return TXTYPE_PERSIST_SUBDOMAIN_PATH_ID; }
42+
43+
bool Execute(TTransactionContext& txc, const TActorContext&) override {
44+
if (Self->CurrentSchemeShardId == SchemeShardId &&
45+
!Self->SubDomainPathId || Self->SubDomainPathId->OwnerId != SchemeShardId)
46+
{
47+
Self->PersistSubDomainPathId(SchemeShardId, LocalPathId, txc);
48+
Self->StartWatchingSubDomainPathId();
49+
}
50+
return true;
51+
}
52+
53+
void Complete(const TActorContext&) override {
54+
// nothing
55+
}
56+
57+
private:
58+
const ui64 SchemeShardId;
59+
const ui64 LocalPathId;
60+
};
61+
62+
void TColumnShard::PersistSubDomainPathId(ui64 ownerId, ui64 localPathId,
63+
NTabletFlatExecutor::TTransactionContext &txc) {
64+
SubDomainPathId.emplace(ownerId, localPathId);
65+
NIceDb::TNiceDb db(txc.DB);
66+
Schema::SaveSpecialValue(db, Schema::EValueIds::SubDomainOwnerId, ownerId);
67+
Schema::SaveSpecialValue(db, Schema::EValueIds::SubDomainLocalPathId, localPathId);
68+
}
69+
70+
void TColumnShard::StopWatchingSubDomainPathId() {
71+
if (WatchingSubDomainPathId) {
72+
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchRemove());
73+
WatchingSubDomainPathId.reset();
74+
}
75+
}
76+
77+
void TColumnShard::StartWatchingSubDomainPathId() {
78+
if (!SubDomainPathId || SubDomainPathId->OwnerId != CurrentSchemeShardId) {
79+
return;
80+
}
81+
82+
if (WatchingSubDomainPathId && *WatchingSubDomainPathId != *SubDomainPathId) {
83+
StopWatchingSubDomainPathId();
84+
}
85+
86+
if (!WatchingSubDomainPathId) {
87+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("started_watching_subdomain", *SubDomainPathId);
88+
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchPathId(*SubDomainPathId));
89+
WatchingSubDomainPathId = *SubDomainPathId;
90+
}
91+
}
92+
93+
void TColumnShard::Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& ctx) {
94+
const auto* msg = ev->Get();
95+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("notify_subdomain", msg->PathId);
96+
if (SubDomainPathId && msg->PathId == *SubDomainPathId) {
97+
const bool outOfSpace = msg->Result->GetPathDescription()
98+
.GetDomainDescription()
99+
.GetDomainState()
100+
.GetDiskQuotaExceeded();
101+
102+
LOG_DEBUG_S(ctx, NKikimrServices::TX_COLUMNSHARD,
103+
"Discovered subdomain " << msg->PathId << " state, outOfSpace = " << outOfSpace
104+
<< " at columnshard " << TabletID());
105+
106+
Execute(new TTxPersistSubDomainOutOfSpace(this, outOfSpace), ctx);
107+
}
108+
}
109+
110+
static constexpr TDuration MaxFindSubDomainPathIdDelay = TDuration::Minutes(10);
111+
112+
void TColumnShard::StartFindSubDomainPathId(bool delayFirstRequest) {
113+
if (!FindSubDomainPathIdActor &&
114+
CurrentSchemeShardId != 0 &&
115+
(!SubDomainPathId || SubDomainPathId->OwnerId != CurrentSchemeShardId))
116+
{
117+
FindSubDomainPathIdActor = Register(CreateFindSubDomainPathIdActor(SelfId(), TabletID(), CurrentSchemeShardId, delayFirstRequest, MaxFindSubDomainPathIdDelay));
118+
}
119+
}
120+
121+
122+
void TColumnShard::Handle(NSchemeShard::TEvSchemeShard::TEvSubDomainPathIdFound::TPtr& ev, const TActorContext& ctx) {
123+
const auto* msg = ev->Get();
124+
if (FindSubDomainPathIdActor == ev->Sender) {
125+
FindSubDomainPathIdActor = { };
126+
}
127+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "subdomain_found")("scheme_shard_id", msg->SchemeShardId)("local_path_id", msg->LocalPathId);
128+
Execute(new TTxPersistSubDomainPathId(this, msg->SchemeShardId, msg->LocalPathId), ctx);
129+
}
130+
131+
}

ydb/core/tx/columnshard/columnshard__write.cpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,14 +203,20 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
203203
writeMeta.SetLongTxId(NLongTxService::TLongTxId::FromProto(record.GetLongTxId()));
204204
writeMeta.SetWritePartId(record.GetWritePartId());
205205

206-
const auto returnFail = [&](const NColumnShard::ECumulativeCounters signalIndex, const EWriteFailReason reason) {
206+
const auto returnFail = [&](const NColumnShard::ECumulativeCounters signalIndex, const EWriteFailReason reason, NKikimrTxColumnShard::EResultStatus resultStatus = NKikimrTxColumnShard::EResultStatus::ERROR) {
207207
Counters.GetTabletCounters()->IncCounter(signalIndex);
208208

209-
ctx.Send(source, std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR));
209+
ctx.Send(source, std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, resultStatus));
210210
Counters.GetCSCounters().OnFailedWriteResponse(reason);
211211
return;
212212
};
213213

214+
if (SubDomainOutOfSpace && record.HasModificationType() && (record.GetModificationType() != NKikimrTxColumnShard::TEvWrite::OPERATION_DELETE)) {
215+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_writing")("reason", "quota_exceeded");
216+
Counters.GetTabletCounters()->IncCounter(COUNTER_OUT_OF_SPACE);
217+
return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::Overload, NKikimrTxColumnShard::EResultStatus::OVERLOADED);
218+
}
219+
214220
if (!AppDataVerified().ColumnShardConfig.GetWritingEnabled()) {
215221
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_writing")("reason", "disabled");
216222
return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::Disabled);

ydb/core/tx/columnshard/columnshard_impl.h

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,8 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
174174
friend class TTxRemoveSharedBlobs;
175175
friend class TTxFinishAsyncTransaction;
176176
friend class TWaitEraseTablesTxSubscriber;
177+
friend class TTxPersistSubDomainOutOfSpace;
178+
friend class TTxPersistSubDomainPathId;
177179

178180
friend class NOlap::TCleanupPortionsColumnEngineChanges;
179181
friend class NOlap::TCleanupTablesColumnEngineChanges;
@@ -282,8 +284,9 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
282284
void Handle(NOlap::NDataSharing::NEvents::TEvFinishedFromSource::TPtr& ev, const TActorContext& ctx);
283285
void Handle(NOlap::NDataSharing::NEvents::TEvAckFinishToSource::TPtr& ev, const TActorContext& ctx);
284286
void Handle(NOlap::NDataSharing::NEvents::TEvAckFinishFromInitiator::TPtr& ev, const TActorContext& ctx);
285-
286287
void Handle(NOlap::NDataAccessorControl::TEvAskTabletDataAccessors::TPtr& ev, const TActorContext& ctx);
288+
void Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& ctx);
289+
void Handle(NSchemeShard::TEvSchemeShard::TEvSubDomainPathIdFound::TPtr& ev, const TActorContext&);
287290

288291
ITransaction* CreateTxInitSchema();
289292

@@ -362,6 +365,11 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
362365
return true;
363366
}
364367

368+
void PersistSubDomainPathId(ui64 ownerId, ui64 localPathId, NTabletFlatExecutor::TTransactionContext &txc);
369+
void StopWatchingSubDomainPathId();
370+
void StartWatchingSubDomainPathId();
371+
void StartFindSubDomainPathId(bool delayFirstRequest = true);
372+
365373
private:
366374
void OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteMeta& writeMeta, const ui64 writeSize, const ui64 cookie,
367375
std::unique_ptr<NActors::IEventBase>&& event, const TActorContext& ctx);
@@ -445,6 +453,8 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
445453
HFunc(NOlap::NDataSharing::NEvents::TEvAckFinishToSource, Handle);
446454
HFunc(NOlap::NDataSharing::NEvents::TEvAckFinishFromInitiator, Handle);
447455
HFunc(NOlap::NDataAccessorControl::TEvAskTabletDataAccessors, Handle);
456+
HFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle);
457+
HFunc(NSchemeShard::TEvSchemeShard::TEvSubDomainPathIdFound, Handle);
448458

449459
default:
450460
if (!HandleDefaultEvents(ev, SelfId())) {
@@ -531,6 +541,11 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
531541
TLimits Limits;
532542
NOlap::TNormalizationController NormalizerController;
533543
NDataShard::TSysLocks SysLocks;
544+
TActorId FindSubDomainPathIdActor;
545+
std::optional<TPathId> SubDomainPathId;
546+
std::optional<TPathId> WatchingSubDomainPathId;
547+
bool SubDomainOutOfSpace = false;
548+
534549
static TDuration GetMaxReadStaleness();
535550

536551
void TryRegisterMediatorTimeCast();

ydb/core/tx/columnshard/columnshard_schema.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,10 @@ struct Schema : NIceDb::Schema {
8484
LastCompletedTxId = 14,
8585
LastNormalizerSequentialId = 15,
8686
GCBarrierPreparationGen = 16,
87-
GCBarrierPreparationStep = 17
87+
GCBarrierPreparationStep = 17,
88+
SubDomainOwnerId = 18,
89+
SubDomainLocalPathId = 19,
90+
SubDomainOutOfSpace = 20
8891
};
8992

9093
enum class EInsertTableIds : ui8 {

ydb/core/tx/columnshard/loading/stages.cpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,25 @@ bool TSpecialValuesInitializer::DoExecute(NTabletFlatExecutor::TTransactionConte
172172
return false;
173173
}
174174

175+
std::optional<ui64> subDomainOwnerId;
176+
std::optional<ui64> subDomainLocalPathId;
177+
if (!Schema::GetSpecialValueOpt(db, Schema::EValueIds::SubDomainOwnerId, subDomainOwnerId)) {
178+
return false;
179+
}
180+
if (!Schema::GetSpecialValueOpt(db, Schema::EValueIds::SubDomainLocalPathId, subDomainLocalPathId)) {
181+
return false;
182+
}
183+
184+
if (subDomainOwnerId.has_value() && subDomainLocalPathId.has_value()) {
185+
Self->SubDomainPathId.emplace(*subDomainOwnerId, *subDomainLocalPathId);
186+
}
187+
188+
ui64 outOfSpace = 0;
189+
if (!Schema::GetSpecialValueOpt(db, Schema::EValueIds::SubDomainOutOfSpace, outOfSpace)) {
190+
return false;
191+
}
192+
Self->SubDomainOutOfSpace = outOfSpace;
193+
175194
{
176195
ui64 lastCompletedStep = 0;
177196
ui64 lastCompletedTx = 0;

ydb/core/tx/columnshard/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ SRCS(
1010
columnshard__progress_tx.cpp
1111
columnshard__propose_cancel.cpp
1212
columnshard__propose_transaction.cpp
13+
columnshard__quota.cpp
1314
columnshard__scan.cpp
1415
columnshard__statistics.cpp
1516
columnshard__write.cpp

0 commit comments

Comments
 (0)