Skip to content

Commit e85ed6b

Browse files
authored
Merge e647250 into 3a59d95
2 parents 3a59d95 + e647250 commit e85ed6b

File tree

15 files changed

+701
-151
lines changed

15 files changed

+701
-151
lines changed

ydb/core/persqueue/partition_scale_manager.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ namespace NPQ {
55

66

77
TPartitionScaleManager::TPartitionScaleManager(
8-
const TString& topicName,
9-
const TString& databasePath,
8+
const TString& topicName,
9+
const TString& databasePath,
1010
NKikimrPQ::TUpdateBalancerConfig& balancerConfig
1111
)
1212
: TopicName(topicName)
@@ -34,7 +34,7 @@ void TPartitionScaleManager::TrySendScaleRequest(const TActorContext& ctx) {
3434
if (splitMergePair.first.empty() && splitMergePair.second.empty()) {
3535
return;
3636
}
37-
37+
3838
RequestInflight = true;
3939
CurrentScaleRequest = ctx.Register(new TPartitionScaleRequest(
4040
TopicName,
@@ -55,7 +55,7 @@ std::pair<std::vector<TPartitionSplit>, std::vector<TPartitionMerge>> TPartition
5555
std::vector<TPartitionSplit> splitsToApply;
5656
std::vector<TPartitionMerge> mergesToApply;
5757

58-
size_t allowedSplitsCount = BalancerConfig.PartitionCountLimit > BalancerConfig.CurPartitions ? BalancerConfig.PartitionCountLimit - BalancerConfig.CurPartitions : 0;
58+
size_t allowedSplitsCount = BalancerConfig.MaxActivePartitions > BalancerConfig.CurPartitions ? BalancerConfig.MaxActivePartitions - BalancerConfig.CurPartitions : 0;
5959
auto itSplit = PartitionsToSplit.begin();
6060
while (allowedSplitsCount > 0 && itSplit != PartitionsToSplit.end()) {
6161
const auto partitionId = itSplit->first;

ydb/core/persqueue/partition_scale_manager.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,23 +28,23 @@ class TPartitionScaleManager {
2828
NSchemeShard::TTopicTabletInfo::TKeyRange KeyRange;
2929
};
3030

31-
private:
31+
private:
3232
struct TBalancerConfig {
3333
TBalancerConfig(
3434
NKikimrPQ::TUpdateBalancerConfig& config
3535
)
3636
: PathId(config.GetPathId())
3737
, PathVersion(config.GetVersion())
3838
, PartitionGraph(MakePartitionGraph(config))
39-
, PartitionCountLimit(config.GetTabletConfig().GetPartitionStrategy().GetMaxPartitionCount())
39+
, MaxActivePartitions(config.GetTabletConfig().GetPartitionStrategy().GetMaxPartitionCount())
4040
, MinActivePartitions(config.GetTabletConfig().GetPartitionStrategy().GetMinPartitionCount())
4141
, CurPartitions(config.PartitionsSize()) {
4242
}
4343

4444
ui64 PathId;
4545
int PathVersion;
4646
TPartitionGraph PartitionGraph;
47-
ui64 PartitionCountLimit;
47+
ui64 MaxActivePartitions;
4848
ui64 MinActivePartitions;
4949
ui64 CurPartitions;
5050
};
@@ -59,7 +59,7 @@ class TPartitionScaleManager {
5959
void UpdateBalancerConfig(NKikimrPQ::TUpdateBalancerConfig& config);
6060
void UpdateDatabasePath(const TString& dbPath);
6161
void Die(const TActorContext& ctx);
62-
62+
6363
static TString GetRangeMid(const TString& from, const TString& to);
6464

6565
private:
@@ -74,7 +74,7 @@ class TPartitionScaleManager {
7474
private:
7575
static const ui32 MIN_SCALE_REQUEST_REPEAT_SECONDS_TIMEOUT = 10;
7676
static const ui32 MAX_SCALE_REQUEST_REPEAT_SECONDS_TIMEOUT = 1000;
77-
77+
7878
const TString TopicName;
7979
TString DatabasePath = "";
8080
TActorId CurrentScaleRequest;

ydb/core/persqueue/partition_scale_request.cpp

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@ namespace NKikimr {
44
namespace NPQ {
55

66
TPartitionScaleRequest::TPartitionScaleRequest(
7-
TString topicName,
8-
TString databasePath,
9-
ui64 pathId,
10-
ui64 pathVersion,
11-
std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit> splits,
12-
const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge> merges,
7+
TString topicName,
8+
TString databasePath,
9+
ui64 pathId,
10+
ui64 pathVersion,
11+
std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit> splits,
12+
const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge> merges,
1313
NActors::TActorId parentActorId
1414
)
1515
: Topic(topicName)
@@ -19,7 +19,7 @@ TPartitionScaleRequest::TPartitionScaleRequest(
1919
, Splits(splits)
2020
, Merges(merges)
2121
, ParentActorId(parentActorId) {
22-
22+
2323
}
2424

2525
void TPartitionScaleRequest::Bootstrap(const NActors::TActorContext &ctx) {
@@ -70,14 +70,14 @@ void TPartitionScaleRequest::PassAway() {
7070

7171
void TPartitionScaleRequest::Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TActorContext &ctx) {
7272
if (ev->Get()->Status != NKikimrProto::OK) {
73-
auto scaleRequestResult = std::make_unique<TEvPartitionScaleRequestDone>(TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ProxyShardNotAvailable);//savnik: проверить, какой статус тут приходит
73+
auto scaleRequestResult = std::make_unique<TEvPartitionScaleRequestDone>(TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ProxyShardNotAvailable);
7474
Send(ParentActorId, scaleRequestResult.release());
7575
Die(ctx);
7676
}
7777
}
7878

7979
void TPartitionScaleRequest::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr&, const TActorContext &ctx) {
80-
auto scaleRequestResult = std::make_unique<TEvPartitionScaleRequestDone>(TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ProxyShardNotAvailable);//savnik: проверить, какой статус тут приходит
80+
auto scaleRequestResult = std::make_unique<TEvPartitionScaleRequestDone>(TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ProxyShardNotAvailable);
8181
Send(ParentActorId, scaleRequestResult.release());
8282
Die(ctx);
8383
}
@@ -90,11 +90,10 @@ void TPartitionScaleRequest::Handle(NSchemeShard::TEvSchemeShard::TEvNotifyTxCom
9090

9191
void TPartitionScaleRequest::Handle(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev, const NActors::TActorContext& ctx) {
9292
auto msg = ev->Get();
93-
//Cerr << "SAVDBG" << msg->Record.GetIssues()[0].Getmessage(); //savnik: log err
9493

9594
auto status = static_cast<TEvTxUserProxy::TEvProposeTransactionStatus::EStatus>(msg->Record.GetStatus());
9695
if (status != TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress) {
97-
auto scaleRequestResult = std::make_unique<TEvPartitionScaleRequestDone>(status);//savnik: проверить, какой статус тут приходит
96+
auto scaleRequestResult = std::make_unique<TEvPartitionScaleRequestDone>(status);
9897
Send(ParentActorId, scaleRequestResult.release());
9998
Die(ctx);
10099
} else {

ydb/core/persqueue/partition_write.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -541,9 +541,14 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) {
541541
NKikimrPQ::EScaleStatus TPartition::CheckScaleStatus(const TActorContext& /*ctx*/) {
542542
auto const writeSpeedUsagePercent = SplitMergeAvgWriteBytes->GetValue() * 100.0 / Config.GetPartitionStrategy().GetScaleThresholdSeconds() / TotalPartitionWriteSpeed;
543543

544-
if (writeSpeedUsagePercent >= Config.GetPartitionStrategy().GetScaleUpPartitionWriteSpeedThresholdPercent()) {
544+
auto splitEnabled = Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT
545+
|| Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE;
546+
547+
auto mergeEnabled = Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE;
548+
549+
if (splitEnabled && writeSpeedUsagePercent >= Config.GetPartitionStrategy().GetScaleUpPartitionWriteSpeedThresholdPercent()) {
545550
return NKikimrPQ::EScaleStatus::NEED_SPLIT;
546-
} else if (writeSpeedUsagePercent <= Config.GetPartitionStrategy().GetScaleDownPartitionWriteSpeedThresholdPercent()) {
551+
} else if (mergeEnabled && writeSpeedUsagePercent <= Config.GetPartitionStrategy().GetScaleDownPartitionWriteSpeedThresholdPercent()) {
547552
return NKikimrPQ::EScaleStatus::NEED_MERGE;
548553
}
549554
return NKikimrPQ::EScaleStatus::NORMAL;

ydb/core/persqueue/ut/autoscaling_ut.cpp

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,77 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
498498
status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 0).GetValueSync();
499499
UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::BAD_REQUEST, status.GetStatus(), "The consumer cannot commit an offset for inactive, read-to-the-end partitions.");
500500
}
501+
502+
Y_UNIT_TEST(CreateAlterDescribe) {
503+
auto autoscalingTestTopic = "autoscalit-topic";
504+
TTopicSdkTestSetup setup = CreateSetup();
505+
TTopicClient client = setup.MakeClient();
506+
507+
auto minParts = 5;
508+
auto maxParts = 10;
509+
auto scaleUpPercent = 80;
510+
auto scaleDownPercent = 20;
511+
auto threshold = 500;
512+
auto strategy = EAutoscalingStrategy::ScaleUp;
513+
514+
TCreateTopicSettings createSettings;
515+
createSettings
516+
.BeginConfigurePartitioningSettings()
517+
.MinActivePartitions(minParts)
518+
.MaxActivePartitions(maxParts)
519+
.BeginConfigureAutoscalingSettings()
520+
.ScaleUpThresholdPercent(scaleUpPercent)
521+
.ScaleDownThresholdPercent(scaleDownPercent)
522+
.ThresholdTime(TDuration::Seconds(threshold))
523+
.Strategy(strategy)
524+
.EndConfigureAutoscalingSettings()
525+
.EndConfigurePartitioningSettings();
526+
client.CreateTopic(autoscalingTestTopic, createSettings).Wait();
527+
528+
TDescribeTopicSettings descSettings;
529+
530+
auto describe = client.DescribeTopic(autoscalingTestTopic, descSettings).GetValueSync();
531+
UNIT_ASSERT_VALUES_EQUAL_C(describe.GetStatus(), NYdb::EStatus::SUCCESS, describe.GetIssues().ToString());
532+
533+
534+
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), minParts);
535+
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetMaxActivePartitions(), maxParts);
536+
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetStrategy(), strategy);
537+
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetScaleDownThresholdPercent(), scaleDownPercent);
538+
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetScaleUpThresholdPercent(), scaleUpPercent);
539+
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetThresholdTime().Seconds(), threshold);
540+
541+
auto alterMinParts = 10;
542+
auto alterMaxParts = 20;
543+
auto alterScaleUpPercent = 90;
544+
auto alterScaleDownPercent = 10;
545+
auto alterThreshold = 700;
546+
auto alterStrategy = EAutoscalingStrategy::ScaleUpAndDown;
547+
548+
TAlterTopicSettings alterSettings;
549+
alterSettings
550+
.BeginAlterPartitioningSettings()
551+
.MinActivePartitions(alterMinParts)
552+
.MaxActivePartitions(alterMaxParts)
553+
.BeginAlterAutoscalingSettings()
554+
.ScaleDownThresholdPercent(alterScaleDownPercent)
555+
.ScaleUpThresholdPercent(alterScaleUpPercent)
556+
.ThresholdTime(TDuration::Seconds(alterThreshold))
557+
.Strategy(alterStrategy)
558+
.EndAlterAutoscalingSettings()
559+
.EndAlterTopicPartitioningSettings();
560+
561+
client.AlterTopic(autoscalingTestTopic, alterSettings).Wait();
562+
563+
auto describeAfterAlter = client.DescribeTopic(autoscalingTestTopic).GetValueSync();
564+
565+
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), alterMinParts);
566+
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetMaxActivePartitions(), alterMaxParts);
567+
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetStrategy(), alterStrategy);
568+
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetScaleDownThresholdPercent(), alterScaleDownPercent);
569+
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetScaleUpThresholdPercent(), alterScaleUpPercent);
570+
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetThresholdTime().Seconds(), alterThreshold);
571+
}
501572
}
502573

