Skip to content

Commit ab5341b

Browse files
authored
Merge bd0211b into 254d1a5
2 parents 254d1a5 + bd0211b commit ab5341b

File tree

5 files changed

+101
-64
lines changed

5 files changed

+101
-64
lines changed

ydb/core/persqueue/blob.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -466,7 +466,7 @@ void TBatch::UnpackToType1(TVector<TClientBlob> *blobs) const {
466466
ui32 sourceIdCount = 0;
467467
TVector<TString> sourceIds;
468468

469-
static NScheme::TTypeCodecs ui32Codecs(NScheme::NTypeIds::Uint32), ui64Codecs(NScheme::NTypeIds::Uint64), stringCodecs(NScheme::NTypeIds::String);
469+
static const NScheme::TTypeCodecs ui32Codecs(NScheme::NTypeIds::Uint32), ui64Codecs(NScheme::NTypeIds::Uint64), stringCodecs(NScheme::NTypeIds::String);
470470
//read order
471471
{
472472
auto chunk = NScheme::IChunkDecoder::ReadChunk(GetChunk(data, dataEnd), &ui32Codecs);

ydb/core/persqueue/partition_read.cpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -421,9 +421,10 @@ TReadAnswer TReadInfo::FormAnswer(
421421
size -= lastBlobSize;
422422
}
423423
lastBlobSize = 0;
424-
return (size >= Size || cnt >= Count);
424+
return cnt >= Count;
425425
}
426-
return !AppData()->PQConfig.GetTopicsAreFirstClassCitizen() && (size >= Size || cnt >= Count);
426+
// For backward compatibility, we keep the behavior for older clients for non-FirstClassCitizen
427+
return !AppData()->PQConfig.GetTopicsAreFirstClassCitizen() && cnt >= Count;
427428
};
428429

429430
Y_ABORT_UNLESS(blobs.size() == Blobs.size());
@@ -466,7 +467,7 @@ TReadAnswer TReadInfo::FormAnswer(
466467
Y_ABORT_UNLESS(offset < Offset || partNo <= PartNo);
467468
TKey key(TKeyPrefix::TypeData, TPartitionId(0), offset, partNo, count, internalPartsCount, false);
468469
ui64 firstHeaderOffset = GetFirstHeaderOffset(key, blobValue);
469-
for (TBlobIterator it(key, blobValue); it.IsValid(); it.Next()) {
470+
for (TBlobIterator it(key, blobValue); it.IsValid() && !needStop; it.Next()) {
470471
TBatch batch = it.GetBatch();
471472
auto& header = batch.Header;
472473
batch.Unpack();
@@ -510,7 +511,10 @@ TReadAnswer TReadInfo::FormAnswer(
510511
++PartNo;
511512
}
512513

513-
needStop = updateUsage(res);
514+
if (updateUsage(res)) {
515+
needStop = true;
516+
break;
517+
}
514518
}
515519
}
516520
}

