Skip to content

Commit 5b1970a

Browse files
Merge aa8bba0 into 34e573c
2 parents 34e573c + aa8bba0 commit 5b1970a

File tree

3 files changed

+97
-26
lines changed

3 files changed

+97
-26
lines changed

ydb/core/tx/columnshard/engines/portions/constructor.cpp

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,45 @@ TPortionInfo TPortionInfoConstructor::Build(const bool needChunksNormalization)
3131
NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("portion_id", GetPortionIdVerified());
3232
FullValidation();
3333

34+
if (BlobIdxs.size()) {
35+
auto itRecord = Records.begin();
36+
auto itIndex = Indexes.begin();
37+
auto itBlobIdx = BlobIdxs.begin();
38+
while (itRecord != Records.end() && itIndex != Indexes.end() && itBlobIdx != BlobIdxs.end()) {
39+
if (itRecord->GetAddress() < itIndex->GetAddress()) {
40+
AFL_VERIFY(itRecord->GetAddress() == itBlobIdx->GetAddress());
41+
itRecord->RegisterBlobIdx(itBlobIdx->GetBlobIdx());
42+
++itRecord;
43+
++itBlobIdx;
44+
} else if (itIndex->GetAddress() < itRecord->GetAddress()) {
45+
if (itIndex->HasBlobData()) {
46+
++itIndex;
47+
continue;
48+
}
49+
AFL_VERIFY(itIndex->GetAddress() == itBlobIdx->GetAddress());
50+
itIndex->RegisterBlobIdx(itBlobIdx->GetBlobIdx());
51+
++itIndex;
52+
++itBlobIdx;
53+
} else {
54+
AFL_VERIFY(false);
55+
}
56+
}
57+
for (; itRecord != Records.end() && itBlobIdx != BlobIdxs.end(); ++itRecord, ++itBlobIdx) {
58+
AFL_VERIFY(itRecord->GetAddress() == itBlobIdx->GetAddress());
59+
itRecord->RegisterBlobIdx(itBlobIdx->GetBlobIdx());
60+
}
61+
for (; itIndex != Indexes.end() && itBlobIdx != BlobIdxs.end(); ++itIndex) {
62+
if (itIndex->HasBlobData()) {
63+
continue;
64+
}
65+
AFL_VERIFY(itIndex->GetAddress() == itBlobIdx->GetAddress());
66+
itIndex->RegisterBlobIdx(itBlobIdx->GetBlobIdx());
67+
++itBlobIdx;
68+
}
69+
AFL_VERIFY(itRecord == Records.end());
70+
AFL_VERIFY(itBlobIdx == BlobIdxs.end());
71+
}
72+
3473
result.Indexes = Indexes;
3574
result.Records = Records;
3675
result.BlobIds = BlobIds;

ydb/core/tx/columnshard/engines/portions/constructor.h

