Skip to content

Commit cbfb499

Browse files
authored
Merge 310fb92 into 480f9da
2 parents 480f9da + 310fb92 commit cbfb499

14 files changed

+200
-4
lines changed

ydb/core/protos/counters_columnshard.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,4 +203,6 @@ enum ETxTypes {
203203
TXTYPE_START_INTERNAL_SCAN = 36 [(TxTypeOpts) = {Name: "TxStartInternalScan"}];
204204
TXTYPE_DATA_SHARING_START_SOURCE_CURSOR = 37 [(TxTypeOpts) = {Name: "TxDataSharingStartSourceCursor"}];
205205
TXTYPE_ASK_PORTION_METADATA = 38 [(TxTypeOpts) = {Name: "TxAskPortionMetadata"}];
206+
TXTYPE_PERSIST_SUBDOMAIN_OUT_OF_SPACE = 39 [(TxTypeOpts) = {Name: "TxPersistSubDomainOutOfSpace"}];
207+
TXTYPE_PERSIST_SUBDOMAIN_PATH_ID = 40 [(TxTypeOpts) = {Name: "TxPersistSubDomainPathId"}];
206208
}

ydb/core/protos/tx_columnshard.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ message TEvProposeTransaction {
162162
optional NKikimrSubDomains.TProcessingParams ProcessingParams = 6;
163163
optional uint64 Flags = 7;
164164
optional NKikimrTx.TMessageSeqNo SeqNo = 8;
165+
optional uint64 SubDomainPathId = 9;
165166
}
166167

167168
message TEvCheckPlannedTransaction {

ydb/core/tx/columnshard/columnshard__init.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,11 @@ void TTxInit::Complete(const TActorContext& ctx) {
107107
Self->ProgressTxController->OnTabletInit();
108108
Self->SwitchToWork(ctx);
109109
NYDBTest::TControllers::GetColumnShardController()->OnTabletInitCompleted(*Self);
110+
if (Self->SubDomainPathId) {
111+
Self->StartWatchingSubDomainPathId();
112+
} else {
113+
Self->StartFindSubDomainPathId();
114+
}
110115
}
111116

112117
class TTxUpdateSchema: public TTransactionBase<TColumnShard> {

ydb/core/tx/columnshard/columnshard__propose_transaction.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,14 @@ class TTxProposeTransaction: public NTabletFlatExecutor::TTransactionBase<TColum
5555
AFL_VERIFY(Self->CurrentSchemeShardId == record.GetSchemeShardId());
5656
}
5757
}
58+
if (txKind == NKikimrTxColumnShard::TX_KIND_SCHEMA) {
59+
ui64 subDomainPathId = record.GetSubDomainPathId();
60+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "propose")("subdomain_id", subDomainPathId);
61+
if (subDomainPathId) {
62+
Self->PersistSubDomainPathId(record.GetSchemeShardId(), subDomainPathId, txc);
63+
Self->StartWatchingSubDomainPathId();
64+
}
65+
}
5866
std::optional<TMessageSeqNo> msgSeqNo;
5967
if (Ev->Get()->Record.HasSeqNo()) {
6068
TMessageSeqNo seqNo;

ydb/core/tx/columnshard/columnshard__write.cpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,14 +203,20 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
203203
writeMeta.SetLongTxId(NLongTxService::TLongTxId::FromProto(record.GetLongTxId()));
204204
writeMeta.SetWritePartId(record.GetWritePartId());
205205

206-
const auto returnFail = [&](const NColumnShard::ECumulativeCounters signalIndex, const EWriteFailReason reason) {
206+
const auto returnFail = [&](const NColumnShard::ECumulativeCounters signalIndex, const EWriteFailReason reason, NKikimrTxColumnShard::EResultStatus resultStatus = NKikimrTxColumnShard::EResultStatus::ERROR) {
207207
Counters.GetTabletCounters()->IncCounter(signalIndex);
208208

209-
ctx.Send(source, std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR));
209+
ctx.Send(source, std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, resultStatus));
210210
Counters.GetCSCounters().OnFailedWriteResponse(reason);
211211
return;
212212
};
213213