ydb/core/persqueue/ut/pq_ut.cpp

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1174,7 +1174,7 @@ Y_UNIT_TEST(TestWritePQBigMessage) {
11741174
CmdWrite(0, "sourceid0", data, tc, false, {}, true);
11751175
PQGetPartInfo(0, 27, tc);
11761176

1177-
CmdRead(0, 0, Max<i32>(), Max<i32>(), 13, false, tc);
1177+
CmdRead(0, 0, Max<i32>(), Max<i32>(), 1, false, tc);
11781178
CmdRead(0, 1, Max<i32>(), Max<i32>(), 25, false, tc);
11791179
CmdRead(0, 24, Max<i32>(), Max<i32>(), 2, false, tc);
11801180
CmdRead(0, 26, Max<i32>(), Max<i32>(), 1, false, tc);
@@ -1602,18 +1602,18 @@ Y_UNIT_TEST(TestPQRead) {
16021602
CmdRead(0, 26, Max<i32>(), Max<i32>(), 0, true, tc);
16031603

16041604
CmdRead(0, 0, Max<i32>(), Max<i32>(), 25, false, tc);
1605-
CmdRead(0, 0, 10, 100_MB, 15, false, tc);
1606-
CmdRead(0, 9, 1, 100_MB, 6, false, tc);
1605+
CmdRead(0, 0, 10, 100_MB, 10, false, tc);
1606+
CmdRead(0, 9, 1, 100_MB, 1, false, tc);
16071607
CmdRead(0, 23, 3, 100_MB, 3, false, tc);
16081608

1609-
CmdRead(0, 3, 1000, 511_KB, 4, false, tc);
1610-
CmdRead(0, 3, 1000, 511_KB, 4, false, tc);
1611-
CmdRead(0, 3, 1000, 1_KB, 4, false, tc); //at least one message will be readed always
1609+
CmdRead(0, 3, 1000, 511_KB, 12, false, tc);
1610+
CmdRead(0, 3, 1000, 511_KB, 12, false, tc);
1611+
CmdRead(0, 3, 1000, 1_KB, 12, false, tc); //at least one message will be readed always
16121612
CmdRead(0, 25, 1000, 1_KB, 1, false, tc); //at least one message will be readed always, from head
16131613

16141614
activeZone = true;
1615-
CmdRead(0, 9, 1000, 3_MB, 6, false, tc);
1616-
CmdRead(0, 9, 1000, 3_MB - 10_KB, 6, false, tc);
1615+
CmdRead(0, 9, 1000, 3_MB, 14, false, tc);
1616+
CmdRead(0, 9, 1000, 3_MB - 10_KB, 14, false, tc);
16171617
CmdRead(0, 25, 1000, 512_KB, 1, false, tc); //from head
16181618
CmdRead(0, 24, 1000, 512_KB, 1, false, tc); //from head
16191619

@@ -1703,11 +1703,15 @@ Y_UNIT_TEST(TestPQReadAhead) {
17031703
PQGetPartInfo(0, 22, tc);
17041704
activeZone = true;
17051705

1706-
CmdRead(0, 0, 1, 100_MB, 12, false, tc);
1707-
CmdRead(0, 1, 1, 100_MB, 11, false, tc);
1708-
CmdRead(0, 2, 1, 100_MB, 10, false, tc);
1709-
CmdRead(0, 3, 1, 100_MB, 9, false, tc);
1710-
CmdRead(0, 4, 10, 100_MB, 16, false, tc);
1706+
CmdRead(0, 0, 1, 100_MB, 1, false, tc);
1707+
CmdRead(0, 1, 1, 100_MB, 1, false, tc);
1708+
CmdRead(0, 2, 1, 100_MB, 1, false, tc);
1709+
CmdRead(0, 4, 10, 100_MB, 10, false, tc);
1710+
1711+
CmdRead(0, 0, Max<i32>(), 100_KB, 12, false, tc);
1712+
CmdRead(0, 1, Max<i32>(), 100_KB, 19, false, tc);
1713+
CmdRead(0, 2, Max<i32>(), 100_KB, 18, false, tc);
1714+
CmdRead(0, 3, Max<i32>(), 100_KB, 17, false, tc);
17111715
});
17121716
}
17131717

@@ -2023,7 +2027,7 @@ Y_UNIT_TEST(TestPQCacheSizeManagement) {
20232027

20242028
TAutoPtr<IEventHandle> handle;
20252029
for (ui32 i = 0; i < 10; ++i) {
2026-
CmdRead(0, 0, 1, 100_MB, 7, false, tc);
2030+
CmdRead(0, 0, 1, 100_MB, 1, false, tc);
20272031
PQTabletRestart(tc);
20282032
}
20292033
});
@@ -2074,14 +2078,14 @@ Y_UNIT_TEST(TestMaxTimeLagRewind) {
20742078
}
20752079
const auto ts = tc.Runtime->GetCurrentTime();
20762080

2077-
CmdRead(0, 0, 1, Max<i32>(), 7, false, tc, {0});
2078-
CmdRead(0, 0, 1, Max<i32>(), 7, false, tc, {21}, TDuration::Minutes(3).MilliSeconds());
2079-
CmdRead(0, 22, 1, Max<i32>(), 6, false, tc, {22}, TDuration::Minutes(3).MilliSeconds());
2081+
CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {0});
2082+
CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {21}, TDuration::Minutes(3).MilliSeconds());
2083+
CmdRead(0, 22, 1, Max<i32>(), 1, false, tc, {22}, TDuration::Minutes(3).MilliSeconds());
20802084
CmdRead(0, 4, 1, Max<i32>(), 1, false, tc, {34}, 1000);
20812085