503574
} // namespace NKikimr

ydb/core/persqueue/utils.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ ui64 TopicPartitionReserveThroughput(const NKikimrPQ::TPQTabletConfig& config) {
3333
}
3434

3535
bool SplitMergeEnabled(const NKikimrPQ::TPQTabletConfig& config) {
36-
return 0 < config.GetPartitionStrategy().GetMaxPartitionCount();
36+
return config.has_partitionstrategy() && config.partitionstrategy().has_partitionstrategytype() && config.partitionstrategy().partitionstrategytype() != ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED;
3737
}
3838

3939
static constexpr ui64 PUT_UNIT_SIZE = 40960u; // 40Kb

ydb/core/protos/pqconfig.proto

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -385,17 +385,27 @@ message TPQTabletConfig {
385385
}
386386
optional EMeteringMode MeteringMode = 34;
387387

388+
enum TPartitionStrategyType {
389+
// The autoscaling is disabled.
390+
DISABLED = 0;
391+
// The autoscaling algorithm will increase partitions count depending on the load characteristics.
392+
// The autoscaling algorithm will never decrease the number of partitions.
393+
CAN_SPLIT = 1;
394+
// The autoscaling algorithm will both increase and decrease partitions count depending on the load characteristics.
395+
CAN_SPLIT_AND_MERGE = 2;
396+
}
397+
388398
// Strategy for automatically changing the number of topic partitions depending on the load
389399
message TPartitionStrategy {
390400
// The minimum number of partitions that will be supported by the strategy
391401
optional uint32 MinPartitionCount = 1 [default = 1];
392402
// The maximum number of partitions that will be supported by the strategy. The strategy will not create partitions if the specified
393403
// amount is reached, even if the load exceeds the current capabilities of the topic.
394-
required uint32 MaxPartitionCount = 2;
395-
optional uint32 MaxActivePartitionCount = 3;
396-
optional uint32 ScaleThresholdSeconds = 4 [default = 300];
397-
optional uint32 ScaleUpPartitionWriteSpeedThresholdPercent = 5 [default = 80];
398-
optional uint32 ScaleDownPartitionWriteSpeedThresholdPercent = 6 [default = 20];
404+
required uint32 MaxPartitionCount = 2 [default = 1];
405+
optional uint32 ScaleThresholdSeconds = 3 [default = 300];
406+
optional uint32 ScaleUpPartitionWriteSpeedThresholdPercent = 4 [default = 80];
407+
optional uint32 ScaleDownPartitionWriteSpeedThresholdPercent = 5 [default = 20];
408+
optional TPartitionStrategyType PartitionStrategyType = 6;
399409
}
400410
optional TPartitionStrategy PartitionStrategy = 35;
401411

ydb/public/api/protos/ydb_topic.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -872,7 +872,7 @@ message AlterPartitioningSettings {
872872
optional int64 set_min_active_partitions = 1 [(Ydb.value) = ">= 0"];
873873
// Maximum partition count auto merge would stop working at.
874874
// Zero value means default - 1.
875-
optional int64 max_active_partitions = 3 [(Ydb.value) = ">= 0"];
875+
optional int64 set_max_active_partitions = 3 [(Ydb.value) = ">= 0"];
876876
// Limit for total partition count, including active (open for write) and read-only partitions.
877877
// Zero value means default - 100.
878878
// Use set_max_active_partitions

0 commit comments

Comments
 (0)