214+
if (SubDomainOutOfSpace && record.HasModificationType() && (record.GetModificationType() != NKikimrTxColumnShard::TEvWrite::OPERATION_DELETE)) {
215+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_writing")("reason", "quota_exceeded");
216+
Counters.GetTabletCounters()->IncCounter(COUNTER_OUT_OF_SPACE);
217+
return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::Overload, NKikimrTxColumnShard::EResultStatus::OVERLOADED);
218+
}
219+
214220
if (!AppDataVerified().ColumnShardConfig.GetWritingEnabled()) {
215221
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_writing")("reason", "disabled");
216222
return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::Disabled);

ydb/core/tx/columnshard/columnshard_impl.cpp

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,64 @@ namespace NKikimr::NColumnShard {
5858
// But in unittests we want to test both scenarios
5959
bool gAllowLogBatchingDefaultValue = true;
6060

61+
class TTxPersistSubDomainOutOfSpace : public NTabletFlatExecutor::TTransactionBase<TColumnShard> {
62+
public:
63+
TTxPersistSubDomainOutOfSpace(TColumnShard* self, bool outOfSpace)
64+
: TTransactionBase(self)
65+
, OutOfSpace(outOfSpace)
66+
{ }
67+
68+
TTxType GetTxType() const override { return TXTYPE_PERSIST_SUBDOMAIN_OUT_OF_SPACE; }
69+
70+
bool Execute(TTransactionContext& txc, const TActorContext&) override {
71+
NIceDb::TNiceDb db(txc.DB);
72+
73+
if (Self->SubDomainOutOfSpace != OutOfSpace) {
74+
Schema::SaveSpecialValue(db, Schema::EValueIds::SubDomainOutOfSpace, ui64(OutOfSpace ? 1 : 0));
75+
Self->SubDomainOutOfSpace = OutOfSpace;
76+
}
77+
78+
return true;
79+
}
80+
81+
void Complete(const TActorContext&) override {
82+
// nothing
83+
}
84+
85+
private:
86+
const bool OutOfSpace = false;
87+
};
88+
89+
class TTxPersistSubDomainPathId : public NTabletFlatExecutor::TTransactionBase<TColumnShard> {
90+
public:
91+
TTxPersistSubDomainPathId(TColumnShard* self, ui64 schemeShardId, ui64 localPathId)
92+
: TTransactionBase(self)
93+
, SchemeShardId(schemeShardId)
94+
, LocalPathId(localPathId)
95+
{ }
96+
97+
TTxType GetTxType() const override { return TXTYPE_PERSIST_SUBDOMAIN_PATH_ID; }
98+
99+
bool Execute(TTransactionContext& txc, const TActorContext&) override {
100+
if (Self->CurrentSchemeShardId == SchemeShardId &&
101+
!Self->SubDomainPathId || Self->SubDomainPathId->OwnerId != SchemeShardId)
102+
{
103+
Self->PersistSubDomainPathId(SchemeShardId, LocalPathId, txc);
104+
Self->StartWatchingSubDomainPathId();
105+
}
106+
return true;
107+
}
108+
109+
void Complete(const TActorContext&) override {
110+
// nothing
111+
}
112+
113+
private:
114+
const ui64 SchemeShardId;
115+
const ui64 LocalPathId;
116+
};
117+
118+
61119
namespace {
62120

63121
NTabletPipe::TClientConfig GetPipeClientConfig() {
@@ -1463,4 +1521,73 @@ TDuration TColumnShard::GetMaxReadStaleness() {
14631521
return NYDBTest::TControllers::GetColumnShardController()->GetReadTimeoutClean();
14641522
}
14651523

1524+
void TColumnShard::PersistSubDomainPathId(ui64 ownerId, ui64 localPathId,
1525+
NTabletFlatExecutor::TTransactionContext &txc) {
1526+
SubDomainPathId.emplace(ownerId, localPathId);
1527+
NIceDb::TNiceDb db(txc.DB);
1528+
Schema::SaveSpecialValue(db, Schema::EValueIds::SubDomainOwnerId, ownerId);
1529+
Schema::SaveSpecialValue(db, Schema::EValueIds::SubDomainLocalPathId, localPathId);
1530+
}
1531+
1532+
void TColumnShard::StopWatchingSubDomainPathId() {
1533+
if (WatchingSubDomainPathId) {
1534+
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchRemove());
1535+
WatchingSubDomainPathId.reset();
1536+
}
1537+
}
1538+
1539+
void TColumnShard::StartWatchingSubDomainPathId() {
1540+
if (!SubDomainPathId || SubDomainPathId->OwnerId != CurrentSchemeShardId) {
1541+
return;
1542+
}
1543+
1544+
if (WatchingSubDomainPathId && *WatchingSubDomainPathId != *SubDomainPathId) {
1545+
StopWatchingSubDomainPathId();
1546+
}
1547+
1548+
if (!WatchingSubDomainPathId) {
1549+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("started_watching_subdomain", *SubDomainPathId);
1550+
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchPathId(*SubDomainPathId));
1551+
WatchingSubDomainPathId = *SubDomainPathId;
1552+
}
1553+
}
1554+
1555+
void TColumnShard::Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& ctx) {
1556+
const auto* msg = ev->Get();
1557+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("notify_subdomain", msg->PathId);
1558+
if (SubDomainPathId && msg->PathId == *SubDomainPathId) {
1559+
const bool outOfSpace = msg->Result->GetPathDescription()
1560+
.GetDomainDescription()
1561+
.GetDomainState()
1562+
.GetDiskQuotaExceeded();
1563+
1564+
LOG_DEBUG_S(ctx, NKikimrServices::TX_COLUMNSHARD,
1565+
"Discovered subdomain " << msg->PathId << " state, outOfSpace = " << outOfSpace
1566+
<< " at columnshard " << TabletID());
1567+
1568+
Execute(new TTxPersistSubDomainOutOfSpace(this, outOfSpace), ctx);
1569+
}
1570+
}
1571+
1572+
static constexpr TDuration MaxFindSubDomainPathIdDelay = TDuration::Minutes(10);
1573+
1574+
void TColumnShard::StartFindSubDomainPathId(bool delayFirstRequest) {
1575+
if (!FindSubDomainPathIdActor &&
1576+
CurrentSchemeShardId != 0 &&
1577+
(!SubDomainPathId || SubDomainPathId->OwnerId != CurrentSchemeShardId))
1578+
{
1579+
FindSubDomainPathIdActor = Register(CreateFindSubDomainPathIdActor(SelfId(), TabletID(), CurrentSchemeShardId, delayFirstRequest, MaxFindSubDomainPathIdDelay));
1580+
}
1581+
}
1582+
1583+
1584+
void TColumnShard::Handle(NSchemeShard::TEvSchemeShard::TEvSubDomainPathIdFound::TPtr& ev, const TActorContext& ctx) {
1585+
const auto* msg = ev->Get();
1586+
if (FindSubDomainPathIdActor == ev->Sender) {
1587+
FindSubDomainPathIdActor = { };
1588+
}
1589+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "subdomain_found")("scheme_shard_id", msg->SchemeShardId)("local_path_id", msg->LocalPathId);
1590+
Execute(new TTxPersistSubDomainPathId(this, msg->SchemeShardId, msg->LocalPathId), ctx);
1591+
}
1592+
14661593
} // namespace NKikimr::NColumnShard

