Skip to content

Commit 6c24540

Browse files
fix normalizers for v1 migration chunks (#11308)
1 parent dbd5f95 commit 6c24540

File tree

12 files changed

+243
-40
lines changed

12 files changed

+243
-40
lines changed

ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,13 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
1616

1717
ACFL_DEBUG("event", "TTxWriteIndex::Execute")("change_type", changes->TypeString())("details", changes->DebugString());
1818
if (Ev->Get()->GetPutStatus() == NKikimrProto::OK) {
19-
NOlap::TSnapshot snapshot(Self->LastPlannedStep, Self->LastPlannedTxId);
20-
Y_ABORT_UNLESS(Ev->Get()->IndexInfo->GetLastSchema()->GetSnapshot() <= snapshot);
19+
AFL_VERIFY(Ev->Get()->IndexInfo->GetLastSchema()->GetSnapshot() <= Self->GetLastTxSnapshot())
20+
("schema_last", Ev->Get()->IndexInfo->GetLastSchema()->GetSnapshot().DebugString())(
21+
"planned_last", Self->GetLastTxSnapshot().DebugString());
2122

2223
TBlobGroupSelector dsGroupSelector(Self->Info());
2324
NOlap::TDbWrapper dbWrap(txc.DB, &dsGroupSelector);
24-
AFL_VERIFY(Self->TablesManager.MutablePrimaryIndex().ApplyChangesOnExecute(dbWrap, changes, snapshot));
25+
AFL_VERIFY(Self->TablesManager.MutablePrimaryIndex().ApplyChangesOnExecute(dbWrap, changes, Self->GetLastTxSnapshot()));
2526
LOG_S_DEBUG(TxPrefix() << "(" << changes->TypeString() << ") apply" << TxSuffix());
2627
NOlap::TWriteIndexContext context(&txc.DB, dbWrap, Self->MutableIndexAs<NOlap::TColumnEngineForLogs>(), CurrentSnapshot);
2728
changes->WriteIndexOnExecute(Self, context);

ydb/core/tx/columnshard/columnshard_schema.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -918,13 +918,19 @@ class TPortionLoadContext {
918918
YDB_READONLY(ui64, PathId, 0);
919919
YDB_READONLY(ui64, PortionId, 0);
920920
YDB_READONLY_DEF(NKikimrTxColumnShard::TIndexPortionMeta, MetaProto);
921+
YDB_READONLY_DEF(std::optional<NOlap::TSnapshot>, DeprecatedMinSnapshot);
921922

922923
public:
923924
template <class TSource>
924925
TPortionLoadContext(const TSource& rowset) {
925926
PathId = rowset.template GetValue<NColumnShard::Schema::IndexPortions::PathId>();
926927
PortionId = rowset.template GetValue<NColumnShard::Schema::IndexPortions::PortionId>();
927928
const TString metadata = rowset.template GetValue<NColumnShard::Schema::IndexPortions::Metadata>();
929+
AFL_VERIFY(rowset.template HaveValue<NColumnShard::Schema::IndexPortions::MinSnapshotPlanStep>() == rowset.template HaveValue<NColumnShard::Schema::IndexPortions::MinSnapshotTxId>());
930+
if (rowset.template HaveValue<NColumnShard::Schema::IndexPortions::MinSnapshotPlanStep>()) {
931+
DeprecatedMinSnapshot = NOlap::TSnapshot(rowset.template GetValue<NColumnShard::Schema::IndexPortions::MinSnapshotPlanStep>(),
932+
rowset.template GetValue<NColumnShard::Schema::IndexPortions::MinSnapshotTxId>());
933+
}
928934
AFL_VERIFY(MetaProto.ParseFromArray(metadata.data(), metadata.size()))("event", "cannot parse metadata as protobuf");
929935
}
930936
};

ydb/core/tx/columnshard/common/blob.cpp

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,16 @@ TString TBlobRange::GetData(const TString& blobData) const {
141141
return blobData.substr(Offset, Size);
142142
}
143143

144+
TBlobRange::TBlobRange(const TUnifiedBlobId& blobId /*= TUnifiedBlobId()*/, ui32 offset /*= 0*/, ui32 size /*= 0*/)
145+
: BlobId(blobId)
146+
, Offset(offset)
147+
, Size(size) {
148+
if (Size > 0) {
149+
AFL_VERIFY(Offset < BlobId.BlobSize())("offset", Offset)("size", Size)("blob", BlobId.ToStringNew());
150+
AFL_VERIFY(Offset + Size <= BlobId.BlobSize())("offset", Offset)("size", Size)("blob", BlobId.ToStringNew());
151+
}
152+
}
153+
144154
NKikimr::TConclusionStatus TBlobRangeLink16::DeserializeFromProto(const NKikimrColumnShardProto::TBlobRangeLink16& proto) {
145155
BlobIdx = proto.GetBlobIdx();
146156
Offset = proto.GetOffset();
@@ -171,8 +181,12 @@ ui16 TBlobRangeLink16::GetBlobIdxVerified() const {
171181
return *BlobIdx;
172182
}
173183

174-
NKikimr::NOlap::TBlobRange TBlobRangeLink16::RestoreRange(const TUnifiedBlobId& blobId) const {
184+
TBlobRange TBlobRangeLink16::RestoreRange(const TUnifiedBlobId& blobId) const {
175185
return TBlobRange(blobId, Offset, Size);
176186
}
177187

188+
bool TBlobRangeLink16::CheckBlob(const TUnifiedBlobId& blobId) const {
189+
return Offset + Size <= blobId.BlobSize();
178190
}
191+
192+
} // namespace NKikimr::NOlap

ydb/core/tx/columnshard/common/blob.h

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ class TBlobRangeLink16 {
189189
}
190190

191191
TBlobRange RestoreRange(const TUnifiedBlobId& blobId) const;
192+
bool CheckBlob(const TUnifiedBlobId& blobId) const;
192193
};
193194

194195
struct TBlobRange {
@@ -267,16 +268,7 @@ struct TBlobRange {
267268
return Size == BlobId.BlobSize();
268269
}
269270

270-
explicit TBlobRange(const TUnifiedBlobId& blobId = TUnifiedBlobId(), ui32 offset = 0, ui32 size = 0)
271-
: BlobId(blobId)
272-
, Offset(offset)
273-
, Size(size)
274-
{
275-
if (Size > 0) {
276-
Y_ABORT_UNLESS(Offset < BlobId.BlobSize());
277-
Y_ABORT_UNLESS(Offset + Size <= BlobId.BlobSize());
278-
}
279-
}
271+
explicit TBlobRange(const TUnifiedBlobId& blobId = TUnifiedBlobId(), ui32 offset = 0, ui32 size = 0);
280272

281273
static TBlobRange FromBlobId(const TUnifiedBlobId& blobId) {
282274
return TBlobRange(blobId, 0, blobId.BlobSize());

ydb/core/tx/columnshard/engines/portions/constructor.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,8 @@ ISnapshotSchema::TPtr TPortionInfoConstructor::GetSchema(const TVersionedIndex&
125125

126126
void TPortionInfoConstructor::LoadRecord(const TColumnChunkLoadContextV1& loadContext) {
127127
AFL_VERIFY(loadContext.GetBlobRange().GetBlobIdxVerified() < MetaConstructor.BlobIds.size());
128+
AFL_VERIFY(loadContext.GetBlobRange().CheckBlob(MetaConstructor.BlobIds[loadContext.GetBlobRange().GetBlobIdxVerified()]))(
129+
"blobs", JoinSeq(",", MetaConstructor.BlobIds))("range", loadContext.GetBlobRange().ToString());
128130
TColumnRecord rec(loadContext);
129131
Records.push_back(std::move(rec));
130132
}

ydb/core/tx/columnshard/normalizer/abstract/abstract.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,10 @@ enum class ENormalizerSequentialId: ui32 {
6161
GCCountersNormalizer,
6262
RestorePortionFromChunks,
6363
SyncPortionFromChunks,
64-
RestoreV1Chunks,
64+
DeprecatedRestoreV1Chunks,
65+
SyncMinSnapshotFromChunks,
66+
DeprecatedRestoreV1Chunks_V1,
67+
RestoreV1Chunks_V2,
6568

6669
MAX
6770
};

ydb/core/tx/columnshard/normalizer/portion/chunks_actualization.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class TChanges: public INormalizerChanges {
4646
ui64 indexRawBytes = 0;
4747
ui32 columnBlobBytes = 0;
4848
ui32 indexBlobBytes = 0;
49-
49+
5050
for (auto&& c : i.GetRecords()) {
5151
columnRawBytes += c.GetMetaProto().GetRawBytes();
5252
columnBlobBytes += c.GetBlobRange().GetSize();

ydb/core/tx/columnshard/normalizer/portion/restore_v1_chunks.cpp

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,15 @@ class TChangesAddV1: public INormalizerChanges {
6161
using IndexColumnsV1 = NColumnShard::Schema::IndexColumnsV1;
6262
for (auto&& i : Patches) {
6363
auto metaProto = i.GetPortionInfo().GetMetaProto();
64+
metaProto.ClearBlobIds();
6465
AFL_VERIFY(!metaProto.GetBlobIds().size());
6566
for (auto&& b : i.GetBlobIds()) {
66-
*metaProto.AddBlobIds() = b.SerializeBinary();
67+
*metaProto.AddBlobIds() = b.GetLogoBlobId().AsBinaryString();
68+
}
69+
ui32 idx = 0;
70+
for (auto&& b : metaProto.GetBlobIds()) {
71+
auto logo = TLogoBlobID::FromBinary(b);
72+
AFL_VERIFY(i.GetBlobIds()[idx++].GetLogoBlobId() == logo);
6773
}
6874
db.Table<IndexPortions>()
6975
.Key(i.GetPortionInfo().GetPathId(), i.GetPortionInfo().GetPortionId())
@@ -154,9 +160,7 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TNormalizer::DoInit(
154160
while (!rowset.EndOfSet()) {
155161
TPortionLoadContext portion(rowset);
156162
existPortions0.emplace(portion.GetPortionId());
157-
if (!portion.GetMetaProto().GetBlobIds().size()) {
158-
AFL_VERIFY(portions0.emplace(portion.GetPortionId(), portion).second);
159-
}
163+
AFL_VERIFY(portions0.emplace(portion.GetPortionId(), portion).second);
160164

161165
if (!rowset.Next()) {
162166
return TConclusionStatus::Fail("Not ready");
@@ -190,10 +194,10 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TNormalizer::DoInit(
190194

191195
while (!rowset.EndOfSet()) {
192196
TColumnChunkLoadContextV1 chunk(rowset);
193-
AFL_VERIFY(!portions0.contains(chunk.GetPortionId()));
194-
if (!existPortions0.contains(chunk.GetPortionId())) {
197+
// AFL_VERIFY(!portions0.contains(chunk.GetPortionId()));
198+
// if (!existPortions0.contains(chunk.GetPortionId())) {
195199
AFL_VERIFY(columns1Remove.emplace(chunk.GetFullChunkAddress(), chunk).second);
196-
}
200+
// }
197201

198202
if (!rowset.Next()) {
199203
return TConclusionStatus::Fail("Not ready");
@@ -207,37 +211,37 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TNormalizer::DoInit(
207211
}
208212

209213
{
210-
std::vector<TPatchItemAddV1> package;
211-
for (auto&& [portionId, portionInfo] : portions0) {
212-
auto it = columns0.find(portionId);
213-
AFL_VERIFY(it != columns0.end());
214-
package.emplace_back(portionInfo, std::move(it->second));
215-
columns0.erase(it);
214+
std::vector<TPatchItemRemoveV1> package;
215+
for (auto&& [portionId, chunkInfo] : columns1Remove) {
216+
package.emplace_back(chunkInfo);
216217
if (package.size() == 100) {
217-
std::vector<TPatchItemAddV1> local;
218+
std::vector<TPatchItemRemoveV1> local;
218219
local.swap(package);
219-
tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChangesAddV1>(std::move(local))));
220+
tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChangesRemoveV1>(std::move(local))));
220221
}
221222
}
222223

223224
if (package.size() > 0) {
224-
tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChangesAddV1>(std::move(package))));
225+
tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChangesRemoveV1>(std::move(package))));
225226
}
226227
}
227228

228229
{
229-
std::vector<TPatchItemRemoveV1> package;
230-
for (auto&& [portionId, chunkInfo] : columns1Remove) {
231-
package.emplace_back(chunkInfo);
230+
std::vector<TPatchItemAddV1> package;
231+
for (auto&& [portionId, portionInfo] : portions0) {
232+
auto it = columns0.find(portionId);
233+
AFL_VERIFY(it != columns0.end());
234+
package.emplace_back(portionInfo, std::move(it->second));
235+
columns0.erase(it);
232236
if (package.size() == 100) {
233-
std::vector<TPatchItemRemoveV1> local;
237+
std::vector<TPatchItemAddV1> local;
234238
local.swap(package);
235-
tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChangesRemoveV1>(std::move(local))));
239+
tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChangesAddV1>(std::move(local))));
236240
}
237241
}
238242

239243
if (package.size() > 0) {
240-
tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChangesRemoveV1>(std::move(package))));
244+
tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChangesAddV1>(std::move(package))));
241245
}
242246
}
243247

ydb/core/tx/columnshard/normalizer/portion/restore_v1_chunks.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,11 @@ namespace NKikimr::NOlap::NRestoreV1Chunks {
1313
class TNormalizer: public TNormalizationController::INormalizerComponent {
1414
public:
1515
static TString GetClassNameStatic() {
16-
return ::ToString(ENormalizerSequentialId::RestoreV1Chunks);
16+
return ::ToString(ENormalizerSequentialId::RestoreV1Chunks_V2);
1717
}
1818

1919
virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
20-
return ENormalizerSequentialId::RestoreV1Chunks;
20+
return ENormalizerSequentialId::RestoreV1Chunks_V2;
2121
}
2222

2323
virtual TString GetClassName() const override {
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
#include "snapshot_from_chunks.h"
2+
#include "normalizer.h"
3+
4+
#include <ydb/core/formats/arrow/size_calcer.h>
5+
#include <ydb/core/tx/columnshard/engines/portions/data_accessor.h>
6+
#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
7+
#include <ydb/core/tx/columnshard/tables_manager.h>
8+
9+
namespace NKikimr::NOlap::NSyncMinSnapshotFromChunks {
10+
11+
class TPatchItem {
12+
private:
13+
TPortionLoadContext PortionInfo;
14+
YDB_READONLY(NOlap::TSnapshot, Snapshot, NOlap::TSnapshot::Zero());
15+
16+
public:
17+
const TPortionLoadContext& GetPortionInfo() const {
18+
return PortionInfo;
19+
}
20+
21+
TPatchItem(TPortionLoadContext&& portion, const NOlap::TSnapshot& snapshot)
22+
: PortionInfo(std::move(portion))
23+
, Snapshot(snapshot) {
24+
}
25+
};
26+
27+
class TChanges: public INormalizerChanges {
28+
private:
29+
std::vector<TPatchItem> Patches;
30+
31+
public:
32+
TChanges(std::vector<TPatchItem>&& patches)
33+
: Patches(std::move(patches)) {
34+
}
35+
virtual bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController&) const override {
36+
using namespace NColumnShard;
37+
NIceDb::TNiceDb db(txc.DB);
38+
for (auto&& i : Patches) {
39+
db.Table<Schema::IndexPortions>()
40+
.Key(i.GetPortionInfo().GetPathId(), i.GetPortionInfo().GetPortionId())
41+
.Update(NIceDb::TUpdate<Schema::IndexPortions::MinSnapshotPlanStep>(i.GetSnapshot().GetPlanStep()),
42+
NIceDb::TUpdate<Schema::IndexPortions::MinSnapshotTxId>(i.GetSnapshot().GetTxId())
43+
);
44+
}
45+
46+
return true;
47+
}
48+
49+
virtual ui64 GetSize() const override {
50+
return Patches.size();
51+
}
52+
53+
};
54+
55+
TConclusion<std::vector<INormalizerTask::TPtr>> TNormalizer::DoInit(
56+
const TNormalizationController& /*controller*/, NTabletFlatExecutor::TTransactionContext& txc) {
57+
using namespace NColumnShard;
58+
NIceDb::TNiceDb db(txc.DB);
59+
60+
bool ready = true;
61+
ready = ready & Schema::Precharge<Schema::IndexPortions>(db, txc.DB.GetScheme());
62+
ready = ready & Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme());
63+
if (!ready) {
64+
return TConclusionStatus::Fail("Not ready");
65+
}
66+
67+
THashMap<ui64, TPortionLoadContext> dbPortions;
68+
THashMap<ui64, NOlap::TSnapshot> initSnapshot;
69+
70+
{
71+
auto rowset = db.Table<Schema::IndexPortions>().Select();
72+
if (!rowset.IsReady()) {
73+
return TConclusionStatus::Fail("Not ready");
74+
}
75+
76+
while (!rowset.EndOfSet()) {
77+
TPortionLoadContext portion(rowset);
78+
if (!portion.GetDeprecatedMinSnapshot()) {
79+
AFL_VERIFY(dbPortions.emplace(portion.GetPortionId(), portion).second);
80+
}
81+
82+
if (!rowset.Next()) {
83+
return TConclusionStatus::Fail("Not ready");
84+
}
85+
}
86+
}
87+
88+
{
89+
auto rowset = db.Table<Schema::IndexColumns>().Select();
90+
if (!rowset.IsReady()) {
91+
return TConclusionStatus::Fail("Not ready");
92+
}
93+
94+
while (!rowset.EndOfSet()) {
95+
TColumnChunkLoadContext chunk(rowset, &DsGroupSelector);
96+
const ui64 portionId = chunk.GetPortionId();
97+
if (dbPortions.contains(portionId)) {
98+
auto it = initSnapshot.find(portionId);
99+
if (it == initSnapshot.end()) {
100+
initSnapshot.emplace(portionId, chunk.GetMinSnapshotDeprecated());
101+
} else {
102+
AFL_VERIFY(it->second == chunk.GetMinSnapshotDeprecated());
103+
}
104+
}
105+
106+
if (!rowset.Next()) {
107+
return TConclusionStatus::Fail("Not ready");
108+
}
109+
}
110+
}
111+
AFL_VERIFY(dbPortions.size() == initSnapshot.size())("portions", dbPortions.size())("records", initSnapshot.size());
112+
113+
std::vector<INormalizerTask::TPtr> tasks;
114+
if (dbPortions.empty()) {
115+
return tasks;
116+
}
117+
118+
std::vector<TPatchItem> package;
119+
120+
for (auto&& [portionId, portion] : dbPortions) {
121+
auto it = initSnapshot.find(portionId);
122+
AFL_VERIFY(it != initSnapshot.end());
123+
package.emplace_back(std::move(portion), it->second);
124+
if (package.size() == 100) {
125+
std::vector<TPatchItem> local;
126+
local.swap(package);
127+
tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChanges>(std::move(local))));
128+
}
129+
}
130+
131+
if (package.size() > 0) {
132+
tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChanges>(std::move(package))));
133+
}
134+
return tasks;
135+
}
136+
137+
} // namespace NKikimr::NOlap::NChunksActualization

0 commit comments

Comments
 (0)