Skip to content

Commit ed3e338

Browse files
Memory control and search fix (#1649)
* memory control and program data fetching for select fixes * fix * add queue for ttl * fix test build
1 parent 36c23f3 commit ed3e338

File tree

18 files changed

+140
-31
lines changed

18 files changed

+140
-31
lines changed

ydb/core/protos/config.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1489,6 +1489,7 @@ message TColumnShardConfig {
14891489
optional uint32 WritingBufferDurationMs = 8 [default = 0];
14901490
optional bool UseChunkedMergeOnCompaction = 9 [default = true];
14911491
optional uint64 CompactionMemoryLimit = 10 [default = 536870912];
1492+
optional uint64 TieringsMemoryLimit = 11 [default = 536870912];
14921493
}
14931494

14941495
message TSchemeShardConfig {

ydb/core/tablet/resource_broker.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1269,6 +1269,7 @@ NKikimrResourceBroker::TResourceBrokerConfig MakeDefaultConfig()
12691269
const ui64 KqpRmQueueCPU = 4;
12701270
const ui64 KqpRmQueueMemory = 10ULL << 30;
12711271

1272+
const ui64 CSTTLCompactionMemoryLimit = 1ULL << 30;
12721273
const ui64 CSInsertCompactionMemoryLimit = 1ULL << 30;
12731274
const ui64 CSGeneralCompactionMemoryLimit = 3ULL << 30;
12741275
const ui64 CSScanMemoryLimit = 3ULL << 30;
@@ -1314,6 +1315,12 @@ NKikimrResourceBroker::TResourceBrokerConfig MakeDefaultConfig()
13141315
queue->MutableLimit()->SetCpu(3);
13151316
queue->MutableLimit()->SetMemory(CSInsertCompactionMemoryLimit);
13161317

1318+
queue = config.AddQueues();
1319+
queue->SetName("queue_cs_ttl");
1320+
queue->SetWeight(100);
1321+
queue->MutableLimit()->SetCpu(3);
1322+
queue->MutableLimit()->SetMemory(CSTTLCompactionMemoryLimit);
1323+
13171324
queue = config.AddQueues();
13181325
queue->SetName("queue_cs_general");
13191326
queue->SetWeight(100);
@@ -1413,6 +1420,11 @@ NKikimrResourceBroker::TResourceBrokerConfig MakeDefaultConfig()
14131420
task->SetQueueName("queue_compaction_borrowed");
14141421
task->SetDefaultDuration(TDuration::Minutes(10).GetValue());
14151422

1423+
task = config.AddTasks();
1424+
task->SetName("CS::TTL");
1425+
task->SetQueueName("queue_cs_ttl");
1426+
task->SetDefaultDuration(TDuration::Minutes(10).GetValue());
1427+
14161428
task = config.AddTasks();
14171429
task->SetName("CS::INDEXATION");
14181430
task->SetQueueName("queue_cs_indexation");

ydb/core/tx/columnshard/columnshard_impl.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
8989
, SubscribeCounters(std::make_shared<NOlap::NResourceBroker::NSubscribe::TSubscriberCounters>())
9090
, InsertTaskSubscription(NOlap::TInsertColumnEngineChanges::StaticTypeName(), SubscribeCounters)
9191
, CompactTaskSubscription(NOlap::TCompactColumnEngineChanges::StaticTypeName(), SubscribeCounters)
92+
, TTLTaskSubscription(NOlap::TTTLColumnEngineChanges::StaticTypeName(), SubscribeCounters)
9293
, ReadCounters("Read")
9394
, ScanCounters("Scan")
9495
, WritesMonitor(*this)
@@ -743,21 +744,25 @@ bool TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls, con
743744
}
744745

745746
auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndex();
746-
std::shared_ptr<NOlap::TTTLColumnEngineChanges> indexChanges = TablesManager.MutablePrimaryIndex().StartTtl(eviction, BackgroundController.GetConflictTTLPortions());
747+
const ui64 memoryUsageLimit = HasAppData() ? AppDataVerified().ColumnShardConfig.GetTieringsMemoryLimit() : ((ui64)512 * 1024 * 1024);
748+
std::shared_ptr<NOlap::TTTLColumnEngineChanges> indexChanges = TablesManager.MutablePrimaryIndex().StartTtl(
749+
eviction, BackgroundController.GetConflictTTLPortions(), memoryUsageLimit);
747750

748751
if (!indexChanges) {
749752
ACFL_DEBUG("background", "ttl")("skip_reason", "no_changes");
750753
return false;
751754
}
752-
755+
const TString externalTaskId = indexChanges->GetTaskIdentifier();
753756
const bool needWrites = indexChanges->NeedConstruction();
754757
ACFL_DEBUG("background", "ttl")("need_writes", needWrites);
755758

756759
indexChanges->Start(*this);
757760
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, false);
758761
NYDBTest::TControllers::GetColumnShardController()->OnWriteIndexStart(TabletID(), indexChanges->TypeString());
759762
if (needWrites) {
760-
ActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(std::make_shared<TChangesReadTask>(std::move(ev), SelfId(), TabletID(), CompactionCounters)));
763+
NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(
764+
ResourceSubscribeActor, std::make_shared<NOlap::NBlobOperations::NRead::ITask::TReadSubscriber>(
765+
std::make_shared<TChangesReadTask>(std::move(ev), SelfId(), TabletID(), CompactionCounters), 0, indexChanges->CalcMemoryForUsage(), externalTaskId, TTLTaskSubscription));
761766
} else {
762767
ev->SetPutStatus(NKikimrProto::OK);
763768
ActorContext().Send(SelfId(), std::move(ev));

ydb/core/tx/columnshard/columnshard_impl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,7 @@ class TColumnShard
392392
std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TSubscriberCounters> SubscribeCounters;
393393
NOlap::NResourceBroker::NSubscribe::TTaskContext InsertTaskSubscription;
394394
NOlap::NResourceBroker::NSubscribe::TTaskContext CompactTaskSubscription;
395+
NOlap::NResourceBroker::NSubscribe::TTaskContext TTLTaskSubscription;
395396
const TScanCounters ReadCounters;
396397
const TScanCounters ScanCounters;
397398
const TIndexationCounters CompactionCounters = TIndexationCounters("GeneralCompaction");
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package NKikimrColumnShardProto;
2+
3+
message TSnapshot {
4+
optional uint64 PlanStep = 1;
5+
optional uint64 TxId = 2;
6+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
PROTO_LIBRARY()
2+
3+
SRCS(
4+
snapshot.proto
5+
)
6+
7+
PEERDIR(
8+
)
9+
10+
END()

ydb/core/tx/columnshard/common/snapshot.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "snapshot.h"
2+
#include <ydb/core/tx/columnshard/common/protos/snapshot.pb.h>
23
#include <util/string/builder.h>
34

45
namespace NKikimr::NOlap {
@@ -7,4 +8,10 @@ TString TSnapshot::DebugString() const {
78
return TStringBuilder() << "plan_step=" << PlanStep << ";tx_id=" << TxId << ";";
89
}
910

11+
NKikimrColumnShardProto::TSnapshot TSnapshot::SerializeToProto() const {
12+
NKikimrColumnShardProto::TSnapshot result;
13+
SerializeToProto(result);
14+
return result;
15+
}
16+
1017
};

ydb/core/tx/columnshard/common/snapshot.h

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
#pragma once
22
#include <util/stream/output.h>
33
#include <util/string/cast.h>
4+
#include <ydb/library/conclusion/status.h>
5+
6+
namespace NKikimrColumnShardProto {
7+
class TSnapshot;
8+
}
49

510
namespace NKikimr::NOlap {
611

@@ -47,6 +52,27 @@ class TSnapshot {
4752
return out << "{" << s.PlanStep << ':' << (s.TxId == std::numeric_limits<ui64>::max() ? "max" : ::ToString(s.TxId)) << "}";
4853
}
4954

55+
template <class TProto>
56+
void SerializeToProto(TProto& result) const {
57+
result.SetPlanStep(PlanStep);
58+
result.SetTxId(TxId);
59+
}
60+
61+
NKikimrColumnShardProto::TSnapshot SerializeToProto() const;
62+
63+
template <class TProto>
64+
TConclusionStatus DeserializeFromProto(const TProto& proto) {
65+
PlanStep = proto.GetPlanStep();
66+
TxId = proto.GetTxId();
67+
if (!PlanStep) {
68+
return TConclusionStatus::Fail("incorrect planStep in proto");
69+
}
70+
if (!TxId) {
71+
return TConclusionStatus::Fail("incorrect txId in proto");
72+
}
73+
return TConclusionStatus::Success();
74+
}
75+
5076
TString DebugString() const;
5177
};
5278

ydb/core/tx/columnshard/common/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ PEERDIR(
1111
ydb/core/protos
1212
contrib/libs/apache/arrow
1313
ydb/core/formats/arrow
14+
ydb/core/tx/columnshard/common/protos
1415
)
1516

1617
GENERATE_ENUM_SERIALIZATION(portion.h)

ydb/core/tx/columnshard/engines/changes/abstract/abstract.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,12 @@ class TColumnEngineChanges {
173173
const TString TaskIdentifier = TGUID::Create().AsGuidString();
174174
virtual ui64 DoCalcMemoryForUsage() const = 0;
175175
public:
176+
class IMemoryPredictor {
177+
public:
178+
virtual ui64 AddPortion(const TPortionInfo& portionInfo) = 0;
179+
virtual ~IMemoryPredictor() = default;
180+
};
181+
176182
ui64 CalcMemoryForUsage() const {
177183
return DoCalcMemoryForUsage();
178184
}

0 commit comments

Comments
 (0)