Skip to content

Commit 22f51fb

Browse files
authored
Merge 774abac into fe08785
2 parents fe08785 + 774abac commit 22f51fb

File tree

16 files changed

+718
-162
lines changed

16 files changed

+718
-162
lines changed

ydb/core/persqueue/partition_scale_manager.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ namespace NPQ {
55

66

77
TPartitionScaleManager::TPartitionScaleManager(
8-
const TString& topicName,
9-
const TString& databasePath,
8+
const TString& topicName,
9+
const TString& databasePath,
1010
NKikimrPQ::TUpdateBalancerConfig& balancerConfig
1111
)
1212
: TopicName(topicName)
@@ -34,7 +34,7 @@ void TPartitionScaleManager::TrySendScaleRequest(const TActorContext& ctx) {
3434
if (splitMergePair.first.empty() && splitMergePair.second.empty()) {
3535
return;
3636
}
37-
37+
3838
RequestInflight = true;
3939
CurrentScaleRequest = ctx.Register(new TPartitionScaleRequest(
4040
TopicName,
@@ -55,7 +55,7 @@ std::pair<std::vector<TPartitionSplit>, std::vector<TPartitionMerge>> TPartition
5555
std::vector<TPartitionSplit> splitsToApply;
5656
std::vector<TPartitionMerge> mergesToApply;
5757

58-
size_t allowedSplitsCount = BalancerConfig.PartitionCountLimit > BalancerConfig.CurPartitions ? BalancerConfig.PartitionCountLimit - BalancerConfig.CurPartitions : 0;
58+
size_t allowedSplitsCount = BalancerConfig.MaxActivePartitions > BalancerConfig.CurPartitions ? BalancerConfig.MaxActivePartitions - BalancerConfig.CurPartitions : 0;
5959
auto itSplit = PartitionsToSplit.begin();
6060
while (allowedSplitsCount > 0 && itSplit != PartitionsToSplit.end()) {
6161
const auto partitionId = itSplit->first;

ydb/core/persqueue/partition_scale_manager.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,23 +28,23 @@ class TPartitionScaleManager {
2828
NSchemeShard::TTopicTabletInfo::TKeyRange KeyRange;
2929
};
3030

31-
private:
31+
private:
3232
struct TBalancerConfig {
3333
TBalancerConfig(
3434
NKikimrPQ::TUpdateBalancerConfig& config
3535
)
3636
: PathId(config.GetPathId())
3737
, PathVersion(config.GetVersion())
3838
, PartitionGraph(MakePartitionGraph(config))
39-
, PartitionCountLimit(config.GetTabletConfig().GetPartitionStrategy().GetMaxPartitionCount())
39+
, MaxActivePartitions(config.GetTabletConfig().GetPartitionStrategy().GetMaxPartitionCount())
4040
, MinActivePartitions(config.GetTabletConfig().GetPartitionStrategy().GetMinPartitionCount())
4141
, CurPartitions(config.PartitionsSize()) {
4242
}
4343

4444
ui64 PathId;
4545
int PathVersion;
4646
TPartitionGraph PartitionGraph;
47-
ui64 PartitionCountLimit;
47+
ui64 MaxActivePartitions;
4848
ui64 MinActivePartitions;
4949
ui64 CurPartitions;
5050
};
@@ -59,7 +59,7 @@ class TPartitionScaleManager {
5959
void UpdateBalancerConfig(NKikimrPQ::TUpdateBalancerConfig& config);
6060
void UpdateDatabasePath(const TString& dbPath);
6161
void Die(const TActorContext& ctx);
62-
62+
6363
static TString GetRangeMid(const TString& from, const TString& to);
6464

6565
private:
@@ -74,7 +74,7 @@ class TPartitionScaleManager {
7474
private:
7575
static const ui32 MIN_SCALE_REQUEST_REPEAT_SECONDS_TIMEOUT = 10;
7676
static const ui32 MAX_SCALE_REQUEST_REPEAT_SECONDS_TIMEOUT = 1000;
77-
77+
7878
const TString TopicName;
7979
TString DatabasePath = "";
8080
TActorId CurrentScaleRequest;

ydb/core/persqueue/partition_scale_request.cpp

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@ namespace NKikimr {
44
namespace NPQ {
55

66
TPartitionScaleRequest::TPartitionScaleRequest(
7-
TString topicName,
8-
TString databasePath,
9-
ui64 pathId,
10-
ui64 pathVersion,
11-
std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit> splits,
12-
const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge> merges,
7+
TString topicName,
8+
TString databasePath,
9+
ui64 pathId,
10+
ui64 pathVersion,
11+
std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit> splits,
12+
const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge> merges,
1313
NActors::TActorId parentActorId
1414
)
1515
: Topic(topicName)
@@ -19,7 +19,7 @@ TPartitionScaleRequest::TPartitionScaleRequest(
1919
, Splits(splits)
2020
, Merges(merges)
2121
, ParentActorId(parentActorId) {
22-
22+
2323
}
2424

2525
void TPartitionScaleRequest::Bootstrap(const NActors::TActorContext &ctx) {
@@ -70,14 +70,14 @@ void TPartitionScaleRequest::PassAway() {
7070

7171
void TPartitionScaleRequest::Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TActorContext &ctx) {
7272
if (ev->Get()->Status != NKikimrProto::OK) {
73-
auto scaleRequestResult = std::make_unique<TEvPartitionScaleRequestDone>(TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ProxyShardNotAvailable);//savnik: проверить, какой статус тут приходит
73+
auto scaleRequestResult = std::make_unique<TEvPartitionScaleRequestDone>(TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ProxyShardNotAvailable);
7474
Send(ParentActorId, scaleRequestResult.release());
7575
Die(ctx);
7676
}
7777
}
7878

7979
void TPartitionScaleRequest::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr&, const TActorContext &ctx) {
80-
auto scaleRequestResult = std::make_unique<TEvPartitionScaleRequestDone>(TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ProxyShardNotAvailable);//savnik: проверить, какой статус тут приходит
80+
auto scaleRequestResult = std::make_unique<TEvPartitionScaleRequestDone>(TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ProxyShardNotAvailable);
8181
Send(ParentActorId, scaleRequestResult.release());
8282
Die(ctx);
8383
}
@@ -90,11 +90,10 @@ void TPartitionScaleRequest::Handle(NSchemeShard::TEvSchemeShard::TEvNotifyTxCom
9090

9191
void TPartitionScaleRequest::Handle(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev, const NActors::TActorContext& ctx) {
9292
auto msg = ev->Get();
93-
//Cerr << "SAVDBG" << msg->Record.GetIssues()[0].Getmessage(); //savnik: log err
9493

9594
auto status = static_cast<TEvTxUserProxy::TEvProposeTransactionStatus::EStatus>(msg->Record.GetStatus());
9695
if (status != TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress) {
97-
auto scaleRequestResult = std::make_unique<TEvPartitionScaleRequestDone>(status);//savnik: проверить, какой статус тут приходит
96+
auto scaleRequestResult = std::make_unique<TEvPartitionScaleRequestDone>(status);
9897
Send(ParentActorId, scaleRequestResult.release());
9998
Die(ctx);
10099
} else {

ydb/core/persqueue/partition_write.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -541,9 +541,14 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) {
541541
NKikimrPQ::EScaleStatus TPartition::CheckScaleStatus(const TActorContext& /*ctx*/) {
542542
auto const writeSpeedUsagePercent = SplitMergeAvgWriteBytes->GetValue() * 100.0 / Config.GetPartitionStrategy().GetScaleThresholdSeconds() / TotalPartitionWriteSpeed;
543543

544-
if (writeSpeedUsagePercent >= Config.GetPartitionStrategy().GetScaleUpPartitionWriteSpeedThresholdPercent()) {
544+
auto splitEnabled = Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT
545+
|| Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE;
546+
547+
auto mergeEnabled = Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE;
548+
549+
if (splitEnabled && writeSpeedUsagePercent >= Config.GetPartitionStrategy().GetScaleUpPartitionWriteSpeedThresholdPercent()) {
545550
return NKikimrPQ::EScaleStatus::NEED_SPLIT;
546-
} else if (writeSpeedUsagePercent <= Config.GetPartitionStrategy().GetScaleDownPartitionWriteSpeedThresholdPercent()) {
551+
} else if (mergeEnabled && writeSpeedUsagePercent <= Config.GetPartitionStrategy().GetScaleDownPartitionWriteSpeedThresholdPercent()) {
547552
return NKikimrPQ::EScaleStatus::NEED_MERGE;
548553
}
549554
return NKikimrPQ::EScaleStatus::NORMAL;

ydb/core/persqueue/ut/autoscaling_ut.cpp

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,77 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
498498
status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 0).GetValueSync();
499499
UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::BAD_REQUEST, status.GetStatus(), "The consumer cannot commit an offset for inactive, read-to-the-end partitions.");
500500
}
501+
502+
Y_UNIT_TEST(CreateAlterDescribe) {
503+
auto autoscalingTestTopic = "autoscalit-topic";
504+
TTopicSdkTestSetup setup = CreateSetup();
505+
TTopicClient client = setup.MakeClient();
506+
507+
auto minParts = 5;
508+
auto maxParts = 10;
509+
auto scaleUpPercent = 80;
510+
auto scaleDownPercent = 20;
511+
auto threshold = 500;
512+
auto strategy = EAutoscalingStrategy::ScaleUp;
513+
514+
TCreateTopicSettings createSettings;
515+
createSettings
516+
.BeginConfigurePartitioningSettings()
517+
.MinActivePartitions(minParts)
518+
.MaxActivePartitions(maxParts)
519+
.BeginConfigureAutoscalingSettings()
520+
.ScaleUpThresholdPercent(scaleUpPercent)
521+
.ScaleDownThresholdPercent(scaleDownPercent)
522+
.ThresholdTime(TDuration::Seconds(threshold))
523+
.Strategy(strategy)
524+
.EndConfigureAutoscalingSettings()
525+
.EndConfigurePartitioningSettings();
526+
client.CreateTopic(autoscalingTestTopic, createSettings).Wait();
527+
528+
TDescribeTopicSettings descSettings;
529+
530+
auto describe = client.DescribeTopic(autoscalingTestTopic, descSettings).GetValueSync();
531+
UNIT_ASSERT_VALUES_EQUAL_C(describe.GetStatus(), NYdb::EStatus::SUCCESS, describe.GetIssues().ToString());
532+
533+
534+
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), minParts);
535+
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetMaxActivePartitions(), maxParts);
536+
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetStrategy(), strategy);
537+
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetScaleDownThresholdPercent(), scaleDownPercent);
538+
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetScaleUpThresholdPercent(), scaleUpPercent);
539+
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetThresholdTime().Seconds(), threshold);
540+
541+
auto alterMinParts = 10;
542+
auto alterMaxParts = 20;
543+
auto alterScaleUpPercent = 90;
544+
auto alterScaleDownPercent = 10;
545+
auto alterThreshold = 700;
546+
auto alterStrategy = EAutoscalingStrategy::ScaleUpAndDown;
547+
548+
TAlterTopicSettings alterSettings;
549+
alterSettings
550+
.BeginAlterPartitioningSettings()
551+
.MinActivePartitions(alterMinParts)
552+
.MaxActivePartitions(alterMaxParts)
553+
.BeginAlterAutoscalingSettings()
554+
.ScaleDownThresholdPercent(alterScaleDownPercent)
555+
.ScaleUpThresholdPercent(alterScaleUpPercent)
556+
.ThresholdTime(TDuration::Seconds(alterThreshold))
557+
.Strategy(alterStrategy)
558+
.EndAlterAutoscalingSettings()
559+
.EndAlterTopicPartitioningSettings();
560+
561+
client.AlterTopic(autoscalingTestTopic, alterSettings).Wait();
562+
563+
auto describeAfterAlter = client.DescribeTopic(autoscalingTestTopic).GetValueSync();
564+
565+
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), alterMinParts);
566+
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetMaxActivePartitions(), alterMaxParts);
567+
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetStrategy(), alterStrategy);
568+
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetScaleDownThresholdPercent(), alterScaleDownPercent);
569+
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetScaleUpThresholdPercent(), alterScaleUpPercent);
570+
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetThresholdTime().Seconds(), alterThreshold);
571+
}
501572
}
502573

