Skip to content

Commit 4f34c1a

Browse files
compaction. lc levels configuration (#12273)
1 parent ff307c7 commit 4f34c1a

File tree

11 files changed

+227
-17
lines changed

11 files changed

+227
-17
lines changed

ydb/core/kqp/ut/olap/kqp_olap_ut.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2698,6 +2698,18 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
26982698
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::GENERIC_ERROR, alterResult.GetIssues().ToString());
26992699
}
27002700

2701+
{
2702+
auto alterQuery =
2703+
TStringBuilder() <<
2704+
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, `COMPACTION_PLANNER.CLASS_NAME`=`lc-buckets`, `COMPACTION_PLANNER.FEATURES`=`
2705+
{"levels" : [{"class_name" : "Zero", "portions_live_duration" : "180s", "expected_blobs_size" : 2048000},
2706+
{"class_name" : "Zero", "expected_blobs_size" : 2048000}, {"class_name" : "Zero"}]}`);
2707+
)";
2708+
auto session = tableClient.CreateSession().GetValueSync().GetSession();
2709+
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
2710+
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
2711+
}
2712+
27012713
{
27022714
auto it = tableClient.StreamExecuteScanQuery(R"(
27032715
--!syntax_v1

ydb/core/protos/flat_scheme_op.proto

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,20 @@ message TStorageTierConfig {
460460
optional TCompressionOptions Compression = 3;
461461
}
462462

463+
message TCompactionLevelConstructorContainer {
464+
optional string ClassName = 1;
465+
466+
message TZeroLevel {
467+
optional uint32 PortionsLiveDurationSeconds = 1;
468+
optional uint64 ExpectedBlobsSize = 2;
469+
}
470+
471+
oneof Implementation {
472+
TZeroLevel ZeroLevel = 10;
473+
}
474+
475+
}
476+
463477
message TCompactionPlannerConstructorContainer {
464478
optional string ClassName = 1;
465479

@@ -473,7 +487,7 @@ message TCompactionPlannerConstructorContainer {
473487
}
474488

475489
message TLCOptimizer {
476-
490+
repeated TCompactionLevelConstructorContainer Levels = 1;
477491
}
478492

479493
oneof Implementation {

ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.cpp

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets {
55

66
NKikimr::TConclusion<std::shared_ptr<NKikimr::NOlap::NStorageOptimizer::IOptimizerPlanner>> TOptimizerPlannerConstructor::DoBuildPlanner(const TBuildContext& context) const {
7-
return std::make_shared<TOptimizerPlanner>(context.GetPathId(), context.GetStorages(), context.GetPKSchema());
7+
return std::make_shared<TOptimizerPlanner>(context.GetPathId(), context.GetStorages(), context.GetPKSchema(), Levels);
88
}
99

1010
bool TOptimizerPlannerConstructor::DoApplyToCurrentObject(IOptimizerPlanner& current) const {
@@ -23,14 +23,50 @@ bool TOptimizerPlannerConstructor::DoIsEqualTo(const IOptimizerPlannerConstructo
2323

2424
void TOptimizerPlannerConstructor::DoSerializeToProto(TProto& proto) const {
2525
*proto.MutableLCBuckets() = NKikimrSchemeOp::TCompactionPlannerConstructorContainer::TLCOptimizer();
26+
for (auto&& i : Levels) {
27+
*proto.MutableLCBuckets()->AddLevels() = i.SerializeToProto();
28+
}
2629
}
2730

2831
bool TOptimizerPlannerConstructor::DoDeserializeFromProto(const TProto& proto) {
2932
if (!proto.HasLCBuckets()) {
3033
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("error", "cannot parse lc-buckets optimizer from proto")("proto", proto.DebugString());
3134
return false;
3235
}
36+
for (auto&& i : proto.GetLCBuckets().GetLevels()) {
37+
TLevelConstructorContainer lContainer;
38+
if (!lContainer.DeserializeFromProto(i)) {
39+
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("error", "cannot parse lc-bucket level")("proto", i.DebugString());
40+
return false;
41+
}
42+
Levels.emplace_back(std::move(lContainer));
43+
}
3344
return true;
3445
}
3546

47+
NKikimr::TConclusionStatus TOptimizerPlannerConstructor::DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) {
48+
if (!jsonInfo.Has("levels")) {
49+
return TConclusionStatus::Fail("no levels description");
50+
}
51+
if (!jsonInfo["levels"].IsArray()) {
52+
return TConclusionStatus::Fail("levels have to been array in json description");
53+
}
54+
auto& arr = jsonInfo["levels"].GetArray();
55+
if (!arr.size()) {
56+
return TConclusionStatus::Fail("no objects in json array 'levels'");
57+
}
58+
for (auto&& i : arr) {
59+
const auto className = i["class_name"].GetStringRobust();
60+
auto level = ILevelConstructor::TFactory::MakeHolder(className);
61+
if (!level) {
62+
return TConclusionStatus::Fail("incorrect level class_name: " + className);
63+
}
64+
if (!level->DeserializeFromJson(i["description"])) {
65+
return TConclusionStatus::Fail("cannot parse level: " + className + ": " + i["description"].GetStringRobust());
66+
}
67+
Levels.emplace_back(TLevelConstructorContainer(std::shared_ptr<ILevelConstructor>(level.Release())));
68+
}
69+
return TConclusionStatus::Success();
70+
}
71+
3672
} // namespace NKikimr::NOlap::NStorageOptimizer::NLBuckets
Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,75 @@
11
#pragma once
22
#include <ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h>
3+
#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/counters.h>
4+
#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/abstract.h>
35

46
namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets {
57

8+
class ILevelConstructor {
9+
private:
10+
virtual std::shared_ptr<IPortionsLevel> DoBuildLevel(
11+
const std::shared_ptr<IPortionsLevel>& nextLevel, const ui32 indexLevel, const TLevelCounters& counters) const = 0;
12+
virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& json) = 0;
13+
virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) = 0;
14+
virtual void DoSerializeToProto(NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) const = 0;
15+
16+
public:
17+
using TFactory = NObjectFactory::TObjectFactory<ILevelConstructor, TString>;
18+
using TProto = NKikimrSchemeOp::TCompactionLevelConstructorContainer;
19+
20+
virtual ~ILevelConstructor() = default;
21+
22+
std::shared_ptr<IPortionsLevel> BuildLevel(
23+
const std::shared_ptr<IPortionsLevel>& nextLevel, const ui32 indexLevel, const TLevelCounters& counters) const {
24+
return DoBuildLevel(nextLevel, indexLevel, counters);
25+
}
26+
27+
TConclusionStatus DeserializeFromJson(const NJson::TJsonValue& json) {
28+
return DoDeserializeFromJson(json);
29+
}
30+
31+
bool DeserializeFromProto(const TProto& proto) {
32+
return DoDeserializeFromProto(proto);
33+
}
34+
void SerializeToProto(NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) const {
35+
return DoSerializeToProto(proto);
36+
}
37+
virtual TString GetClassName() const = 0;
38+
};
39+
40+
class TLevelConstructorContainer: public NBackgroundTasks::TInterfaceProtoContainer<ILevelConstructor> {
41+
private:
42+
using TBase = NBackgroundTasks::TInterfaceProtoContainer<ILevelConstructor>;
43+
44+
public:
45+
using TBase::TBase;
46+
};
47+
648
class TOptimizerPlannerConstructor: public IOptimizerPlannerConstructor {
749
public:
850
static TString GetClassNameStatic() {
951
return "lc-buckets";
1052
}
53+
1154
private:
12-
static inline const TFactory::TRegistrator<TOptimizerPlannerConstructor> Registrator = TFactory::TRegistrator<TOptimizerPlannerConstructor>(GetClassNameStatic());
55+
std::vector<TLevelConstructorContainer> Levels;
56+
57+
static inline const TFactory::TRegistrator<TOptimizerPlannerConstructor> Registrator =
58+
TFactory::TRegistrator<TOptimizerPlannerConstructor>(GetClassNameStatic());
1359

1460
virtual void DoSerializeToProto(TProto& proto) const override;
1561

1662
virtual bool DoDeserializeFromProto(const TProto& proto) override;
17-
virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& /*jsonInfo*/) override {
18-
return TConclusionStatus::Success();
19-
}
63+
virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) override;
2064
virtual bool DoApplyToCurrentObject(IOptimizerPlanner& current) const override;
2165

2266
virtual TConclusion<std::shared_ptr<IOptimizerPlanner>> DoBuildPlanner(const TBuildContext& context) const override;
2367
virtual bool DoIsEqualTo(const IOptimizerPlannerConstructor& item) const override;
68+
2469
public:
2570
virtual TString GetClassName() const override {
2671
return GetClassNameStatic();
2772
}
28-
2973
};
3074

31-
} // namespace NKikimr::NOlap::NStorageOptimizer::NLBuckets
75+
} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets

ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ LIBRARY()
22

