Skip to content

Commit 42be2c5

Browse files
Merge df462bf into 231dff1
2 parents 231dff1 + df462bf commit 42be2c5

File tree

18 files changed

+140
-31
lines changed

18 files changed

+140
-31
lines changed

ydb/core/protos/config.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1440,6 +1440,7 @@ message TColumnShardConfig {
14401440
optional uint32 WritingBufferDurationMs = 8 [default = 0];
14411441
optional bool UseChunkedMergeOnCompaction = 9 [default = true];
14421442
optional uint64 CompactionMemoryLimit = 10 [default = 536870912];
1443+
optional uint64 TieringsMemoryLimit = 11 [default = 536870912];
14431444
}
14441445

14451446
message TSchemeShardConfig {

ydb/core/tablet/resource_broker.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1269,6 +1269,7 @@ NKikimrResourceBroker::TResourceBrokerConfig MakeDefaultConfig()
12691269
const ui64 KqpRmQueueCPU = 4;
12701270
const ui64 KqpRmQueueMemory = 10ULL << 30;
12711271

1272+
const ui64 CSTTLCompactionMemoryLimit = 1ULL << 30;
12721273
const ui64 CSInsertCompactionMemoryLimit = 1ULL << 30;
12731274
const ui64 CSGeneralCompactionMemoryLimit = 3ULL << 30;
12741275
const ui64 CSScanMemoryLimit = 3ULL << 30;
@@ -1314,6 +1315,12 @@ NKikimrResourceBroker::TResourceBrokerConfig MakeDefaultConfig()
13141315
queue->MutableLimit()->SetCpu(3);
13151316
queue->MutableLimit()->SetMemory(CSInsertCompactionMemoryLimit);
13161317

1318+
queue = config.AddQueues();
1319+
queue->SetName("queue_cs_ttl");
1320+
queue->SetWeight(100);
1321+
queue->MutableLimit()->SetCpu(3);
1322+
queue->MutableLimit()->SetMemory(CSTTLCompactionMemoryLimit);
1323+
13171324
queue = config.AddQueues();
13181325
queue->SetName("queue_cs_general");
13191326
queue->SetWeight(100);
@@ -1413,6 +1420,11 @@ NKikimrResourceBroker::TResourceBrokerConfig MakeDefaultConfig()
14131420
task->SetQueueName("queue_compaction_borrowed");
14141421
task->SetDefaultDuration(TDuration::Minutes(10).GetValue());
14151422

1423+
task = config.AddTasks();
1424+
task->SetName("CS::TTL");
1425+
task->SetQueueName("queue_cs_ttl");
1426+
task->SetDefaultDuration(TDuration::Minutes(10).GetValue());
1427+
14161428
task = config.AddTasks();
14171429
task->SetName("CS::INDEXATION");
14181430
task->SetQueueName("queue_cs_indexation");

ydb/core/tx/columnshard/columnshard_impl.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
8989
, SubscribeCounters(std::make_shared<NOlap::NResourceBroker::NSubscribe::TSubscriberCounters>())
9090
, InsertTaskSubscription(NOlap::TInsertColumnEngineChanges::StaticTypeName(), SubscribeCounters)
9191
, CompactTaskSubscription(NOlap::TCompactColumnEngineChanges::StaticTypeName(), SubscribeCounters)
92+
, TTLTaskSubscription(NOlap::TTTLColumnEngineChanges::StaticTypeName(), SubscribeCounters)
9293
, ReadCounters("Read")
9394
, ScanCounters("Scan")
9495
, WritesMonitor(*this)
@@ -744,21 +745,25 @@ bool TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls, con
744745
}
745746

746747
auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndex();
747-
std::shared_ptr<NOlap::TTTLColumnEngineChanges> indexChanges = TablesManager.MutablePrimaryIndex().StartTtl(eviction, BackgroundController.GetConflictTTLPortions());
748+
const ui64 memoryUsageLimit = HasAppData() ? AppDataVerified().ColumnShardConfig.GetTieringsMemoryLimit() : ((ui64)512 * 1024 * 1024);
749+
std::shared_ptr<NOlap::TTTLColumnEngineChanges> indexChanges = TablesManager.MutablePrimaryIndex().StartTtl(
750+
eviction, BackgroundController.GetConflictTTLPortions(), memoryUsageLimit);
748751

749752
if (!indexChanges) {
750753
ACFL_DEBUG("background", "ttl")("skip_reason", "no_changes");
751754
return false;
752755
}
753-
756+
const TString externalTaskId = indexChanges->GetTaskIdentifier();
754757
const bool needWrites = indexChanges->NeedConstruction();
755758
ACFL_DEBUG("background", "ttl")("need_writes", needWrites);
756759

