-
Notifications
You must be signed in to change notification settings - Fork 694
Stop writing to column tables if quota is exceeded #11580
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
4a18254
f1a2ced
ce8abf7
4a732fa
fd1e1f0
82e4eae
1c66736
c1811e8
66f9111
cb63e00
6a93448
a2eae61
dfc3129
f590339
603f320
a5b6a55
2464f7f
b58b233
2d98683
1d576da
1d8ff00
c50d30e
cbb8ece
2f63fdf
13e5eb8
193a5af
281d8a3
9eb9855
9e9d5ae
010566f
35ceb15
2d9ddd3
6e3f726
f2ece2a
75501cf
469051d
f853af9
d9a561f
520f594
bfa74fb
ffbb808
a6440bd
8469711
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -203,24 +203,30 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex | |
writeMeta.SetLongTxId(NLongTxService::TLongTxId::FromProto(record.GetLongTxId())); | ||
writeMeta.SetWritePartId(record.GetWritePartId()); | ||
|
||
const auto returnFail = [&](const NColumnShard::ECumulativeCounters signalIndex, const EWriteFailReason reason) { | ||
const auto returnFail = [&](const NColumnShard::ECumulativeCounters signalIndex, const EWriteFailReason reason, NKikimrTxColumnShard::EResultStatus resultStatus) { | ||
Counters.GetTabletCounters()->IncCounter(signalIndex); | ||
|
||
ctx.Send(source, std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR)); | ||
ctx.Send(source, std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, resultStatus)); | ||
Counters.GetCSCounters().OnFailedWriteResponse(reason); | ||
return; | ||
}; | ||
|
||
if (SpaceWatcher->SubDomainOutOfSpace && (!record.HasModificationType() || (record.GetModificationType() != NKikimrTxColumnShard::TEvWrite::OPERATION_DELETE))) { | ||
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_writing")("reason", "quota_exceeded"); | ||
Counters.GetTabletCounters()->IncCounter(COUNTER_OUT_OF_SPACE); | ||
return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::Overload, NKikimrTxColumnShard::EResultStatus::OVERLOADED); | ||
} | ||
|
||
if (!AppDataVerified().ColumnShardConfig.GetWritingEnabled()) { | ||
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "skip_writing")("reason", "disabled"); | ||
return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::Disabled); | ||
return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::Disabled, NKikimrTxColumnShard::EResultStatus::ERROR); | ||
} | ||
|
||
if (!TablesManager.IsReadyForStartWrite(pathId, false)) { | ||
LOG_S_NOTICE("Write (fail) into pathId:" << writeMeta.GetTableId() << (TablesManager.HasPrimaryIndex() ? "" : " no index") | ||
<< " at tablet " << TabletID()); | ||
|
||
return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::NoTable); | ||
return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::NoTable, NKikimrTxColumnShard::EResultStatus::ERROR); | ||
} | ||
|
||
{ | ||
|
@@ -230,7 +236,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex | |
.CheckWriteData(); | ||
if (status.IsFail()) { | ||
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "writing_fail_through_compaction")("reason", status.GetErrorMessage()); | ||
return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::CompactionCriteria); | ||
return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::CompactionCriteria, NKikimrTxColumnShard::EResultStatus::ERROR); | ||
} | ||
} | ||
|
||
|
@@ -239,7 +245,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex | |
if (!arrowData->ParseFromProto(record)) { | ||
LOG_S_ERROR( | ||
"Write (fail) " << record.GetData().size() << " bytes into pathId " << writeMeta.GetTableId() << " at tablet " << TabletID()); | ||
return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::IncorrectSchema); | ||
return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::IncorrectSchema, NKikimrTxColumnShard::EResultStatus::ERROR); | ||
} | ||
|
||
NEvWrite::TWriteData writeData(writeMetaPtr, arrowData, snapshotSchema->GetIndexInfo().GetReplaceKey(), | ||
|
@@ -564,7 +570,11 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor | |
return; | ||
} | ||
|
||
auto overloadStatus = CheckOverloadedImmediate(pathId); | ||
const bool outOfSpace = SpaceWatcher->SubDomainOutOfSpace && (*mType != NEvWrite::EModificationType::Delete); | ||
if (outOfSpace) { | ||
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_writing")("reason", "quota_exceeded")("source", "dataevent"); | ||
aavdonkin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
auto overloadStatus = outOfSpace ? EOverloadStatus::Disk : CheckOverloadedImmediate(pathId); | ||
if (overloadStatus != EOverloadStatus::None) { | ||
std::unique_ptr<NActors::IEventBase> result = NEvents::TDataEvents::TEvWriteResult::BuildError( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. в TEvWriteResult есть оцпия STATUS_DISK_SPACE_EXHAUSTED = 10. Можно посмотреть в DS, какой они флаг возвращают при разных ошибках. Не нужно ли, например, при ChannelYellowStop также возвращать STATUS_DISK_SPACE_EXHAUSTED? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. На каналах - согласен. А при исчерпании квоты? |
||
TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, "overload data error"); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
#include "columnshard_impl.h" | ||
|
||
namespace NKikimr::NColumnShard { | ||
|
||
class TTxPersistSubDomainOutOfSpace : public NTabletFlatExecutor::TTransactionBase<TColumnShard> { | ||
public: | ||
TTxPersistSubDomainOutOfSpace(TColumnShard* self, bool outOfSpace) | ||
: TTransactionBase(self) | ||
, OutOfSpace(outOfSpace) | ||
{ } | ||
|
||
TTxType GetTxType() const override { return TXTYPE_PERSIST_SUBDOMAIN_OUT_OF_SPACE; } | ||
|
||
bool Execute(TTransactionContext& txc, const TActorContext&) override { | ||
NIceDb::TNiceDb db(txc.DB); | ||
|
||
if (Self->SpaceWatcher->SubDomainOutOfSpace != OutOfSpace) { | ||
Schema::SaveSpecialValue(db, Schema::EValueIds::SubDomainOutOfSpace, ui64(OutOfSpace ? 1 : 0)); | ||
Self->SpaceWatcher->SubDomainOutOfSpace = OutOfSpace; | ||
} | ||
|
||
return true; | ||
} | ||
|
||
void Complete(const TActorContext&) override { | ||
// nothing | ||
} | ||
|
||
private: | ||
const bool OutOfSpace; | ||
}; | ||
|
||
class TTxPersistSubDomainPathId : public NTabletFlatExecutor::TTransactionBase<TColumnShard> { | ||
public: | ||
TTxPersistSubDomainPathId(TColumnShard* self, ui64 localPathId) | ||
: TTransactionBase(self) | ||
, LocalPathId(localPathId) | ||
{ } | ||
|
||
TTxType GetTxType() const override { return TXTYPE_PERSIST_SUBDOMAIN_PATH_ID; } | ||
|
||
bool Execute(TTransactionContext& txc, const TActorContext&) override { | ||
if (!Self->SpaceWatcher->SubDomainPathId) { | ||
Self->SpaceWatcher->PersistSubDomainPathId(LocalPathId, txc); | ||
Self->SpaceWatcher->StartWatchingSubDomainPathId(); | ||
} | ||
return true; | ||
} | ||
|
||
void Complete(const TActorContext&) override { | ||
// nothing | ||
} | ||
|
||
private: | ||
const ui64 LocalPathId; | ||
}; | ||
|
||
void TSpaceWatcher::PersistSubDomainPathId(ui64 localPathId, | ||
NTabletFlatExecutor::TTransactionContext &txc) { | ||
SubDomainPathId = localPathId; | ||
NIceDb::TNiceDb db(txc.DB); | ||
Schema::SaveSpecialValue(db, Schema::EValueIds::SubDomainLocalPathId, localPathId); | ||
} | ||
|
||
void TSpaceWatcher::StopWatchingSubDomainPathId() { | ||
if (WatchingSubDomainPathId) { | ||
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchRemove()); | ||
WatchingSubDomainPathId.reset(); | ||
} | ||
} | ||
|
||
void TSpaceWatcher::StartWatchingSubDomainPathId() { | ||
if (!SubDomainPathId) { | ||
return; | ||
} | ||
|
||
if (!WatchingSubDomainPathId) { | ||
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("started_watching_subdomain", *SubDomainPathId); | ||
Self->Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchPathId(TPathId(Self->CurrentSchemeShardId, *SubDomainPathId))); | ||
WatchingSubDomainPathId = *SubDomainPathId; | ||
} | ||
} | ||
|
||
void TSpaceWatcher::Handle(NActors::TEvents::TEvPoison::TPtr& , const TActorContext& ctx) { | ||
Die(ctx); | ||
} | ||
|
||
void TColumnShard::Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& ctx) { | ||
const auto* msg = ev->Get(); | ||
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("notify_subdomain", msg->PathId); | ||
const bool outOfSpace = msg->Result->GetPathDescription() | ||
.GetDomainDescription() | ||
.GetDomainState() | ||
.GetDiskQuotaExceeded(); | ||
|
||
Execute(new TTxPersistSubDomainOutOfSpace(this, outOfSpace), ctx); | ||
} | ||
|
||
static constexpr TDuration MaxFindSubDomainPathIdDelay = TDuration::Minutes(10); | ||
|
||
void TSpaceWatcher::StartFindSubDomainPathId(bool delayFirstRequest) { | ||
if (!FindSubDomainPathIdActor && | ||
aavdonkin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Self->CurrentSchemeShardId != 0 && | ||
(!SubDomainPathId)) | ||
{ | ||
FindSubDomainPathIdActor = Register(CreateFindSubDomainPathIdActor(SelfId(), Self->TabletID(), Self->CurrentSchemeShardId, delayFirstRequest, MaxFindSubDomainPathIdDelay)); | ||
} | ||
} | ||
|
||
|
||
void TSpaceWatcher::Handle(NSchemeShard::TEvSchemeShard::TEvSubDomainPathIdFound::TPtr& ev, const TActorContext& ctx) { | ||
const auto* msg = ev->Get(); | ||
if (FindSubDomainPathIdActor == ev->Sender) { | ||
FindSubDomainPathIdActor = { }; | ||
} | ||
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "subdomain_found")("scheme_shard_id", msg->SchemeShardId)("local_path_id", msg->LocalPathId); | ||
Self->Execute(new TTxPersistSubDomainPathId(Self, msg->LocalPathId), ctx); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
#pragma once | ||
|
||
#include <ydb/library/actors/core/actorid.h> | ||
#include <ydb/core/tx/scheme_cache/scheme_cache.h> | ||
|
||
namespace NKikimr::NColumnShard::NLoading { | ||
class TSpecialValuesInitializer; | ||
}; | ||
|
||
namespace NKikimr::NColumnShard { | ||
|
||
class TSpaceWatcher : public TActorBootstrapped<TSpaceWatcher> { | ||
TColumnShard* Self; | ||
NActors::TActorId FindSubDomainPathIdActor; | ||
std::optional<NKikimr::TLocalPathId> SubDomainPathId; | ||
std::optional<NKikimr::TLocalPathId> WatchingSubDomainPathId; | ||
bool SubDomainOutOfSpace = false; | ||
|
||
public: | ||
friend class TColumnShard; | ||
friend class TTxInit; | ||
friend class TTxPersistSubDomainOutOfSpace; | ||
friend class TTxPersistSubDomainPathId; | ||
friend class NKikimr::NColumnShard::NLoading::TSpecialValuesInitializer; | ||
|
||
public: | ||
TSpaceWatcher(TColumnShard* self) | ||
: Self(self) { | ||
} | ||
|
||
void PersistSubDomainPathId(ui64 localPathId, NTabletFlatExecutor::TTransactionContext &txc); | ||
void StopWatchingSubDomainPathId(); | ||
void StartWatchingSubDomainPathId(); | ||
void StartFindSubDomainPathId(bool delayFirstRequest = true); | ||
|
||
void Bootstrap(const TActorContext& /*ctx*/) { | ||
Become(&TThis::StateWork); | ||
} | ||
|
||
void Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& ctx); | ||
void Handle(NSchemeShard::TEvSchemeShard::TEvSubDomainPathIdFound::TPtr& ev, const TActorContext&); | ||
void Handle(NActors::TEvents::TEvPoison::TPtr& ev, const TActorContext&); | ||
|
||
STFUNC(StateWork) { | ||
switch (ev->GetTypeRewrite()) { | ||
HFunc(NSchemeShard::TEvSchemeShard::TEvSubDomainPathIdFound, Handle); | ||
HFunc(NActors::TEvents::TEvPoison, Handle); | ||
default: | ||
LOG_S_WARN("TSpaceWatcher.StateWork at " << " unhandled event type: " << ev->GetTypeName() | ||
<< " event: " << ev->ToString()); | ||
break; | ||
} | ||
} | ||
}; | ||
|
||
} |
Uh oh!
There was an error while loading. Please reload this page.