Skip to content

Commit 9e3f068

Browse files
fix actualization(ttl, tiering, sync) tasks count limit usage (#3234)
1 parent ad07cba commit 9e3f068

File tree

5 files changed

+37
-8
lines changed

5 files changed

+37
-8
lines changed

ydb/core/tx/columnshard/columnshard_impl.cpp

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -595,7 +595,7 @@ class TChangesReadTask: public NOlap::NBlobOperations::NRead::ITask {
595595
}
596596
}
597597
virtual bool DoOnError(const TString& storageId, const NOlap::TBlobRange& range, const NOlap::IBlobsReadingAction::TErrorStatus& status) override {
598-
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "DoOnError")("storage_id", storageId)("blob_id", range)("status", status.GetErrorMessage())("status_code", status.GetStatus());
598+
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "DoOnError")("storage_id", storageId)("blob_id", range)("status", status.GetErrorMessage())("status_code", status.GetStatus());
599599
AFL_VERIFY(false)("blob_id", range)("status", status.GetStatus())("error", status.GetErrorMessage())("type", TxEvent->IndexChanges->TypeString())("task_id", TxEvent->IndexChanges->GetTaskIdentifier())
600600
("debug", TxEvent->IndexChanges->DebugString());
601601
TxEvent->SetPutStatus(NKikimrProto::ERROR);
@@ -614,6 +614,27 @@ class TChangesReadTask: public NOlap::NBlobOperations::NRead::ITask {
614614
}
615615
};
616616

617+
class TInsertChangesReadTask: public TChangesReadTask, public TMonitoringObjectsCounter<TInsertChangesReadTask> {
618+
private:
619+
using TBase = TChangesReadTask;
620+
public:
621+
using TBase::TBase;
622+
};
623+
624+
class TCompactChangesReadTask: public TChangesReadTask, public TMonitoringObjectsCounter<TCompactChangesReadTask> {
625+
private:
626+
using TBase = TChangesReadTask;
627+
public:
628+
using TBase::TBase;
629+
};
630+
631+
class TTTLChangesReadTask: public TChangesReadTask, public TMonitoringObjectsCounter<TTTLChangesReadTask> {
632+
private:
633+
using TBase = TChangesReadTask;
634+
public:
635+
using TBase::TBase;
636+
};
637+
617638
void TColumnShard::StartIndexTask(std::vector<const NOlap::TInsertedData*>&& dataToIndex, const i64 bytesToIndex) {
618639
CSCounters.IndexationInput(bytesToIndex);
619640

@@ -637,7 +658,7 @@ void TColumnShard::StartIndexTask(std::vector<const NOlap::TInsertedData*>&& dat
637658

638659
NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(
639660
ResourceSubscribeActor, std::make_shared<NOlap::NBlobOperations::NRead::ITask::TReadSubscriber>(
640-
std::make_shared<TChangesReadTask>(std::move(ev), SelfId(), TabletID(), IndexationCounters), 0, indexChanges->CalcMemoryForUsage(), externalTaskId, InsertTaskSubscription));
661+
std::make_shared<TInsertChangesReadTask>(std::move(ev), SelfId(), TabletID(), IndexationCounters), 0, indexChanges->CalcMemoryForUsage(), externalTaskId, InsertTaskSubscription));
641662
}
642663

643664
void TColumnShard::SetupIndexation() {
@@ -714,7 +735,7 @@ void TColumnShard::SetupCompaction() {
714735

715736
NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(
716737
ResourceSubscribeActor, std::make_shared<NOlap::NBlobOperations::NRead::ITask::TReadSubscriber>(
717-
std::make_shared<TChangesReadTask>(std::move(ev), SelfId(), TabletID(), CompactionCounters), 0, indexChanges->CalcMemoryForUsage(), externalTaskId, CompactTaskSubscription));
738+
std::make_shared<TCompactChangesReadTask>(std::move(ev), SelfId(), TabletID(), CompactionCounters), 0, indexChanges->CalcMemoryForUsage(), externalTaskId, CompactTaskSubscription));
718739
}
719740

