Skip to content

Commit e07d7bf

Browse files
committed
fix for commit to past
1 parent 1874976 commit e07d7bf

File tree

2 files changed

+248
-3
lines changed

2 files changed

+248
-3
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2536,6 +2536,10 @@ void TPartition::CommitTransaction(TSimpleSharedPtr<TTransaction>& t)
25362536
Y_ABORT_UNLESS(userInfo.Offset == (i64)operation.GetCommitOffsetsBegin());
25372537
}
25382538

2539+
if ((i64)operation.GetCommitOffsetsEnd() < userInfo.Offset && !operation.GetReadSessionId().empty()) {
2540+
continue; // this is stale request, answer ok for it
2541+
}
2542+
25392543
if (operation.GetCommitOffsetsEnd() <= StartOffset) {
25402544
userInfo.AnyCommits = false;
25412545
userInfo.Offset = StartOffset;
@@ -2546,6 +2550,7 @@ void TPartition::CommitTransaction(TSimpleSharedPtr<TTransaction>& t)
25462550
userInfo.AnyCommits = true;
25472551
userInfo.Offset = operation.GetCommitOffsetsEnd();
25482552
}
2553+
25492554
if (operation.GetKillReadSession()) {
25502555
userInfo.Session = "";
25512556
userInfo.PartitionSessionId = 0;

ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp

Lines changed: 243 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1348,7 +1348,7 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
13481348
UNIT_ASSERT(false);
13491349
}
13501350

