Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions ydb/core/persqueue/ut/autoscaling_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,36 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetThresholdTime().Seconds(), alterThreshold);
}

Y_UNIT_TEST(ControlPlane_AutoscalingWithStorageSizeRetention) {
auto autoscalingTestTopic = "autoscalit-topic";
TTopicSdkTestSetup setup = CreateSetup();
TTopicClient client = setup.MakeClient();

TCreateTopicSettings createSettings;
createSettings
.RetentionStorageMb(1024)
.BeginConfigurePartitioningSettings()
.BeginConfigureAutoscalingSettings()
.Strategy(EAutoscalingStrategy::ScaleUp)
.EndConfigureAutoscalingSettings()
.EndConfigurePartitioningSettings();
auto result = client.CreateTopic(autoscalingTestTopic, createSettings).GetValueSync();

UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::BAD_REQUEST);

createSettings.RetentionStorageMb(0);
result = client.CreateTopic(autoscalingTestTopic, createSettings).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::SUCCESS);

TAlterTopicSettings alterSettings;
alterSettings
.SetRetentionStorageMb(1024);

result = client.AlterTopic(autoscalingTestTopic, alterSettings).GetValueSync();

UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::BAD_REQUEST);
}

Y_UNIT_TEST(PartitionSplit_AutosplitByLoad) {
TTopicSdkTestSetup setup = CreateSetup();
TTopicClient client = setup.MakeClient();
Expand Down
86 changes: 45 additions & 41 deletions ydb/services/lib/actors/pq_schema_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,10 @@ namespace NKikimr::NGRpcProxy::V1 {
error = TStringBuilder() << "Partition scale threshold time must be greater then 1 second, provided " << strategy.GetScaleThresholdSeconds() << " seconds";
return TYdbPqCodes(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::VALIDATION_ERROR);
}
if (strategy.GetPartitionStrategyType() != ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED && config.GetPartitionConfig().HasStorageLimitBytes()) {
error = TStringBuilder() << "Partitions autoscaling is incompatible with retention storage bytes option";
return TYdbPqCodes(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::VALIDATION_ERROR);
}

return std::nullopt;
}
Expand All @@ -736,6 +740,29 @@ namespace NKikimr::NGRpcProxy::V1 {
auto minParts = 1;
auto* pqTabletConfig = pqDescr->MutablePQTabletConfig();
auto partConfig = pqTabletConfig->MutablePartitionConfig();

switch (settings.retention_case()) {
case Ydb::PersQueue::V1::TopicSettings::kRetentionPeriodMs: {
partConfig->SetLifetimeSeconds(Max(settings.retention_period_ms() / 1000ll, 1ll));
}
break;

case Ydb::PersQueue::V1::TopicSettings::kRetentionStorageBytes: {
if (settings.retention_storage_bytes() <= 0) {
error = TStringBuilder() << "retention_storage_bytes must be positive, provided " <<
settings.retention_storage_bytes();
return Ydb::StatusIds::BAD_REQUEST;
}
partConfig->SetStorageLimitBytes(settings.retention_storage_bytes());
}
break;

default: {
error = TStringBuilder() << "retention_storage_bytes or retention_period_ms should be set";
return Ydb::StatusIds::BAD_REQUEST;
}
}

if (settings.has_partitions_count()) {
if (settings.partitions_count() > 0) {
minParts = settings.partitions_count();
Expand Down Expand Up @@ -816,28 +843,6 @@ namespace NKikimr::NGRpcProxy::V1 {
partConfig->SetMaxSizeInPartition(settings.max_partition_storage_size() ? settings.max_partition_storage_size() : Max<i64>());
partConfig->SetMaxCountInPartition(Max<i32>());

switch (settings.retention_case()) {
case Ydb::PersQueue::V1::TopicSettings::kRetentionPeriodMs: {
partConfig->SetLifetimeSeconds(Max(settings.retention_period_ms() / 1000ll, 1ll));
}
break;

case Ydb::PersQueue::V1::TopicSettings::kRetentionStorageBytes: {
if (settings.retention_storage_bytes() <= 0) {
error = TStringBuilder() << "retention_storage_bytes must be positive, provided " <<
settings.retention_storage_bytes();
return Ydb::StatusIds::BAD_REQUEST;
}
partConfig->SetStorageLimitBytes(settings.retention_storage_bytes());
}
break;

default: {
error = TStringBuilder() << "retention_storage_bytes or retention_period_ms should be set";
return Ydb::StatusIds::BAD_REQUEST;
}
}

if (settings.message_group_seqno_retention_period_ms() > 0 && settings.message_group_seqno_retention_period_ms() < settings.retention_period_ms()) {
error = TStringBuilder() << "message_group_seqno_retention_period_ms (provided " << settings.message_group_seqno_retention_period_ms() << ") must be more then retention_period_ms (provided " << settings.retention_period_ms() << ")";
return Ydb::StatusIds::BAD_REQUEST;
Expand Down Expand Up @@ -1077,6 +1082,10 @@ namespace NKikimr::NGRpcProxy::V1 {

auto pqTabletConfig = pqDescr->MutablePQTabletConfig();
auto partConfig = pqTabletConfig->MutablePartitionConfig();

if (request.retention_storage_mb())
partConfig->SetStorageLimitBytes(request.retention_storage_mb() * 1024 * 1024);

if (request.has_partitioning_settings()) {
const auto& settings = request.partitioning_settings();
if (settings.min_active_partitions() > 0) {
Expand Down Expand Up @@ -1162,9 +1171,6 @@ namespace NKikimr::NGRpcProxy::V1 {
partConfig->SetLifetimeSeconds(TDuration::Days(1).Seconds());
}

if (request.retention_storage_mb())
partConfig->SetStorageLimitBytes(request.retention_storage_mb() * 1024 * 1024);

if (local) {
auto partSpeed = request.partition_write_speed_bytes_per_second();
if (partSpeed == 0) {
Expand Down Expand Up @@ -1237,9 +1243,16 @@ namespace NKikimr::NGRpcProxy::V1 {
auto pqTabletConfig = pqDescr.MutablePQTabletConfig();
NPQ::Migrate(*pqTabletConfig);
auto partConfig = pqTabletConfig->MutablePartitionConfig();
if (request.has_alter_partitioning_settings()) {
auto splitMergeFeatureEnabled = AppData(ctx)->FeatureFlags.GetEnableTopicSplitMerge();
auto splitMergeFeatureEnabled = AppData(ctx)->FeatureFlags.GetEnableTopicSplitMerge();

if (request.has_set_retention_storage_mb()) {
CHECK_CDC;
partConfig->ClearStorageLimitBytes();
if (request.set_retention_storage_mb())
partConfig->SetStorageLimitBytes(request.set_retention_storage_mb() * 1024 * 1024);
}

if (request.has_alter_partitioning_settings()) {
const auto& settings = request.alter_partitioning_settings();
if (settings.has_set_min_active_partitions()) {
auto minParts = IfEqualThenDefault(settings.set_min_active_partitions(), 0L, 1L);
Expand All @@ -1248,6 +1261,7 @@ namespace NKikimr::NGRpcProxy::V1 {
pqTabletConfig->MutablePartitionStrategy()->SetMinPartitionCount(minParts);
}
}

if (splitMergeFeatureEnabled) {
if (settings.has_set_max_active_partitions()) {
pqTabletConfig->MutablePartitionStrategy()->SetMaxPartitionCount(settings.set_max_active_partitions());
Expand Down Expand Up @@ -1278,11 +1292,12 @@ namespace NKikimr::NGRpcProxy::V1 {
}
}
}
if (auto code = ValidatePartitionStrategy(*pqTabletConfig, error); code) {
return code->YdbCode;
}
}
}

if (splitMergeFeatureEnabled) {
auto code = ValidatePartitionStrategy(*pqTabletConfig, error);
if (code) return code->YdbCode;
}

if (request.alter_attributes().size()) {
Expand All @@ -1299,14 +1314,6 @@ namespace NKikimr::NGRpcProxy::V1 {
partConfig->SetLifetimeSeconds(request.set_retention_period().seconds());
}


if (request.has_set_retention_storage_mb()) {
CHECK_CDC;
partConfig->ClearStorageLimitBytes();
if (request.set_retention_storage_mb())
partConfig->SetStorageLimitBytes(request.set_retention_storage_mb() * 1024 * 1024);
}

bool local = true; //todo: check locality
if (local || pqConfig.GetTopicsAreFirstClassCitizen()) {
if (request.has_set_partition_write_speed_bytes_per_second()) {
Expand Down Expand Up @@ -1434,7 +1441,4 @@ namespace NKikimr::NGRpcProxy::V1 {

return CheckConfig(*pqTabletConfig, supportedClientServiceTypes, error, ctx, Ydb::StatusIds::ALREADY_EXISTS);
}



}