Skip to content

Commit bd0211b

Browse files
committed
fix
1 parent d695e4b commit bd0211b

File tree

3 files changed

+11
-42
lines changed

3 files changed

+11
-42
lines changed

ydb/core/persqueue/partition_read.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,7 @@ TReadAnswer TReadInfo::FormAnswer(
430430
Y_ABORT_UNLESS(blobs.size() == Blobs.size());
431431
response->Check();
432432
bool needStop = false;
433-
for (ui32 pos = 0; pos < blobs.size() && !needStop && size < Size; ++pos) {
433+
for (ui32 pos = 0; pos < blobs.size() && !needStop; ++pos) {
434434
Y_ABORT_UNLESS(Blobs[pos].Offset == blobs[pos].Offset, "Mismatch %" PRIu64 " vs %" PRIu64, Blobs[pos].Offset, blobs[pos].Offset);
435435
Y_ABORT_UNLESS(Blobs[pos].Count == blobs[pos].Count, "Mismatch %" PRIu32 " vs %" PRIu32, Blobs[pos].Count, blobs[pos].Count);
436436

ydb/core/persqueue/ut/pq_ut.cpp

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1606,14 +1606,14 @@ Y_UNIT_TEST(TestPQRead) {
16061606
CmdRead(0, 9, 1, 100_MB, 1, false, tc);
16071607
CmdRead(0, 23, 3, 100_MB, 3, false, tc);
16081608

1609-
CmdRead(0, 3, 1000, 511_KB, 4, false, tc);
1610-
CmdRead(0, 3, 1000, 511_KB, 4, false, tc);
1611-
CmdRead(0, 3, 1000, 1_KB, 4, false, tc); //at least one message will be readed always
1609+
CmdRead(0, 3, 1000, 511_KB, 12, false, tc);
1610+
CmdRead(0, 3, 1000, 511_KB, 12, false, tc);
1611+
CmdRead(0, 3, 1000, 1_KB, 12, false, tc); //at least one message will be readed always
16121612
CmdRead(0, 25, 1000, 1_KB, 1, false, tc); //at least one message will be readed always, from head
16131613

16141614
activeZone = true;
1615-
CmdRead(0, 9, 1000, 3_MB, 6, false, tc);
1616-
CmdRead(0, 9, 1000, 3_MB - 10_KB, 6, false, tc);
1615+
CmdRead(0, 9, 1000, 3_MB, 14, false, tc);
1616+
CmdRead(0, 9, 1000, 3_MB - 10_KB, 14, false, tc);
16171617
CmdRead(0, 25, 1000, 512_KB, 1, false, tc); //from head
16181618
CmdRead(0, 24, 1000, 512_KB, 1, false, tc); //from head
16191619

@@ -1709,9 +1709,9 @@ Y_UNIT_TEST(TestPQReadAhead) {
17091709
CmdRead(0, 4, 10, 100_MB, 10, false, tc);
17101710

17111711
CmdRead(0, 0, Max<i32>(), 100_KB, 12, false, tc);
1712-
CmdRead(0, 1, Max<i32>(), 100_KB, 11, false, tc);
1713-
CmdRead(0, 2, Max<i32>(), 100_KB, 10, false, tc);
1714-
CmdRead(0, 3, Max<i32>(), 100_KB, 9, false, tc);
1712+
CmdRead(0, 1, Max<i32>(), 100_KB, 19, false, tc);
1713+
CmdRead(0, 2, Max<i32>(), 100_KB, 18, false, tc);
1714+
CmdRead(0, 3, Max<i32>(), 100_KB, 17, false, tc);
17151715
});
17161716
}
17171717

ydb/services/persqueue_v1/persqueue_ut.cpp

Lines changed: 2 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -880,12 +880,12 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
880880
writer->Close();
881881
}
882882

883-
void DoRead(ui64 assignId, ui64& nextReadId, ui32& currTotalMessages, ui32 messageLimit) {
883+
void DoRead(ui64 assignId, ui64& nextReadId, ui32& currTotalMessages, const ui32 messageLimit) {
884884
// Get DirectReadResponse messages, send DirectReadAck messages.
885885

886886
auto endTime = TInstant::Now() + TDuration::Seconds(10);
887887
while (currTotalMessages < messageLimit && endTime > TInstant::Now()) {
888-
Cerr << "Wait for direct read id: " << nextReadId << ", currently have " << currTotalMessages << " messages" << Endl;
888+
Cerr << "Wait for direct read id: " << nextReadId << ", currently have " << currTotalMessages << " messages, limit is " << messageLimit << Endl;
889889

890890
Ydb::Topic::StreamDirectReadMessage::FromServer resp;
891891
UNIT_ASSERT(DirectStream->Read(&resp));
@@ -1013,37 +1013,6 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
10131013
UNIT_ASSERT_VALUES_EQUAL(cachedData->Data.begin()->second.Reads.size(), 0);
10141014
}
10151015

1016-
Y_UNIT_TEST(DirectReadNotCached) {
1017-
TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
1018-
SET_LOCALS;
1019-
TDirectReadTestSetup setup{server};
1020-
1021-
setup.InitControlSession("acc/topic1");
1022-
auto [partitionId, assignId] = setup.GetNextAssign("acc/topic1");
1023-
UNIT_ASSERT_VALUES_EQUAL(partitionId, 0);
1024-
setup.InitDirectSession("acc/topic1");
1025-
setup.SendReadSessionAssign(assignId);
1026-
1027-
ui32 totalMsg = 0;
1028-
ui64 nextReadId = 1;
1029-
Sleep(TDuration::Seconds(3));
1030-
setup.DoWrite(pqClient->GetDriver(), "acc/topic1", 1_MB, 50);
1031-
setup.DoRead(assignId, nextReadId, totalMsg, 46);
1032-
1033-
Topic::StreamReadMessage::FromClient req;
1034-
req.mutable_read_request()->set_bytes_size(40_MB);
1035-
if (!setup.ControlStream->Write(req)) {
1036-
ythrow yexception() << "write fail";
1037-
}
1038-
setup.DoRead(assignId, nextReadId, totalMsg, 50);
1039-
1040-
Sleep(TDuration::Seconds(1));
1041-
auto cachedData = RequestCacheData(runtime, new TEvPQ::TEvGetFullDirectReadData());
1042-
UNIT_ASSERT_VALUES_EQUAL(cachedData->Data.size(), 1);
1043-
UNIT_ASSERT_VALUES_EQUAL(cachedData->Data.begin()->second.StagedReads.size(), 0);
1044-
UNIT_ASSERT_VALUES_EQUAL(cachedData->Data.begin()->second.Reads.size(), 0);
1045-
}
1046-
10471016
Y_UNIT_TEST(DirectReadBadCases) {
10481017
TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
10491018
SET_LOCALS;

0 commit comments

Comments
 (0)