Skip to content

Commit 97ed61a

Browse files
authored
fix pk usage in writing v0 chunks (#17830)
1 parent f984b29 commit 97ed61a

File tree

8 files changed

+324
-11
lines changed

8 files changed

+324
-11
lines changed

ydb/core/tx/columnshard/blobs_action/transaction/tx_blobs_written.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ bool TTxBlobsWritingFinished::DoExecute(TTransactionContext& txc, const TActorCo
3131
}
3232
AFL_VERIFY(!!PackBehaviour);
3333
auto& granule = index.MutableGranuleVerified(Pack.GetPathId());
34+
const ui64 firstPKColumnId = Self->TablesManager.GetIndexInfo(*CommitSnapshot).GetPKFirstColumnId();
3435
for (auto&& portion : Pack.MutablePortions()) {
3536
AFL_VERIFY(portion.GetPortionInfoConstructor()->GetPortionConstructor().GetType() == NOlap::EPortionType::Written);
3637
auto* constructor =
@@ -44,9 +45,9 @@ bool TTxBlobsWritingFinished::DoExecute(TTransactionContext& txc, const TActorCo
4445
InsertWriteIds.emplace_back(constructor->GetInsertWriteIdVerified());
4546
portion.Finalize(Self, txc);
4647
if (PackBehaviour == EOperationBehaviour::NoTxWrite) {
47-
granule.CommitImmediateOnExecute(txc, *CommitSnapshot, portion.GetPortionInfo());
48+
granule.CommitImmediateOnExecute(txc, *CommitSnapshot, portion.GetPortionInfo(), firstPKColumnId);
4849
} else {
49-
granule.InsertPortionOnExecute(txc, portion.GetPortionInfo());
50+
granule.InsertPortionOnExecute(txc, portion.GetPortionInfo(), firstPKColumnId);
5051
}
5152
}
5253

ydb/core/tx/columnshard/engines/storage/granule/granule.cpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,8 @@ bool TGranuleMeta::TestingLoad(IDbWrapper& db, const TVersionedIndex& versionedI
271271
return true;
272272
}
273273

274-
void TGranuleMeta::InsertPortionOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TPortionDataAccessor& portion) const {
274+
void TGranuleMeta::InsertPortionOnExecute(
275+
NTabletFlatExecutor::TTransactionContext& txc, const TPortionDataAccessor& portion, const ui64 firstPKColumnId) const {
275276
auto portionImpl = portion.MutablePortionInfoPtr();
276277
if (portionImpl->GetPortionType() == EPortionType::Written) {
277278
auto writtenPortion = std::static_pointer_cast<TWrittenPortionInfo>(portionImpl);
@@ -280,7 +281,7 @@ void TGranuleMeta::InsertPortionOnExecute(NTabletFlatExecutor::TTransactionConte
280281
AFL_VERIFY(!InsertedPortions.contains((TInsertWriteId)0));
281282
}
282283
TDbWrapper wrapper(txc.DB, nullptr);
283-
portion.SaveToDatabase(wrapper, 0, false);
284+
portion.SaveToDatabase(wrapper, firstPKColumnId, false);
284285
}
285286

286287
void TGranuleMeta::InsertPortionOnComplete(const TPortionDataAccessor& portion, IColumnEngine& /*engine*/) {
@@ -314,16 +315,16 @@ void TGranuleMeta::CommitPortionOnComplete(const TInsertWriteId insertWriteId, I
314315
}
315316
}
316317

317-
void TGranuleMeta::CommitImmediateOnExecute(
318-
NTabletFlatExecutor::TTransactionContext& txc, const TSnapshot& snapshot, const TPortionDataAccessor& portion) const {
318+
void TGranuleMeta::CommitImmediateOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TSnapshot& snapshot,
319+
const TPortionDataAccessor& portion, const ui64 firstPKColumnId) const {
319320
auto portionImpl = portion.MutablePortionInfoPtr();
320321
AFL_VERIFY(portionImpl->GetPortionType() == EPortionType::Written);
321322
auto writtenPortion = std::static_pointer_cast<TWrittenPortionInfo>(portionImpl);
322323

323324
AFL_VERIFY(!InsertedPortions.contains(writtenPortion->GetInsertWriteId()));
324325
writtenPortion->SetCommitSnapshot(snapshot);
325326
TDbWrapper wrapper(txc.DB, nullptr);
326-
portion.SaveToDatabase(wrapper, 0, false);
327+
portion.SaveToDatabase(wrapper, firstPKColumnId, false);
327328
}
328329

329330
void TGranuleMeta::CommitImmediateOnComplete(const std::shared_ptr<TPortionInfo> /*portion*/, IColumnEngine& /*engine*/) {

ydb/core/tx/columnshard/engines/storage/granule/granule.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,8 @@ class TGranuleMeta: TNonCopyable {
212212
OnAfterChangePortion(innerPortion, nullptr);
213213
}
214214

215-
void InsertPortionOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TPortionDataAccessor& portion) const;
215+
void InsertPortionOnExecute(
216+
NTabletFlatExecutor::TTransactionContext& txc, const TPortionDataAccessor& portion, const ui64 firstPKColumnId) const;
216217
void InsertPortionOnComplete(const TPortionDataAccessor& portion, IColumnEngine& engine);
217218

218219
void CommitPortionOnExecute(
@@ -233,8 +234,8 @@ class TGranuleMeta: TNonCopyable {
233234
CommitPortionOnComplete(insertWriteId, engine);
234235
}
235236

236-
void CommitImmediateOnExecute(
237-
NTabletFlatExecutor::TTransactionContext& txc, const TSnapshot& snapshot, const TPortionDataAccessor& portion) const;
237+
void CommitImmediateOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TSnapshot& snapshot, const TPortionDataAccessor& portion,
238+
const ui64 firstPKColumnId) const;
238239
void CommitImmediateOnComplete(const std::shared_ptr<TPortionInfo> portion, IColumnEngine& engine);
239240

240241
std::vector<NStorageOptimizer::TTaskDescription> GetOptimizerTasksDescription() const {

ydb/core/tx/columnshard/normalizer/abstract/abstract.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ enum class ENormalizerSequentialId : ui32 {
7070
RestoreV1Chunks_V2,
7171
RestoreV2Chunks,
7272
CleanDeprecatedSnapshot,
73+
RestoreV0ChunksMeta,
7374

7475
MAX
7576
};
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
#include "chunks_v0_meta.h"
2+
#include "normalizer.h"
3+
4+
#include <ydb/core/formats/arrow/size_calcer.h>
5+
#include <ydb/core/tx/columnshard/counters/portion_index.h>
6+
#include <ydb/core/tx/columnshard/data_accessor/manager.h>
7+
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
8+
#include <ydb/core/tx/columnshard/engines/portions/data_accessor.h>
9+
#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
10+
#include <ydb/core/tx/columnshard/tables_manager.h>
11+
12+
namespace NKikimr::NOlap {
13+
14+
class TChunksV0MetaNormalizer::TNormalizerResult: public INormalizerChanges {
15+
std::vector<TChunksV0MetaNormalizer::TChunkInfo> Chunks;
16+
std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> Schemas;
17+
18+
public:
19+
TNormalizerResult(std::vector<TChunksV0MetaNormalizer::TChunkInfo>&& chunks)
20+
: Chunks(std::move(chunks)) {
21+
}
22+
23+
bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /* normController */) const override {
24+
using namespace NColumnShard;
25+
NIceDb::TNiceDb db(txc.DB);
26+
27+
for (auto&& chunkInfo : Chunks) {
28+
NKikimrTxColumnShard::TIndexColumnMeta metaProto = chunkInfo.GetMetaProto();
29+
metaProto.MutablePortionMeta()->CopyFrom(chunkInfo.GetUpdate().GetPortionMeta());
30+
31+
const auto& key = chunkInfo.GetKey();
32+
33+
db.Table<Schema::IndexColumns>()
34+
.Key(key.GetIndex(), key.GetGranule(), key.GetColumnId(), key.GetPlanStep(), key.GetTxId(), key.GetPortion(), key.GetChunk())
35+
.Update(NIceDb::TUpdate<Schema::IndexColumns::Metadata>(metaProto.SerializeAsString()));
36+
}
37+
return true;
38+
}
39+
40+
ui64 GetSize() const override {
41+
return Chunks.size();
42+
}
43+
};
44+
45+
void TChunksV0MetaNormalizer::TChunkInfo::InitSchema(const NColumnShard::TTablesManager& tm) {
46+
Schema = tm.GetPrimaryIndexSafe().GetVersionedIndex().GetSchemaVerified(NOlap::TSnapshot(Key.GetPlanStep(), Key.GetTxId()));
47+
}
48+
49+
TConclusion<std::vector<INormalizerTask::TPtr>> TChunksV0MetaNormalizer::DoInit(
50+
const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) {
51+
using namespace NColumnShard;
52+
NIceDb::TNiceDb db(txc.DB);
53+
54+
if (!AppDataVerified().ColumnShardConfig.GetColumnChunksV0Usage()) {
55+
return std::vector<INormalizerTask::TPtr>();
56+
}
57+
58+
bool ready = true;
59+
ready = ready & Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme());
60+
ready = ready & Schema::Precharge<Schema::IndexPortions>(db, txc.DB.GetScheme());
61+
if (!ready) {
62+
return TConclusionStatus::Fail("Not ready");
63+
}
64+
65+
TTablesManager tablesManager(controller.GetStoragesManager(), std::make_shared<NDataAccessorControl::TLocalManager>(nullptr),
66+
std::make_shared<TSchemaObjectsCache>(), std::make_shared<TPortionIndexStats>(), 0);
67+
if (!tablesManager.InitFromDB(db)) {
68+
ACFL_TRACE("normalizer", "TChunksV0MetaNormalizer")("error", "can't initialize tables manager");
69+
return TConclusionStatus::Fail("Can't load index");
70+
}
71+
72+
std::vector<TChunkInfo> chunks;
73+
{
74+
auto rowset = db.Table<Schema::IndexColumns>().Select();
75+
if (!rowset.IsReady()) {
76+
return TConclusionStatus::Fail("Not ready");
77+
}
78+
79+
while (!rowset.EndOfSet()) {
80+
TColumnKey key;
81+
key.Load(rowset);
82+
83+
TChunkInfo chunkInfo(std::move(key), rowset, &*DsGroupSelector, tablesManager);
84+
if (chunkInfo.NormalizationRequired()) {
85+
auto metadata = GetPortionMeta(TPortionKey(rowset.GetValue<Schema::IndexColumns::PathId>(), key.GetPortion()), db);
86+
if (metadata.IsFail()) {
87+
return metadata;
88+
}
89+
chunkInfo.SetUpdate(metadata.DetachResult());
90+
chunks.emplace_back(std::move(chunkInfo));
91+
}
92+
93+
if (!rowset.Next()) {
94+
return TConclusionStatus::Fail("Not ready");
95+
}
96+
}
97+
}
98+
99+
std::vector<INormalizerTask::TPtr> tasks;
100+
ACFL_INFO("normalizer", "TChunksV0MetaNormalizer")("message", TStringBuilder() << chunks.size() << " chunks found");
101+
if (chunks.empty()) {
102+
return tasks;
103+
}
104+
105+
std::vector<TChunkInfo> package;
106+
package.reserve(100);
107+
108+
for (auto&& chunk : chunks) {
109+
package.emplace_back(chunk);
110+
if (package.size() == 100) {
111+
std::vector<TChunkInfo> local;
112+
local.swap(package);
113+
tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TNormalizerResult>(std::move(local))));
114+
}
115+
}
116+
117+
if (package.size() > 0) {
118+
tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TNormalizerResult>(std::move(package))));
119+
}
120+
return tasks;
121+
}
122+
123+
} // namespace NKikimr::NOlap
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
#pragma once
2+
3+
#include <ydb/core/tx/columnshard/columnshard_schema.h>
4+
#include <ydb/core/tx/columnshard/defs.h>
5+
#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
6+
7+
namespace NKikimr::NColumnShard {
8+
class TTablesManager;
9+
}
10+
11+
namespace NKikimr::NOlap {
12+
13+
class TChunksV0MetaNormalizer: public TNormalizationController::INormalizerComponent {
14+
private:
15+
using TBase = TNormalizationController::INormalizerComponent;
16+
17+
class TPortionKey {
18+
YDB_READONLY(ui64, PathId, 0);
19+
YDB_READONLY(ui64, PortionId, 0);
20+
21+
public:
22+
TPortionKey(const ui64 pathId, const ui64 portionId)
23+
: PathId(pathId)
24+
, PortionId(portionId) {
25+
}
26+
27+
auto operator<=>(const TPortionKey& other) const {
28+
return std::tie(PathId, PortionId) <=> std::tie(other.PathId, other.PortionId);
29+
}
30+
operator size_t() const {
31+
return CombineHashes(PathId, PortionId);
32+
}
33+
};
34+
35+
TConclusion<NKikimrTxColumnShard::TIndexPortionMeta> GetPortionMeta(const TPortionKey& key, NIceDb::TNiceDb& db) {
36+
if (auto findMeta = PortionMetaCache.FindPtr(key)) {
37+
return *findMeta;
38+
}
39+
40+
auto rowset = db.Table<NColumnShard::Schema::IndexPortions>().Key(key.GetPathId(), key.GetPortionId()).Select();
41+
if (!rowset.IsReady()) {
42+
return TConclusionStatus::Fail("Not ready");
43+
}
44+
AFL_VERIFY(!rowset.EndOfSet());
45+
NKikimrTxColumnShard::TIndexColumnMeta metaProto;
46+
AFL_VERIFY(metaProto.ParseFromString(rowset.GetValue<NColumnShard::Schema::IndexPortions::Metadata>()));
47+
auto emplaced = PortionMetaCache.emplace(key, metaProto.GetPortionMeta());
48+
AFL_VERIFY(emplaced.second);
49+
return emplaced.first->second;
50+
}
51+
52+
public:
53+
static TString GetClassNameStatic() {
54+
return ::ToString(ENormalizerSequentialId::RestoreV0ChunksMeta);
55+
}
56+
57+
virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
58+
return ENormalizerSequentialId::RestoreV0ChunksMeta;
59+
}
60+
61+
virtual TString GetClassName() const override {
62+
return GetClassNameStatic();
63+
}
64+
65+
class TNormalizerResult;
66+
67+
class TColumnKey {
68+
YDB_READONLY(ui64, Index, 0);
69+
YDB_READONLY(ui64, Granule, 0);
70+
YDB_READONLY(ui64, ColumnId, 0);
71+
YDB_READONLY(ui64, PlanStep, 0);
72+
YDB_READONLY(ui64, TxId, 0);
73+
YDB_READONLY(ui64, Portion, 0);
74+
YDB_READONLY(ui64, Chunk, 0);
75+
76+
public:
77+
template <class TRowset>
78+
void Load(TRowset& rowset) {
79+
using namespace NColumnShard;
80+
Index = rowset.template GetValue<Schema::IndexColumns::Index>();
81+
Granule = rowset.template GetValue<Schema::IndexColumns::Granule>();
82+
ColumnId = rowset.template GetValue<Schema::IndexColumns::ColumnIdx>();
83+
PlanStep = rowset.template GetValue<Schema::IndexColumns::PlanStep>();
84+
TxId = rowset.template GetValue<Schema::IndexColumns::TxId>();
85+
Portion = rowset.template GetValue<Schema::IndexColumns::Portion>();
86+
Chunk = rowset.template GetValue<Schema::IndexColumns::Chunk>();
87+
}
88+
89+
bool operator<(const TColumnKey& other) const {
90+
return std::make_tuple(Portion, Chunk, ColumnId) < std::make_tuple(other.Portion, other.Chunk, other.ColumnId);
91+
}
92+
};
93+
94+
class TUpdate {
95+
private:
96+
YDB_READONLY_DEF(NKikimrTxColumnShard::TIndexPortionMeta, PortionMeta);
97+
98+
public:
99+
TUpdate() = default;
100+
TUpdate(NKikimrTxColumnShard::TIndexPortionMeta&& portionMeta)
101+
: PortionMeta(std::move(portionMeta)) {
102+
}
103+
};
104+
105+
class TChunkInfo {
106+
YDB_READONLY_DEF(TColumnKey, Key);
107+
YDB_READONLY_DEF(ui64, PathId);
108+
TColumnChunkLoadContext CLContext;
109+
ISnapshotSchema::TPtr Schema;
110+
111+
YDB_ACCESSOR_DEF(TUpdate, Update);
112+
113+
void InitSchema(const NColumnShard::TTablesManager& tm);
114+
115+
public:
116+
template <class TSource>
117+
TChunkInfo(TColumnKey&& key, const TSource& rowset, const IBlobGroupSelector* dsGroupSelector,
118+
const NColumnShard::TTablesManager& tablesManager)
119+
: Key(std::move(key))
120+
, PathId(rowset.template GetValue<NColumnShard::Schema::IndexColumns::PathId>())
121+
, CLContext(rowset, dsGroupSelector) {
122+
InitSchema(tablesManager);
123+
}
124+
125+
const NKikimrTxColumnShard::TIndexColumnMeta& GetMetaProto() const {
126+
return CLContext.GetMetaProto();
127+
}
128+
129+
bool NormalizationRequired() const {
130+
return Key.GetColumnId() == Schema->GetIndexInfo().GetPKFirstColumnId() && Key.GetChunk() == 0 &&
131+
!CLContext.GetMetaProto().HasPortionMeta();
132+
}
133+
134+
bool operator<(const TChunkInfo& other) const {
135+
return Key < other.Key;
136+
}
137+
};
138+
139+
static inline INormalizerComponent::TFactory::TRegistrator<TChunksV0MetaNormalizer> Registrator =
140+
INormalizerComponent::TFactory::TRegistrator<TChunksV0MetaNormalizer>(GetClassNameStatic());
141+
142+
public:
143+
TChunksV0MetaNormalizer(const TNormalizationController::TInitContext& info)
144+
: TBase(info)
145+
, DsGroupSelector(std::make_shared<NColumnShard::TBlobGroupSelector>(info.GetStorageInfo())) {
146+
}
147+
148+
virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(
149+
const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
150+
151+
private:
152+
std::shared_ptr<NColumnShard::TBlobGroupSelector> DsGroupSelector;
153+
THashMap<TPortionKey, NKikimrTxColumnShard::TIndexPortionMeta> PortionMetaCache;
154+
};
155+
} // namespace NKikimr::NOlap

ydb/core/tx/columnshard/normalizer/portion/ya.make

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ SRCS(
1313
GLOBAL restore_v2_chunks.cpp
1414
GLOBAL leaked_blobs.cpp
1515
GLOBAL clean_deprecated_snapshot.cpp
16-
16+
GLOBAL chunks_v0_meta.cpp
1717
)
1818

1919
PEERDIR(

0 commit comments

Comments
 (0)