Skip to content

Commit 50195e2

Browse files
committed
ut
1 parent 18f911f commit 50195e2

File tree

2 files changed

+112
-5
lines changed

2 files changed

+112
-5
lines changed

ydb/core/persqueue/read_balancer__balancing.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -681,7 +681,7 @@ bool IsRoot(const TPartitionGraph::Node* node, const std::unordered_set<ui32>& p
681681
if (node->IsRoot()) {
682682
return true;
683683
}
684-
for (auto* p : node->Parents) {
684+
for (auto* p : node->DirectParents) {
685685
if (partitions.contains(p->Id)) {
686686
return false;
687687
}
@@ -966,10 +966,10 @@ bool TConsumer::IsReadable(ui32 partitionId) {
966966
}
967967

968968
if (Partitions.empty()) {
969-
return node->Parents.empty();
969+
return node->DirectParents.empty();
970970
}
971971

972-
for(auto* parent : node->HierarhicalParents) {
972+
for(auto* parent : node->AllParents) {
973973
if (!IsInactive(parent->Id)) {
974974
return false;
975975
}
@@ -1035,9 +1035,9 @@ bool TConsumer::ProccessReadingFinished(ui32 partitionId, bool wasInactive, cons
10351035
if (family->CanAttach(std::vector{id})) {
10361036
auto* node = GetPartitionGraph().GetPartition(id);
10371037
bool allParentsMerged = true;
1038-
if (node->Parents.size() > 1) {
1038+
if (node->DirectParents.size() > 1) {
10391039
// The partition was obtained as a result of the merge.
1040-
for (auto* c : node->Parents) {
1040+
for (auto* c : node->DirectParents) {
10411041
auto* other = FindFamily(c->Id);
10421042
if (!other) {
10431043
allParentsMerged = false;

ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1218,6 +1218,113 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
12181218
UNIT_ASSERT(stats1->GetCommittedOffset() == 4);
12191219
}
12201220

1221+
Y_UNIT_TEST(PartitionSplit_DistributedTxCommit_CheckSessionReset) {
1222+
TTopicSdkTestSetup setup = CreateSetup();
1223+
TTopicClient client = setup.MakeClient();
1224+
1225+
TCreateTopicSettings createSettings;
1226+
createSettings
1227+
.BeginConfigurePartitioningSettings()
1228+
.MinActivePartitions(1)
1229+
.MaxActivePartitions(100)
1230+
.BeginConfigureAutoPartitioningSettings()
1231+
.UpUtilizationPercent(2)
1232+
.DownUtilizationPercent(1)
1233+
.StabilizationWindow(TDuration::Seconds(2))
1234+
.Strategy(EAutoPartitioningStrategy::ScaleUp)
1235+
.EndConfigureAutoPartitioningSettings()
1236+
.EndConfigurePartitioningSettings()
1237+
.BeginAddConsumer()
1238+
.ConsumerName(TEST_CONSUMER);
1239+
1240+
client.CreateTopic(TEST_TOPIC, createSettings).Wait();
1241+
1242+
auto msg = TString(1_MB, 'a');
1243+
1244+
auto writeSession_1 = CreateWriteSession(client, "producer-1", 0, TEST_TOPIC, false);
1245+
auto writeSession_2 = CreateWriteSession(client, "producer-2", 0, TEST_TOPIC, false);
1246+
auto seqNo = 1;
1247+
{
1248+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
1249+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
1250+
Sleep(TDuration::Seconds(5));
1251+
auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync();
1252+
UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 1);
1253+
}
1254+
1255+
{
1256+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
1257+
UNIT_ASSERT(writeSession_2->Write(Msg(msg, seqNo++)));
1258+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
1259+
UNIT_ASSERT(writeSession_2->Write(Msg(msg, seqNo++)));
1260+
Sleep(TDuration::Seconds(5));
1261+
auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync();
1262+
UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 3);
1263+
}
1264+
1265+
auto writeSession_3 = CreateWriteSession(client, "producer-2", 1, TEST_TOPIC, false);
1266+
UNIT_ASSERT(writeSession_3->Write(Msg("message", seqNo++)));
1267+
UNIT_ASSERT(writeSession_3->Write(Msg("message", seqNo++)));
1268+
1269+
auto reader = client.CreateReadSession(
1270+
TReadSessionSettings()
1271+
.AutoPartitioningSupport(true)
1272+
.AppendTopics(TTopicReadSettings(TEST_TOPIC))
1273+
.ConsumerName(TEST_CONSUMER));
1274+
1275+
TInstant deadlineTime = TInstant::Now() + TDuration::Seconds(15);
1276+
1277+
auto commitSent = false;
1278+
1279+
while(deadlineTime > TInstant::Now()) {
1280+
for (auto event : reader->GetEvents(false)) {
1281+
if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent>(&event)) {
1282+
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
1283+
auto& messages = x->GetMessages();
1284+
for (size_t i = 0u; i < messages.size(); ++i) {
1285+
auto& message = messages[i];
1286+
message.Commit();
1287+
1288+
if (message.GetSeqNo() == 8) {
1289+
if (!commitSent) {
1290+
commitSent = true;
1291+
Sleep(TDuration::MilliSeconds(300));
1292+
auto status = client.CommitOffset(TEST_TOPIC, 1, TEST_CONSUMER, 0).GetValueSync();
1293+
UNIT_ASSERT(status.IsSuccess());
1294+
} else {
1295+
return;
1296+
}
1297+
}
1298+
}
1299+
UNIT_ASSERT(writeSession_3->Write(Msg(TStringBuilder() << "message-" << seqNo, seqNo)));
1300+
seqNo++;
1301+
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&event)) {
1302+
x->Confirm();
1303+
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
1304+
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(&event)) {
1305+
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
1306+
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TPartitionSessionStatusEvent>(&event)) {
1307+
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
1308+
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&event)) {
1309+
x->Confirm();
1310+
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
1311+
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TPartitionSessionClosedEvent>(&event)) {
1312+
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
1313+
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TEndPartitionSessionEvent>(&event)) {
1314+
x->Confirm();
1315+
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
1316+
} else if (auto* sessionClosedEvent = std::get_if<NYdb::NTopic::TSessionClosedEvent>(&event)) {
1317+
Cerr << sessionClosedEvent->DebugString() << Endl << Flush;
1318+
} else {
1319+
Cerr << "SESSION EVENT unhandled \n";
1320+
}
1321+
}
1322+
Sleep(TDuration::MilliSeconds(250));
1323+
}
1324+
1325+
UNIT_ASSERT(false);
1326+
}
1327+
12211328
Y_UNIT_TEST(PartitionSplit_AutosplitByLoad) {
12221329
TTopicSdkTestSetup setup = CreateSetup();
12231330
TTopicClient client = setup.MakeClient();

0 commit comments

Comments
 (0)