Skip to content

Commit f0e01da

Browse files
authored
Merge 9931a63 into 1709b73
2 parents 1709b73 + 9931a63 commit f0e01da

File tree

5 files changed

+182
-71
lines changed

5 files changed

+182
-71
lines changed

ydb/core/persqueue/partition_key_range/partition_key_range.cpp

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,86 @@
55
namespace NKikimr {
66
namespace NPQ {
77

8+
TString MiddleOf(const TString& from, const TString& to) {
9+
auto GetChar = [](const TString& str, size_t i, unsigned char defaultValue) {
10+
if (i >= str.size()) {
11+
return defaultValue;
12+
}
13+
return static_cast<unsigned char>(str[i]);
14+
};
15+
16+
TStringBuilder result;
17+
if (from.empty() && to.empty()) {
18+
result << static_cast<unsigned char>(0x7F);
19+
return result;
20+
}
21+
22+
bool splitted = false;
23+
bool diffFound = false;
24+
25+
size_t maxSize = std::max(from.size(), to.size());
26+
result.reserve(maxSize + 1);
27+
for (size_t i = 0; i < maxSize; ++i) {
28+
ui16 f = GetChar(from, i, 0);
29+
ui16 t = GetChar(to, i, 0xFF);
30+
31+
if (f != t) {
32+
diffFound = true;
33+
}
34+
35+
if (!splitted) {
36+
if (!diffFound) {
37+
result << static_cast<unsigned char>(f);
38+
continue;
39+
}
40+
41+
if (f < t) {
42+
auto m = (f + t) / 2u;
43+
result << static_cast<unsigned char>(m);
44+
splitted = m != f;
45+
continue;
46+
}
47+
auto n = (f + t + 0x100u) / 2u;
48+
if (n < 0x100) {
49+
result << static_cast<unsigned char>(n);
50+
splitted = n != f;
51+
continue;
52+
}
53+
54+
for(size_t j = i; j > 0;) {
55+
--j;
56+
result.pop_back();
57+
58+
ui16 prev = GetChar(from, j, 0);
59+
if (prev == 0xFFu) {
60+
continue;
61+
}
62+
63+
++j;
64+
result << static_cast<unsigned char>(prev + 1u);
65+
66+
for (; j < i; ++j) {
67+
result << static_cast<unsigned char>(0u);
68+
}
69+
70+
break;
71+
}
72+
result << static_cast<unsigned char>(n - 0x100u);
73+
splitted = true;
74+
} else {
75+
auto n = f < t ? (f + t) / 2u : std::min<ui16>(0xFFu, (f + t + 0x100u) / 2u);
76+
result << static_cast<unsigned char>(n);
77+
break;
78+
}
79+
}
80+
81+
if (result == from) {
82+
result << static_cast<unsigned char>(diffFound ? 0xFFu: 0x7Fu);
83+
}
84+
85+
return result;
86+
}
87+
888
TPartitionKeyRange TPartitionKeyRange::Parse(const NKikimrPQ::TPartitionKeyRange& proto) {
989
TPartitionKeyRange result;
1090

ydb/core/persqueue/partition_key_range/partition_key_range.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ TString AsKeyBound(const NYql::TWide<Type>& value) {
5151
return key;
5252
}
5353

54+
TString MiddleOf(const TString& fromBound, const TString& toBound);
5455

5556
struct TPartitionKeyRange {
5657
TMaybe<TSerializedCellVec> FromBound; // inclusive

ydb/core/persqueue/partition_scale_manager.cpp

Lines changed: 16 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
#include "ydb/core/persqueue/partition_scale_manager.h"
1+
#include "partition_scale_manager.h"
2+
3+
#include <ydb/core/persqueue/partition_key_range/partition_key_range.h>
24

35
namespace NKikimr {
46
namespace NPQ {
@@ -55,6 +57,10 @@ void TPartitionScaleManager::TrySendScaleRequest(const TActorContext& ctx) {
5557
using TPartitionSplit = NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit;
5658
using TPartitionMerge = NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge;
5759

60+
const TString ToHex(const TString& value) {
61+
return TStringBuilder() << HexText(TBasicStringBuf(value));
62+
}
63+
5864
std::pair<std::vector<TPartitionSplit>, std::vector<TPartitionMerge>> TPartitionScaleManager::BuildScaleRequest(const TActorContext& ctx) {
5965
std::vector<TPartitionSplit> splitsToApply;
6066
std::vector<TPartitionMerge> mergesToApply;
@@ -68,13 +74,17 @@ std::pair<std::vector<TPartitionSplit>, std::vector<TPartitionMerge>> TPartition
6874
if (BalancerConfig.PartitionGraph.GetPartition(partitionId)->Children.empty()) {
6975
auto from = partition.KeyRange.FromBound ? *partition.KeyRange.FromBound : "";
7076
auto to = partition.KeyRange.ToBound ?*partition.KeyRange.ToBound : "";
71-
auto mid = GetRangeMid(from, to);
77+
auto mid = MiddleOf(from, to);
7278
if (mid.empty()) {
7379
itSplit = PartitionsToSplit.erase(itSplit);
74-
LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "TPartitionScaleManager::BuildScaleRequest wrong partition key range. Can't get mid. Topic# " << TopicName << ", partition# " << partitionId);
80+
LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER,
81+
"TPartitionScaleManager::BuildScaleRequest wrong partition key range. Can't get mid. Topic# " << TopicName << ", partition# " << partitionId);
7582
continue;
7683
}
77-
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "TPartitionScaleManager::BuildScaleRequest partition split ranges. From# '" << from << "'. To# '" << to << "'. Mid# '" << mid <<"'. Topic# " << TopicName << ". Partition# " << partitionId);
84+
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER,
85+
"TPartitionScaleManager::BuildScaleRequest partition split ranges. From# '" << ToHex(from)
86+
<< "'. To# '" << ToHex(to) << "'. Mid# '" << ToHex(mid)
87+
<< "'. Topic# " << TopicName << ". Partition# " << partitionId);
7888

7989
TPartitionSplit split;
8090
split.set_partition(partition.Id);
@@ -95,7 +105,8 @@ void TPartitionScaleManager::HandleScaleRequestResult(TPartitionScaleRequest::TE
95105
RequestInflight = false;
96106
LastResponseTime = ctx.Now();
97107
auto result = ev->Get();
98-
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "TPartitionScaleManager::HandleScaleRequestResult scale request result: " << result->Status << ". Topic# " << TopicName);
108+
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER,
109+
"TPartitionScaleManager::HandleScaleRequestResult scale request result: " << result->Status << ". Topic# " << TopicName);
99110
if (result->Status == TEvTxUserProxy::TResultStatus::ExecComplete) {
100111
TrySendScaleRequest(ctx);
101112
} else {
@@ -119,31 +130,5 @@ void TPartitionScaleManager::UpdateDatabasePath(const TString& dbPath) {
119130
DatabasePath = dbPath;
120131
}
121132

122-
TString TPartitionScaleManager::GetRangeMid(const TString& from, const TString& to) {
123-
if (from > to && to.size() != 0) {
124-
return "";
125-
}
126-
127-
TStringBuilder result;
128-
129-
unsigned char fromPadding = 0;
130-
unsigned char toPadding = 255;
131-
132-
size_t maxSize = std::max(from.size(), to.size());
133-
for (size_t i = 0; i < maxSize; ++i) {
134-
ui16 fromChar = i < from.size() ? static_cast<ui16>(from[i]) : fromPadding;
135-
unsigned char toChar = i < to.size() ? static_cast<unsigned char>(to[i]) : toPadding;
136-
137-
ui16 sum = fromChar + toChar;
138-
139-
result += static_cast<unsigned char>(sum / 2);
140-
}
141-
142-
if (result == from) {
143-
result += static_cast<unsigned char>(127);
144-
}
145-
return result;
146-
}
147-
148133
} // namespace NPQ
149134
} // namespace NKikimr

ydb/core/persqueue/partition_scale_manager.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,6 @@ class TPartitionScaleManager {
6060
void UpdateDatabasePath(const TString& dbPath);
6161
void Die(const TActorContext& ctx);
6262

63-
static TString GetRangeMid(const TString& from, const TString& to);
64-
6563
private:
6664
using TPartitionSplit = NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit;
6765
using TPartitionMerge = NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge;

ydb/core/persqueue/ut/autoscaling_ut.cpp

Lines changed: 85 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.h>
44

55
#include <library/cpp/testing/unittest/registar.h>
6+
#include <ydb/core/persqueue/partition_key_range/partition_key_range.h>
67
#include <ydb/core/persqueue/partition_scale_manager.h>
78
#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>
89
#include <ydb/core/tx/schemeshard/ut_helpers/test_env.h>
@@ -636,44 +637,90 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
636637
}
637638

638639
Y_UNIT_TEST(MidOfRange) {
639-
TString a = "a";
640-
TString b = "c";
641-
auto res = NKikimr::NPQ::TPartitionScaleManager::GetRangeMid(a,b);
642-
643-
b = "b";
644-
res = NKikimr::NPQ::TPartitionScaleManager::GetRangeMid(a,b);
645-
UNIT_ASSERT(a < res);
646-
UNIT_ASSERT(b > res);
647-
648-
a = {};
649-
b = "b";
650-
res = NKikimr::NPQ::TPartitionScaleManager::GetRangeMid(a,b);
651-
UNIT_ASSERT(a < res);
652-
UNIT_ASSERT(b > res);
653-
654-
a = "a";
655-
b = {};
656-
res = NKikimr::NPQ::TPartitionScaleManager::GetRangeMid(a,b);
657-
UNIT_ASSERT(a < res);
658-
UNIT_ASSERT(b != res);
659-
660-
a = "aa";
661-
b = {};
662-
res = NKikimr::NPQ::TPartitionScaleManager::GetRangeMid(a,b);
663-
UNIT_ASSERT(a < res);
664-
UNIT_ASSERT(b != res);
665-
666-
a = "aaa";
667-
b = "b";
668-
res = NKikimr::NPQ::TPartitionScaleManager::GetRangeMid(a,b);
669-
UNIT_ASSERT(a < res);
670-
UNIT_ASSERT(b > res);
671-
672-
a = "aaa";
673-
b = "aab";
674-
res = NKikimr::NPQ::TPartitionScaleManager::GetRangeMid(a,b);
675-
UNIT_ASSERT(a < res);
676-
UNIT_ASSERT(b > res);
640+
auto AsString = [](std::vector<ui16> vs) {
641+
TStringBuilder a;
642+
for (auto v : vs) {
643+
a << static_cast<unsigned char>(v);
644+
}
645+
return a;
646+
};
647+
648+
auto ToHex = [](const TString& value) {
649+
return TStringBuilder() << HexText(TBasicStringBuf(value));
650+
};
651+
652+
{
653+
auto res = NKikimr::NPQ::MiddleOf("", "");
654+
UNIT_ASSERT_VALUES_EQUAL(ToHex(res), "7F");
655+
}
656+
657+
{
658+
auto res = NKikimr::NPQ::MiddleOf("", AsString({0x7F}));
659+
UNIT_ASSERT_VALUES_EQUAL(ToHex(res), "3F");
660+
}
661+
662+
{
663+
auto res = NKikimr::NPQ::MiddleOf(AsString({0x7F}), "");
664+
UNIT_ASSERT_VALUES_EQUAL(ToHex(res), "BF");
665+
}
666+
667+
{
668+
auto res = NKikimr::NPQ::MiddleOf(AsString({0x7F}), AsString({0xBF}));
669+
UNIT_ASSERT_VALUES_EQUAL(ToHex(res), "9F");
670+
}
671+
672+
{
673+
auto res = NKikimr::NPQ::MiddleOf(AsString({0x01}), AsString({0x02}));
674+
UNIT_ASSERT_VALUES_EQUAL(ToHex(res), "01 FF");
675+
}
676+
677+
{
678+
auto res = NKikimr::NPQ::MiddleOf(AsString({0x01, 0x7F}), AsString({0x02}));
679+
UNIT_ASSERT_VALUES_EQUAL(ToHex(res), "01 BF");
680+
}
681+
682+
{
683+
auto res = NKikimr::NPQ::MiddleOf(AsString({0x02}), AsString({0x03, 0x7F}));
684+
UNIT_ASSERT_VALUES_EQUAL(ToHex(res), "02 3F");
685+
}
686+
687+
{
688+
auto res = NKikimr::NPQ::MiddleOf(AsString({0x01, 0xFF}), AsString({0x02, 0x00}));
689+
UNIT_ASSERT_VALUES_EQUAL(ToHex(res), "01 FF FF");
690+
}
691+
692+
{
693+
auto res = NKikimr::NPQ::MiddleOf(AsString({0x03, 0xFF}), AsString({0x04, 0x20}));
694+
UNIT_ASSERT_VALUES_EQUAL(ToHex(res), "04 0F");
695+
}
696+
{
697+
auto res = NKikimr::NPQ::MiddleOf(AsString({0x03, 0x40}), AsString({0x04, 0x40}));
698+
UNIT_ASSERT_VALUES_EQUAL(ToHex(res), "03 C0");
699+
}
700+
{
701+
auto res = NKikimr::NPQ::MiddleOf(AsString({0x03, 0x20}), AsString({0x04, 0x10}));
702+
UNIT_ASSERT_VALUES_EQUAL(ToHex(res), "03 98");
703+
}
704+
{
705+
auto res = NKikimr::NPQ::MiddleOf(AsString({0x04, 0xFF, 0xFF}), AsString({0x05, 0x20}));
706+
UNIT_ASSERT_VALUES_EQUAL(ToHex(res), "05 0F FF");
707+
}
708+
{
709+
auto res = NKikimr::NPQ::MiddleOf(AsString({0x03, 0x40, 0x7F}), AsString({0x04, 0x40}));
710+
UNIT_ASSERT_VALUES_EQUAL(ToHex(res), "03 C0 BF");
711+
}
712+
{
713+
auto res = NKikimr::NPQ::MiddleOf(AsString({0x03, 0x40, 0x30}), AsString({0x04, 0x40, 0x20}));
714+
UNIT_ASSERT_VALUES_EQUAL(ToHex(res), "03 C0 A8");
715+
}
716+
{
717+
auto res = NKikimr::NPQ::MiddleOf(AsString({0x01, 0xFF, 0xFF, 0xFF}), AsString({0x02, 0x00, 0x00, 0x10}));
718+
UNIT_ASSERT_VALUES_EQUAL(ToHex(res), "02 00 00 07");
719+
}
720+
{
721+
auto res = NKikimr::NPQ::MiddleOf(AsString({0xFF, 0xFF}), AsString({0xFF}));
722+
UNIT_ASSERT_VALUES_EQUAL(ToHex(res), "FF FF 7F");
723+
}
677724
}
678725
}
679726

0 commit comments

Comments
 (0)