Skip to content

Commit fef7e41

Browse files
author
Vladislav Gogov
authored
Default compression setting via CS config (#13203)
1 parent cbefb3a commit fef7e41

File tree

10 files changed

+191
-16
lines changed

10 files changed

+191
-16
lines changed
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
#include "validators.h"
2+
3+
#include <ydb/core/formats/arrow/serializer/parsing.h>
4+
#include <ydb/core/formats/arrow/serializer/utils.h>
5+
#include <ydb/core/protos/config.pb.h>
6+
7+
#include <util/generic/string.h>
8+
#include <util/string/builder.h>
9+
10+
#include <vector>
11+
12+
namespace NKikimr::NConfig {
13+
namespace {
14+
15+
EValidationResult ValidateDefaultCompression(const NKikimrConfig::TColumnShardConfig& columnShardConfig, std::vector<TString>& msg) {
16+
if (!columnShardConfig.HasDefaultCompression() && !columnShardConfig.HasDefaultCompressionLevel()) {
17+
return EValidationResult::Ok;
18+
}
19+
if (!columnShardConfig.HasDefaultCompression() && columnShardConfig.HasDefaultCompressionLevel()) {
20+
msg.push_back("ColumnShardConfig: compression level is set without compression type");
21+
return EValidationResult::Error;
22+
}
23+
std::optional<arrow::Compression::type> codec = NArrow::CompressionFromProto(columnShardConfig.GetDefaultCompression());
24+
if (!codec.has_value()) {
25+
msg.push_back("ColumnShardConfig: Unknown compression");
26+
return EValidationResult::Error;
27+
}
28+
if (columnShardConfig.HasDefaultCompressionLevel()) {
29+
if (!NArrow::SupportsCompressionLevel(codec.value())) {
30+
TString messageErr = TStringBuilder() << "ColumnShardConfig: compression `" << NArrow::CompressionToString(codec.value())
31+
<< "` does not support compression level";
32+
msg.push_back(messageErr);
33+
return EValidationResult::Error;
34+
} else if (!NArrow::SupportsCompressionLevel(codec.value(), columnShardConfig.GetDefaultCompressionLevel())) {
35+
TString messageErr = TStringBuilder()
36+
<< "ColumnShardConfig: compression `" << NArrow::CompressionToString(codec.value())
37+
<< "` does not support compression level = " << std::to_string(columnShardConfig.GetDefaultCompressionLevel());
38+
msg.push_back(messageErr);
39+
return EValidationResult::Error;
40+
}
41+
}
42+
return EValidationResult::Ok;
43+
}
44+
45+
} // namespace
46+
47+
EValidationResult ValidateColumnShardConfig(const NKikimrConfig::TColumnShardConfig& columnShardConfig, std::vector<TString>& msg) {
48+
EValidationResult validateDefaultCompressionResult = ValidateDefaultCompression(columnShardConfig, msg);
49+
if (validateDefaultCompressionResult == EValidationResult::Error) {
50+
return EValidationResult::Error;
51+
}
52+
return EValidationResult::Ok;
53+
}
54+
55+
} // namespace NKikimr::NConfig
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
#include <ydb/core/config/validation/validators.h>
2+
#include <ydb/core/protos/config.pb.h>
3+
#include <ydb/core/protos/flat_scheme_op.pb.h>
4+
5+
#include <library/cpp/testing/unittest/registar.h>
6+
7+
#include <vector>
8+
9+
using namespace NKikimr::NConfig;
10+
11+
Y_UNIT_TEST_SUITE(ColumnShardConfigValidation) {
12+
Y_UNIT_TEST(AcceptDefaultCompression) {
13+
NKikimrConfig::TColumnShardConfig CSConfig;
14+
std::vector<TString> error;
15+
EValidationResult result = ValidateColumnShardConfig(CSConfig, error);
16+
UNIT_ASSERT_EQUAL(result, EValidationResult::Ok);
17+
UNIT_ASSERT_C(error.empty(), "Should not be errors");
18+
}
19+
20+
Y_UNIT_TEST(NotAcceptDefaultCompression) {
21+
NKikimrConfig::TColumnShardConfig CSConfig;
22+
std::vector<TString> error;
23+
CSConfig.SetDefaultCompressionLevel(2);
24+
EValidationResult result = ValidateColumnShardConfig(CSConfig, error);
25+
UNIT_ASSERT_EQUAL(result, EValidationResult::Error);
26+
UNIT_ASSERT_VALUES_EQUAL(error.size(), 1);
27+
UNIT_ASSERT_STRINGS_EQUAL(error.front(), "ColumnShardConfig: compression level is set without compression type");
28+
}
29+
30+
Y_UNIT_TEST(CorrectPlainCompression) {
31+
NKikimrConfig::TColumnShardConfig CSConfig;
32+
std::vector<TString> error;
33+
CSConfig.SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain);
34+
EValidationResult result = ValidateColumnShardConfig(CSConfig, error);
35+
UNIT_ASSERT_EQUAL(result, EValidationResult::Ok);
36+
UNIT_ASSERT_C(error.empty(), "Should not be errors");
37+
}
38+
39+
Y_UNIT_TEST(NotCorrectPlainCompression) {
40+
NKikimrConfig::TColumnShardConfig CSConfig;
41+
std::vector<TString> error;
42+
CSConfig.SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain);
43+
CSConfig.SetDefaultCompressionLevel(1);
44+
EValidationResult result = ValidateColumnShardConfig(CSConfig, error);
45+
UNIT_ASSERT_EQUAL(result, EValidationResult::Error);
46+
UNIT_ASSERT_VALUES_EQUAL(error.size(), 1);
47+
UNIT_ASSERT_STRINGS_EQUAL(error.front(), "ColumnShardConfig: compression `uncompressed` does not support compression level");
48+
}
49+
50+
Y_UNIT_TEST(CorrectLZ4Compression) {
51+
NKikimrConfig::TColumnShardConfig CSConfig;
52+
std::vector<TString> error;
53+
CSConfig.SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4);
54+
EValidationResult result = ValidateColumnShardConfig(CSConfig, error);
55+
UNIT_ASSERT_EQUAL(result, EValidationResult::Ok);
56+
UNIT_ASSERT_C(error.empty(), "Should not be errors");
57+
}
58+
59+
Y_UNIT_TEST(NotCorrectLZ4Compression) {
60+
NKikimrConfig::TColumnShardConfig CSConfig;
61+
std::vector<TString> error;
62+
CSConfig.SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4);
63+
CSConfig.SetDefaultCompressionLevel(1);
64+
EValidationResult result = ValidateColumnShardConfig(CSConfig, error);
65+
UNIT_ASSERT_EQUAL(result, EValidationResult::Error);
66+
UNIT_ASSERT_VALUES_EQUAL(error.size(), 1);
67+
UNIT_ASSERT_STRINGS_EQUAL(error.front(), "ColumnShardConfig: compression `lz4` does not support compression level");
68+
}
69+
70+
Y_UNIT_TEST(CorrectZSTDCompression) {
71+
NKikimrConfig::TColumnShardConfig CSConfig;
72+
std::vector<TString> error;
73+
CSConfig.SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD);
74+
EValidationResult result = ValidateColumnShardConfig(CSConfig, error);
75+
UNIT_ASSERT_EQUAL(result, EValidationResult::Ok);
76+
UNIT_ASSERT_C(error.empty(), "Should not be errors");
77+
CSConfig.SetDefaultCompressionLevel(0);
78+
result = ValidateColumnShardConfig(CSConfig, error);
79+
UNIT_ASSERT_EQUAL(result, EValidationResult::Ok);
80+
UNIT_ASSERT_C(error.empty(), "Should not be errors");
81+
CSConfig.SetDefaultCompressionLevel(-100);
82+
result = ValidateColumnShardConfig(CSConfig, error);
83+
UNIT_ASSERT_EQUAL(result, EValidationResult::Ok);
84+
UNIT_ASSERT_C(error.empty(), "Should not be errors");
85+
}
86+
87+
Y_UNIT_TEST(NotCorrectZSTDCompression) {
88+
NKikimrConfig::TColumnShardConfig CSConfig;
89+
std::vector<TString> error;
90+
CSConfig.SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD);
91+
CSConfig.SetDefaultCompressionLevel(100);
92+
EValidationResult result = ValidateColumnShardConfig(CSConfig, error);
93+
UNIT_ASSERT_EQUAL(result, EValidationResult::Error);
94+
UNIT_ASSERT_VALUES_EQUAL(error.size(), 1);
95+
UNIT_ASSERT_STRINGS_EQUAL(error.front(), "ColumnShardConfig: compression `zstd` does not support compression level = 100");
96+
}
97+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
UNITTEST_FOR(ydb/core/config/validation)
2+
3+
SRC(
4+
column_shard_config_validator_ut.cpp
5+
)
6+
7+
END()