33
SRCS(
44
GLOBAL constructor.cpp
5+
GLOBAL zero_level.cpp
56
)
67

78
PEERDIR(
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
#include "zero_level.h"
2+
3+
#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.h>
4+
5+
namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets {
6+
7+
TConclusionStatus TZeroLevelConstructor::DoDeserializeFromJson(const NJson::TJsonValue& json) {
8+
if (json.Has("portions_live_duration")) {
9+
const auto& jsonValue = json["portions_live_duration"];
10+
if (!jsonValue.IsString()) {
11+
return TConclusionStatus::Fail("incorrect portions_live_duration value (have to be similar as 10s, 20m, 30d, etc)");
12+
}
13+
TDuration d;
14+
if (!TDuration::TryParse(jsonValue.GetString(), d)) {
15+
return TConclusionStatus::Fail("cannot parse portions_live_duration value " + jsonValue.GetString());
16+
}
17+
PortionsLiveDuration = d;
18+
}
19+
if (json.Has("expected_blobs_size")) {
20+
const auto& jsonValue = json["expected_blobs_size"];
21+
if (!jsonValue.IsUInteger()) {
22+
return TConclusionStatus::Fail("incorrect expected_blobs_size value (have to be unsigned int)");
23+
}
24+
ExpectedBlobsSize = jsonValue.GetUInteger();
25+
}
26+
return TConclusionStatus::Success();
27+
}
28+
29+
bool TZeroLevelConstructor::DoDeserializeFromProto(const NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) {
30+
if (!proto.HasZeroLevel()) {
31+
return true;
32+
}
33+
if (proto.GetZeroLevel().HasPortionsLiveDurationSeconds()) {
34+
PortionsLiveDuration = TDuration::Seconds(proto.GetZeroLevel().GetPortionsLiveDurationSeconds());
35+
}
36+
if (proto.GetZeroLevel().HasExpectedBlobsSize()) {
37+
ExpectedBlobsSize = proto.GetZeroLevel().GetExpectedBlobsSize();
38+
}
39+
return true;
40+
}
41+
42+
void TZeroLevelConstructor::DoSerializeToProto(NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) const {
43+
if (PortionsLiveDuration) {
44+
proto.MutableZeroLevel()->SetPortionsLiveDurationSeconds(PortionsLiveDuration->Seconds());
45+
}
46+
if (ExpectedBlobsSize) {
47+
proto.MutableZeroLevel()->SetExpectedBlobsSize(*ExpectedBlobsSize);
48+
}
49+
}
50+
51+
std::shared_ptr<NKikimr::NOlap::NStorageOptimizer::NLCBuckets::IPortionsLevel> TZeroLevelConstructor::DoBuildLevel(
52+
const std::shared_ptr<IPortionsLevel>& nextLevel, const ui32 indexLevel, const TLevelCounters& counters) const {
53+
return std::make_shared<TZeroLevelPortions>(
54+
indexLevel, nextLevel, counters, PortionsLiveDuration.value_or(TDuration::Max()), ExpectedBlobsSize.value_or((ui64)1 << 20));
55+
}
56+
57+
} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
#pragma once
2+
#include "constructor.h"
3+
4+
namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets {
5+
6+
class TZeroLevelConstructor: public ILevelConstructor {
7+
public:
8+
static TString GetClassNameStatic() {
9+
return "Zero";
10+
}
11+
12+
private:
13+
std::optional<TDuration> PortionsLiveDuration;
14+
std::optional<ui64> ExpectedBlobsSize;
15+
16+
virtual std::shared_ptr<IPortionsLevel> DoBuildLevel(
17+
const std::shared_ptr<IPortionsLevel>& nextLevel, const ui32 indexLevel, const TLevelCounters& counters) const override;
18+
virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& json) override;
19+
virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) override;
20+
virtual void DoSerializeToProto(NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) const override;
21+
22+
static const inline TFactory::TRegistrator<TZeroLevelConstructor> Registrator = TFactory::TRegistrator<TZeroLevelConstructor>(GetClassNameStatic());
23+
24+
public:
25+
virtual TString GetClassName() const override {
26+
return GetClassNameStatic();
27+
}
28+
};
29+
30+
} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets

ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@
33
#include "optimizer.h"
44
#include "zero_level.h"
55

