Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/persqueue/blob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ void TBatch::UnpackToType1(TVector<TClientBlob> *blobs) const {
ui32 sourceIdCount = 0;
TVector<TString> sourceIds;

static NScheme::TTypeCodecs ui32Codecs(NScheme::NTypeIds::Uint32), ui64Codecs(NScheme::NTypeIds::Uint64), stringCodecs(NScheme::NTypeIds::String);
static const NScheme::TTypeCodecs ui32Codecs(NScheme::NTypeIds::Uint32), ui64Codecs(NScheme::NTypeIds::Uint64), stringCodecs(NScheme::NTypeIds::String);
//read order
{
auto chunk = NScheme::IChunkDecoder::ReadChunk(GetChunk(data, dataEnd), &ui32Codecs);
Expand Down
12 changes: 8 additions & 4 deletions ydb/core/persqueue/partition_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -421,9 +421,10 @@ TReadAnswer TReadInfo::FormAnswer(
size -= lastBlobSize;
}
lastBlobSize = 0;
return (size >= Size || cnt >= Count);
return cnt >= Count;
}
return !AppData()->PQConfig.GetTopicsAreFirstClassCitizen() && (size >= Size || cnt >= Count);
// For backward compatibility, we keep the behavior for older clients for non-FirstClassCitizen
return !AppData()->PQConfig.GetTopicsAreFirstClassCitizen() && cnt >= Count;
};

Y_ABORT_UNLESS(blobs.size() == Blobs.size());
Expand Down Expand Up @@ -466,7 +467,7 @@ TReadAnswer TReadInfo::FormAnswer(
Y_ABORT_UNLESS(offset < Offset || partNo <= PartNo);
TKey key(TKeyPrefix::TypeData, TPartitionId(0), offset, partNo, count, internalPartsCount, false);
ui64 firstHeaderOffset = GetFirstHeaderOffset(key, blobValue);
for (TBlobIterator it(key, blobValue); it.IsValid(); it.Next()) {
for (TBlobIterator it(key, blobValue); it.IsValid() && !needStop; it.Next()) {
TBatch batch = it.GetBatch();
auto& header = batch.Header;
batch.Unpack();
Expand Down Expand Up @@ -510,7 +511,10 @@ TReadAnswer TReadInfo::FormAnswer(
++PartNo;
}

needStop = updateUsage(res);
if (updateUsage(res)) {
needStop = true;
break;
}
}
}
}
Expand Down
42 changes: 23 additions & 19 deletions ydb/core/persqueue/ut/pq_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1174,7 +1174,7 @@ Y_UNIT_TEST(TestWritePQBigMessage) {
CmdWrite(0, "sourceid0", data, tc, false, {}, true);
PQGetPartInfo(0, 27, tc);

CmdRead(0, 0, Max<i32>(), Max<i32>(), 13, false, tc);
CmdRead(0, 0, Max<i32>(), Max<i32>(), 1, false, tc);
CmdRead(0, 1, Max<i32>(), Max<i32>(), 25, false, tc);
CmdRead(0, 24, Max<i32>(), Max<i32>(), 2, false, tc);
CmdRead(0, 26, Max<i32>(), Max<i32>(), 1, false, tc);
Expand Down Expand Up @@ -1602,18 +1602,18 @@ Y_UNIT_TEST(TestPQRead) {
CmdRead(0, 26, Max<i32>(), Max<i32>(), 0, true, tc);

CmdRead(0, 0, Max<i32>(), Max<i32>(), 25, false, tc);
CmdRead(0, 0, 10, 100_MB, 15, false, tc);
CmdRead(0, 9, 1, 100_MB, 6, false, tc);
CmdRead(0, 0, 10, 100_MB, 10, false, tc);
CmdRead(0, 9, 1, 100_MB, 1, false, tc);
CmdRead(0, 23, 3, 100_MB, 3, false, tc);

CmdRead(0, 3, 1000, 511_KB, 4, false, tc);
CmdRead(0, 3, 1000, 511_KB, 4, false, tc);
CmdRead(0, 3, 1000, 1_KB, 4, false, tc); //at least one message will be readed always
CmdRead(0, 3, 1000, 511_KB, 12, false, tc);
CmdRead(0, 3, 1000, 511_KB, 12, false, tc);
CmdRead(0, 3, 1000, 1_KB, 12, false, tc); //at least one message will be readed always
CmdRead(0, 25, 1000, 1_KB, 1, false, tc); //at least one message will be readed always, from head

activeZone = true;
CmdRead(0, 9, 1000, 3_MB, 6, false, tc);
CmdRead(0, 9, 1000, 3_MB - 10_KB, 6, false, tc);
CmdRead(0, 9, 1000, 3_MB, 14, false, tc);
CmdRead(0, 9, 1000, 3_MB - 10_KB, 14, false, tc);
CmdRead(0, 25, 1000, 512_KB, 1, false, tc); //from head
CmdRead(0, 24, 1000, 512_KB, 1, false, tc); //from head

Expand Down Expand Up @@ -1703,11 +1703,15 @@ Y_UNIT_TEST(TestPQReadAhead) {
PQGetPartInfo(0, 22, tc);
activeZone = true;

CmdRead(0, 0, 1, 100_MB, 12, false, tc);
CmdRead(0, 1, 1, 100_MB, 11, false, tc);
CmdRead(0, 2, 1, 100_MB, 10, false, tc);
CmdRead(0, 3, 1, 100_MB, 9, false, tc);
CmdRead(0, 4, 10, 100_MB, 16, false, tc);
CmdRead(0, 0, 1, 100_MB, 1, false, tc);
CmdRead(0, 1, 1, 100_MB, 1, false, tc);
CmdRead(0, 2, 1, 100_MB, 1, false, tc);
CmdRead(0, 4, 10, 100_MB, 10, false, tc);

CmdRead(0, 0, Max<i32>(), 100_KB, 12, false, tc);
CmdRead(0, 1, Max<i32>(), 100_KB, 19, false, tc);
CmdRead(0, 2, Max<i32>(), 100_KB, 18, false, tc);
CmdRead(0, 3, Max<i32>(), 100_KB, 17, false, tc);
});
}