Lines changed: 57 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,26 @@ class TPortionInfoConstructor {
2929
YDB_ACCESSOR_DEF(std::vector<TColumnRecord>, Records);
3030
std::vector<TUnifiedBlobId> BlobIds;
3131

32+
class TAddressBlobId {
33+
private:
34+
TChunkAddress Address;
35+
YDB_READONLY(TBlobRangeLink16::TLinkId, BlobIdx, 0);
36+
37+
public:
38+
const TChunkAddress& GetAddress() const {
39+
return Address;
40+
}
41+
42+
TAddressBlobId(const TChunkAddress& address, const TBlobRangeLink16::TLinkId blobIdx)
43+
: Address(address)
44+
, BlobIdx(blobIdx)
45+
{
46+
47+
}
48+
};
49+
std::vector<TAddressBlobId> BlobIdxs;
50+
bool NeedBlobIdxsSort = false;
51+
3252
public:
3353
void SetPortionId(const ui64 value) {
3454
AFL_VERIFY(value);
@@ -216,6 +236,16 @@ class TPortionInfoConstructor {
216236
return linkRange.RestoreRange(GetBlobId(linkRange.GetBlobIdxVerified()));
217237
}
218238

239+
const TBlobRange RestoreBlobRangeSlow(const TBlobRangeLink16& linkRange, const TChunkAddress& address) const {
240+
for (auto&& i : BlobIdxs) {
241+
if (i.GetAddress() == address) {
242+
return linkRange.RestoreRange(GetBlobId(i.GetBlobIdx()));
243+
}
244+
}
245+
AFL_VERIFY(false);
246+
return TBlobRange();
247+
}
248+
219249
const TUnifiedBlobId& GetBlobId(const TBlobRangeLink16::TLinkId linkId) const {
220250
AFL_VERIFY(linkId < BlobIds.size());
221251
return BlobIds[linkId];
@@ -226,19 +256,10 @@ class TPortionInfoConstructor {
226256
}
227257

228258
void RegisterBlobIdx(const TChunkAddress& address, const TBlobRangeLink16::TLinkId blobIdx) {
229-
for (auto&& i : Records) {
230-
if (i.GetColumnId() == address.GetEntityId() && i.GetChunkIdx() == address.GetChunkIdx()) {
231-
i.RegisterBlobIdx(blobIdx);
232-
return;
233-
}
259+
if (BlobIdxs.size() && address < BlobIdxs.back().GetAddress()) {
260+
NeedBlobIdxsSort = true;
234261
}
235-
for (auto&& i : Indexes) {
236-
if (i.GetIndexId() == address.GetEntityId() && i.GetChunkIdx() == address.GetChunkIdx()) {
237-
i.RegisterBlobIdx(blobIdx);
238-
return;
239-
}
240-
}
241-
AFL_VERIFY(false)("problem", "portion haven't address for blob registration")("address", address.DebugString());
262+
BlobIdxs.emplace_back(address, blobIdx);
242263
}
243264

244265
TString DebugString() const {
@@ -265,26 +286,37 @@ class TPortionInfoConstructor {
265286
std::sort(Indexes.begin(), Indexes.end(), pred);
266287
CheckChunksOrder(Indexes);
267288
}
289+
if (NeedBlobIdxsSort) {
290+
auto pred = [](const TAddressBlobId& l, const TAddressBlobId& r) {
291+
return l.GetAddress() < r.GetAddress();
292+
};
293+
std::sort(BlobIdxs.begin(), BlobIdxs.end(), pred);
294+
}
268295
}
269296

270297
void FullValidation() const {
271298
AFL_VERIFY(Records.size());
272299
CheckChunksOrder(Records);
273300
CheckChunksOrder(Indexes);
274-
std::set<ui32> blobIdxs;
275-
for (auto&& i : Records) {
276-
blobIdxs.emplace(i.GetBlobRange().GetBlobIdxVerified());
277-
}
278-
for (auto&& i : Indexes) {
279-
if (i.HasBlobRange()) {
280-
blobIdxs.emplace(i.GetBlobRangeVerified().GetBlobIdxVerified());
281-
}
282-
}
283-
if (BlobIds.size()) {
284-
AFL_VERIFY(BlobIds.size() == blobIdxs.size());
285-
AFL_VERIFY(BlobIds.size() == *blobIdxs.rbegin() + 1);
301+
if (BlobIdxs.size()) {
302+
AFL_VERIFY(BlobIdxs.size() <= Records.size() + Indexes.size())("blobs", BlobIdxs.size())("records", Records.size())(
303+
"indexes", Indexes.size());
286304
} else {
287-
AFL_VERIFY(blobIdxs.empty());
305+
std::set<ui32> blobIdxs;
306+
for (auto&& i : Records) {
307+
blobIdxs.emplace(i.GetBlobRange().GetBlobIdxVerified());
308+
}
309+
for (auto&& i : Indexes) {
310+
if (i.HasBlobRange()) {
311+
blobIdxs.emplace(i.GetBlobRangeVerified().GetBlobIdxVerified());
312+
}
313+
}
314+
if (BlobIds.size()) {
315+
AFL_VERIFY(BlobIds.size() == blobIdxs.size());
316+
AFL_VERIFY(BlobIds.size() == *blobIdxs.rbegin() + 1);
317+
} else {
318+
AFL_VERIFY(blobIdxs.empty());
319+
}
288320
}
289321
}
290322

ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ void AddIdsToBlobs(std::vector<TWritePortionInfoWithBlobsResult>& portions, NBlo
284284
blobsData.emplace(blobId, b.GetResultBlob());
285285
}
286286
for (auto&& rec : portion.GetPortionConstructor().GetRecords()) {
287-
auto range = portion.GetPortionConstructor().RestoreBlobRange(rec.BlobRange);
287+
auto range = portion.GetPortionConstructor().RestoreBlobRangeSlow(rec.BlobRange, rec.GetAddress());
288288
auto it = blobsData.find(range.BlobId);
289289
AFL_VERIFY(it != blobsData.end());
290290
const TString& data = it->second;

0 commit comments

Comments
 (0)