6+
#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.h>
7+
68
#include <util/string/join.h>
79

810
namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets {
911

10-
TOptimizerPlanner::TOptimizerPlanner(
11-
const ui64 pathId, const std::shared_ptr<IStoragesManager>& storagesManager, const std::shared_ptr<arrow::Schema>& primaryKeysSchema)
12+
TOptimizerPlanner::TOptimizerPlanner(const ui64 pathId, const std::shared_ptr<IStoragesManager>& storagesManager,
13+
const std::shared_ptr<arrow::Schema>& primaryKeysSchema, const std::vector<TLevelConstructorContainer>& levelConstructors)
1214
: TBase(pathId)
1315
, Counters(std::make_shared<TCounters>())
1416
, StoragesManager(storagesManager)
@@ -19,9 +21,19 @@ TOptimizerPlanner::TOptimizerPlanner(
1921
Levels.emplace_back(
2022
std::make_shared<TLevelPortions>(2, 0.9, maxPortionBlobBytes, nullptr, PortionsInfo, Counters->GetLevelCounters(2)));
2123
*/
22-
Levels.emplace_back(std::make_shared<TZeroLevelPortions>(2, nullptr, Counters->GetLevelCounters(2), TDuration::Max()));
23-
Levels.emplace_back(std::make_shared<TZeroLevelPortions>(1, Levels.back(), Counters->GetLevelCounters(1), TDuration::Max()));
24-
Levels.emplace_back(std::make_shared<TZeroLevelPortions>(0, Levels.back(), Counters->GetLevelCounters(0), TDuration::Seconds(180)));
24+
if (levelConstructors.size()) {
25+
std::shared_ptr<IPortionsLevel> nextLevel;
26+
ui32 idx = levelConstructors.size();
27+
for (auto it = levelConstructors.rbegin(); it != levelConstructors.rend(); ++it) {
28+
--idx;
29+
Levels.emplace_back((*it)->BuildLevel(nextLevel, idx, Counters->GetLevelCounters(idx)));
30+
}
31+
} else {
32+
Levels.emplace_back(std::make_shared<TZeroLevelPortions>(2, nullptr, Counters->GetLevelCounters(2), TDuration::Max(), 1 << 20));
33+
Levels.emplace_back(std::make_shared<TZeroLevelPortions>(1, Levels.back(), Counters->GetLevelCounters(1), TDuration::Max(), 1 << 20));
34+
Levels.emplace_back(
35+
std::make_shared<TZeroLevelPortions>(0, Levels.back(), Counters->GetLevelCounters(0), TDuration::Seconds(180), 1 << 20));
36+
}
2537
std::reverse(Levels.begin(), Levels.end());
2638
RefreshWeights();
2739
}

ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets {
66

7+
class TLevelConstructorContainer;
8+
79
class TOptimizerPlanner: public IOptimizerPlanner {
810
private:
911
using TBase = IOptimizerPlanner;
@@ -144,8 +146,8 @@ class TOptimizerPlanner: public IOptimizerPlanner {
144146
return result;
145147
}
146148

147-
TOptimizerPlanner(
148-
const ui64 pathId, const std::shared_ptr<IStoragesManager>& storagesManager, const std::shared_ptr<arrow::Schema>& primaryKeysSchema);
149+
TOptimizerPlanner(const ui64 pathId, const std::shared_ptr<IStoragesManager>& storagesManager,
150+
const std::shared_ptr<arrow::Schema>& primaryKeysSchema, const std::vector<TLevelConstructorContainer>& levelConstructors);
149151
};
150152

151153
} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets

ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ ui64 TZeroLevelPortions::DoGetWeight() const {
2626
return 0;
2727
}
2828
if (PredOptimization && TInstant::Now() - *PredOptimization < DurationToDrop) {
29-
if (PortionsInfo.PredictPackedBlobBytes(GetPackKff()) < (1 << 20)) {
29+
if (PortionsInfo.PredictPackedBlobBytes(GetPackKff()) < ExpectedBlobsSize) {
3030
return 0;
3131
}
3232
}

0 commit comments

Comments
 (0)