Expand Down Expand Up @@ -2023,7 +2027,7 @@ Y_UNIT_TEST(TestPQCacheSizeManagement) {

TAutoPtr<IEventHandle> handle;
for (ui32 i = 0; i < 10; ++i) {
CmdRead(0, 0, 1, 100_MB, 7, false, tc);
CmdRead(0, 0, 1, 100_MB, 1, false, tc);
PQTabletRestart(tc);
}
});
Expand Down Expand Up @@ -2074,14 +2078,14 @@ Y_UNIT_TEST(TestMaxTimeLagRewind) {
}
const auto ts = tc.Runtime->GetCurrentTime();

CmdRead(0, 0, 1, Max<i32>(), 7, false, tc, {0});
CmdRead(0, 0, 1, Max<i32>(), 7, false, tc, {21}, TDuration::Minutes(3).MilliSeconds());
CmdRead(0, 22, 1, Max<i32>(), 6, false, tc, {22}, TDuration::Minutes(3).MilliSeconds());
CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {0});
CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {21}, TDuration::Minutes(3).MilliSeconds());
CmdRead(0, 22, 1, Max<i32>(), 1, false, tc, {22}, TDuration::Minutes(3).MilliSeconds());
CmdRead(0, 4, 1, Max<i32>(), 1, false, tc, {34}, 1000);

