Skip to content

Commit 3be78ef

Browse files
add v2 chunks normalizer and usage for restore accessors
1 parent c2b8031 commit 3be78ef

File tree

9 files changed

+283
-4
lines changed

9 files changed

+283
-4
lines changed

ydb/core/tx/columnshard/columnshard_impl.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1280,7 +1280,7 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
12801280
for (auto&& i : PortionsByPath) {
12811281
for (auto&& p : i.second) {
12821282
{
1283-
auto rowset = db.Table<NColumnShard::Schema::IndexColumnsV1>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
1283+
auto rowset = db.Table<NColumnShard::Schema::IndexColumnsV2>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
12841284
if (!rowset.IsReady()) {
12851285
reask = true;
12861286
}
@@ -1303,12 +1303,12 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
13031303
std::vector<NOlap::TColumnChunkLoadContextV1> records;
13041304
std::vector<NOlap::TIndexChunkLoadContext> indexes;
13051305
{
1306-
auto rowset = db.Table<NColumnShard::Schema::IndexColumnsV1>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
1306+
auto rowset = db.Table<NColumnShard::Schema::IndexColumnsV2>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
13071307
if (!rowset.IsReady()) {
13081308
return false;
13091309
}
13101310
while (!rowset.EndOfSet()) {
1311-
records.emplace_back(NOlap::TColumnChunkLoadContextV1(rowset));
1311+
NOlap::TColumnChunkLoadContextV1::BuildFromDBV2(rowset, records);
13121312
if (!rowset.Next()) {
13131313
return false;
13141314
}

ydb/core/tx/columnshard/columnshard_schema.h

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,15 @@ struct Schema : NIceDb::Schema {
569569
using TColumns = TableColumns<PathId, PortionId, SSColumnId, ChunkIdx, Metadata, BlobIdx, Offset, Size>;
570570
};
571571

572+
struct IndexColumnsV2: Table<ColumnsV1TableId> {
573+
struct PathId: Column<1, NScheme::NTypeIds::Uint64> {};
574+
struct PortionId: Column<2, NScheme::NTypeIds::Uint64> {};
575+
struct Metadata: Column<3, NScheme::NTypeIds::String> {};
576+
577+
using TKey = TableKey<PathId, PortionId>;
578+
using TColumns = TableColumns<PathId, PortionId, Metadata>;
579+
};
580+
572581
using TTables = SchemaTables<
573582
Value,
574583
TxInfo,
@@ -607,7 +616,8 @@ struct Schema : NIceDb::Schema {
607616
TxDependencies,
608617
TxStates,
609618
TxEvents,
610-
IndexColumnsV1
619+
IndexColumnsV1,
620+
IndexColumnsV2
611621
>;
612622

613623
//
@@ -997,6 +1007,29 @@ class TColumnChunkLoadContextV1 {
9971007
YDB_READONLY_DEF(NKikimrTxColumnShard::TIndexColumnMeta, MetaProto);
9981008

9991009
public:
1010+
template <class TSource>
1011+
static void BuildFromDBV2(const TSource& rowset, std::vector<TColumnChunkLoadContextV1>& records) {
1012+
const ui64 pathId = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::PathId>();
1013+
const ui64 portionId = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::PortionId>();
1014+
const TString metadata = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::Metadata>();
1015+
NKikimrTxColumnShard::TIndexPortionAccessor metaProto;
1016+
AFL_VERIFY(metaProto.ParseFromArray(metadata.data(), metadata.size()))("event", "cannot parse metadata as protobuf");
1017+
for (auto&& i : metaProto.GetChunks()) {
1018+
TColumnChunkLoadContextV1 result(pathId, portionId, TChunkAddress(i.GetSSColumnId(), i.GetChunkIdx()),
1019+
TBlobRangeLink16::BuildFromProto(i.GetBlobRangeLink()).DetachResult(), i.GetMetadata());
1020+
records.emplace_back(std::move(result));
1021+
}
1022+
}
1023+
1024+
NKikimrTxColumnShard::TColumnChunkInfo SerializeToDBProto() const {
1025+
NKikimrTxColumnShard::TColumnChunkInfo proto;
1026+
proto.SetSSColumnId(Address.GetColumnId());
1027+
proto.SetChunkIdx(Address.GetChunkIdx());
1028+
*proto.MutableMetadata() = MetaProto;
1029+
*proto.MutableBlobRangeLink() = BlobRange.SerializeToProto();
1030+
return proto;
1031+
}
1032+
10001033
TFullChunkAddress GetFullChunkAddress() const {
10011034
return TFullChunkAddress(PathId, PortionId, Address.GetEntityId(), Address.GetChunkIdx());
10021035
}

ydb/core/tx/columnshard/engines/portions/column_record.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,14 @@ class TColumnRecord {
121121
return BlobRange;
122122
}
123123

124+
NKikimrTxColumnShard::TColumnChunkInfo SerializeToDBProto() const {
125+
NKikimrTxColumnShard::TColumnChunkInfo result;
126+
result.SetSSColumnId(GetEntityId());
127+
result.SetChunkIdx(GetChunkIdx());
128+
result.SetMetadata(Meta.SerializeToProto());
129+
*result.MutableBlobRangeLink() = BlobRange.SerializeToProto();
130+
return result;
131+
}
124132
NKikimrColumnShardDataSharingProto::TColumnRecord SerializeToProto() const;
125133
static TConclusion<TColumnRecord> BuildFromProto(const NKikimrColumnShardDataSharingProto::TColumnRecord& proto) {
126134
TColumnRecord result;

ydb/core/tx/columnshard/engines/portions/data_accessor.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -501,6 +501,12 @@ void TPortionDataAccessor::SaveToDatabase(IDbWrapper& db, const ui32 firstPKColu
501501
FullValidation();
502502
db.WritePortion(*PortionInfo);
503503
if (!saveOnlyMeta) {
504+
NKikimrTxColumnShard::TIndexPortionAccessor protoData;
505+
for (auto& record : GetRecordsVerified()) {
506+
*protoData.AddChunks() = record.SerializeToDBProto();
507+
}
508+
db.WriteColumns(*PortionInfo, std::move(protoData));
509+
504510
for (auto& record : GetRecordsVerified()) {
505511
db.WriteColumn(*PortionInfo, record, firstPKColumnId);
506512
}

ydb/core/tx/columnshard/engines/protos/portion_info.proto

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,14 @@ message TIndexColumnMeta {
3535
optional NKikimrSSA.TProgram.TConstant MaxValue = 4;
3636
optional TIndexPortionMeta PortionMeta = 5[deprecated = true]; // First PK column could contain portion info
3737
}
38+
39+
message TColumnChunkInfo {
40+
optional uint32 SSColumnId = 1;
41+
optional uint32 ChunkIdx = 2;
42+
optional TIndexColumnMeta Metadata = 3;
43+
optional TBlobRangeLink16 BlobRangeLink = 4;
44+
}
45+
46+
message TIndexPortionAccessor {
47+
repeated TColumnChunkInfo Chunks = 1;
48+
}

ydb/core/tx/columnshard/normalizer/abstract/abstract.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ enum class ENormalizerSequentialId: ui32 {
6565
SyncMinSnapshotFromChunks,
6666
DeprecatedRestoreV1Chunks_V1,
6767
RestoreV1Chunks_V2,
68+
RestoreV2Chunks,
6869

6970
MAX
7071
};
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
#include "normalizer.h"
2+
#include "restore_v1_chunks.h"
3+
4+
#include <ydb/core/formats/arrow/size_calcer.h>
5+
#include <ydb/core/tx/columnshard/engines/portions/data_accessor.h>
6+
#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
7+
#include <ydb/core/tx/columnshard/tables_manager.h>
8+
9+
namespace NKikimr::NOlap::NRestoreV2Chunks {
10+
11+
class TV2BuildTask {
12+
private:
13+
TPortionAddress PortionAddress;
14+
std::vector<TColumnChunkLoadContextV1> Chunks;
15+
16+
public:
17+
void AddChunk(const TColumnChunkLoadContextV1& chunk) {
18+
Chunks.emplace_back(chunk);
19+
}
20+
21+
NKikimrTxColumnShard::TIndexPortionAccessor BuildProto() const {
22+
const auto pred = [](const TColumnChunkLoadContextV1& l, const TColumnChunkLoadContextV1& r) {
23+
return l.GetAddress() < r.GetAddress();
24+
};
25+
std::sort(Chunks.begin(), Chunks.end(), pred);
26+
NKikimrTxColumnShard::TIndexPortionAccessor result;
27+
for (auto&& c : Chunks) {
28+
*result.AddChunks() = c.SerializeToDBProto();
29+
}
30+
return result;
31+
}
32+
33+
TV2BuildTask(const TPortionAddress& address)
34+
: PortionAddress(address) {
35+
}
36+
};
37+
38+
class TChangesAddV2: public INormalizerChanges {
39+
private:
40+
std::vector<TV2BuildTask> Patches;
41+
42+
public:
43+
TChangesAddV2(std::vector<TV2BuildTask>&& patches)
44+
: Patches(std::move(patches)) {
45+
}
46+
virtual bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController&) const override {
47+
using namespace NColumnShard;
48+
NIceDb::TNiceDb db(txc.DB);
49+
using IndexColumnsV2 = NColumnShard::Schema::IndexColumnsV2;
50+
for (auto&& i : Patches) {
51+
auto metaProto = i.BuildProto();
52+
db.Table<IndexColumnsV2>()
53+
.Key(i.GetPathId(), i.GetPortionId())
54+
.Update(NIceDb::TUpdate<IndexColumnsV2::Metadata>(metaProto.SerializeAsString()));
55+
}
56+
57+
return true;
58+
}
59+
60+
virtual ui64 GetSize() const override {
61+
return Patches.size();
62+
}
63+
};
64+
65+
class TPatchItemRemoveV1 {
66+
private:
67+
TColumnChunkLoadContextV1 ChunkInfo;
68+
69+
public:
70+
const TColumnChunkLoadContextV1& GetChunkInfo() const {
71+
return ChunkInfo;
72+
}
73+
74+
TPatchItemRemoveV1(const TColumnChunkLoadContextV1& chunkInfo)
75+
: ChunkInfo(chunkInfo) {
76+
}
77+
};
78+
79+
class TChangesRemoveV1: public INormalizerChanges {
80+
private:
81+
std::vector<TPatchItemRemoveV1> Patches;
82+
83+
public:
84+
TChangesRemoveV1(std::vector<TPatchItemRemoveV1>&& patches)
85+
: Patches(std::move(patches)) {
86+
}
87+
virtual bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController&) const override {
88+
using namespace NColumnShard;
89+
NIceDb::TNiceDb db(txc.DB);
90+
using IndexColumnsV1 = NColumnShard::Schema::IndexColumnsV1;
91+
for (auto&& i : Patches) {
92+
db.Table<IndexColumnsV1>()
93+
.Key(i.GetChunkInfo().GetPathId(), i.GetChunkInfo().GetPortionId(), i.GetChunkInfo().GetAddress().GetEntityId(),
94+
i.GetChunkInfo().GetAddress().GetChunkIdx())
95+
.Delete();
96+
}
97+
98+
return true;
99+
}
100+
101+
virtual ui64 GetSize() const override {
102+
return Patches.size();
103+
}
104+
};
105+
106+
TConclusion<std::vector<INormalizerTask::TPtr>> TNormalizer::DoInit(
107+
const TNormalizationController& /*controller*/, NTabletFlatExecutor::TTransactionContext& txc) {
108+
using namespace NColumnShard;
109+
NIceDb::TNiceDb db(txc.DB);
110+
111+
bool ready = true;
112+
ready = ready & Schema::Precharge<Schema::IndexColumnsV1>(db, txc.DB.GetScheme());
113+
ready = ready & Schema::Precharge<Schema::IndexColumnsV2>(db, txc.DB.GetScheme());
114+
if (!ready) {
115+
return TConclusionStatus::Fail("Not ready");
116+
}
117+
AFL_VERIFY(AppDataVerified().ColumnShardConfig.GetColumnChunksV1Usage());
118+
THashSet<TPortionAddress> readyPortions;
119+
THashMap<TPortionAddress, std::vector<TColumnChunkLoadContextV1>> buildPortions;
120+
{
121+
auto rowset = db.Table<Schema::IndexColumnsV2>().Select();
122+
if (!rowset.IsReady()) {
123+
return TConclusionStatus::Fail("Not ready");
124+
}
125+
126+
while (!rowset.EndOfSet()) {
127+
AFL_VERIFY(readyPortions.emplace(TPortionAddress(rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::PathId>(),
128+
rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::PortionId>())).second);
129+
if (!rowset.Next()) {
130+
return TConclusionStatus::Fail("Not ready");
131+
}
132+
}
133+
}
134+
135+
{
136+
auto rowset = db.Table<Schema::IndexColumnsV1>().Select();
137+
if (!rowset.IsReady()) {
138+
return TConclusionStatus::Fail("Not ready");
139+
}
140+
141+
while (!rowset.EndOfSet()) {
142+
TColumnChunkLoadContextV1 chunk(rowset);
143+
if (!readyPortions.contains(chunk.GetPortionId())) {
144+
auto it = buildPortions.find(chunk.GetPortionAddress());
145+
if (it == buildPortions.end()) {
146+
it = buildPortions.emplace(chunk.GetPortionAddress(), TV2BuildTask(chunk.GetPortionAddress())).first;
147+
}
148+
it->second.AddChunk(chunk);
149+
}
150+
151+
if (!rowset.Next()) {
152+
return TConclusionStatus::Fail("Not ready");
153+
}
154+
}
155+
}
156+
157+
{
158+
std::vector<TV2BuildTask> package;
159+
for (auto&& [portionAddress, portionInfos] : buildPortions) {
160+
package.emplace_back(std::move(portionInfos));
161+
if (package.size() == 100) {
162+
std::vector<TV2BuildTask> local;
163+
local.swap(package);
164+
tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChangesAddV2>(std::move(local))));
165+
}
166+
}
167+
168+
if (package.size() > 0) {
169+
tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChangesAddV2>(std::move(package))));
170+
}
171+
}
172+
173+
return tasks;
174+
}
175+
176+
} // namespace NKikimr::NOlap::NRestoreV1Chunks
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
#pragma once
2+
3+
#include <ydb/core/tx/columnshard/columnshard_schema.h>
4+
#include <ydb/core/tx/columnshard/defs.h>
5+
#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
6+
7+
namespace NKikimr::NColumnShard {
8+
class TTablesManager;
9+
}
10+
11+
namespace NKikimr::NOlap::NRestoreV2Chunks {
12+
13+
class TNormalizer: public TNormalizationController::INormalizerComponent {
14+
public:
15+
static TString GetClassNameStatic() {
16+
return ::ToString(ENormalizerSequentialId::RestoreV2Chunks);
17+
}
18+
19+
virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
20+
return ENormalizerSequentialId::RestoreV2Chunks;
21+
}
22+
23+
virtual TString GetClassName() const override {
24+
return GetClassNameStatic();
25+
}
26+
27+
class TNormalizerResult;
28+
29+
static inline INormalizerComponent::TFactory::TRegistrator<TNormalizer> Registrator =
30+
INormalizerComponent::TFactory::TRegistrator<TNormalizer>(GetClassNameStatic());
31+
32+
public:
33+
TNormalizer(const TNormalizationController::TInitContext& info)
34+
: DsGroupSelector(info.GetStorageInfo()) {
35+
}
36+
37+
virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(
38+
const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
39+
40+
private:
41+
NColumnShard::TBlobGroupSelector DsGroupSelector;
42+
};
43+
} // namespace NKikimr::NOlap::NChunksActualization

ydb/core/tx/columnshard/normalizer/portion/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ SRCS(
1111
GLOBAL chunks_actualization.cpp
1212
GLOBAL restore_portion_from_chunks.cpp
1313
GLOBAL restore_v1_chunks.cpp
14+
GLOBAL restore_v2_chunks.cpp
1415
GLOBAL snapshot_from_chunks.cpp
1516
)
1617

0 commit comments

Comments
 (0)