Skip to content

Commit 55a593f

Browse files
The values of PartNo must be added (#13869)
1 parent d0b724a commit 55a593f

File tree

2 files changed

+53
-1
lines changed

2 files changed

+53
-1
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2290,7 +2290,7 @@ void TPartition::CommitWriteOperations(TTransaction& t)
22902290

22912291
if (!t.WriteInfo->BlobsFromHead.empty()) {
22922292
auto& first = t.WriteInfo->BlobsFromHead.front();
2293-
NewHead.PartNo = first.GetPartNo();
2293+
NewHead.PartNo += first.GetPartNo();
22942294

22952295
Parameters->HeadCleared = Parameters->HeadCleared || !t.WriteInfo->BodyKeys.empty();
22962296

ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2377,6 +2377,58 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_48, TFixture)
23772377
UNIT_ASSERT_GT(topicDescription.GetTotalPartitionsCount(), 2);
23782378
}
23792379

2380+
Y_UNIT_TEST_F(Write_Random_Sized_Messages_In_Wide_Transactions, TFixture)
2381+
{
2382+
// The test verifies the simultaneous execution of several transactions. There is a topic
2383+
// with PARTITIONS_COUNT partitions. In each transaction, the test writes to all the partitions.
2384+
// The size of the messages is random. Such that both large blobs in the body and small ones in
2385+
// the head of the partition are obtained.
2386+
2387+
const size_t PARTITIONS_COUNT = 20;
2388+
const size_t TXS_COUNT = 100;
2389+
2390+
CreateTopic("topic_A", TEST_CONSUMER, PARTITIONS_COUNT);
2391+
2392+
TVector<NTable::TSession> sessions;
2393+
TVector<NTable::TTransaction> transactions;
2394+
2395+
// We open TXS_COUNT transactions and write messages to the topic.
2396+
for (size_t i = 0; i < TXS_COUNT; ++i) {
2397+
sessions.push_back(CreateTableSession());
2398+
auto& session = sessions.back();
2399+
2400+
transactions.push_back(BeginTx(session));
2401+
auto& tx = transactions.back();
2402+
2403+
for (size_t j = 0; j < PARTITIONS_COUNT; ++j) {
2404+
TString sourceId = TEST_MESSAGE_GROUP_ID;
2405+
sourceId += "_";
2406+
sourceId += ToString(i);
2407+
sourceId += "_";
2408+
sourceId += ToString(j);
2409+
2410+
size_t count = RandomNumber<size_t>(20) + 3;
2411+
WriteToTopic("topic_A", sourceId, TString(512 * 1000 * count, 'x'), &tx, j);
2412+
2413+
WaitForAcks("topic_A", sourceId);
2414+
}
2415+
}
2416+
2417+
// We are doing an asynchronous commit of transactions. They will be executed simultaneously.
2418+
TVector<NTable::TAsyncCommitTransactionResult> futures;
2419+
2420+
for (size_t i = 0; i < TXS_COUNT; ++i) {
2421+
futures.push_back(transactions[i].Commit());
2422+
}
2423+
2424+
// All transactions must be completed successfully.
2425+
for (size_t i = 0; i < TXS_COUNT; ++i) {
2426+
futures[i].Wait();
2427+
const auto& result = futures[i].GetValueSync();
2428+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2429+
}
2430+
}
2431+
23802432
}
23812433

23822434
}

0 commit comments

Comments
 (0)