1351-
Y_UNIT_TEST(PartitionSplit_DistributedTxCommit_CheckSessionNotResetedAfterCommitWithSessionId) {
1351+
Y_UNIT_TEST(PartitionSplit_DistributedTxCommit_CheckOffsetCommitForDifferentCases_SplitedTopic) {
13521352
TTopicSdkTestSetup setup = CreateSetup();
13531353
TTopicClient client = setup.MakeClient();
13541354

@@ -1383,6 +1383,8 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
13831383
}
13841384

13851385
{
1386+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
1387+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
13861388
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
13871389
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
13881390
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
@@ -1405,26 +1407,97 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
14051407
TInstant deadlineTime = TInstant::Now() + TDuration::Seconds(5);
14061408

14071409
auto commitSent = false;
1410+
TString readSessionId = "";
14081411
while(deadlineTime > TInstant::Now()) {
14091412
for (auto event : reader->GetEvents(false)) {
14101413
if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent>(&event)) {
14111414
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
14121415
auto& messages = x->GetMessages();
14131416
for (size_t i = 0u; i < messages.size(); ++i) {
14141417
auto& message = messages[i];
1415-
message.Commit();
14161418
Cerr << "SESSION EVENT READ SeqNo: " << message.GetSeqNo() << Endl << Flush;
1419+
1420+
if (commitSent) {
1421+
// read session not changed
1422+
UNIT_ASSERT_EQUAL(readSessionId, message.GetPartitionSession()->GetReadSessionId());
1423+
}
1424+
14171425
// check we NOT get this SeqNo two times
14181426
if (message.GetSeqNo() == 6) {
14191427
if (!commitSent) {
14201428
commitSent = true;
14211429
Sleep(TDuration::MilliSeconds(300));
1430+
1431+
{
1432+
auto describeConsumerSettings = TDescribeConsumerSettings().IncludeStats(true);
1433+
auto result = client.DescribeConsumer(TEST_TOPIC, TEST_CONSUMER, describeConsumerSettings).GetValueSync();
1434+
UNIT_ASSERT(result.IsSuccess());
1435+
1436+
auto description = result.GetConsumerDescription();
1437+
1438+
auto stats = description.GetPartitions().at(0).GetPartitionConsumerStats();
1439+
UNIT_ASSERT(stats);
1440+
UNIT_ASSERT(stats->GetCommittedOffset() >= 5);
1441+
}
1442+
1443+
readSessionId = message.GetPartitionSession()->GetReadSessionId();
14221444
TCommitOffsetSettings commitSettings {.ReadSessionId_ = message.GetPartitionSession()->GetReadSessionId()};
1423-
auto status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 0, commitSettings).GetValueSync();
1445+
auto status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 8, commitSettings).GetValueSync();
14241446
UNIT_ASSERT(status.IsSuccess());
1447+
1448+
{
1449+
auto describeConsumerSettings = TDescribeConsumerSettings().IncludeStats(true);
1450+
auto result = client.DescribeConsumer(TEST_TOPIC, TEST_CONSUMER, describeConsumerSettings).GetValueSync();
1451+
UNIT_ASSERT(result.IsSuccess());
1452+
1453+
auto description = result.GetConsumerDescription();
1454+
1455+
auto stats = description.GetPartitions().at(0).GetPartitionConsumerStats();
1456+
UNIT_ASSERT(stats);
1457+
1458+
UNIT_ASSERT(stats->GetCommittedOffset() == 8);
1459+
}
1460+
1461+
// must be ignored, because commit to past
1462+
TCommitOffsetSettings commitToPastSettings {.ReadSessionId_ = message.GetPartitionSession()->GetReadSessionId()};
1463+
auto commitToPastStatus = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 0, commitToPastSettings).GetValueSync();
1464+
UNIT_ASSERT(commitToPastStatus.IsSuccess());
1465+
1466+
{
1467+
auto describeConsumerSettings = TDescribeConsumerSettings().IncludeStats(true);
1468+
auto result = client.DescribeConsumer(TEST_TOPIC, TEST_CONSUMER, describeConsumerSettings).GetValueSync();
1469+
UNIT_ASSERT(result.IsSuccess());
1470+
1471+
auto description = result.GetConsumerDescription();
1472+
1473+
auto stats = description.GetPartitions().at(0).GetPartitionConsumerStats();
1474+
UNIT_ASSERT(stats);
1475+
1476+
UNIT_ASSERT(stats->GetCommittedOffset() == 8);
1477+
}
1478+
1479+
TCommitOffsetSettings commitSettingsWrongSession {.ReadSessionId_ = "random_session"};
1480+
auto statusWrongSession = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 0, commitSettingsWrongSession).GetValueSync();
1481+
UNIT_ASSERT(!statusWrongSession.IsSuccess());
1482+
1483+
{
1484+
auto describeConsumerSettings = TDescribeConsumerSettings().IncludeStats(true);
1485+
auto result = client.DescribeConsumer(TEST_TOPIC, TEST_CONSUMER, describeConsumerSettings).GetValueSync();
1486+
UNIT_ASSERT(result.IsSuccess());
1487+
1488+
auto description = result.GetConsumerDescription();
1489+
1490+
auto stats = description.GetPartitions().at(0).GetPartitionConsumerStats();
1491+
UNIT_ASSERT(stats);
1492+
1493+
UNIT_ASSERT(stats->GetCommittedOffset() == 8);
1494+
}
1495+
14251496
} else {
14261497
UNIT_ASSERT(false);
14271498
}
1499+
} else {
1500+
message.Commit();
14281501
}
14291502
}
14301503
UNIT_ASSERT(writeSession_3->Write(Msg(TStringBuilder() << "message-" << seqNo, seqNo++)));
@@ -1451,9 +1524,176 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
14511524
}
14521525
Sleep(TDuration::MilliSeconds(250));
14531526
}
1527+
}
1528+
1529+
Y_UNIT_TEST(PartitionSplit_DistributedTxCommit_CheckOffsetCommitForDifferentCases_NotSplitedTopic) {
1530+
TTopicSdkTestSetup setup = CreateSetup();
1531+
TTopicClient client = setup.MakeClient();
1532+
1533+
TCreateTopicSettings createSettings;
1534+
createSettings
1535+
.BeginConfigurePartitioningSettings()
1536+
.MinActivePartitions(1)
1537+
.MaxActivePartitions(100)
1538+
.BeginConfigureAutoPartitioningSettings()
1539+
.UpUtilizationPercent(2)
1540+
.DownUtilizationPercent(1)
1541+
.StabilizationWindow(TDuration::Seconds(2))
1542+
.Strategy(EAutoPartitioningStrategy::ScaleUp)
1543+
.EndConfigureAutoPartitioningSettings()
1544+
.EndConfigurePartitioningSettings()
1545+
.BeginAddConsumer()
1546+
.ConsumerName(TEST_CONSUMER);
1547+
1548+
client.CreateTopic(TEST_TOPIC, createSettings).Wait();
1549+
1550+
auto msg = TString(1_MB, 'a');
1551+
1552+
auto writeSession_1 = CreateWriteSession(client, "producer-1", 0, std::string{TEST_TOPIC}, false);
1553+
auto seqNo = 1;
1554+
{
1555+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
1556+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
1557+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
1558+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
1559+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
1560+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
1561+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
1562+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
1563+
Sleep(TDuration::Seconds(15));
1564+
auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync();
1565+
UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 1);
1566+
}
1567+
1568+
auto reader = client.CreateReadSession(
1569+
TReadSessionSettings()
1570+
.AutoPartitioningSupport(true)
1571+
.AppendTopics(TTopicReadSettings(TEST_TOPIC))
1572+
.ConsumerName(TEST_CONSUMER));
1573+
1574+
TInstant deadlineTime = TInstant::Now() + TDuration::Seconds(5);
1575+
1576+
auto commitSent = false;
1577+
TString readSessionId = "";
1578+
while(deadlineTime > TInstant::Now()) {
1579+
for (auto event : reader->GetEvents(false)) {
1580+
if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent>(&event)) {
1581+
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
1582+
auto& messages = x->GetMessages();
1583+
for (size_t i = 0u; i < messages.size(); ++i) {
1584+
auto& message = messages[i];
1585+
1586+
if (commitSent) {
1587+
// read session not changed
1588+
UNIT_ASSERT_EQUAL(readSessionId, message.GetPartitionSession()->GetReadSessionId());
1589+
}
1590+
1591+
// check we NOT get this SeqNo two times
1592+
if (message.GetSeqNo() == 6) {
1593+
if (!commitSent) {
1594+
commitSent = true;
1595+
Sleep(TDuration::MilliSeconds(1000));
1596+
1597+
{
1598+
auto describeConsumerSettings = TDescribeConsumerSettings().IncludeStats(true);
1599+
auto result = client.DescribeConsumer(TEST_TOPIC, TEST_CONSUMER, describeConsumerSettings).GetValueSync();
1600+
UNIT_ASSERT(result.IsSuccess());
1601+
1602+
auto description = result.GetConsumerDescription();
1603+
1604+
auto stats = description.GetPartitions().at(0).GetPartitionConsumerStats();
1605+
UNIT_ASSERT(stats);
1606+
UNIT_ASSERT(stats->GetCommittedOffset() == 5);
1607+
}
1608+
1609+
readSessionId = message.GetPartitionSession()->GetReadSessionId();
1610+
TCommitOffsetSettings commitSettings {.ReadSessionId_ = message.GetPartitionSession()->GetReadSessionId()};
1611+
auto status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 8, commitSettings).GetValueSync();
1612+
UNIT_ASSERT(status.IsSuccess());
1613+
1614+
{
1615+
auto describeConsumerSettings = TDescribeConsumerSettings().IncludeStats(true);
1616+
auto result = client.DescribeConsumer(TEST_TOPIC, TEST_CONSUMER, describeConsumerSettings).GetValueSync();
1617+
UNIT_ASSERT(result.IsSuccess());
1618+
1619+
auto description = result.GetConsumerDescription();
1620+
1621+
auto stats = description.GetPartitions().at(0).GetPartitionConsumerStats();
1622+
UNIT_ASSERT(stats);
1623+
1624+
UNIT_ASSERT(stats->GetCommittedOffset() == 8);
1625+
}
1626+
1627+
// must be ignored, because commit to past
1628+
TCommitOffsetSettings commitToPastSettings {.ReadSessionId_ = message.GetPartitionSession()->GetReadSessionId()};
1629+
auto commitToPastStatus = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 0, commitToPastSettings).GetValueSync();
1630+
UNIT_ASSERT(commitToPastStatus.IsSuccess());
1631+
1632+
{
1633+
auto describeConsumerSettings = TDescribeConsumerSettings().IncludeStats(true);
1634+
auto result = client.DescribeConsumer(TEST_TOPIC, TEST_CONSUMER, describeConsumerSettings).GetValueSync();
1635+
UNIT_ASSERT(result.IsSuccess());
1636+
1637+
auto description = result.GetConsumerDescription();
1638+
1639+
auto stats = description.GetPartitions().at(0).GetPartitionConsumerStats();
1640+
UNIT_ASSERT(stats);
1641+
1642+
UNIT_ASSERT(stats->GetCommittedOffset() == 8);
1643+
}
1644+
1645+
TCommitOffsetSettings commitSettingsWrongSession {.ReadSessionId_ = "random_session"};
1646+
auto statusWrongSession = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 0, commitSettingsWrongSession).GetValueSync();
1647+
UNIT_ASSERT(!statusWrongSession.IsSuccess());
14541648

1649+
{
1650+
auto describeConsumerSettings = TDescribeConsumerSettings().IncludeStats(true);
1651+
auto result = client.DescribeConsumer(TEST_TOPIC, TEST_CONSUMER, describeConsumerSettings).GetValueSync();
1652+
UNIT_ASSERT(result.IsSuccess());
1653+
1654+
auto description = result.GetConsumerDescription();
1655+
1656+
auto stats = description.GetPartitions().at(0).GetPartitionConsumerStats();
1657+
UNIT_ASSERT(stats);
1658+
1659+
UNIT_ASSERT(stats->GetCommittedOffset() == 8);
1660+
}
1661+
1662+
} else {
1663+
UNIT_ASSERT(false);
1664+
}
1665+
} else {
1666+
message.Commit();
1667+
}
1668+
}
1669+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
1670+
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&event)) {
1671+
x->Confirm();
1672+
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
1673+
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(&event)) {
1674+
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
1675+
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TPartitionSessionStatusEvent>(&event)) {
1676+
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
1677+
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&event)) {
1678+
x->Confirm();
1679+
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
1680+
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TPartitionSessionClosedEvent>(&event)) {
1681+
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
1682+
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TEndPartitionSessionEvent>(&event)) {
1683+
x->Confirm();
1684+
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
1685+
} else if (auto* sessionClosedEvent = std::get_if<NYdb::NTopic::TSessionClosedEvent>(&event)) {
1686+
Cerr << sessionClosedEvent->DebugString() << Endl << Flush;
1687+
} else {
1688+
Cerr << "SESSION EVENT unhandled \n";
1689+
}
1690+
}
1691+
Sleep(TDuration::MilliSeconds(250));
1692+
}
14551693
}
14561694

1695+
1696+
14571697
Y_UNIT_TEST(PartitionSplit_AutosplitByLoad) {
14581698
TTopicSdkTestSetup setup = CreateSetup();
14591699
TTopicClient client = setup.MakeClient();

0 commit comments

Comments
 (0)