503574
} // namespace NKikimr

ydb/core/persqueue/ut/partition_chooser_ut.cpp

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ void AddPartition(NKikimrSchemeOp::TPersQueueGroupDescription& conf,
2020
ui32 id,
2121
const std::optional<TString>&& boundaryFrom = std::nullopt,
2222
const std::optional<TString>&& boundaryTo = std::nullopt,
23-
std::vector<ui32> children = {}) {
23+
std::vector<ui32> children = {}) {
2424
auto* p = conf.AddPartitions();
2525
p->SetPartitionId(id);
2626
p->SetTabletId(1000 + id);
@@ -36,15 +36,21 @@ void AddPartition(NKikimrSchemeOp::TPersQueueGroupDescription& conf,
3636
}
3737
}
3838

39-
NKikimrSchemeOp::TPersQueueGroupDescription CreateConfig0(bool SplitMergeEnabled) {
39+
NKikimrSchemeOp::TPersQueueGroupDescription CreateConfig0(bool splitMergeEnabled) {
4040
NKikimrSchemeOp::TPersQueueGroupDescription result;
4141
NKikimrPQ::TPQTabletConfig* config = result.MutablePQTabletConfig();
4242

4343
result.SetBalancerTabletID(999);
4444

4545
auto* partitionStrategy = config->MutablePartitionStrategy();
4646
partitionStrategy->SetMinPartitionCount(3);
47-
partitionStrategy->SetMaxPartitionCount(SplitMergeEnabled ? 10 : 0);
47+
partitionStrategy->SetMaxPartitionCount(10);
48+
if (splitMergeEnabled) {
49+
partitionStrategy->SetPartitionStrategyType(::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT);
50+
} else {
51+
partitionStrategy->SetPartitionStrategyType(::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED);
52+
}
53+
4854

4955
config->SetTopicName("/Root/topic-1");
5056
config->SetTopicPath("/Root");
@@ -181,7 +187,7 @@ struct TWriteSessionMock: public NActors::TActorBootstrapped<TWriteSessionMock>
181187
hFunc(TEvPartitionChooser::TEvChooseResult, Handle);
182188
hFunc(TEvPartitionChooser::TEvChooseError, Handle);
183189
}
184-
}
190+
}
185191
};
186192

