@@ -1218,6 +1218,113 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
12181218 UNIT_ASSERT (stats1->GetCommittedOffset () == 4 );
12191219 }
12201220
1221+ Y_UNIT_TEST (PartitionSplit_DistributedTxCommit_CheckSessionReset) {
1222+ TTopicSdkTestSetup setup = CreateSetup ();
1223+ TTopicClient client = setup.MakeClient ();
1224+
1225+ TCreateTopicSettings createSettings;
1226+ createSettings
1227+ .BeginConfigurePartitioningSettings ()
1228+ .MinActivePartitions (1 )
1229+ .MaxActivePartitions (100 )
1230+ .BeginConfigureAutoPartitioningSettings ()
1231+ .UpUtilizationPercent (2 )
1232+ .DownUtilizationPercent (1 )
1233+ .StabilizationWindow (TDuration::Seconds (2 ))
1234+ .Strategy (EAutoPartitioningStrategy::ScaleUp)
1235+ .EndConfigureAutoPartitioningSettings ()
1236+ .EndConfigurePartitioningSettings ()
1237+ .BeginAddConsumer ()
1238+ .ConsumerName (TEST_CONSUMER);
1239+
1240+ client.CreateTopic (TEST_TOPIC, createSettings).Wait ();
1241+
1242+ auto msg = TString (1_MB, ' a' );
1243+
1244+ auto writeSession_1 = CreateWriteSession (client, " producer-1" , 0 , TEST_TOPIC, false );
1245+ auto writeSession_2 = CreateWriteSession (client, " producer-2" , 0 , TEST_TOPIC, false );
1246+ auto seqNo = 1 ;
1247+ {
1248+ UNIT_ASSERT (writeSession_1->Write (Msg (msg, seqNo++)));
1249+ UNIT_ASSERT (writeSession_1->Write (Msg (msg, seqNo++)));
1250+ Sleep (TDuration::Seconds (5 ));
1251+ auto describe = client.DescribeTopic (TEST_TOPIC).GetValueSync ();
1252+ UNIT_ASSERT_EQUAL (describe.GetTopicDescription ().GetPartitions ().size (), 1 );
1253+ }
1254+
1255+ {
1256+ UNIT_ASSERT (writeSession_1->Write (Msg (msg, seqNo++)));
1257+ UNIT_ASSERT (writeSession_2->Write (Msg (msg, seqNo++)));
1258+ UNIT_ASSERT (writeSession_1->Write (Msg (msg, seqNo++)));
1259+ UNIT_ASSERT (writeSession_2->Write (Msg (msg, seqNo++)));
1260+ Sleep (TDuration::Seconds (5 ));
1261+ auto describe = client.DescribeTopic (TEST_TOPIC).GetValueSync ();
1262+ UNIT_ASSERT_EQUAL (describe.GetTopicDescription ().GetPartitions ().size (), 3 );
1263+ }
1264+
1265+ auto writeSession_3 = CreateWriteSession (client, " producer-2" , 1 , TEST_TOPIC, false );
1266+ UNIT_ASSERT (writeSession_3->Write (Msg (" message" , seqNo++)));
1267+ UNIT_ASSERT (writeSession_3->Write (Msg (" message" , seqNo++)));
1268+
1269+ auto reader = client.CreateReadSession (
1270+ TReadSessionSettings ()
1271+ .AutoPartitioningSupport (true )
1272+ .AppendTopics (TTopicReadSettings (TEST_TOPIC))
1273+ .ConsumerName (TEST_CONSUMER));
1274+
1275+ TInstant deadlineTime = TInstant::Now () + TDuration::Seconds (15 );
1276+
1277+ auto commitSent = false ;
1278+
1279+ while (deadlineTime > TInstant::Now ()) {
1280+ for (auto event : reader->GetEvents (false )) {
1281+ if (auto * x = std::get_if<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent>(&event)) {
1282+ Cerr << " SESSION EVENT " << x->DebugString () << Endl << Flush;
1283+ auto & messages = x->GetMessages ();
1284+ for (size_t i = 0u ; i < messages.size (); ++i) {
1285+ auto & message = messages[i];
1286+ message.Commit ();
1287+
1288+ if (message.GetSeqNo () == 8 ) {
1289+ if (!commitSent) {
1290+ commitSent = true ;
1291+ Sleep (TDuration::MilliSeconds (300 ));
1292+ auto status = client.CommitOffset (TEST_TOPIC, 1 , TEST_CONSUMER, 0 ).GetValueSync ();
1293+ UNIT_ASSERT (status.IsSuccess ());
1294+ } else {
1295+ return ;
1296+ }
1297+ }
1298+ }
1299+ UNIT_ASSERT (writeSession_3->Write (Msg (TStringBuilder () << " message-" << seqNo, seqNo)));
1300+ seqNo++;
1301+ } else if (auto * x = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&event)) {
1302+ x->Confirm ();
1303+ Cerr << " SESSION EVENT " << x->DebugString () << Endl << Flush;
1304+ } else if (auto * x = std::get_if<NYdb::NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(&event)) {
1305+ Cerr << " SESSION EVENT " << x->DebugString () << Endl << Flush;
1306+ } else if (auto * x = std::get_if<NYdb::NTopic::TReadSessionEvent::TPartitionSessionStatusEvent>(&event)) {
1307+ Cerr << " SESSION EVENT " << x->DebugString () << Endl << Flush;
1308+ } else if (auto * x = std::get_if<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&event)) {
1309+ x->Confirm ();
1310+ Cerr << " SESSION EVENT " << x->DebugString () << Endl << Flush;
1311+ } else if (auto * x = std::get_if<NYdb::NTopic::TReadSessionEvent::TPartitionSessionClosedEvent>(&event)) {
1312+ Cerr << " SESSION EVENT " << x->DebugString () << Endl << Flush;
1313+ } else if (auto * x = std::get_if<NYdb::NTopic::TReadSessionEvent::TEndPartitionSessionEvent>(&event)) {
1314+ x->Confirm ();
1315+ Cerr << " SESSION EVENT " << x->DebugString () << Endl << Flush;
1316+ } else if (auto * sessionClosedEvent = std::get_if<NYdb::NTopic::TSessionClosedEvent>(&event)) {
1317+ Cerr << sessionClosedEvent->DebugString () << Endl << Flush;
1318+ } else {
1319+ Cerr << " SESSION EVENT unhandled \n " ;
1320+ }
1321+ }
1322+ Sleep (TDuration::MilliSeconds (250 ));
1323+ }
1324+
1325+ UNIT_ASSERT (false );
1326+ }
1327+
12211328 Y_UNIT_TEST (PartitionSplit_AutosplitByLoad) {
12221329 TTopicSdkTestSetup setup = CreateSetup ();
12231330 TTopicClient client = setup.MakeClient ();
0 commit comments