Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions ydb/core/persqueue/partition_scale_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ namespace NPQ {


TPartitionScaleManager::TPartitionScaleManager(
const TString& topicName,
const TString& databasePath,
const TString& topicName,
const TString& databasePath,
NKikimrPQ::TUpdateBalancerConfig& balancerConfig
)
: TopicName(topicName)
Expand Down Expand Up @@ -34,7 +34,7 @@ void TPartitionScaleManager::TrySendScaleRequest(const TActorContext& ctx) {
if (splitMergePair.first.empty() && splitMergePair.second.empty()) {
return;
}

RequestInflight = true;
CurrentScaleRequest = ctx.Register(new TPartitionScaleRequest(
TopicName,
Expand All @@ -55,7 +55,7 @@ std::pair<std::vector<TPartitionSplit>, std::vector<TPartitionMerge>> TPartition
std::vector<TPartitionSplit> splitsToApply;
std::vector<TPartitionMerge> mergesToApply;

size_t allowedSplitsCount = BalancerConfig.PartitionCountLimit > BalancerConfig.CurPartitions ? BalancerConfig.PartitionCountLimit - BalancerConfig.CurPartitions : 0;
size_t allowedSplitsCount = BalancerConfig.MaxActivePartitions > BalancerConfig.CurPartitions ? BalancerConfig.MaxActivePartitions - BalancerConfig.CurPartitions : 0;
auto itSplit = PartitionsToSplit.begin();
while (allowedSplitsCount > 0 && itSplit != PartitionsToSplit.end()) {
const auto partitionId = itSplit->first;
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/persqueue/partition_scale_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,23 @@ class TPartitionScaleManager {
NSchemeShard::TTopicTabletInfo::TKeyRange KeyRange;
};

private:
private:
struct TBalancerConfig {
TBalancerConfig(
NKikimrPQ::TUpdateBalancerConfig& config
)
: PathId(config.GetPathId())
, PathVersion(config.GetVersion())
, PartitionGraph(MakePartitionGraph(config))
, PartitionCountLimit(config.GetTabletConfig().GetPartitionStrategy().GetMaxPartitionCount())
, MaxActivePartitions(config.GetTabletConfig().GetPartitionStrategy().GetMaxPartitionCount())
, MinActivePartitions(config.GetTabletConfig().GetPartitionStrategy().GetMinPartitionCount())
, CurPartitions(config.PartitionsSize()) {
}

ui64 PathId;
int PathVersion;
TPartitionGraph PartitionGraph;
ui64 PartitionCountLimit;
ui64 MaxActivePartitions;
ui64 MinActivePartitions;
ui64 CurPartitions;
};
Expand All @@ -59,7 +59,7 @@ class TPartitionScaleManager {
void UpdateBalancerConfig(NKikimrPQ::TUpdateBalancerConfig& config);
void UpdateDatabasePath(const TString& dbPath);
void Die(const TActorContext& ctx);

static TString GetRangeMid(const TString& from, const TString& to);

private:
Expand All @@ -74,7 +74,7 @@ class TPartitionScaleManager {
private:
static const ui32 MIN_SCALE_REQUEST_REPEAT_SECONDS_TIMEOUT = 10;
static const ui32 MAX_SCALE_REQUEST_REPEAT_SECONDS_TIMEOUT = 1000;

const TString TopicName;
TString DatabasePath = "";
TActorId CurrentScaleRequest;
Expand Down
30 changes: 17 additions & 13 deletions ydb/core/persqueue/partition_scale_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ namespace NKikimr {
namespace NPQ {

TPartitionScaleRequest::TPartitionScaleRequest(
TString topicName,
TString databasePath,
ui64 pathId,
ui64 pathVersion,
std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit> splits,
const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge> merges,
TString topicName,
TString databasePath,
ui64 pathId,
ui64 pathVersion,
std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit> splits,
const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge> merges,
NActors::TActorId parentActorId
)
: Topic(topicName)
Expand All @@ -19,7 +19,7 @@ TPartitionScaleRequest::TPartitionScaleRequest(
, Splits(splits)
, Merges(merges)
, ParentActorId(parentActorId) {

}

void TPartitionScaleRequest::Bootstrap(const NActors::TActorContext &ctx) {
Expand All @@ -41,8 +41,8 @@ void TPartitionScaleRequest::FillProposeRequest(TEvTxUserProxy::TEvProposeTransa

auto applyIf = modifyScheme.AddApplyIf();
applyIf->SetPathId(PathId);
applyIf->SetPathVersion(PathVersion);
//applyIf->SetCheckGeneralVersion(false);
applyIf->SetPathVersion(PathVersion == 0 ? 1 : PathVersion);
applyIf->SetCheckEntityVersion(true);

NKikimrSchemeOp::TPersQueueGroupDescription groupDescription;
groupDescription.SetName(topicName);
Expand Down Expand Up @@ -70,14 +70,14 @@ void TPartitionScaleRequest::PassAway() {

void TPartitionScaleRequest::Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TActorContext &ctx) {
if (ev->Get()->Status != NKikimrProto::OK) {
auto scaleRequestResult = std::make_unique<TEvPartitionScaleRequestDone>(TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ProxyShardNotAvailable);//savnik: проверить, какой статус тут приходит
auto scaleRequestResult = std::make_unique<TEvPartitionScaleRequestDone>(TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ProxyShardNotAvailable);
Send(ParentActorId, scaleRequestResult.release());
Die(ctx);
}
}

void TPartitionScaleRequest::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr&, const TActorContext &ctx) {
auto scaleRequestResult = std::make_unique<TEvPartitionScaleRequestDone>(TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ProxyShardNotAvailable);//savnik: проверить, какой статус тут приходит
auto scaleRequestResult = std::make_unique<TEvPartitionScaleRequestDone>(TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ProxyShardNotAvailable);
Send(ParentActorId, scaleRequestResult.release());
Die(ctx);
}
Expand All @@ -90,11 +90,15 @@ void TPartitionScaleRequest::Handle(NSchemeShard::TEvSchemeShard::TEvNotifyTxCom

void TPartitionScaleRequest::Handle(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev, const NActors::TActorContext& ctx) {
auto msg = ev->Get();
//Cerr << "SAVDBG" << msg->Record.GetIssues()[0].Getmessage(); //savnik: log err

auto status = static_cast<TEvTxUserProxy::TEvProposeTransactionStatus::EStatus>(msg->Record.GetStatus());
if (status != TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress) {
auto scaleRequestResult = std::make_unique<TEvPartitionScaleRequestDone>(status);//savnik: проверить, какой статус тут приходит
auto scaleRequestResult = std::make_unique<TEvPartitionScaleRequestDone>(status);
TStringBuilder issues;
for (auto& issue : ev->Get()->Record.GetIssues()) {
issues << issue.ShortDebugString() + ", ";
}
Cerr << "\n SAVDGB " << issues << "\n";
Send(ParentActorId, scaleRequestResult.release());
Die(ctx);
} else {
Expand Down
9 changes: 7 additions & 2 deletions ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -541,9 +541,14 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) {
NKikimrPQ::EScaleStatus TPartition::CheckScaleStatus(const TActorContext& /*ctx*/) {
auto const writeSpeedUsagePercent = SplitMergeAvgWriteBytes->GetValue() * 100.0 / Config.GetPartitionStrategy().GetScaleThresholdSeconds() / TotalPartitionWriteSpeed;

if (writeSpeedUsagePercent >= Config.GetPartitionStrategy().GetScaleUpPartitionWriteSpeedThresholdPercent()) {
auto splitEnabled = Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT
|| Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE;

auto mergeEnabled = Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE;

if (splitEnabled && writeSpeedUsagePercent >= Config.GetPartitionStrategy().GetScaleUpPartitionWriteSpeedThresholdPercent()) {
return NKikimrPQ::EScaleStatus::NEED_SPLIT;
} else if (writeSpeedUsagePercent <= Config.GetPartitionStrategy().GetScaleDownPartitionWriteSpeedThresholdPercent()) {
} else if (mergeEnabled && writeSpeedUsagePercent <= Config.GetPartitionStrategy().GetScaleDownPartitionWriteSpeedThresholdPercent()) {
return NKikimrPQ::EScaleStatus::NEED_MERGE;
}
return NKikimrPQ::EScaleStatus::NORMAL;
Expand Down
98 changes: 98 additions & 0 deletions ydb/core/persqueue/ut/autoscaling_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,104 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 0).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::BAD_REQUEST, status.GetStatus(), "The consumer cannot commit an offset for inactive, read-to-the-end partitions.");
}

Y_UNIT_TEST(CreateAlterDescribe) {
auto autoscalingTestTopic = "autoscalit-topic";
TTopicSdkTestSetup setup = CreateSetup();
TTopicClient client = setup.MakeClient();

auto minParts = 5;
auto maxParts = 10;
auto scaleUpPercent = 80;
auto scaleDownPercent = 20;
auto threshold = 500;
auto strategy = EAutoscalingStrategy::ScaleUp;

TCreateTopicSettings createSettings;
createSettings
.BeginConfigurePartitioningSettings()
.MinActivePartitions(minParts)
.MaxActivePartitions(maxParts)
.BeginConfigureAutoscalingSettings()
.ScaleUpThresholdPercent(scaleUpPercent)
.ScaleDownThresholdPercent(scaleDownPercent)
.ThresholdTime(TDuration::Seconds(threshold))
.Strategy(strategy)
.EndConfigureAutoscalingSettings()
.EndConfigurePartitioningSettings();
client.CreateTopic(autoscalingTestTopic, createSettings).Wait();

TDescribeTopicSettings descSettings;

auto describe = client.DescribeTopic(autoscalingTestTopic, descSettings).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(describe.GetStatus(), NYdb::EStatus::SUCCESS, describe.GetIssues().ToString());


UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), minParts);
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetMaxActivePartitions(), maxParts);
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetStrategy(), strategy);
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetScaleDownThresholdPercent(), scaleDownPercent);
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetScaleUpThresholdPercent(), scaleUpPercent);
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetThresholdTime().Seconds(), threshold);

auto alterMinParts = 10;
auto alterMaxParts = 20;
auto alterScaleUpPercent = 90;
auto alterScaleDownPercent = 10;
auto alterThreshold = 700;
auto alterStrategy = EAutoscalingStrategy::ScaleUpAndDown;

TAlterTopicSettings alterSettings;
alterSettings
.BeginAlterPartitioningSettings()
.MinActivePartitions(alterMinParts)
.MaxActivePartitions(alterMaxParts)
.BeginAlterAutoscalingSettings()
.ScaleDownThresholdPercent(alterScaleDownPercent)
.ScaleUpThresholdPercent(alterScaleUpPercent)
.ThresholdTime(TDuration::Seconds(alterThreshold))
.Strategy(alterStrategy)
.EndAlterAutoscalingSettings()
.EndAlterTopicPartitioningSettings();

client.AlterTopic(autoscalingTestTopic, alterSettings).Wait();

auto describeAfterAlter = client.DescribeTopic(autoscalingTestTopic).GetValueSync();

UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), alterMinParts);
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetMaxActivePartitions(), alterMaxParts);
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetStrategy(), alterStrategy);
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetScaleDownThresholdPercent(), alterScaleDownPercent);
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetScaleUpThresholdPercent(), alterScaleUpPercent);
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetThresholdTime().Seconds(), alterThreshold);
}