CmdRead(0, 0, 1, Max<i32>(), 7, false, tc, {21}, 0,
CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {21}, 0,
(ts - TDuration::Minutes(3)).MilliSeconds());
CmdRead(0, 22, 1, Max<i32>(), 6, false, tc, {22}, 0,
CmdRead(0, 22, 1, Max<i32>(), 1, false, tc, {22}, 0,
(ts - TDuration::Minutes(3)).MilliSeconds());
CmdRead(0, 4, 1, Max<i32>(), 1, false, tc, {34}, 0,
(ts - TDuration::Seconds(1)).MilliSeconds());
Expand Down
60 changes: 60 additions & 0 deletions ydb/services/datastreams/datastreams_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2353,6 +2353,66 @@ Y_UNIT_TEST_SUITE(DataStreams) {

}

Y_UNIT_TEST(TestGetRecordsWithCount) {
TInsecureDatastreamsTestServer testServer;
const TString streamName = TStringBuilder() << "stream_" << Y_UNIT_TEST_NAME;
{
auto result = testServer.DataStreamsClient->CreateStream(streamName,
NYDS_V1::TCreateStreamSettings().ShardCount(1)).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

const ui32 recordsCount = 16;
std::vector<ui64> timestamps;
{
std::string data;
data.resize(1_MB); // big messages. compaction must will be completed.
std::iota(data.begin(), data.end(), 1);
std::random_device rd;
std::mt19937 generator{rd()};

for (ui32 i = 1; i <= recordsCount; ++i) {
std::shuffle(data.begin(), data.end(), generator);
{
TString id = Sprintf("%04u", i);
NYDS_V1::TDataRecord dataRecord{{data.begin(), data.end()}, id, ""};
//
// we make sure that the value of WriteTimestampMs is between neighboring timestamps
//
timestamps.push_back(TInstant::Now().MilliSeconds());
Sleep(TDuration::MilliSeconds(500));
auto result = testServer.DataStreamsClient->PutRecord(streamName, dataRecord).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
Sleep(TDuration::MilliSeconds(500));
}
}

for (ui32 i = 0; i < recordsCount; ++i) {
TString shardIterator;

{
auto result = testServer.DataStreamsClient->GetShardIterator(streamName, "shard-000000",
YDS_V1::ShardIteratorType::AT_TIMESTAMP,
NYDS_V1::TGetShardIteratorSettings().Timestamp(timestamps[i])).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
shardIterator = result.GetResult().shard_iterator();
}

{
auto result = testServer.DataStreamsClient->GetRecords(shardIterator,
NYDS_V1::TGetRecordsSettings().Limit(1)).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResult().records().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(std::stoi(result.GetResult().records().begin()->sequence_number()), i);
}
}
}

Y_UNIT_TEST(TestGetRecordsStreamWithMultipleShards) {
TInsecureDatastreamsTestServer testServer;
const TString streamName = TStringBuilder() << "stream_" << Y_UNIT_TEST_NAME;
Expand Down
49 changes: 9 additions & 40 deletions ydb/services/persqueue_v1/persqueue_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -880,12 +880,12 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
writer->Close();
}

void DoRead(ui64 assignId, ui64& nextReadId, ui32& currTotalMessages, ui32 messageLimit) {
void DoRead(ui64 assignId, ui64& nextReadId, ui32& currTotalMessages, const ui32 messageLimit) {
// Get DirectReadResponse messages, send DirectReadAck messages.

auto endTime = TInstant::Now() + TDuration::Seconds(10);
while (currTotalMessages < messageLimit && endTime > TInstant::Now()) {
Cerr << "Wait for direct read id: " << nextReadId << ", currently have " << currTotalMessages << " messages" << Endl;
Cerr << "Wait for direct read id: " << nextReadId << ", currently have " << currTotalMessages << " messages, limit is " << messageLimit << Endl;

Ydb::Topic::StreamDirectReadMessage::FromServer resp;
UNIT_ASSERT(DirectStream->Read(&resp));
Expand Down Expand Up @@ -1013,37 +1013,6 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
UNIT_ASSERT_VALUES_EQUAL(cachedData->Data.begin()->second.Reads.size(), 0);
}

Y_UNIT_TEST(DirectReadNotCached) {
TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
SET_LOCALS;
TDirectReadTestSetup setup{server};

setup.InitControlSession("acc/topic1");
auto [partitionId, assignId] = setup.GetNextAssign("acc/topic1");
UNIT_ASSERT_VALUES_EQUAL(partitionId, 0);
setup.InitDirectSession("acc/topic1");
setup.SendReadSessionAssign(assignId);

ui32 totalMsg = 0;
ui64 nextReadId = 1;
Sleep(TDuration::Seconds(3));
setup.DoWrite(pqClient->GetDriver(), "acc/topic1", 1_MB, 50);
setup.DoRead(assignId, nextReadId, totalMsg, 42);

Topic::StreamReadMessage::FromClient req;
req.mutable_read_request()->set_bytes_size(40_MB);
if (!setup.ControlStream->Write(req)) {
ythrow yexception() << "write fail";
}
setup.DoRead(assignId, nextReadId, totalMsg, 50);

Sleep(TDuration::Seconds(1));
auto cachedData = RequestCacheData(runtime, new TEvPQ::TEvGetFullDirectReadData());
UNIT_ASSERT_VALUES_EQUAL(cachedData->Data.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(cachedData->Data.begin()->second.StagedReads.size(), 0);
UNIT_ASSERT_VALUES_EQUAL(cachedData->Data.begin()->second.Reads.size(), 0);
}

Y_UNIT_TEST(DirectReadBadCases) {
TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
SET_LOCALS;
Expand Down Expand Up @@ -2552,7 +2521,7 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
for (ui32 i = 0; i < 32; ++i)
server.AnnoyingClient->WriteToPQ({DEFAULT_TOPIC_NAME, 0, "source1", i}, value);

auto info0 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 16, "user"}, 23);
auto info0 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 16, "user"}, 16);
auto info16 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 16, 16, "user"}, 16);

UNIT_ASSERT_VALUES_EQUAL(info0.BlobsFromCache, 3);
Expand All @@ -2562,8 +2531,8 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
for (ui32 i = 0; i < 8; ++i)
server.AnnoyingClient->WriteToPQ({DEFAULT_TOPIC_NAME, 0, "source1", 32+i}, value);

info0 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 16, "user"}, 23);
info16 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 16, 16, "user"}, 22);
info0 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 16, "user"}, 16);
info16 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 16, 16, "user"}, 16);