ydb/core/config/validation/validators.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,12 @@ EValidationResult ValidateConfig(const NKikimrConfig::TAppConfig& config, std::v
168168
return EValidationResult::Error;
169169
}
170170
}
171+
if (config.HasColumnShardConfig()) {
172+
NKikimr::NConfig::EValidationResult result = NKikimr::NConfig::ValidateColumnShardConfig(config.GetColumnShardConfig(), msg);
173+
if (result == NKikimr::NConfig::EValidationResult::Error) {
174+
return EValidationResult::Error;
175+
}
176+
}
171177
if (msg.size() > 0) {
172178
return EValidationResult::Warn;
173179
}

ydb/core/config/validation/validators.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ EValidationResult ValidateAuthConfig(
4242
const NKikimrProto::TAuthConfig& authConfig,
4343
std::vector<TString>& msg);
4444

45+
EValidationResult ValidateColumnShardConfig(
46+
const NKikimrConfig::TColumnShardConfig& columnShardConfig,
47+
std::vector<TString>& msg);
48+
4549
EValidationResult ValidateConfig(
4650
const NKikimrConfig::TAppConfig& config,
4751
std::vector<TString>& msg);

ydb/core/config/validation/ya.make

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,18 @@ SRCS(
44
validators.h
55
validators.cpp
66
auth_config_validator.cpp
7+
column_shard_config_validator.cpp
78
)
89