2082-
CmdRead(0, 0, 1, Max<i32>(), 7, false, tc, {21}, 0,
2086+
CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {21}, 0,
20832087
(ts - TDuration::Minutes(3)).MilliSeconds());
2084-
CmdRead(0, 22, 1, Max<i32>(), 6, false, tc, {22}, 0,
2088+
CmdRead(0, 22, 1, Max<i32>(), 1, false, tc, {22}, 0,
20852089
(ts - TDuration::Minutes(3)).MilliSeconds());
20862090
CmdRead(0, 4, 1, Max<i32>(), 1, false, tc, {34}, 0,
20872091
(ts - TDuration::Seconds(1)).MilliSeconds());

ydb/services/datastreams/datastreams_ut.cpp

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2353,6 +2353,66 @@ Y_UNIT_TEST_SUITE(DataStreams) {
23532353

23542354
}
23552355

2356+
Y_UNIT_TEST(TestGetRecordsWithCount) {
2357+
TInsecureDatastreamsTestServer testServer;
2358+
const TString streamName = TStringBuilder() << "stream_" << Y_UNIT_TEST_NAME;
2359+
{
2360+
auto result = testServer.DataStreamsClient->CreateStream(streamName,
2361+
NYDS_V1::TCreateStreamSettings().ShardCount(1)).ExtractValueSync();
2362+
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
2363+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2364+
}
2365+
2366+
const ui32 recordsCount = 16;
2367+
std::vector<ui64> timestamps;
2368+
{
2369+
std::string data;
2370+
data.resize(1_MB); // big messages. compaction must will be completed.
2371+
std::iota(data.begin(), data.end(), 1);
2372+
std::random_device rd;
2373+
std::mt19937 generator{rd()};
2374+
2375+
for (ui32 i = 1; i <= recordsCount; ++i) {
2376+
std::shuffle(data.begin(), data.end(), generator);
2377+
{
2378+
TString id = Sprintf("%04u", i);
2379+
NYDS_V1::TDataRecord dataRecord{{data.begin(), data.end()}, id, ""};
2380+
//
2381+
// we make sure that the value of WriteTimestampMs is between neighboring timestamps
2382+
//
2383+
timestamps.push_back(TInstant::Now().MilliSeconds());
2384+
Sleep(TDuration::MilliSeconds(500));
2385+
auto result = testServer.DataStreamsClient->PutRecord(streamName, dataRecord).ExtractValueSync();
2386+
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
2387+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2388+
}
2389+
Sleep(TDuration::MilliSeconds(500));
2390+
}
2391+
}
2392+
2393+
for (ui32 i = 0; i < recordsCount; ++i) {
2394+
TString shardIterator;
2395+
2396+
{
2397+
auto result = testServer.DataStreamsClient->GetShardIterator(streamName, "shard-000000",
2398+
YDS_V1::ShardIteratorType::AT_TIMESTAMP,
2399+
NYDS_V1::TGetShardIteratorSettings().Timestamp(timestamps[i])).ExtractValueSync();
2400+
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
2401+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2402+
shardIterator = result.GetResult().shard_iterator();
2403+
}
2404+
2405+
{
2406+
auto result = testServer.DataStreamsClient->GetRecords(shardIterator,
2407+
NYDS_V1::TGetRecordsSettings().Limit(1)).ExtractValueSync();
2408+
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
2409+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2410+
UNIT_ASSERT_VALUES_EQUAL(result.GetResult().records().size(), 1);
2411+
UNIT_ASSERT_VALUES_EQUAL(std::stoi(result.GetResult().records().begin()->sequence_number()), i);
2412+
}
2413+
}
2414+
}
2415+
23562416
Y_UNIT_TEST(TestGetRecordsStreamWithMultipleShards) {
23572417
TInsecureDatastreamsTestServer testServer;
23582418
const TString streamName = TStringBuilder() << "stream_" << Y_UNIT_TEST_NAME;

ydb/services/persqueue_v1/persqueue_ut.cpp

Lines changed: 9 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -880,12 +880,12 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
880880
writer->Close();
881881
}
882882

883-
void DoRead(ui64 assignId, ui64& nextReadId, ui32& currTotalMessages, ui32 messageLimit) {
883+
void DoRead(ui64 assignId, ui64& nextReadId, ui32& currTotalMessages, const ui32 messageLimit) {
884884
// Get DirectReadResponse messages, send DirectReadAck messages.
885885

886886
auto endTime = TInstant::Now() + TDuration::Seconds(10);
887887
while (currTotalMessages < messageLimit && endTime > TInstant::Now()) {
888-
Cerr << "Wait for direct read id: " << nextReadId << ", currently have " << currTotalMessages << " messages" << Endl;
888+
Cerr << "Wait for direct read id: " << nextReadId << ", currently have " << currTotalMessages << " messages, limit is " << messageLimit << Endl;
889889

890890
Ydb::Topic::StreamDirectReadMessage::FromServer resp;
891891
UNIT_ASSERT(DirectStream->Read(&resp));
@@ -1013,37 +1013,6 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
10131013
UNIT_ASSERT_VALUES_EQUAL(cachedData->Data.begin()->second.Reads.size(), 0);
10141014
}
10151015

1016-
Y_UNIT_TEST(DirectReadNotCached) {
1017-
TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
1018-
SET_LOCALS;
1019-
TDirectReadTestSetup setup{server};
1020-
1021-
setup.InitControlSession("acc/topic1");
1022-
auto [partitionId, assignId] = setup.GetNextAssign("acc/topic1");
1023-
UNIT_ASSERT_VALUES_EQUAL(partitionId, 0);
1024-
setup.InitDirectSession("acc/topic1");
1025-
setup.SendReadSessionAssign(assignId);
1026-
1027-
ui32 totalMsg = 0;
1028-
ui64 nextReadId = 1;
1029-
Sleep(TDuration::Seconds(3));
1030-
setup.DoWrite(pqClient->GetDriver(), "acc/topic1", 1_MB, 50);
1031-
setup.DoRead(assignId, nextReadId, totalMsg, 42);
1032-
1033-
Topic::StreamReadMessage::FromClient req;
1034-
req.mutable_read_request()->set_bytes_size(40_MB);
1035-
if (!setup.ControlStream->Write(req)) {
1036-
ythrow yexception() << "write fail";
1037-
}
1038-
setup.DoRead(assignId, nextReadId, totalMsg, 50);
1039-
1040-
Sleep(TDuration::Seconds(1));
1041-
auto cachedData = RequestCacheData(runtime, new TEvPQ::TEvGetFullDirectReadData());
1042-
UNIT_ASSERT_VALUES_EQUAL(cachedData->Data.size(), 1);
1043-
UNIT_ASSERT_VALUES_EQUAL(cachedData->Data.begin()->second.StagedReads.size(), 0);
1044-
UNIT_ASSERT_VALUES_EQUAL(cachedData->Data.begin()->second.Reads.size(), 0);
1045-
}
1046-
10471016
Y_UNIT_TEST(DirectReadBadCases) {
10481017
TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
10491018
SET_LOCALS;
@@ -2552,7 +2521,7 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
25522521
for (ui32 i = 0; i < 32; ++i)
25532522
server.AnnoyingClient->WriteToPQ({DEFAULT_TOPIC_NAME, 0, "source1", i}, value);
25542523

2555-
auto info0 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 16, "user"}, 23);
2524+
auto info0 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 16, "user"}, 16);
25562525
auto info16 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 16, 16, "user"}, 16);
25572526