757760
indexChanges->Start(*this);
758761
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, false);
759762
NYDBTest::TControllers::GetColumnShardController()->OnWriteIndexStart(TabletID(), indexChanges->TypeString());
760763
if (needWrites) {
761-
ActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(std::make_shared<TChangesReadTask>(std::move(ev), SelfId(), TabletID(), CompactionCounters)));
764+
NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(
765+
ResourceSubscribeActor, std::make_shared<NOlap::NBlobOperations::NRead::ITask::TReadSubscriber>(
766+
std::make_shared<TChangesReadTask>(std::move(ev), SelfId(), TabletID(), CompactionCounters), 0, indexChanges->CalcMemoryForUsage(), externalTaskId, TTLTaskSubscription));
762767
} else {
763768
ev->SetPutStatus(NKikimrProto::OK);
764769
ActorContext().Send(SelfId(), std::move(ev));

ydb/core/tx/columnshard/columnshard_impl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,7 @@ class TColumnShard
421421
std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TSubscriberCounters> SubscribeCounters;
422422
NOlap::NResourceBroker::NSubscribe::TTaskContext InsertTaskSubscription;
423423
NOlap::NResourceBroker::NSubscribe::TTaskContext CompactTaskSubscription;
424+
NOlap::NResourceBroker::NSubscribe::TTaskContext TTLTaskSubscription;
424425
const TScanCounters ReadCounters;
425426
const TScanCounters ScanCounters;
426427
const TIndexationCounters CompactionCounters = TIndexationCounters("GeneralCompaction");
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package NKikimrColumnShardProto;
2+
3+
message TSnapshot {
4+
optional uint64 PlanStep = 1;
5+
optional uint64 TxId = 2;
6+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
PROTO_LIBRARY()
2+
3+
SRCS(
4+
snapshot.proto
5+
)
6+
7+
PEERDIR(
8+
)
9+
10+
END()

ydb/core/tx/columnshard/common/snapshot.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "snapshot.h"
2+
#include <ydb/core/tx/columnshard/common/protos/snapshot.pb.h>
23
#include <util/string/builder.h>
34

45
namespace NKikimr::NOlap {
@@ -7,4 +8,10 @@ TString TSnapshot::DebugString() const {
78
return TStringBuilder() << "plan_step=" << PlanStep << ";tx_id=" << TxId << ";";
89
}
910

11+
NKikimrColumnShardProto::TSnapshot TSnapshot::SerializeToProto() const {
12+
NKikimrColumnShardProto::TSnapshot result;
13+
SerializeToProto(result);
14+
return result;
15+
}
16+
1017
};

ydb/core/tx/columnshard/common/snapshot.h

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
#pragma once
22
#include <util/stream/output.h>
33
#include <util/string/cast.h>
4+
#include <ydb/library/conclusion/status.h>
5+
6+
namespace NKikimrColumnShardProto {
7+
class TSnapshot;
8+
}
49

510
namespace NKikimr::NOlap {
611

@@ -47,6 +52,27 @@ class TSnapshot {
4752
return out << "{" << s.PlanStep << ':' << (s.TxId == std::numeric_limits<ui64>::max() ? "max" : ::ToString(s.TxId)) << "}";
4853
}
4954

55+
template <class TProto>
56+
void SerializeToProto(TProto& result) const {
57+
result.SetPlanStep(PlanStep);
58+
result.SetTxId(TxId);
59+
}
60+
61+
NKikimrColumnShardProto::TSnapshot SerializeToProto() const;
62+
63+
template <class TProto>
64+
TConclusionStatus DeserializeFromProto(const TProto& proto) {
65+
PlanStep = proto.GetPlanStep();
66+
TxId = proto.GetTxId();
67+
if (!PlanStep) {
68+
return TConclusionStatus::Fail("incorrect planStep in proto");
69+
}
70+
if (!TxId) {
71+
return TConclusionStatus::Fail("incorrect txId in proto");
72+
}
73+
return TConclusionStatus::Success();
74+
}
75+
5076
TString DebugString() const;
5177
};
5278

ydb/core/tx/columnshard/common/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ PEERDIR(
1111
ydb/core/protos
1212
contrib/libs/apache/arrow
1313
ydb/core/formats/arrow
14+
ydb/core/tx/columnshard/common/protos
1415
)
1516

1617
GENERATE_ENUM_SERIALIZATION(portion.h)

ydb/core/tx/columnshard/engines/changes/abstract/abstract.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,12 @@ class TColumnEngineChanges {
173173
const TString TaskIdentifier = TGUID::Create().AsGuidString();
174174
virtual ui64 DoCalcMemoryForUsage() const = 0;
175175
public:
176+
class IMemoryPredictor {
177+
public:
178+
virtual ui64 AddPortion(const TPortionInfo& portionInfo) = 0;
179+
virtual ~IMemoryPredictor() = default;
180+
};
181+
176182
ui64 CalcMemoryForUsage() const {
177183
return DoCalcMemoryForUsage();
178184
}

0 commit comments

Comments
 (0)