Skip to content

Commit 8d9694b

Browse files
nshestakovzinal
authored andcommitted
Optimize CPU usage when read blob (ydb-platform#12153) (ydb-platform#12221)
1 parent a88a7ee commit 8d9694b

File tree

5 files changed

+46
-49
lines changed

5 files changed

+46
-49
lines changed

ydb/core/persqueue/blob.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,7 @@ void TBatch::UnpackToType1(TVector<TClientBlob> *blobs) const {
459459
ui32 sourceIdCount = 0;
460460
TVector<TString> sourceIds;
461461

462-
NScheme::TTypeCodecs ui32Codecs(NScheme::NTypeIds::Uint32), ui64Codecs(NScheme::NTypeIds::Uint64), stringCodecs(NScheme::NTypeIds::String);
462+
static NScheme::TTypeCodecs ui32Codecs(NScheme::NTypeIds::Uint32), ui64Codecs(NScheme::NTypeIds::Uint64), stringCodecs(NScheme::NTypeIds::String);
463463
//read order
464464
{
465465
auto chunk = NScheme::IChunkDecoder::ReadChunk(GetChunk(data, dataEnd), &ui32Codecs);

ydb/core/persqueue/partition_read.cpp

+4-10
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,7 @@ TReadAnswer TReadInfo::FormAnswer(
451451
Y_ABORT_UNLESS(offset < Offset || partNo <= PartNo);
452452
TKey key(TKeyPrefix::TypeData, TPartitionId(0), offset, partNo, count, internalPartsCount, false);
453453
ui64 firstHeaderOffset = GetFirstHeaderOffset(key, blobValue);
454-
for (TBlobIterator it(key, blobValue); it.IsValid() && !needStop; it.Next()) {
454+
for (TBlobIterator it(key, blobValue); it.IsValid(); it.Next()) {
455455
TBatch batch = it.GetBatch();
456456
auto& header = batch.Header;
457457
batch.Unpack();
@@ -473,12 +473,11 @@ TReadAnswer TReadInfo::FormAnswer(
473473
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "FormAnswer processing batch offset "
474474
<< (offset - header.GetCount()) << " totakecount " << count << " count " << header.GetCount() << " size " << header.GetPayloadSize() << " from pos " << pos << " cbcount " << batch.Blobs.size());
475475

476-
ui32 i = 0;
477-
for (i = pos; i < batch.Blobs.size(); ++i) {
476+
for (size_t i = pos; i < batch.Blobs.size(); ++i) {
478477
TClientBlob &res = batch.Blobs[i];
479478
VERIFY_RESULT_BLOB(res, i);
480479

481-
Y_ABORT_UNLESS(PartNo == res.GetPartNo(), "pos %" PRIu32 " i %" PRIu32 " Offset %" PRIu64 " PartNo %" PRIu16 " offset %" PRIu64 " partNo %" PRIu16,
480+
Y_ABORT_UNLESS(PartNo == res.GetPartNo(), "pos %" PRIu32 " i %" PRIu64 " Offset %" PRIu64 " PartNo %" PRIu16 " offset %" PRIu64 " partNo %" PRIu16,
482481
pos, i, Offset, PartNo, offset, res.GetPartNo());
483482

484483
if (userInfo) {
@@ -495,13 +494,8 @@ TReadAnswer TReadInfo::FormAnswer(
495494
} else {
496495
++PartNo;
497496
}
498-
if (updateUsage(res)) {
499-
break;
500-
}
501-
}
502497

503-
if (i != batch.Blobs.size()) {//not fully processed batch - next definetely will not be processed
504-
needStop = true;
498+
needStop = updateUsage(res);
505499
}
506500
}
507501
}

ydb/core/persqueue/ut/common/pq_ut_common.cpp

+4-4
Original file line numberDiff line numberDiff line change
@@ -916,7 +916,7 @@ bool CheckCmdReadResult(const TPQCmdReadSettings& settings, TEvPersQueue::TEvRes
916916
UNIT_ASSERT_C(result->Record.GetPartitionResponse().HasCmdReadResult(), result->Record.GetPartitionResponse().DebugString());
917917
auto res = result->Record.GetPartitionResponse().GetCmdReadResult();
918918

919-
UNIT_ASSERT_EQUAL(res.ResultSize(), settings.ResCount);
919+
UNIT_ASSERT_VALUES_EQUAL(res.ResultSize(), settings.ResCount);
920920
ui64 off = settings.Offset;
921921

922922
for (ui32 i = 0; i < settings.ResCount; ++i) {
@@ -926,10 +926,10 @@ bool CheckCmdReadResult(const TPQCmdReadSettings& settings, TEvPersQueue::TEvRes
926926
UNIT_ASSERT_EQUAL((ui64)r.GetOffset(), off);
927927
}
928928
UNIT_ASSERT(r.GetSourceId().size() == 9 && r.GetSourceId().StartsWith("sourceid"));
929-
UNIT_ASSERT_EQUAL(ui32(r.GetData()[0]), off);
930-
UNIT_ASSERT_EQUAL(ui32((unsigned char)r.GetData().back()), r.GetSeqNo() % 256);
929+
UNIT_ASSERT_VALUES_EQUAL(ui32(r.GetData()[0]), off);
930+
UNIT_ASSERT_VALUES_EQUAL(ui32((unsigned char)r.GetData().back()), r.GetSeqNo() % 256);
931931
++off;
932-
} else {
932+
} else if (settings.Offsets.size() > i) {
933933
UNIT_ASSERT(settings.Offsets[i] == (i64)r.GetOffset());
934934
}
935935
}

ydb/core/persqueue/ut/pq_ut.cpp

+21-19
Original file line numberDiff line numberDiff line change
@@ -1174,7 +1174,7 @@ Y_UNIT_TEST(TestWritePQBigMessage) {
11741174
CmdWrite(0, "sourceid0", data, tc, false, {}, true);
11751175
PQGetPartInfo(0, 27, tc);
11761176

1177-
CmdRead(0, 0, Max<i32>(), Max<i32>(), 1, false, tc);
1177+
CmdRead(0, 0, Max<i32>(), Max<i32>(), 13, false, tc);
11781178
CmdRead(0, 1, Max<i32>(), Max<i32>(), 25, false, tc);
11791179
CmdRead(0, 24, Max<i32>(), Max<i32>(), 2, false, tc);
11801180
CmdRead(0, 26, Max<i32>(), Max<i32>(), 1, false, tc);
@@ -1602,17 +1602,18 @@ Y_UNIT_TEST(TestPQRead) {
16021602
CmdRead(0, 26, Max<i32>(), Max<i32>(), 0, true, tc);
16031603

16041604
CmdRead(0, 0, Max<i32>(), Max<i32>(), 25, false, tc);
1605-
CmdRead(0, 0, 10, 100_MB, 10, false, tc);
1606-
CmdRead(0, 9, 1, 100_MB, 1, false, tc);
1605+
CmdRead(0, 0, 10, 100_MB, 15, false, tc);
1606+
CmdRead(0, 9, 1, 100_MB, 6, false, tc);
16071607
CmdRead(0, 23, 3, 100_MB, 3, false, tc);
16081608

1609-
CmdRead(0, 3, 1000, 511_KB, 1, false, tc);
1610-
CmdRead(0, 3, 1000, 1_KB, 1, false, tc); //at least one message will be readed always
1609+
CmdRead(0, 3, 1000, 511_KB, 4, false, tc);
1610+
CmdRead(0, 3, 1000, 511_KB, 4, false, tc);
1611+
CmdRead(0, 3, 1000, 1_KB, 4, false, tc); //at least one message will be readed always
16111612
CmdRead(0, 25, 1000, 1_KB, 1, false, tc); //at least one message will be readed always, from head
16121613

16131614
activeZone = true;
1614-
CmdRead(0, 9, 1000, 3_MB, 3, false, tc);
1615-
CmdRead(0, 9, 1000, 3_MB - 10_KB, 3, false, tc);
1615+
CmdRead(0, 9, 1000, 3_MB, 6, false, tc);
1616+
CmdRead(0, 9, 1000, 3_MB - 10_KB, 6, false, tc);
16161617
CmdRead(0, 25, 1000, 512_KB, 1, false, tc); //from head
16171618
CmdRead(0, 24, 1000, 512_KB, 1, false, tc); //from head
16181619

@@ -1701,11 +1702,12 @@ Y_UNIT_TEST(TestPQReadAhead) {
17011702
CmdWrite(0, "sourceid0", data, tc, false, {}, true); //now 1 blob
17021703
PQGetPartInfo(0, 22, tc);
17031704
activeZone = true;
1704-
CmdRead(0, 0, 1, 100_MB, 1, false, tc);
1705-
CmdRead(0, 1, 1, 100_MB, 1, false, tc);
1706-
CmdRead(0, 2, 1, 100_MB, 1, false, tc);
1707-
CmdRead(0, 3, 1, 100_MB, 1, false, tc);
1708-
CmdRead(0, 4, 10, 100_MB, 10, false, tc);
1705+
1706+
CmdRead(0, 0, 1, 100_MB, 12, false, tc);
1707+
CmdRead(0, 1, 1, 100_MB, 11, false, tc);
1708+
CmdRead(0, 2, 1, 100_MB, 10, false, tc);
1709+
CmdRead(0, 3, 1, 100_MB, 9, false, tc);
1710+
CmdRead(0, 4, 10, 100_MB, 16, false, tc);
17091711
});
17101712
}
17111713

@@ -2021,7 +2023,7 @@ Y_UNIT_TEST(TestPQCacheSizeManagement) {
20212023

20222024
TAutoPtr<IEventHandle> handle;
20232025
for (ui32 i = 0; i < 10; ++i) {
2024-
CmdRead(0, 0, 1, 100_MB, 1, false, tc);
2026+
CmdRead(0, 0, 1, 100_MB, 7, false, tc);
20252027
PQTabletRestart(tc);
20262028
}
20272029
});
@@ -2071,22 +2073,22 @@ Y_UNIT_TEST(TestMaxTimeLagRewind) {
20712073
tc.Runtime->UpdateCurrentTime(tc.Runtime->GetCurrentTime() + TDuration::Minutes(1));
20722074
}
20732075
const auto ts = tc.Runtime->GetCurrentTime();
2074-
CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {0});
2075-
CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {21}, TDuration::Minutes(3).MilliSeconds());
2076-
CmdRead(0, 22, 1, Max<i32>(), 1, false, tc, {22}, TDuration::Minutes(3).MilliSeconds());
2076+
2077+
CmdRead(0, 0, 1, Max<i32>(), 7, false, tc, {0});
2078+
CmdRead(0, 0, 1, Max<i32>(), 7, false, tc, {21}, TDuration::Minutes(3).MilliSeconds());
2079+
CmdRead(0, 22, 1, Max<i32>(), 6, false, tc, {22}, TDuration::Minutes(3).MilliSeconds());
20772080
CmdRead(0, 4, 1, Max<i32>(), 1, false, tc, {34}, 1000);
20782081

2079-
CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {21}, 0,
2082+
CmdRead(0, 0, 1, Max<i32>(), 7, false, tc, {21}, 0,
20802083
(ts - TDuration::Minutes(3)).MilliSeconds());
2081-
CmdRead(0, 22, 1, Max<i32>(), 1, false, tc, {22}, 0,
2084+
CmdRead(0, 22, 1, Max<i32>(), 6, false, tc, {22}, 0,
20822085
(ts - TDuration::Minutes(3)).MilliSeconds());
20832086
CmdRead(0, 4, 1, Max<i32>(), 1, false, tc, {34}, 0,
20842087
(ts - TDuration::Seconds(1)).MilliSeconds());
20852088

20862089
PQTabletPrepare({.readFromTimestampsMs=(ts - TDuration::Seconds(1)).MilliSeconds()},
20872090
{{"aaa", true}}, tc);
20882091
CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {34});
2089-
20902092
});
20912093
}
20922094

ydb/services/persqueue_v1/persqueue_ut.cpp

+16-15
Original file line numberDiff line numberDiff line change
@@ -883,7 +883,8 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
883883
void DoRead(ui64 assignId, ui64& nextReadId, ui32& currTotalMessages, ui32 messageLimit) {
884884
// Get DirectReadResponse messages, send DirectReadAck messages.
885885

886-
while (currTotalMessages < messageLimit) {
886+
auto endTime = TInstant::Now() + TDuration::Seconds(10);
887+
while (currTotalMessages < messageLimit && endTime > TInstant::Now()) {
887888
Cerr << "Wait for direct read id: " << nextReadId << ", currently have " << currTotalMessages << " messages" << Endl;
888889

889890
Ydb::Topic::StreamDirectReadMessage::FromServer resp;
@@ -974,7 +975,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
974975
THolder<TEvPQ::TEvGetFullDirectReadData> RequestCacheData(TTestActorRuntime* runtime, TEvPQ::TEvGetFullDirectReadData* request) {
975976
const auto& edgeId = runtime->AllocateEdgeActor();
976977
runtime->Send(NPQ::MakePQDReadCacheServiceActorId(), edgeId, request);
977-
auto resp = runtime->GrabEdgeEvent<TEvPQ::TEvGetFullDirectReadData>();
978+
auto resp = runtime->GrabEdgeEvent<TEvPQ::TEvGetFullDirectReadData>(TDuration::Seconds(10));
978979
UNIT_ASSERT(resp);
979980
return resp;
980981
}
@@ -1027,7 +1028,7 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
10271028
ui64 nextReadId = 1;
10281029
Sleep(TDuration::Seconds(3));
10291030
setup.DoWrite(pqClient->GetDriver(), "acc/topic1", 1_MB, 50);
1030-
setup.DoRead(assignId, nextReadId, totalMsg, 40);
1031+
setup.DoRead(assignId, nextReadId, totalMsg, 42);
10311032

10321033
Topic::StreamReadMessage::FromClient req;
10331034
req.mutable_read_request()->set_bytes_size(40_MB);
@@ -1186,8 +1187,6 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
11861187
UNIT_ASSERT_VALUES_EQUAL(pair.first, 0);
11871188
auto assignId = pair.second;
11881189
setup.SendReadSessionAssign(assignId);
1189-
// auto cachedData = RequestCacheData(runtime, new TEvPQ::TEvGetFullDirectReadData());
1190-
// UNIT_ASSERT_VALUES_EQUAL(cachedData->Data.size(), 1);
11911190
setup.DoWrite(pqClient->GetDriver(), "acc/topic2", 10_MB, 1);
11921191
Ydb::Topic::StreamDirectReadMessage::FromServer resp;
11931192
Cerr << "Request initial read data\n";
@@ -1196,12 +1195,14 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
11961195
Cerr << "Request cache data\n";
11971196
auto cachedData = RequestCacheData(runtime, new TEvPQ::TEvGetFullDirectReadData());
11981197
UNIT_ASSERT_VALUES_EQUAL(cachedData->Data.size(), 1);
1198+
11991199
Cerr << "Kill the tablet\n";
12001200
server.Server->AnnoyingClient->KillTablet(*(server.Server->CleverServer), tabletId);
12011201
Cerr << "Get session closure\n";
1202+
12021203
resp.Clear();
12031204
UNIT_ASSERT(setup.DirectStream->Read(&resp));
1204-
UNIT_ASSERT_C(resp.status() == Ydb::StatusIds::SESSION_EXPIRED, resp.status());
1205+
12051206
Cerr << "Check caching service data empty\n";
12061207
cachedData = RequestCacheData(runtime, new TEvPQ::TEvGetFullDirectReadData());
12071208
UNIT_ASSERT_VALUES_EQUAL(cachedData->Data.size(), 0);
@@ -2542,7 +2543,7 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
25422543

25432544
// expects that L2 size is 32Mb
25442545
Y_UNIT_TEST(Cache) {
2545-
NPersQueue::TTestServer server(PQSettings(0).SetDomainName("Root").SetGrpcMaxMessageSize(18_MB));
2546+
NPersQueue::TTestServer server(PQSettings(0).SetDomainName("Root").SetGrpcMaxMessageSize(48_MB));
25462547
server.AnnoyingClient->CreateTopic(DEFAULT_TOPIC_NAME, 1, 8_MB, 86400);
25472548

25482549
server.EnableLogs({ NKikimrServices::FLAT_TX_SCHEMESHARD, NKikimrServices::PERSQUEUE });
@@ -2551,7 +2552,7 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
25512552
for (ui32 i = 0; i < 32; ++i)
25522553
server.AnnoyingClient->WriteToPQ({DEFAULT_TOPIC_NAME, 0, "source1", i}, value);
25532554

2554-
auto info0 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 16, "user"}, 16);
2555+
auto info0 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 16, "user"}, 23);
25552556
auto info16 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 16, 16, "user"}, 16);
25562557