ui32 fromDisk = info0.BlobsFromDisk + info16.BlobsFromDisk;
ui32 fromCache = info0.BlobsFromCache + info16.BlobsFromCache;
Expand Down Expand Up @@ -2623,13 +2592,13 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
server.AnnoyingClient->WriteToPQ({secondTopic, 0, "source1", i}, mb);
}

auto info1 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 1, "user1"}, 7);
auto info2 = server.AnnoyingClient->ReadFromPQ({secondTopic, 0, 0, 1, "user1"}, 7);
auto info1 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 1, "user1"}, 1);
auto info2 = server.AnnoyingClient->ReadFromPQ({secondTopic, 0, 0, 1, "user1"}, 1);

UNIT_ASSERT_VALUES_EQUAL(info1.BlobsFromCache, 1);
UNIT_ASSERT_VALUES_EQUAL(info2.BlobsFromCache, 1);
UNIT_ASSERT_VALUES_EQUAL(info1.Values.size(), 7);
UNIT_ASSERT_VALUES_EQUAL(info2.Values.size(), 7);
UNIT_ASSERT_VALUES_EQUAL(info1.Values.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(info2.Values.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(info1.Values[0].size(), valueSize);
UNIT_ASSERT_VALUES_EQUAL(info2.Values[0].size(), valueSize);
UNIT_ASSERT(info1.Values[0] == value1);
Expand Down
Loading