ydb/core/tx/columnshard/columnshard_impl.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,8 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
174174
friend class TTxRemoveSharedBlobs;
175175
friend class TTxFinishAsyncTransaction;
176176
friend class TWaitEraseTablesTxSubscriber;
177+
friend class TTxPersistSubDomainOutOfSpace;
178+
friend class TTxPersistSubDomainPathId;
177179

178180
friend class NOlap::TCleanupPortionsColumnEngineChanges;
179181
friend class NOlap::TCleanupTablesColumnEngineChanges;
@@ -283,6 +285,8 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
283285
void Handle(NOlap::NDataSharing::NEvents::TEvAckFinishToSource::TPtr& ev, const TActorContext& ctx);
284286
void Handle(NOlap::NDataSharing::NEvents::TEvAckFinishFromInitiator::TPtr& ev, const TActorContext& ctx);
285287

288+
void Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& ctx);
289+
void Handle(NSchemeShard::TEvSchemeShard::TEvSubDomainPathIdFound::TPtr& ev, const TActorContext&);
286290
void Handle(NOlap::NDataAccessorControl::TEvAskTabletDataAccessors::TPtr& ev, const TActorContext& ctx);
287291

288292
ITransaction* CreateTxInitSchema();
@@ -362,6 +366,11 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
362366
return true;
363367
}
364368

