Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions ydb/core/tx/columnshard/engines/column_engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,14 +413,15 @@ std::vector<std::shared_ptr<TTTLColumnEngineChanges>> TColumnEngineForLogs::Star

TSaverContext saverContext(StoragesManager);
NActualizer::TTieringProcessContext context(memoryUsageLimit, saverContext, dataLocksManager, SignalCounters, ActualizationController);
const TDuration actualizationLag = NYDBTest::TControllers::GetColumnShardController()->GetActualizationTasksLag(TDuration::Seconds(1));
for (auto&& i : pathEviction) {
auto g = GetGranuleOptional(i.first);
if (g) {
if (!ActualizationStarted) {
g->StartActualizationIndex();
}
g->RefreshTiering(i.second);
g->BuildActualizationTasks(context);
g->BuildActualizationTasks(context, actualizationLag);
}
}

Expand All @@ -430,7 +431,7 @@ std::vector<std::shared_ptr<TTTLColumnEngineChanges>> TColumnEngineForLogs::Star
if (pathEviction.contains(i.first)) {
continue;
}
i.second->BuildActualizationTasks(context);
i.second->BuildActualizationTasks(context, actualizationLag);
}
} else {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "StartTtl")("skip", "not_ready_tiers");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,16 @@ void TTieringActualizer::DoRemovePortion(const ui64 portionId) {
void TTieringActualizer::DoExtractTasks(TTieringProcessContext& tasksContext, const TExternalTasksContext& externalContext, TInternalTasksContext& /*internalContext*/) {
THashSet<ui64> portionIds;
for (auto&& [address, addressPortions] : PortionIdByWaitDuration) {
if (addressPortions.GetPortions().size() && tasksContext.Now - StartInstant < addressPortions.GetPortions().begin()->first) {
Counters.SkipEvictionForLimit->Add(1);
continue;
}
if (!tasksContext.IsRWAddressAvailable(address)) {
Counters.SkipEvictionForLimit->Add(1);
continue;
}
for (auto&& [duration, portions] : addressPortions.GetPortions()) {
if (duration - (tasksContext.Now - StartInstant) > TDuration::Zero()) {
if (tasksContext.Now - StartInstant < duration) {
break;
}
bool limitEnriched = false;
Expand Down Expand Up @@ -140,28 +144,29 @@ void TTieringActualizer::DoExtractTasks(TTieringProcessContext& tasksContext, co
}
}
}
if (portionIds.size()) {
ui64 waitDurationEvict = 0;
ui64 waitQueueEvict = 0;
ui64 waitDurationDelete = 0;
ui64 waitQueueDelete = 0;
for (auto&& i : PortionIdByWaitDuration) {
std::shared_ptr<NColumnShard::TValueAggregationClient> waitDurationSignal;
std::shared_ptr<NColumnShard::TValueAggregationClient> queueSizeSignal;
if (i.first.WriteIs(NTiering::NCommon::DeleteTierName)) {
i.second.CorrectSignals(waitQueueDelete, waitDurationDelete, tasksContext.Now - StartInstant);
} else {
i.second.CorrectSignals(waitQueueEvict, waitDurationEvict, tasksContext.Now - StartInstant);
}
}
Counters.DifferenceWaitToDelete->SetValue(waitDurationDelete);
Counters.DifferenceWaitToEvict->SetValue(waitDurationEvict);
Counters.QueueSizeToDelete->SetValue(waitQueueDelete);
Counters.QueueSizeToEvict->SetValue(waitQueueEvict);
}
for (auto&& i : portionIds) {
RemovePortion(i);
}

ui64 waitDurationEvict = 0;
ui64 waitQueueEvict = 0;
ui64 waitDurationDelete = 0;
ui64 waitQueueDelete = 0;
for (auto&& i : PortionIdByWaitDuration) {
std::shared_ptr<NColumnShard::TValueAggregationClient> waitDurationSignal;
std::shared_ptr<NColumnShard::TValueAggregationClient> queueSizeSignal;
if (i.first.WriteIs(NTiering::NCommon::DeleteTierName)) {
i.second.CorrectSignals(waitQueueDelete, waitDurationDelete, tasksContext.Now - StartInstant);
} else {
i.second.CorrectSignals(waitQueueEvict, waitDurationEvict, tasksContext.Now - StartInstant);
}
}
Counters.DifferenceWaitToDelete->SetValue(waitDurationDelete);
Counters.DifferenceWaitToEvict->SetValue(waitDurationEvict);
Counters.QueueSizeToDelete->SetValue(waitQueueDelete);
Counters.QueueSizeToEvict->SetValue(waitQueueEvict);

}

void TTieringActualizer::Refresh(const std::optional<TTiering>& info, const TAddExternalContext& externalContext) {
Expand Down
10 changes: 10 additions & 0 deletions ydb/core/tx/columnshard/engines/storage/granule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include <ydb/library/actors/core/log.h>
#include <ydb/core/tx/columnshard/columnshard_schema.h>
#include <ydb/core/tx/columnshard/engines/changes/actualization/construction/context.h>

namespace NKikimr::NOlap {

Expand Down Expand Up @@ -157,4 +158,13 @@ std::shared_ptr<TPortionInfo> TGranuleMeta::UpsertPortionOnLoad(TPortionInfo&& p
return emplaceInfo.first->second;
}

void TGranuleMeta::BuildActualizationTasks(NActualizer::TTieringProcessContext& context, const TDuration actualizationLag) const {
if (context.Now - LastActualizations < actualizationLag) {
return;
}
NActualizer::TExternalTasksContext extTasks(Portions);
ActualizationIndex->ExtractActualizationTasks(context, extTasks);
LastActualizations = context.Now;
}

} // namespace NKikimr::NOlap
10 changes: 4 additions & 6 deletions ydb/core/tx/columnshard/engines/storage/granule.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ class TGranuleMeta: TNonCopyable {
std::shared_ptr<TGranulesStat> Stats;
std::shared_ptr<NStorageOptimizer::IOptimizerPlanner> OptimizerPlanner;
std::shared_ptr<NActualizer::TGranuleActualizationIndex> ActualizationIndex;
mutable TInstant LastActualizations = TInstant::Zero();
std::map<NArrow::TReplaceKey, THashMap<ui64, std::shared_ptr<TPortionInfo>>> PortionsByPK;

void OnBeforeChangePortion(const std::shared_ptr<TPortionInfo> portionBefore);
Expand Down Expand Up @@ -197,10 +198,7 @@ class TGranuleMeta: TNonCopyable {
LastCompactionInstant = TMonotonic::Now();
}

void BuildActualizationTasks(NActualizer::TTieringProcessContext& context) const {
NActualizer::TExternalTasksContext extTasks(Portions);
ActualizationIndex->ExtractActualizationTasks(context, extTasks);
}
void BuildActualizationTasks(NActualizer::TTieringProcessContext& context, const TDuration actualizationLag) const;

std::shared_ptr<TColumnEngineChanges> GetOptimizationTask(std::shared_ptr<TGranuleMeta> self, const std::shared_ptr<NDataLocks::TManager>& locksManager) const {
return OptimizerPlanner->GetOptimizationTask(self, locksManager);
Expand Down Expand Up @@ -248,8 +246,8 @@ class TGranuleMeta: TNonCopyable {
return OptimizerPlanner->IsLocked(dataLocksManager);
}

void ActualizeOptimizer(const TInstant currentInstant) const {
if (currentInstant - OptimizerPlanner->GetActualizationInstant() >= NYDBTest::TControllers::GetColumnShardController()->GetCompactionActualizationLag(TDuration::Seconds(1))) {
void ActualizeOptimizer(const TInstant currentInstant, const TDuration recalcLag) const {
if (currentInstant - OptimizerPlanner->GetActualizationInstant() >= recalcLag) {
OptimizerPlanner->Actualize(currentInstant);
}
}
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/columnshard/engines/storage/storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ std::shared_ptr<NKikimr::NOlap::TGranuleMeta> TGranulesStorage::GetGranuleForCom
std::map<NStorageOptimizer::TOptimizationPriority, std::shared_ptr<TGranuleMeta>> granulesSorted;
ui32 countChecker = 0;
std::optional<NStorageOptimizer::TOptimizationPriority> priorityChecker;
const TDuration actualizationLag = NYDBTest::TControllers::GetColumnShardController()->GetCompactionActualizationLag(TDuration::Seconds(1));
for (auto&& i : Tables) {
NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("path_id", i.first);
i.second->ActualizeOptimizer(now);
i.second->ActualizeOptimizer(now, actualizationLag);
auto gPriority = i.second->GetCompactionPriority();
if (gPriority.IsZero() || (priorityChecker && gPriority < *priorityChecker)) {
continue;
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/hooks/abstract/abstract.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ class ICSController {
return def;
}

virtual TDuration GetActualizationTasksLag(const TDuration d) const {
return d;
}

virtual ui64 GetReduceMemoryIntervalLimit(const ui64 def) const {
return def;
}
Expand Down
11 changes: 11 additions & 0 deletions ydb/core/tx/columnshard/hooks/testing/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ class TController: public TReadOnlyController {
YDB_ACCESSOR(std::optional<TDuration>, StatsReportInterval, std::nullopt);
YDB_ACCESSOR(std::optional<ui64>, GuaranteeIndexationStartBytesLimit, 0);
YDB_ACCESSOR(std::optional<TDuration>, OptimizerFreshnessCheckDuration, TDuration::Zero());
YDB_ACCESSOR_DEF(std::optional<TDuration>, CompactionActualizationLag);
YDB_ACCESSOR_DEF(std::optional<TDuration>, TasksActualizationLag);
EOptimizerCompactionWeightControl CompactionControl = EOptimizerCompactionWeightControl::Force;

YDB_ACCESSOR(std::optional<ui64>, OverrideReduceMemoryIntervalLimit, 1024);
Expand Down Expand Up @@ -131,11 +133,20 @@ class TController: public TReadOnlyController {
return LagForCompactionBeforeTierings.value_or(def);
}

virtual TDuration GetCompactionActualizationLag(const TDuration def) const override {
return CompactionActualizationLag.value_or(def);
}


virtual bool IsBackgroundEnabled(const EBackground id) const override {
TGuard<TMutex> g(Mutex);
return !DisabledBackgrounds.contains(id);
}

virtual TDuration GetActualizationTasksLag(const TDuration d) const override {
return TasksActualizationLag.value_or(d);
}

virtual void DoOnTabletInitCompleted(const ::NKikimr::NColumnShard::TColumnShard& shard) override;
virtual void DoOnTabletStopped(const ::NKikimr::NColumnShard::TColumnShard& shard) override;
virtual void DoOnAfterGCAction(const ::NKikimr::NColumnShard::TColumnShard& shard, const NOlap::IBlobsGCAction& action) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,6 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString
auto csControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<TDisableCompactionController>();
TTestBasicRuntime runtime;
TTester::Setup(runtime);
auto csDefaultControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<TDefaultTestsController>();

runtime.SetLogPriority(NKikimrServices::BLOB_CACHE, NActors::NLog::PRI_DEBUG);
runtime.SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG);
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
{
auto csControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
csControllerGuard->DisableBackground(NKikimr::NYDBTest::ICSController::EBackground::Compaction);
csControllerGuard->SetTasksActualizationLag(TDuration::Zero());
std::vector<ui64> ts = {1600000000, 1620000000};

ui32 ttlIncSeconds = 1;
Expand Down Expand Up @@ -512,6 +513,7 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt

auto csControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
csControllerGuard->DisableBackground(NYDBTest::ICSController::EBackground::TTL);
csControllerGuard->SetTasksActualizationLag(TDuration::Zero());
TTestBasicRuntime runtime;
TTester::Setup(runtime);

Expand Down