187193
NPersQueue::TTopicConverterPtr CreateTopicConverter() {
@@ -197,7 +203,7 @@ TWriteSessionMock* ChoosePartition(NPersQueue::TTestServer& server,
197203
TWriteSessionMock* mock = new TWriteSessionMock();
198204

199205
NActors::TActorId parentId = server.GetRuntime()->Register(mock);
200-
server.GetRuntime()->Register(NKikimr::NPQ::CreatePartitionChooserActorM(parentId,
206+
server.GetRuntime()->Register(NKikimr::NPQ::CreatePartitionChooserActorM(parentId,
201207
config,
202208
fullConverter,
203209
sourceId,
@@ -265,7 +271,7 @@ void WriteToTable(NPersQueue::TTestServer& server, const TString& sourceId, ui32
265271
if (pqConfig.GetTopicsAreFirstClassCitizen()) {
266272
query = TStringBuilder() << "--!syntax_v1\n"
267273
"UPSERT INTO `//Root/.metadata/TopicPartitionsMapping` (Hash, Topic, ProducerId, CreateTime, AccessTime, Partition, SeqNo) VALUES "
268-
"(" << encoded.KeysHash << ", \"" << fullConverter->GetClientsideName() << "\", \""
274+
"(" << encoded.KeysHash << ", \"" << fullConverter->GetClientsideName() << "\", \""
269275
<< encoded.EscapedSourceId << "\", "<< TInstant::Now().MilliSeconds() << ", "
270276
<< TInstant::Now().MilliSeconds() << ", " << partitionId << ", " << seqNo << ");";
271277
} else {
@@ -293,14 +299,14 @@ TMaybe<NYdb::TResultSet> SelectTable(NPersQueue::TTestServer& server, const TStr
293299
query = TStringBuilder() << "--!syntax_v1\n"
294300
"SELECT Partition, SeqNo "
295301
"FROM `//Root/.metadata/TopicPartitionsMapping` "
296-
"WHERE Hash = " << encoded.KeysHash <<
302+
"WHERE Hash = " << encoded.KeysHash <<
297303
" AND Topic = \"" << fullConverter->GetClientsideName() << "\"" <<
298304
" AND ProducerId = \"" << encoded.EscapedSourceId << "\"";
299305
} else {
300306
query = TStringBuilder() << "--!syntax_v1\n"
301307
"SELECT Partition, SeqNo "
302308
"FROM `/Root/PQ/SourceIdMeta2` "
303-
"WHERE Hash = " << encoded.KeysHash <<
309+
"WHERE Hash = " << encoded.KeysHash <<
304310
" AND Topic = \"" << fullConverter->GetClientsideName() << "\"" <<
305311
" AND SourceId = \"" << encoded.EscapedSourceId << "\"";
306312
}
@@ -320,7 +326,7 @@ void AssertTable(NPersQueue::TTestServer& server, const TString& sourceId, ui32
320326

321327
UNIT_ASSERT(result);
322328
UNIT_ASSERT_VALUES_EQUAL_C(result->RowsCount(), 1, "Table must contains SourceId='" << sourceId << "'");
323-
329+
324330
NYdb::TResultSetParser parser(*result);
325331
UNIT_ASSERT(parser.TryNextRow());
326332
NYdb::TValueParser p(parser.GetValue(0));
@@ -407,7 +413,7 @@ Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_NewSourceId_Test) {
407413
}
408414

409415
Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_SourceId_PartitionActive_BoundaryTrue_Test) {
410-
// We check the partition selection scenario when we have already written with the
416+
// We check the partition selection scenario when we have already written with the
411417
// specified SourceID, the partition to which we wrote is active, and the partition
412418
// boundaries coincide with the distribution.
413419
NPersQueue::TTestServer server = CreateServer();
@@ -427,7 +433,7 @@ Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_SourceId_PartitionActive_Bo
427433
}
428434

429435
Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_SourceId_PartitionActive_BoundaryFalse_Test) {
430-
// We check the partition selection scenario when we have already written with the
436+
// We check the partition selection scenario when we have already written with the
431437
// specified SourceID, the partition to which we wrote is active, and the partition
432438
// boundaries is not coincide with the distribution.
433439
NPersQueue::TTestServer server = CreateServer();

ydb/core/persqueue/utils.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ ui64 TopicPartitionReserveThroughput(const NKikimrPQ::TPQTabletConfig& config) {
3333
}
3434

3535
bool SplitMergeEnabled(const NKikimrPQ::TPQTabletConfig& config) {
36-
return 0 < config.GetPartitionStrategy().GetMaxPartitionCount();
36+
return config.has_partitionstrategy() && config.partitionstrategy().has_partitionstrategytype() && config.partitionstrategy().partitionstrategytype() != ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED;
3737
}
3838

3939
static constexpr ui64 PUT_UNIT_SIZE = 40960u; // 40Kb

0 commit comments

Comments
 (0)