369+
void PersistSubDomainPathId(ui64 ownerId, ui64 localPathId, NTabletFlatExecutor::TTransactionContext &txc);
370+
void StopWatchingSubDomainPathId();
371+
void StartWatchingSubDomainPathId();
372+
void StartFindSubDomainPathId(bool delayFirstRequest = true);
373+
365374
private:
366375
void OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteMeta& writeMeta, const ui64 writeSize, const ui64 cookie,
367376
std::unique_ptr<NActors::IEventBase>&& event, const TActorContext& ctx);
@@ -444,6 +453,8 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
444453
HFunc(NOlap::NDataSharing::NEvents::TEvFinishedFromSource, Handle);
445454
HFunc(NOlap::NDataSharing::NEvents::TEvAckFinishToSource, Handle);
446455
HFunc(NOlap::NDataSharing::NEvents::TEvAckFinishFromInitiator, Handle);
456+
HFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle);
457+
HFunc(NSchemeShard::TEvSchemeShard::TEvSubDomainPathIdFound, Handle);
447458
HFunc(NOlap::NDataAccessorControl::TEvAskTabletDataAccessors, Handle);
448459

449460
default:
@@ -531,6 +542,11 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
531542
TLimits Limits;
532543
NOlap::TNormalizationController NormalizerController;
533544
NDataShard::TSysLocks SysLocks;
545+
TActorId FindSubDomainPathIdActor;
546+
std::optional<TPathId> SubDomainPathId;
547+
std::optional<TPathId> WatchingSubDomainPathId;
548+
bool SubDomainOutOfSpace = false;
549+
534550
static TDuration GetMaxReadStaleness();
535551

536552
void TryRegisterMediatorTimeCast();

ydb/core/tx/columnshard/columnshard_schema.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,10 @@ struct Schema : NIceDb::Schema {
8383
LastCompletedTxId = 14,
8484
LastNormalizerSequentialId = 15,
8585
GCBarrierPreparationGen = 16,
86-
GCBarrierPreparationStep = 17
86+
GCBarrierPreparationStep = 17,
87+
SubDomainOwnerId = 18,
88+
SubDomainLocalPathId = 19,
89+
SubDomainOutOfSpace = 20
8790
};
8891