720741
LOG_S_DEBUG("ActiveCompactions: " << BackgroundController.GetCompactionsCount() << " at tablet " << TabletID());
@@ -748,7 +769,7 @@ bool TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls) {
748769
if (needWrites) {
749770
NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(
750771
ResourceSubscribeActor, std::make_shared<NOlap::NBlobOperations::NRead::ITask::TReadSubscriber>(
751-
std::make_shared<TChangesReadTask>(std::move(ev), SelfId(), TabletID(), CompactionCounters), 0, i->CalcMemoryForUsage(), externalTaskId, TTLTaskSubscription));
772+
std::make_shared<TTTLChangesReadTask>(std::move(ev), SelfId(), TabletID(), CompactionCounters), 0, i->CalcMemoryForUsage(), externalTaskId, TTLTaskSubscription));
752773
} else {
753774
ev->SetPutStatus(NKikimrProto::OK);
754775
ActorContext().Send(SelfId(), std::move(ev));

ydb/core/tx/columnshard/engines/changes/actualization/controller/controller.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ class TController {
1111

1212
public:
1313
void StartActualization(const NActualizer::TRWAddress& address) {
14-
++ActualizationsInProgress[address];
14+
AFL_VERIFY(++ActualizationsInProgress[address] <= (i32)GetLimitForAddress(address));
1515
}
1616

1717
void FinishActualization(const NActualizer::TRWAddress& address) {
@@ -23,7 +23,7 @@ class TController {
2323
if (it == ActualizationsInProgress.end()) {
2424
return readyTemporaryTasks < GetLimitForAddress(address);
2525
} else {
26-
return readyTemporaryTasks < GetLimitForAddress(address) + it->second;
26+
return it->second + readyTemporaryTasks < GetLimitForAddress(address);
2727
}
2828
}
2929
};

ydb/core/tx/columnshard/engines/insert_table/data.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ class TCommittedBlob {
153153
, Last(last)
154154
{}
155155

156-
/// It uses trick then we place key wtih planStep:txId in container and find them later by BlobId only.
156+
/// It uses trick then we place key with planStep:txId in container and find them later by BlobId only.
157157
/// So hash() and equality should depend on BlobId only.
158158
bool operator == (const TCommittedBlob& key) const { return BlobRange == key.BlobRange; }
159159
ui64 Hash() const noexcept { return BlobRange.Hash(); }

ydb/core/tx/columnshard/engines/storage/actualizer/tiering/counters.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ class TTieringGlobalCounters: public NColumnShard::TCommonCountersOwner {
1616
std::shared_ptr<NColumnShard::TValueAggregationAgent> DifferenceWaitToEvict;
1717
std::shared_ptr<NColumnShard::TValueAggregationAgent> DifferenceWaitToDelete;
1818
NMonitoring::TDynamicCounters::TCounterPtr SkipEvictionForCompaction;
19+
NMonitoring::TDynamicCounters::TCounterPtr SkipEvictionForLimit;
1920
public:
2021
TTieringGlobalCounters()
2122
: TBase("TieringActualizer")
@@ -25,6 +26,11 @@ class TTieringGlobalCounters: public NColumnShard::TCommonCountersOwner {
2526
DifferenceWaitToEvict = TBase::GetValueAutoAggregations("Granule/Eviction/WaitingInSeconds");
2627
DifferenceWaitToDelete = TBase::GetValueAutoAggregations("Granule/Deletion/WaitingInSeconds");
2728
SkipEvictionForCompaction = TBase::GetDeriviative("Eviction/SkipForCompaction");
29+
SkipEvictionForLimit = TBase::GetDeriviative("Eviction/SkipForLimit");
30+
}
31+
32+
static NMonitoring::TDynamicCounters::TCounterPtr GetSkipEvictionForLimit() {
33+
return Singleton<TTieringGlobalCounters>()->SkipEvictionForLimit;
2834
}
2935

3036
static NMonitoring::TDynamicCounters::TCounterPtr GetSkipEvictionForCompaction() {
@@ -56,14 +62,15 @@ class TTieringCounters {
5662
const std::shared_ptr<NColumnShard::TValueAggregationClient> DifferenceWaitToEvict;
5763
const std::shared_ptr<NColumnShard::TValueAggregationClient> DifferenceWaitToDelete;
5864
const NMonitoring::TDynamicCounters::TCounterPtr SkipEvictionForCompaction;
65+
const NMonitoring::TDynamicCounters::TCounterPtr SkipEvictionForLimit;
5966

6067
TTieringCounters()
6168
: QueueSizeToEvict(TTieringGlobalCounters::BuildQueueSizeToEvict())
6269
, QueueSizeToDelete(TTieringGlobalCounters::BuildQueueSizeToDelete())
6370
, DifferenceWaitToEvict(TTieringGlobalCounters::BuildDifferenceWaitToEvict())
6471
, DifferenceWaitToDelete(TTieringGlobalCounters::BuildDifferenceWaitToDelete())
6572
, SkipEvictionForCompaction(TTieringGlobalCounters::GetSkipEvictionForCompaction())
66-
{
73+
, SkipEvictionForLimit(TTieringGlobalCounters::GetSkipEvictionForLimit()) {
6774
}
6875

6976
};

ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ void TTieringActualizer::DoExtractTasks(TTieringProcessContext& tasksContext, co
106106
THashSet<ui64> portionIds;
107107
for (auto&& [address, addressPortions] : PortionIdByWaitDuration) {
108108
if (!tasksContext.IsRWAddressAvailable(address)) {
109+
Counters.SkipEvictionForLimit->Add(1);
109110
continue;
110111
}
111112
for (auto&& [duration, portions] : addressPortions.GetPortions()) {

0 commit comments

Comments
 (0)