25572558
UNIT_ASSERT_VALUES_EQUAL(info0.BlobsFromCache, 3);
@@ -2561,8 +2562,8 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
25612562
for (ui32 i = 0; i < 8; ++i)
25622563
server.AnnoyingClient->WriteToPQ({DEFAULT_TOPIC_NAME, 0, "source1", 32+i}, value);
25632564

2564-
info0 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 16, "user"}, 16);
2565-
info16 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 16, 16, "user"}, 16);
2565+
info0 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 16, "user"}, 23);
2566+
info16 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 16, 16, "user"}, 22);
25662567

25672568
ui32 fromDisk = info0.BlobsFromDisk + info16.BlobsFromDisk;
25682569
ui32 fromCache = info0.BlobsFromCache + info16.BlobsFromCache;
@@ -2602,7 +2603,7 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
26022603
}
26032604

26042605
Y_UNIT_TEST(SameOffset) {
2605-
NPersQueue::TTestServer server(PQSettings(0).SetDomainName("Root"));
2606+
NPersQueue::TTestServer server(PQSettings(0).SetDomainName("Root").SetGrpcMaxMessageSize(48_MB));
26062607
server.AnnoyingClient->CreateTopic(DEFAULT_TOPIC_NAME, 1, 6_MB, 86400);
26072608
TString secondTopic = DEFAULT_TOPIC_NAME + "2";
26082609
server.AnnoyingClient->CreateTopic(secondTopic, 1, 6_MB, 86400);
@@ -2622,13 +2623,13 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
26222623
server.AnnoyingClient->WriteToPQ({secondTopic, 0, "source1", i}, mb);
26232624
}
26242625

