Skip to content

Commit b55e463

Browse files
committed
Fix and ut
1 parent 4b411a1 commit b55e463

File tree

4 files changed

+37
-5
lines changed

4 files changed

+37
-5
lines changed

ydb/core/persqueue/partition_scale_request.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ void TPartitionScaleRequest::FillProposeRequest(TEvTxUserProxy::TEvProposeTransa
4141

4242
auto applyIf = modifyScheme.AddApplyIf();
4343
applyIf->SetPathId(PathId);
44-
applyIf->SetPathVersion(PathVersion);
45-
//applyIf->SetCheckGeneralVersion(false);
44+
applyIf->SetPathVersion(PathVersion == 0 ? 1 : PathVersion);
45+
applyIf->SetCheckEntityVersion(true);
4646

4747
NKikimrSchemeOp::TPersQueueGroupDescription groupDescription;
4848
groupDescription.SetName(topicName);
@@ -94,6 +94,11 @@ void TPartitionScaleRequest::Handle(TEvTxUserProxy::TEvProposeTransactionStatus:
9494
auto status = static_cast<TEvTxUserProxy::TEvProposeTransactionStatus::EStatus>(msg->Record.GetStatus());
9595
if (status != TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress) {
9696
auto scaleRequestResult = std::make_unique<TEvPartitionScaleRequestDone>(status);
97+
TStringBuilder issues;
98+
for (auto& issue : ev->Get()->Record.GetIssues()) {
99+
issues << issue.ShortDebugString() + ", ";
100+
}
101+
Cerr << "\n SAVDGB " << issues << "\n";
97102
Send(ParentActorId, scaleRequestResult.release());
98103
Die(ctx);
99104
} else {

ydb/core/persqueue/ut/autoscaling_ut.cpp

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,33 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
569569
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetScaleUpThresholdPercent(), alterScaleUpPercent);
570570
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetThresholdTime().Seconds(), alterThreshold);
571571
}
572+
573+
Y_UNIT_TEST(PartitionSplit_AutosplitByLoad) {
574+
TTopicSdkTestSetup setup = CreateSetup();
575+
TTopicClient client = setup.MakeClient();
576+
577+
TCreateTopicSettings createSettings;
578+
createSettings
579+
.BeginConfigurePartitioningSettings()
580+
.MinActivePartitions(1)
581+
.MaxActivePartitions(100)
582+
.BeginConfigureAutoscalingSettings()
583+
.ScaleUpThresholdPercent(2)
584+
.ScaleDownThresholdPercent(1)
585+
.ThresholdTime(TDuration::Seconds(1))
586+
.Strategy(EAutoscalingStrategy::ScaleUp)
587+
.EndConfigureAutoscalingSettings()
588+
.EndConfigurePartitioningSettings();
589+
client.CreateTopic(TEST_TOPIC, createSettings).Wait();
590+
591+
auto msg = TString("a", 1_MB);
592+
auto writeSession = CreateWriteSession(client, "producer-1", 0);
593+
UNIT_ASSERT(writeSession->Write(Msg(msg, 1)));
594+
UNIT_ASSERT(writeSession->Write(Msg(msg, 2)));
595+
Sleep(TDuration::Seconds(5));
596+
auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync();
597+
UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 3);
598+
}
572599
}
573600

574601
} // namespace NKikimr

ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,9 @@ TTopicSdkTestSetup CreateSetup() {
105105
return setup;
106106
}
107107

108-
std::shared_ptr<ISimpleBlockingWriteSession> CreateWriteSession(TTopicClient& client, const TString& producer, std::optional<ui32> partition) {
108+
std::shared_ptr<ISimpleBlockingWriteSession> CreateWriteSession(TTopicClient& client, const TString& producer, std::optional<ui32> partition, TString topic) {
109109
auto writeSettings = TWriteSessionSettings()
110-
.Path(TEST_TOPIC)
110+
.Path(topic)
111111
.ProducerId(producer);
112112
if (partition) {
113113
writeSettings.PartitionId(*partition);

ydb/core/persqueue/ut/common/autoscaling_ut_common.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ TWriteMessage Msg(const TString& data, ui64 seqNo);
2727

2828
TTopicSdkTestSetup CreateSetup();
2929

30-
std::shared_ptr<ISimpleBlockingWriteSession> CreateWriteSession(TTopicClient& client, const TString& producer, std::optional<ui32> partition = std::nullopt);
30+
std::shared_ptr<ISimpleBlockingWriteSession> CreateWriteSession(TTopicClient& client, const TString& producer, std::optional<ui32> partition = std::nullopt, TString topic = TEST_TOPIC);
3131

3232
struct TTestReadSession {
3333
struct MsgInfo {

0 commit comments

Comments
 (0)