910
PEERDIR(
1011
ydb/core/protos
12+
ydb/core/formats/arrow/serializer
1113
)
1214

1315
END()
1416

1517
RECURSE_FOR_TESTS(
1618
ut
1719
auth_config_validator_ut
20+
column_shard_config_validator_ut
1821
)

ydb/core/formats/arrow/serializer/utils.cpp

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,18 @@
66
#include <contrib/libs/apache/arrow/cpp/src/arrow/util/compression.h>
77

88
namespace NKikimr::NArrow {
9-
bool SupportsCompressionLevel(const arrow::Compression::type compression) {
10-
return arrow::util::Codec::SupportsCompressionLevel(compression);
11-
}
12-
13-
bool SupportsCompressionLevel(const NKikimrSchemeOp::EColumnCodec compression) {
14-
return SupportsCompressionLevel(CompressionFromProto(compression).value());
9+
bool SupportsCompressionLevel(const arrow::Compression::type compression, const std::optional<i32>& compressionLevel) {
10+
if (!arrow::util::Codec::SupportsCompressionLevel(compression)) {
11+
return false;
12+
}
13+
if (compressionLevel.has_value()) {
14+
int minLevel = MinimumCompressionLevel(compression).value();
15+
int maxLevel = MaximumCompressionLevel(compression).value();
16+
if (compressionLevel < minLevel || compressionLevel > maxLevel) {
17+
return false;
18+
}
19+
}
20+
return true;
1521
}
1622

1723
std::optional<int> MinimumCompressionLevel(const arrow::Compression::type compression) {

ydb/core/formats/arrow/serializer/utils.h

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
11
#pragma once
22

3-
#include <ydb/core/protos/flat_scheme_op.pb.h>
4-
53
#include <contrib/libs/apache/arrow/cpp/src/arrow/util/type_fwd.h>
6-
#include <util/system/yassert.h>
4+
#include <util/system/types.h>
75

86
#include <optional>
97

108
namespace NKikimr::NArrow {
11-
bool SupportsCompressionLevel(const arrow::Compression::type compression);
12-
bool SupportsCompressionLevel(const NKikimrSchemeOp::EColumnCodec compression);
9+
bool SupportsCompressionLevel(const arrow::Compression::type compression, const std::optional<i32>& compressionLevel = {});
1310

1411
std::optional<int> MinimumCompressionLevel(const arrow::Compression::type compression);
1512
std::optional<int> MaximumCompressionLevel(const arrow::Compression::type compression);

ydb/core/protos/config.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1792,6 +1792,8 @@ message TColumnShardConfig {
17921792
optional uint64 WritingBufferVolumeMb = 33 [default = 32];
17931793
optional uint64 WritingInFlightRequestsCountLimit = 34 [default = 1000];
17941794
optional uint64 WritingInFlightRequestBytesLimit = 35 [default = 128000000];
1795+
optional NKikimrSchemeOp.EColumnCodec DefaultCompression = 36;
1796+
optional int32 DefaultCompressionLevel = 37;
17951797
}
17961798

17971799
message TSchemeShardConfig {

ydb/core/tx/schemeshard/olap/column_families/update.cpp

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,12 @@ NKikimr::TConclusion<NKikimrSchemeOp::TOlapColumn::TSerializer> ConvertFamilyDes
2424
<< "` is not support compression level");
2525
}
2626
if (familyDescription.HasColumnCodecLevel()) {
27-
int level = familyDescription.GetColumnCodecLevel();
28-
int minLevel = NArrow::MinimumCompressionLevel(codec.value()).value();
29-
int maxLevel = NArrow::MaximumCompressionLevel(codec.value()).value();
30-
if (level < minLevel || level > maxLevel) {
27+
if (!NArrow::SupportsCompressionLevel(codec.value(), familyDescription.GetColumnCodecLevel())) {
3128
return NKikimr::TConclusionStatus::Fail(TStringBuilder()
3229
<< "family `" << familyDescription.GetName() << "`: incorrect level for codec `"
3330
<< NArrow::CompressionToString(familyDescription.GetColumnCodec()) << "`. expected: ["
34-
<< minLevel << ":" << maxLevel << "]");
31+
<< NArrow::MinimumCompressionLevel(codec.value()).value() << ":"
32+
<< NArrow::MaximumCompressionLevel(codec.value()).value() << "]");
3533
}
3634
}
3735

0 commit comments

Comments
 (0)