Skip to content

Auto split v2 #3769

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions ydb/core/persqueue/events/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ struct TEvPQ {
EvReadingPartitionStatusRequest,
EvProcessChangeOwnerRequests,
EvWakeupReleasePartition,
EvPartitionScaleStatusChanged,
EvPartitionScaleRequestDone,
EvEnd
};

Expand Down Expand Up @@ -1112,6 +1114,15 @@ struct TEvPQ {
ui32 PartitionId;
ui64 Cookie;
};

struct TEvPartitionScaleStatusChanged : public TEventPB<TEvPartitionScaleStatusChanged, NKikimrPQ::TEvPartitionScaleStatusChanged, EvPartitionScaleStatusChanged> {
TEvPartitionScaleStatusChanged() = default;

TEvPartitionScaleStatusChanged(ui32 partitionId, NKikimrPQ::EScaleStatus scaleStatus) {
Record.SetPartitionId(partitionId);
Record.SetScaleStatus(scaleStatus);
}
};
};

} //NKikimr
7 changes: 7 additions & 0 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,9 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
}
}
}

result.SetScaleStatus(SplitMergeEnabled(TabletConfig) ? ScaleStatus :NKikimrPQ::EScaleStatus::NORMAL);

ctx.Send(ev->Get()->Sender, new TEvPQ::TEvPartitionStatusResponse(result, Partition));
}

Expand Down Expand Up @@ -2073,6 +2076,10 @@ void TPartition::EndChangePartitionConfig(NKikimrPQ::TPQTabletConfig&& config,

Y_ABORT_UNLESS(Config.GetPartitionConfig().GetTotalPartitions() > 0);

if (Config.GetPartitionStrategy().GetScaleThresholdSeconds() != SplitMergeAvgWriteBytes->GetDuration().Seconds()) {
InitSplitMergeSlidingWindow();
}

Send(ReadQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config));
Send(WriteQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config));
TotalPartitionWriteSpeed = config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond();
Expand Down
10 changes: 9 additions & 1 deletion ydb/core/persqueue/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class TPartition : public TActorBootstrapped<TPartition> {

private:
static const ui32 MAX_ERRORS_COUNT_TO_STORE = 10;
static const ui32 SCALE_REQUEST_REPEAT_MIN_SECONDS = 60;

private:
struct THasDataReq;
Expand Down Expand Up @@ -333,7 +334,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
const TActorContext& ctx);

void Initialize(const TActorContext& ctx);

