Skip to content

Commit 9e18b4b

Browse files
don't start actualization tasks construction so often (#5108)
1 parent 4ca8675 commit 9e18b4b

File tree

9 files changed

+60
-29
lines changed

9 files changed

+60
-29
lines changed

ydb/core/tx/columnshard/engines/column_engine_logs.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -413,14 +413,15 @@ std::vector<std::shared_ptr<TTTLColumnEngineChanges>> TColumnEngineForLogs::Star
413413

414414
TSaverContext saverContext(StoragesManager);
415415
NActualizer::TTieringProcessContext context(memoryUsageLimit, saverContext, dataLocksManager, SignalCounters, ActualizationController);
416+
const TDuration actualizationLag = NYDBTest::TControllers::GetColumnShardController()->GetActualizationTasksLag(TDuration::Seconds(1));
416417
for (auto&& i : pathEviction) {
417418
auto g = GetGranuleOptional(i.first);
418419
if (g) {
419420
if (!ActualizationStarted) {
420421
g->StartActualizationIndex();
421422
}
422423
g->RefreshTiering(i.second);
423-
g->BuildActualizationTasks(context);
424+
g->BuildActualizationTasks(context, actualizationLag);
424425
}
425426
}
426427

@@ -430,7 +431,7 @@ std::vector<std::shared_ptr<TTTLColumnEngineChanges>> TColumnEngineForLogs::Star
430431
if (pathEviction.contains(i.first)) {
431432
continue;
432433
}
433-
i.second->BuildActualizationTasks(context);
434+
i.second->BuildActualizationTasks(context, actualizationLag);
434435
}
435436
} else {
436437
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "StartTtl")("skip", "not_ready_tiers");

ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.cpp

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -105,12 +105,16 @@ void TTieringActualizer::DoRemovePortion(const ui64 portionId) {
105105
void TTieringActualizer::DoExtractTasks(TTieringProcessContext& tasksContext, const TExternalTasksContext& externalContext, TInternalTasksContext& /*internalContext*/) {
106106
THashSet<ui64> portionIds;
107107
for (auto&& [address, addressPortions] : PortionIdByWaitDuration) {
108+
if (addressPortions.GetPortions().size() && tasksContext.Now - StartInstant < addressPortions.GetPortions().begin()->first) {
109+
Counters.SkipEvictionForLimit->Add(1);
110+
continue;
111+
}
108112
if (!tasksContext.IsRWAddressAvailable(address)) {
109113
Counters.SkipEvictionForLimit->Add(1);
110114
continue;
111115
}
112116
for (auto&& [duration, portions] : addressPortions.GetPortions()) {
113-
if (duration - (tasksContext.Now - StartInstant) > TDuration::Zero()) {
117+
if (tasksContext.Now - StartInstant < duration) {
114118
break;
115119
}
116120
bool limitEnriched = false;
@@ -140,28 +144,29 @@ void TTieringActualizer::DoExtractTasks(TTieringProcessContext& tasksContext, co
140144
}
141145
}
142146
}
147+
if (portionIds.size()) {
148+
ui64 waitDurationEvict = 0;
149+
ui64 waitQueueEvict = 0;
150+
ui64 waitDurationDelete = 0;
151+
ui64 waitQueueDelete = 0;
152+
for (auto&& i : PortionIdByWaitDuration) {
153+
std::shared_ptr<NColumnShard::TValueAggregationClient> waitDurationSignal;
154+
std::shared_ptr<NColumnShard::TValueAggregationClient> queueSizeSignal;
155+
if (i.first.WriteIs(NTiering::NCommon::DeleteTierName)) {
156+
i.second.CorrectSignals(waitQueueDelete, waitDurationDelete, tasksContext.Now - StartInstant);
157+
} else {
158+
i.second.CorrectSignals(waitQueueEvict, waitDurationEvict, tasksContext.Now - StartInstant);
159+
}
160+
}
161+
Counters.DifferenceWaitToDelete->SetValue(waitDurationDelete);
162+
Counters.DifferenceWaitToEvict->SetValue(waitDurationEvict);
163+
Counters.QueueSizeToDelete->SetValue(waitQueueDelete);
164+
Counters.QueueSizeToEvict->SetValue(waitQueueEvict);
165+
}
143166
for (auto&& i : portionIds) {
144167
RemovePortion(i);
145168
}
146169

147-
ui64 waitDurationEvict = 0;
148-
ui64 waitQueueEvict = 0;
149-
ui64 waitDurationDelete = 0;
150-
ui64 waitQueueDelete = 0;
151-
for (auto&& i : PortionIdByWaitDuration) {
152-
std::shared_ptr<NColumnShard::TValueAggregationClient> waitDurationSignal;
153-
std::shared_ptr<NColumnShard::TValueAggregationClient> queueSizeSignal;
154-
if (i.first.WriteIs(NTiering::NCommon::DeleteTierName)) {
155-
i.second.CorrectSignals(waitQueueDelete, waitDurationDelete, tasksContext.Now - StartInstant);
156-
} else {
157-
i.second.CorrectSignals(waitQueueEvict, waitDurationEvict, tasksContext.Now - StartInstant);
158-
}
159-
}
160-
Counters.DifferenceWaitToDelete->SetValue(waitDurationDelete);
161-
Counters.DifferenceWaitToEvict->SetValue(waitDurationEvict);
162-
Counters.QueueSizeToDelete->SetValue(waitQueueDelete);
163-
Counters.QueueSizeToEvict->SetValue(waitQueueEvict);
164-
165170
}
166171

167172
void TTieringActualizer::Refresh(const std::optional<TTiering>& info, const TAddExternalContext& externalContext) {

ydb/core/tx/columnshard/engines/storage/granule.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
#include <ydb/library/actors/core/log.h>
66
#include <ydb/core/tx/columnshard/columnshard_schema.h>
7+
#include <ydb/core/tx/columnshard/engines/changes/actualization/construction/context.h>
78

89
namespace NKikimr::NOlap {
910

@@ -157,4 +158,13 @@ std::shared_ptr<TPortionInfo> TGranuleMeta::UpsertPortionOnLoad(TPortionInfo&& p
157158
return emplaceInfo.first->second;
158159
}
159160

161+
void TGranuleMeta::BuildActualizationTasks(NActualizer::TTieringProcessContext& context, const TDuration actualizationLag) const {
162+
if (context.Now - LastActualizations < actualizationLag) {
163+
return;
164+
}
165+
NActualizer::TExternalTasksContext extTasks(Portions);
166+
ActualizationIndex->ExtractActualizationTasks(context, extTasks);
167+
LastActualizations = context.Now;
168+
}
169+
160170
} // namespace NKikimr::NOlap

ydb/core/tx/columnshard/engines/storage/granule.h

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ class TGranuleMeta: TNonCopyable {
150150
std::shared_ptr<TGranulesStat> Stats;
151151
std::shared_ptr<NStorageOptimizer::IOptimizerPlanner> OptimizerPlanner;
152152
std::shared_ptr<NActualizer::TGranuleActualizationIndex> ActualizationIndex;
153+
mutable TInstant LastActualizations = TInstant::Zero();
153154
std::map<NArrow::TReplaceKey, THashMap<ui64, std::shared_ptr<TPortionInfo>>> PortionsByPK;
154155

155156
void OnBeforeChangePortion(const std::shared_ptr<TPortionInfo> portionBefore);
@@ -197,10 +198,7 @@ class TGranuleMeta: TNonCopyable {
197198
LastCompactionInstant = TMonotonic::Now();
198199
}
199200

200-
void BuildActualizationTasks(NActualizer::TTieringProcessContext& context) const {
201-
NActualizer::TExternalTasksContext extTasks(Portions);
202-
ActualizationIndex->ExtractActualizationTasks(context, extTasks);
203-
}
201+
void BuildActualizationTasks(NActualizer::TTieringProcessContext& context, const TDuration actualizationLag) const;
204202

205203
std::shared_ptr<TColumnEngineChanges> GetOptimizationTask(std::shared_ptr<TGranuleMeta> self, const std::shared_ptr<NDataLocks::TManager>& locksManager) const {
206204
return OptimizerPlanner->GetOptimizationTask(self, locksManager);
@@ -248,8 +246,8 @@ class TGranuleMeta: TNonCopyable {
248246
return OptimizerPlanner->IsLocked(dataLocksManager);
249247
}
250248

251-
void ActualizeOptimizer(const TInstant currentInstant) const {
252-
if (currentInstant - OptimizerPlanner->GetActualizationInstant() >= NYDBTest::TControllers::GetColumnShardController()->GetCompactionActualizationLag(TDuration::Seconds(1))) {
249+
void ActualizeOptimizer(const TInstant currentInstant, const TDuration recalcLag) const {
250+
if (currentInstant - OptimizerPlanner->GetActualizationInstant() >= recalcLag) {
253251
OptimizerPlanner->Actualize(currentInstant);
254252
}
255253
}

ydb/core/tx/columnshard/engines/storage/storage.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@ std::shared_ptr<NKikimr::NOlap::TGranuleMeta> TGranulesStorage::GetGranuleForCom
88
std::map<NStorageOptimizer::TOptimizationPriority, std::shared_ptr<TGranuleMeta>> granulesSorted;
99
ui32 countChecker = 0;
1010
std::optional<NStorageOptimizer::TOptimizationPriority> priorityChecker;
11+
const TDuration actualizationLag = NYDBTest::TControllers::GetColumnShardController()->GetCompactionActualizationLag(TDuration::Seconds(1));
1112
for (auto&& i : Tables) {
1213
NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("path_id", i.first);
13-
i.second->ActualizeOptimizer(now);
14+
i.second->ActualizeOptimizer(now, actualizationLag);
1415
auto gPriority = i.second->GetCompactionPriority();
1516
if (gPriority.IsZero() || (priorityChecker && gPriority < *priorityChecker)) {
1617
continue;

ydb/core/tx/columnshard/hooks/abstract/abstract.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@ class ICSController {
104104
return def;
105105
}
106106

107+
virtual TDuration GetActualizationTasksLag(const TDuration d) const {
108+
return d;
109+
}
110+
107111
virtual ui64 GetReduceMemoryIntervalLimit(const ui64 def) const {
108112
return def;
109113
}

ydb/core/tx/columnshard/hooks/testing/controller.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ class TController: public TReadOnlyController {
1818
YDB_ACCESSOR(std::optional<TDuration>, StatsReportInterval, std::nullopt);
1919
YDB_ACCESSOR(std::optional<ui64>, GuaranteeIndexationStartBytesLimit, 0);
2020
YDB_ACCESSOR(std::optional<TDuration>, OptimizerFreshnessCheckDuration, TDuration::Zero());
21+
YDB_ACCESSOR_DEF(std::optional<TDuration>, CompactionActualizationLag);
22+
YDB_ACCESSOR_DEF(std::optional<TDuration>, TasksActualizationLag);
2123
EOptimizerCompactionWeightControl CompactionControl = EOptimizerCompactionWeightControl::Force;
2224

2325
YDB_ACCESSOR(std::optional<ui64>, OverrideReduceMemoryIntervalLimit, 1024);
@@ -131,11 +133,20 @@ class TController: public TReadOnlyController {
131133
return LagForCompactionBeforeTierings.value_or(def);
132134
}
133135

136+
virtual TDuration GetCompactionActualizationLag(const TDuration def) const override {
137+
return CompactionActualizationLag.value_or(def);
138+
}
139+
140+
134141
virtual bool IsBackgroundEnabled(const EBackground id) const override {
135142
TGuard<TMutex> g(Mutex);
136143
return !DisabledBackgrounds.contains(id);
137144
}
138145

146+
virtual TDuration GetActualizationTasksLag(const TDuration d) const override {
147+
return TasksActualizationLag.value_or(d);
148+
}
149+
139150
virtual void DoOnTabletInitCompleted(const ::NKikimr::NColumnShard::TColumnShard& shard) override;
140151
virtual void DoOnTabletStopped(const ::NKikimr::NColumnShard::TColumnShard& shard) override;
141152
virtual void DoOnAfterGCAction(const ::NKikimr::NColumnShard::TColumnShard& shard, const NOlap::IBlobsGCAction& action) override;

ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -615,7 +615,6 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString
615615
auto csControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<TDisableCompactionController>();
616616
TTestBasicRuntime runtime;
617617
TTester::Setup(runtime);
618-
auto csDefaultControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<TDefaultTestsController>();
619618

620619
runtime.SetLogPriority(NKikimrServices::BLOB_CACHE, NActors::NLog::PRI_DEBUG);
621620
runtime.SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG);

ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
161161
{
162162
auto csControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
163163
csControllerGuard->DisableBackground(NKikimr::NYDBTest::ICSController::EBackground::Compaction);
164+
csControllerGuard->SetTasksActualizationLag(TDuration::Zero());
164165
std::vector<ui64> ts = {1600000000, 1620000000};
165166

166167
ui32 ttlIncSeconds = 1;
@@ -512,6 +513,7 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt
512513

513514
auto csControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
514515
csControllerGuard->DisableBackground(NYDBTest::ICSController::EBackground::TTL);
516+
csControllerGuard->SetTasksActualizationLag(TDuration::Zero());
515517
TTestBasicRuntime runtime;
516518
TTester::Setup(runtime);
517519

0 commit comments

Comments
 (0)