Y_UNIT_TEST(PartitionSplit_AutosplitByLoad) {
TTopicSdkTestSetup setup = CreateSetup();
TTopicClient client = setup.MakeClient();

TCreateTopicSettings createSettings;
createSettings
.BeginConfigurePartitioningSettings()
.MinActivePartitions(1)
.MaxActivePartitions(100)
.BeginConfigureAutoscalingSettings()
.ScaleUpThresholdPercent(2)
.ScaleDownThresholdPercent(1)
.ThresholdTime(TDuration::Seconds(1))
.Strategy(EAutoscalingStrategy::ScaleUp)
.EndConfigureAutoscalingSettings()
.EndConfigurePartitioningSettings();
client.CreateTopic(TEST_TOPIC, createSettings).Wait();

auto msg = TString("a", 1_MB);
auto writeSession = CreateWriteSession(client, "producer-1", 0);
UNIT_ASSERT(writeSession->Write(Msg(msg, 1)));
UNIT_ASSERT(writeSession->Write(Msg(msg, 2)));
Sleep(TDuration::Seconds(5));
auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync();
UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 3);
}
}

} // namespace NKikimr
4 changes: 2 additions & 2 deletions ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ TTopicSdkTestSetup CreateSetup() {
return setup;
}

std::shared_ptr<ISimpleBlockingWriteSession> CreateWriteSession(TTopicClient& client, const TString& producer, std::optional<ui32> partition) {
std::shared_ptr<ISimpleBlockingWriteSession> CreateWriteSession(TTopicClient& client, const TString& producer, std::optional<ui32> partition, TString topic) {
auto writeSettings = TWriteSessionSettings()
.Path(TEST_TOPIC)
.Path(topic)
.ProducerId(producer);
if (partition) {
writeSettings.PartitionId(*partition);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/ut/common/autoscaling_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ TWriteMessage Msg(const TString& data, ui64 seqNo);

TTopicSdkTestSetup CreateSetup();

std::shared_ptr<ISimpleBlockingWriteSession> CreateWriteSession(TTopicClient& client, const TString& producer, std::optional<ui32> partition = std::nullopt);
std::shared_ptr<ISimpleBlockingWriteSession> CreateWriteSession(TTopicClient& client, const TString& producer, std::optional<ui32> partition = std::nullopt, TString topic = TEST_TOPIC);

struct TTestReadSession {
struct MsgInfo {
Expand Down
Loading