2625-
auto info1 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 1, "user1"}, 1);
2626-
auto info2 = server.AnnoyingClient->ReadFromPQ({secondTopic, 0, 0, 1, "user1"}, 1);
2626+
auto info1 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 1, "user1"}, 7);
2627+
auto info2 = server.AnnoyingClient->ReadFromPQ({secondTopic, 0, 0, 1, "user1"}, 7);
26272628

26282629
UNIT_ASSERT_VALUES_EQUAL(info1.BlobsFromCache, 1);
26292630
UNIT_ASSERT_VALUES_EQUAL(info2.BlobsFromCache, 1);
2630-
UNIT_ASSERT_VALUES_EQUAL(info1.Values.size(), 1);
2631-
UNIT_ASSERT_VALUES_EQUAL(info2.Values.size(), 1);
2631+
UNIT_ASSERT_VALUES_EQUAL(info1.Values.size(), 7);
2632+
UNIT_ASSERT_VALUES_EQUAL(info2.Values.size(), 7);
26322633
UNIT_ASSERT_VALUES_EQUAL(info1.Values[0].size(), valueSize);
26332634
UNIT_ASSERT_VALUES_EQUAL(info2.Values[0].size(), valueSize);
26342635
UNIT_ASSERT(info1.Values[0] == value1);

0 commit comments

Comments
 (0)