Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions ydb/core/persqueue/partition_scale_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ TPartitionScaleManager::TPartitionScaleManager(

void TPartitionScaleManager::HandleScaleStatusChange(const TPartitionInfo& partition, NKikimrPQ::EScaleStatus scaleStatus, const TActorContext& ctx) {
if (scaleStatus == NKikimrPQ::EScaleStatus::NEED_SPLIT) {
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "TPartitionScaleManager::HandleScaleStatusChange "
<< "need to split partition " << partition.Id);
PartitionsToSplit.emplace(partition.Id, partition);
TrySendScaleRequest(ctx);
} else {
Expand All @@ -30,12 +32,14 @@ void TPartitionScaleManager::TrySendScaleRequest(const TActorContext& ctx) {
return;
}

auto splitMergePair = BuildScaleRequest();
auto splitMergePair = BuildScaleRequest(ctx);
if (splitMergePair.first.empty() && splitMergePair.second.empty()) {
return;
}

RequestInflight = true;
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "TPartitionScaleManager::HandleScaleStatusChange "
<< "send split request");
CurrentScaleRequest = ctx.Register(new TPartitionScaleRequest(
TopicName,
DatabasePath,
Expand All @@ -51,7 +55,7 @@ void TPartitionScaleManager::TrySendScaleRequest(const TActorContext& ctx) {
using TPartitionSplit = NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit;
using TPartitionMerge = NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge;

std::pair<std::vector<TPartitionSplit>, std::vector<TPartitionMerge>> TPartitionScaleManager::BuildScaleRequest() {
std::pair<std::vector<TPartitionSplit>, std::vector<TPartitionMerge>> TPartitionScaleManager::BuildScaleRequest(const TActorContext& ctx) {
std::vector<TPartitionSplit> splitsToApply;
std::vector<TPartitionMerge> mergesToApply;

Expand All @@ -62,11 +66,15 @@ std::pair<std::vector<TPartitionSplit>, std::vector<TPartitionMerge>> TPartition
const auto& partition = itSplit->second;

if (BalancerConfig.PartitionGraph.GetPartition(partitionId)->Children.empty()) {
auto mid = GetRangeMid(partition.KeyRange.FromBound ? *partition.KeyRange.FromBound : "", partition.KeyRange.ToBound ?*partition.KeyRange.ToBound : "");
auto from = partition.KeyRange.FromBound ? *partition.KeyRange.FromBound : "";
auto to = partition.KeyRange.ToBound ?*partition.KeyRange.ToBound : "";
auto mid = GetRangeMid(from, to);
if (mid.empty()) {
itSplit = PartitionsToSplit.erase(itSplit);
LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "TPartitionScaleManager::BuildScaleRequest wrong partition key range. Can't get mid. Topic# " << TopicName << ", partition# " << partitionId);
continue;
}
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "TPartitionScaleManager::BuildScaleRequest partition split ranges. From# '" << from << "'. To# '" << to << "'. Mid# '" << mid <<"'. Topic# " << TopicName << ". Partition# " << partitionId);

TPartitionSplit split;
split.set_partition(partition.Id);
Expand All @@ -87,6 +95,7 @@ void TPartitionScaleManager::HandleScaleRequestResult(TPartitionScaleRequest::TE
RequestInflight = false;
LastResponseTime = ctx.Now();
auto result = ev->Get();
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "TPartitionScaleManager::HandleScaleRequestResult scale request result: " << result->Status << ". Topic# " << TopicName);
if (result->Status == TEvTxUserProxy::TResultStatus::ExecComplete) {
TrySendScaleRequest(ctx);
} else {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/partition_scale_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class TPartitionScaleManager {
using TPartitionSplit = NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit;
using TPartitionMerge = NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge;

std::pair<std::vector<TPartitionSplit>, std::vector<TPartitionMerge>> BuildScaleRequest();
std::pair<std::vector<TPartitionSplit>, std::vector<TPartitionMerge>> BuildScaleRequest(const TActorContext& ctx);

public:
static const ui64 TRY_SCALE_REQUEST_WAKE_UP_TAG = 10;
Expand Down
12 changes: 8 additions & 4 deletions ydb/core/persqueue/partition_scale_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ void TPartitionScaleRequest::Bootstrap(const NActors::TActorContext &ctx) {
void TPartitionScaleRequest::SendProposeRequest(const NActors::TActorContext &ctx) {
auto proposal = std::make_unique<TEvTxUserProxy::TEvProposeTransaction>();
proposal->Record.SetDatabaseName(CanonizePath(DatabasePath));
FillProposeRequest(*proposal, DatabasePath, Topic);
FillProposeRequest(*proposal, DatabasePath, Topic, ctx);
ctx.Send(MakeTxProxyID(), proposal.release());
}

void TPartitionScaleRequest::FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const TString& workingDir, const TString& topicName) {
void TPartitionScaleRequest::FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const TString& workingDir, const TString& topicName, const NActors::TActorContext &ctx) {
auto& modifyScheme = *proposal.Record.MutableTransaction()->MutableModifyScheme();
modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterPersQueueGroup);
modifyScheme.SetWorkingDir(workingDir);
Expand All @@ -46,11 +46,14 @@ void TPartitionScaleRequest::FillProposeRequest(TEvTxUserProxy::TEvProposeTransa

NKikimrSchemeOp::TPersQueueGroupDescription groupDescription;
groupDescription.SetName(topicName);

TStringBuilder logMessage;
logMessage << "TPartitionScaleRequest::FillProposeRequest trying to scale partitions. Spilts: ";
for(const auto& split: Splits) {
auto* newSplit = groupDescription.AddSplit();
logMessage << "partition: " << split.GetPartition() << " boundary: '" << split.GetSplitBoundary() << "' ";
*newSplit = split;
}
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, logMessage);

for(const auto& merge: Merges) {
auto* newMerge = groupDescription.AddMerge();
Expand Down Expand Up @@ -98,7 +101,8 @@ void TPartitionScaleRequest::Handle(TEvTxUserProxy::TEvProposeTransactionStatus:
for (auto& issue : ev->Get()->Record.GetIssues()) {
issues << issue.ShortDebugString() + ", ";
}
Cerr << "\n SAVDGB " << issues << "\n";
LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "TPartitionScaleRequest "
<< "SchemaShard error when trying to execute a split request: " << issues);
Send(ParentActorId, scaleRequestResult.release());
Die(ctx);
} else {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/persqueue/partition_scale_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

namespace NKikimr {
namespace NPQ {

class TPartitionScaleRequest: public NActors::TActorBootstrapped<TPartitionScaleRequest> {
using TBase = NActors::TActorBootstrapped<TPartitionScaleRequest>;

Expand Down Expand Up @@ -48,7 +48,7 @@ class TPartitionScaleRequest: public NActors::TActorBootstrapped<TPartitionScale
}
std::pair<TString, TString> SplitPath(const TString& path);
void SendProposeRequest(const NActors::TActorContext &ctx);
void FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const TString& workingDir, const TString& topicName);
void FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const TString& workingDir, const TString& topicName, const NActors::TActorContext &ctx);

private:
const TString Topic;
Expand Down
20 changes: 17 additions & 3 deletions ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -541,17 +541,31 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) {
ProcessTimestampsForNewData(prevEndOffset, ctx);
}

NKikimrPQ::EScaleStatus TPartition::CheckScaleStatus(const TActorContext& /*ctx*/) {
NKikimrPQ::EScaleStatus TPartition::CheckScaleStatus(const TActorContext& ctx) {
auto const writeSpeedUsagePercent = SplitMergeAvgWriteBytes->GetValue() * 100.0 / Config.GetPartitionStrategy().GetScaleThresholdSeconds() / TotalPartitionWriteSpeed;

LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
"TPartition::CheckScaleStatus writeSpeedUsagePercent# " << writeSpeedUsagePercent << " Topic: \"" << TopicName() << "\"." <<
" Partition: " << Partition
);
auto splitEnabled = Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT
|| Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE;

auto mergeEnabled = Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE;

if (splitEnabled && writeSpeedUsagePercent >= Config.GetPartitionStrategy().GetScaleUpPartitionWriteSpeedThresholdPercent()) {
LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
"TPartition::CheckScaleStatus NEED_SPLIT" << " Topic: \"" << TopicName() << "\"." <<
" Partition: " << Partition
);
return NKikimrPQ::EScaleStatus::NEED_SPLIT;
} else if (mergeEnabled && writeSpeedUsagePercent <= Config.GetPartitionStrategy().GetScaleDownPartitionWriteSpeedThresholdPercent()) {
LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
"TPartition::CheckScaleStatus NEED_MERGE" << " Topic: \"" << TopicName() << "\"." <<
" Partition: " << Partition
);
return NKikimrPQ::EScaleStatus::NEED_MERGE;
}
return NKikimrPQ::EScaleStatus::NORMAL;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/read_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr
Y_ABORT_UNLESS(p.GetPartition() >= prevNextPartitionId && p.GetPartition() < NextPartitionId || NextPartitionId == 0);

partitionsInfo[p.GetPartition()] = {p.GetTabletId(), {}};
if (SplitMergeEnabled(TabletConfig)) {
if (SplitMergeEnabled(TabletConfig) && p.HasKeyRange()) {
partitionsInfo[p.GetPartition()].KeyRange.DeserializeFromProto(p.GetKeyRange());
}

Expand Down
1 change: 0 additions & 1 deletion ydb/core/persqueue/ut/autoscaling_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,6 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
a = "a";
b = {};
res = NKikimr::NPQ::TPartitionScaleManager::GetRangeMid(a,b);
Cerr << "\n SAVDBG " << res << "\n";
UNIT_ASSERT(a < res);
UNIT_ASSERT(b != res);

Expand Down