8992
enum class EInsertTableIds : ui8 {

ydb/core/tx/columnshard/loading/stages.cpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,25 @@ bool TSpecialValuesInitializer::DoExecute(NTabletFlatExecutor::TTransactionConte
172172
return false;
173173
}
174174

175+
std::optional<ui64> subDomainOwnerId;
176+
std::optional<ui64> subDomainLocalPathId;
177+
if (!Schema::GetSpecialValueOpt(db, Schema::EValueIds::SubDomainOwnerId, subDomainOwnerId)) {
178+
return false;
179+
}
180+
if (!Schema::GetSpecialValueOpt(db, Schema::EValueIds::SubDomainLocalPathId, subDomainLocalPathId)) {
181+
return false;
182+
}
183+
184+
if (subDomainOwnerId.has_value() && subDomainLocalPathId.has_value()) {
185+
Self->SubDomainPathId.emplace(*subDomainOwnerId, *subDomainLocalPathId);
186+
}
187+
188+
ui64 outOfSpace = 0;
189+
if (!Schema::GetSpecialValueOpt(db, Schema::EValueIds::SubDomainOutOfSpace, outOfSpace)) {
190+
return false;
191+
}
192+
Self->SubDomainOutOfSpace = outOfSpace;
193+
175194
{
176195
ui64 lastCompletedStep = 0;
177196
ui64 lastCompletedTx = 0;

ydb/core/tx/datashard/datashard__init.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,10 @@ void TDataShard::TTxInit::Complete(const TActorContext &ctx) {
104104
// Find subdomain path id if needed
105105
if (Self->State == TShardState::Ready) {
106106
if (Self->SubDomainPathId) {
107+
AFL_DEBUG(NKikimrServices::TX_DATASHARD)("event", "start_watching_subdomain");
107108
Self->StartWatchingSubDomainPathId();
108109
} else {
110+
AFL_DEBUG(NKikimrServices::TX_DATASHARD)("event", "start_find_subdomain");
109111
Self->StartFindSubDomainPathId();
110112
}
111113
}

ydb/core/tx/datashard/datashard_subdomain_path_id.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ class TDataShard::TTxPersistSubDomainPathId : public NTabletFlatExecutor::TTrans
5656
};
5757

5858
void TDataShard::Handle(NSchemeShard::TEvSchemeShard::TEvSubDomainPathIdFound::TPtr& ev, const TActorContext& ctx) {
59+
AFL_DEBUG(NKikimrServices::TX_DATASHARD)("event", "subdomain_found");
5960
const auto* msg = ev->Get();
6061

6162
if (FindSubDomainPathIdActor == ev->Sender) {

ydb/core/tx/schemeshard/olap/operations/create_store.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,9 @@ class TConfigureParts: public TSubOperationState {
100100
ui64(OperationId.GetTxId()),
101101
columnShardTxBody, seqNo,
102102
context.SS->SelectProcessingParams(txState->TargetPathId));
103+
if (const ui64 subDomainPathId = context.SS->ResolvePathIdForDomain(txState->TargetPathId).LocalPathId) {
104+
event->Record.SetSubDomainPathId(subDomainPathId);
105+
}
103106

104107
context.OnComplete.BindMsgToPipe(OperationId, tabletId, shard.Idx, event.release());
105108
} else {

ydb/core/tx/schemeshard/olap/operations/create_table.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,9 @@ class TConfigureParts: public TSubOperationState {
317317
ui64(OperationId.GetTxId()),
318318
columnShardTxBody, seqNo,
319319
context.SS->SelectProcessingParams(txState->TargetPathId));
320-
320+
if (const ui64 subDomainPathId = context.SS->ResolvePathIdForDomain(txState->TargetPathId).LocalPathId) {
321+
event->Record.SetSubDomainPathId(subDomainPathId);
322+
}
321323
context.OnComplete.BindMsgToPipe(OperationId, tabletId, shard.Idx, event.release());
322324
} else {
323325
LOG_ERROR_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, DebugHint() << " unexpected tablet type");

ydb/core/tx/schemeshard/schemeshard_info_types.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ bool TSubDomainInfo::CheckDiskSpaceQuotas(IQuotaCounters* counters) {
158158
// they are not accounted for when we make a decision to change the state of the subdomain.
159159
ui64 totalUsage = TotalDiskSpaceUsage();
160160

161+
LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::FLAT_TX_SCHEMESHARD, "Total usage: " << totalUsage << " soft quota: " << quotas.SoftQuota << " hard quota: " << quotas.HardQuota);
161162
// If a quota is equal to zero, then it sets no limit on the disk space usage.
162163
const bool isHardQuotaExceeded = quotas.HardQuota && totalUsage > quotas.HardQuota;
163164
const bool isTotalUsageBelowSoftQuota = !quotas.SoftQuota || totalUsage < quotas.SoftQuota;

0 commit comments

Comments
 (0)