void InitSplitMergeSlidingWindow();
template <typename T>
void EmplacePendingRequest(T&& body, const TActorContext& ctx) {
const auto now = ctx.Now();
Expand All @@ -358,6 +359,9 @@ class TPartition : public TActorBootstrapped<TPartition> {

void Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx);

NKikimrPQ::EScaleStatus CheckScaleStatus(const TActorContext& ctx);
void ChangeScaleStatusIfNeeded(NKikimrPQ::EScaleStatus scaleStatus);

TString LogPrefix() const;

void Handle(TEvPQ::TEvProcessChangeOwnerRequests::TPtr& ev, const TActorContext& ctx);
Expand Down Expand Up @@ -720,6 +724,10 @@ class TPartition : public TActorBootstrapped<TPartition> {
NSlidingWindow::TSlidingWindow<NSlidingWindow::TSumOperation<ui64>> AvgReadBytes;
TVector<NSlidingWindow::TSlidingWindow<NSlidingWindow::TSumOperation<ui64>>> AvgQuotaBytes;

std::unique_ptr<NSlidingWindow::TSlidingWindow<NSlidingWindow::TSumOperation<ui64>>> SplitMergeAvgWriteBytes;
TInstant LastScaleRequestTime = TInstant::Zero();
NKikimrPQ::EScaleStatus ScaleStatus = NKikimrPQ::EScaleStatus::NORMAL;

ui64 ReservedSize;
std::deque<THolder<TEvPQ::TEvReserveBytes>> ReserveRequests;

Expand Down
6 changes: 6 additions & 0 deletions ydb/core/persqueue/partition_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,8 @@ void TPartition::Initialize(const TActorContext& ctx) {
LastUsedStorageMeterTimestamp = ctx.Now();
WriteTimestampEstimate = ManageWriteTimestampEstimate ? ctx.Now() : TInstant::Zero();

InitSplitMergeSlidingWindow();

CloudId = Config.GetYcCloudId();
DbId = Config.GetYdbDatabaseId();
DbPath = Config.GetYdbDatabasePath();
Expand Down Expand Up @@ -956,6 +958,10 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
);
}

void TPartition::InitSplitMergeSlidingWindow() {
using Tui64SumSlidingWindow = NSlidingWindow::TSlidingWindow<NSlidingWindow::TSumOperation<ui64>>;
SplitMergeAvgWriteBytes = std::make_unique<Tui64SumSlidingWindow>(TDuration::Seconds(Config.GetPartitionStrategy().GetScaleThresholdSeconds()), 1000);
}

//
// Functions
Expand Down
141 changes: 141 additions & 0 deletions ydb/core/persqueue/partition_scale_manager.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
#include "ydb/core/persqueue/partition_scale_manager.h"

namespace NKikimr {
namespace NPQ {


TPartitionScaleManager::TPartitionScaleManager(
const TString& topicName,
const TString& databasePath,
NKikimrPQ::TUpdateBalancerConfig& balancerConfig
)
: TopicName(topicName)
, DatabasePath(databasePath)
, BalancerConfig(balancerConfig) {

}

void TPartitionScaleManager::HandleScaleStatusChange(const TPartitionInfo& partition, NKikimrPQ::EScaleStatus scaleStatus, const TActorContext& ctx) {
if (scaleStatus == NKikimrPQ::EScaleStatus::NEED_SPLIT) {
PartitionsToSplit.emplace(partition.Id, partition);
TrySendScaleRequest(ctx);
} else {
PartitionsToSplit.erase(partition.Id);
}
}

void TPartitionScaleManager::TrySendScaleRequest(const TActorContext& ctx) {
TInstant delayDeadline = LastResponseTime + RequestTimeout;
if (DatabasePath.empty() || RequestInflight || delayDeadline > ctx.Now()) {
return;
}

auto splitMergePair = BuildScaleRequest();
if (splitMergePair.first.empty() && splitMergePair.second.empty()) {
return;
}

RequestInflight = true;
CurrentScaleRequest = ctx.Register(new TPartitionScaleRequest(
TopicName,
DatabasePath,
BalancerConfig.PathId,
BalancerConfig.PathVersion,
splitMergePair.first,
splitMergePair.second,
ctx.SelfID
));
}


using TPartitionSplit = NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit;
using TPartitionMerge = NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge;

std::pair<std::vector<TPartitionSplit>, std::vector<TPartitionMerge>> TPartitionScaleManager::BuildScaleRequest() {
std::vector<TPartitionSplit> splitsToApply;
std::vector<TPartitionMerge> mergesToApply;

size_t allowedSplitsCount = BalancerConfig.PartitionCountLimit > BalancerConfig.CurPartitions ? BalancerConfig.PartitionCountLimit - BalancerConfig.CurPartitions : 0;
auto itSplit = PartitionsToSplit.begin();
while (allowedSplitsCount > 0 && itSplit != PartitionsToSplit.end()) {
const auto partitionId = itSplit->first;
const auto& partition = itSplit->second;

if (BalancerConfig.PartitionGraph.GetPartition(partitionId)->Children.empty()) {
auto mid = GetRangeMid(partition.KeyRange.FromBound ? *partition.KeyRange.FromBound : "", partition.KeyRange.ToBound ?*partition.KeyRange.ToBound : "");
if (mid.empty()) {
itSplit = PartitionsToSplit.erase(itSplit);
continue;
}

TPartitionSplit split;
split.set_partition(partition.Id);
split.set_splitboundary(mid);
splitsToApply.push_back(split);

allowedSplitsCount--;
itSplit++;
} else {
itSplit = PartitionsToSplit.erase(itSplit);
}
}

return {splitsToApply, mergesToApply};
}

void TPartitionScaleManager::HandleScaleRequestResult(TPartitionScaleRequest::TEvPartitionScaleRequestDone::TPtr& ev, const TActorContext& ctx) {
RequestInflight = false;
LastResponseTime = ctx.Now();
auto result = ev->Get();
if (result->Status == TEvTxUserProxy::TResultStatus::ExecComplete) {
TrySendScaleRequest(ctx);
} else {
ui64 newTimeout = RequestTimeout.MilliSeconds() == 0 ? MIN_SCALE_REQUEST_REPEAT_SECONDS_TIMEOUT + RandomNumber<ui64>(50) : RequestTimeout.MilliSeconds() * 2;
RequestTimeout = TDuration::MilliSeconds(std::min(newTimeout, static_cast<ui64>(MAX_SCALE_REQUEST_REPEAT_SECONDS_TIMEOUT)));
ctx.Schedule(RequestTimeout, new TEvents::TEvWakeup(TRY_SCALE_REQUEST_WAKE_UP_TAG));
}
}

void TPartitionScaleManager::Die(const TActorContext& ctx) {
if (CurrentScaleRequest) {
ctx.Send(CurrentScaleRequest, new TEvents::TEvPoisonPill());
}
}

void TPartitionScaleManager::UpdateBalancerConfig(NKikimrPQ::TUpdateBalancerConfig& config) {
BalancerConfig = TBalancerConfig(config);
}

void TPartitionScaleManager::UpdateDatabasePath(const TString& dbPath) {
DatabasePath = dbPath;
}

TString TPartitionScaleManager::GetRangeMid(const TString& from, const TString& to) {
if (from > to) {
return "";
}

TStringBuilder result;

unsigned char fromPadding = 0;
unsigned char toPadding = 255;

size_t maxSize = std::max(from.size(), to.size());
for (size_t i = 0; i < maxSize; ++i) {
ui16 fromChar = i < from.size() ? static_cast<ui16>(from[i]) : fromPadding;
unsigned char toChar = i < to.size() ? static_cast<unsigned char>(to[i]) : toPadding;

ui16 sum = fromChar + toChar;

result += static_cast<unsigned char>(sum / 2);
}

if (result == from) {
result += static_cast<unsigned char>(127);
}

return result;
}

} // namespace NPQ
} // namespace NKikimr
91 changes: 91 additions & 0 deletions ydb/core/persqueue/partition_scale_manager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#pragma once

#include <ydb/core/base/path.h>
#include "ydb/core/persqueue/utils.h"
#include <ydb/core/persqueue/partition_scale_request.h>
#include <ydb/core/protos/pqconfig.pb.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/core/tx/schemeshard/schemeshard.h>
#include <ydb/core/tx/schemeshard/schemeshard_info_types.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/actorsystem.h>

#include <util/system/types.h>
#include <util/generic/fwd.h>
#include <util/generic/string.h>

#include <unordered_map>
#include <utility>

namespace NKikimr {
namespace NPQ {

class TPartitionScaleManager {

public:
struct TPartitionInfo {
ui32 Id;
NSchemeShard::TTopicTabletInfo::TKeyRange KeyRange;
};

private:
struct TBalancerConfig {
TBalancerConfig(
NKikimrPQ::TUpdateBalancerConfig& config
)
: PathId(config.GetPathId())
, PathVersion(config.GetVersion())
, PartitionGraph(MakePartitionGraph(config))
, PartitionCountLimit(config.GetTabletConfig().GetPartitionStrategy().GetMaxPartitionCount())
, MinActivePartitions(config.GetTabletConfig().GetPartitionStrategy().GetMinPartitionCount())
, CurPartitions(config.PartitionsSize()) {
}

ui64 PathId;
int PathVersion;
TPartitionGraph PartitionGraph;
ui64 PartitionCountLimit;
ui64 MinActivePartitions;
ui64 CurPartitions;
};

public:
TPartitionScaleManager(const TString& topicPath, const TString& databasePath, NKikimrPQ::TUpdateBalancerConfig& balancerConfig);

public:
void HandleScaleStatusChange(const TPartitionInfo& partition, NKikimrPQ::EScaleStatus scaleStatus, const TActorContext& ctx);
void HandleScaleRequestResult(TPartitionScaleRequest::TEvPartitionScaleRequestDone::TPtr& ev, const TActorContext& ctx);
void TrySendScaleRequest(const TActorContext& ctx);
void UpdateBalancerConfig(NKikimrPQ::TUpdateBalancerConfig& config);
void UpdateDatabasePath(const TString& dbPath);
void Die(const TActorContext& ctx);

static TString GetRangeMid(const TString& from, const TString& to);

private:
using TPartitionSplit = NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit;
using TPartitionMerge = NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge;

std::pair<std::vector<TPartitionSplit>, std::vector<TPartitionMerge>> BuildScaleRequest();

public:
static const ui64 TRY_SCALE_REQUEST_WAKE_UP_TAG = 10;

private:
static const ui32 MIN_SCALE_REQUEST_REPEAT_SECONDS_TIMEOUT = 10;
static const ui32 MAX_SCALE_REQUEST_REPEAT_SECONDS_TIMEOUT = 1000;

const TString TopicName;
TString DatabasePath = "";
TActorId CurrentScaleRequest;
TDuration RequestTimeout = TDuration::MilliSeconds(0);
TInstant LastResponseTime = TInstant::Zero();

std::unordered_map<ui64, TPartitionInfo> PartitionsToSplit;

TBalancerConfig BalancerConfig;
bool RequestInflight = false;
};

} // namespace NPQ
} // namespace NKikimr
Loading
Loading