25582527
UNIT_ASSERT_VALUES_EQUAL(info0.BlobsFromCache, 3);
@@ -2562,8 +2531,8 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
25622531
for (ui32 i = 0; i < 8; ++i)
25632532
server.AnnoyingClient->WriteToPQ({DEFAULT_TOPIC_NAME, 0, "source1", 32+i}, value);
25642533

2565-
info0 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 16, "user"}, 23);
2566-
info16 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 16, 16, "user"}, 22);
2534+
info0 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 16, "user"}, 16);
2535+
info16 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 16, 16, "user"}, 16);
25672536

25682537
ui32 fromDisk = info0.BlobsFromDisk + info16.BlobsFromDisk;
25692538
ui32 fromCache = info0.BlobsFromCache + info16.BlobsFromCache;
@@ -2623,13 +2592,13 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
26232592
server.AnnoyingClient->WriteToPQ({secondTopic, 0, "source1", i}, mb);
26242593
}
26252594

2626-
auto info1 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 1, "user1"}, 7);
2627-
auto info2 = server.AnnoyingClient->ReadFromPQ({secondTopic, 0, 0, 1, "user1"}, 7);
2595+
auto info1 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 1, "user1"}, 1);
2596+
auto info2 = server.AnnoyingClient->ReadFromPQ({secondTopic, 0, 0, 1, "user1"}, 1);
26282597

26292598
UNIT_ASSERT_VALUES_EQUAL(info1.BlobsFromCache, 1);
26302599
UNIT_ASSERT_VALUES_EQUAL(info2.BlobsFromCache, 1);
2631-
UNIT_ASSERT_VALUES_EQUAL(info1.Values.size(), 7);
2632-
UNIT_ASSERT_VALUES_EQUAL(info2.Values.size(), 7);
2600+
UNIT_ASSERT_VALUES_EQUAL(info1.Values.size(), 1);
2601+
UNIT_ASSERT_VALUES_EQUAL(info2.Values.size(), 1);
26332602
UNIT_ASSERT_VALUES_EQUAL(info1.Values[0].size(), valueSize);
26342603
UNIT_ASSERT_VALUES_EQUAL(info2.Values[0].size(), valueSize);
26352604
UNIT_ASSERT(info1.Values[0] == value1);

0 commit comments

Comments
 (0)