Skip to content

Commit bf97b65

Browse files
Merge d92de94 into 52b2390
2 parents 52b2390 + d92de94 commit bf97b65

File tree

2 files changed

+57
-1
lines changed

2 files changed

+57
-1
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2310,7 +2310,9 @@ void TPartition::CommitWriteOperations(TTransaction& t)
23102310

23112311
if (!t.WriteInfo->BlobsFromHead.empty()) {
23122312
auto& first = t.WriteInfo->BlobsFromHead.front();
2313-
NewHead.PartNo = first.GetPartNo();
2313+
// In one operation, a partition can write blocks of several transactions. Some of them can be broken down
2314+
// into parts. We need to take this division into account.
2315+
NewHead.PartNo += first.GetPartNo();
23142316

23152317
Parameters->HeadCleared = Parameters->HeadCleared || !t.WriteInfo->BodyKeys.empty();
23162318

ydb/public/sdk/cpp/src/client/topic/ut/topic_to_table_ut.cpp

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2910,6 +2910,60 @@ Y_UNIT_TEST_F(Sinks_Olap_WriteToTopicAndTable_3, TFixtureSinks)
29102910

29112911
CheckTabletKeys("topic_A");
29122912
}
2913+
2914+
Y_UNIT_TEST_F(Write_Random_Sized_Messages_In_Wide_Transactions, TFixture)
2915+
{
2916+
// The test verifies the simultaneous execution of several transactions. There is a topic
2917+
// with PARTITIONS_COUNT partitions. In each transaction, the test writes to all the partitions.
2918+
// The size of the messages is random. Such that both large blobs in the body and small ones in
2919+
// the head of the partition are obtained. Message sizes are multiples of 500 KB. This way we
2920+
// will make sure that when committing transactions, the division into blocks is taken into account.
2921+
2922+
const size_t PARTITIONS_COUNT = 20;
2923+
const size_t TXS_COUNT = 100;
2924+
2925+
CreateTopic("topic_A", TEST_CONSUMER, PARTITIONS_COUNT);
2926+
2927+
TVector<NTable::TSession> sessions;
2928+
TVector<NTable::TTransaction> transactions;
2929+
2930+
// We open TXS_COUNT transactions and write messages to the topic.
2931+
for (size_t i = 0; i < TXS_COUNT; ++i) {
2932+
sessions.push_back(CreateTableSession());
2933+
auto& session = sessions.back();
2934+
2935+
transactions.push_back(BeginTx(session));
2936+
auto& tx = transactions.back();
2937+
2938+
for (size_t j = 0; j < PARTITIONS_COUNT; ++j) {
2939+
TString sourceId = TEST_MESSAGE_GROUP_ID;
2940+
sourceId += "_";
2941+
sourceId += ToString(i);
2942+
sourceId += "_";
2943+
sourceId += ToString(j);
2944+
2945+
size_t count = RandomNumber<size_t>(20) + 3;
2946+
WriteToTopic("topic_A", sourceId, TString(512 * 1000 * count, 'x'), &tx, j);
2947+
2948+
WaitForAcks("topic_A", sourceId);
2949+
}
2950+
}
2951+
2952+
// We are doing an asynchronous commit of transactions. They will be executed simultaneously.
2953+
TVector<NTable::TAsyncCommitTransactionResult> futures;
2954+
2955+
for (size_t i = 0; i < TXS_COUNT; ++i) {
2956+
futures.push_back(transactions[i].Commit());
2957+
}
2958+
2959+
// All transactions must be completed successfully.
2960+
for (size_t i = 0; i < TXS_COUNT; ++i) {
2961+
futures[i].Wait();
2962+
const auto& result = futures[i].GetValueSync();
2963+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2964+
}
2965+
}
2966+
29132967
}
29142968

29152969
}

0 commit comments

Comments
 (0)