Skip to content

Commit 48b4846

Browse files
Merge f4812ef into 26c9911
2 parents 26c9911 + f4812ef commit 48b4846

File tree

6 files changed

+223
-36
lines changed

6 files changed

+223
-36
lines changed

ydb/core/tx/columnshard/columnshard_schema.cpp

Lines changed: 9 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -10,44 +10,18 @@ bool Schema::InsertTable_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsG
1010
}
1111

1212
while (!rowset.EndOfSet()) {
13-
EInsertTableIds recType = (EInsertTableIds)rowset.GetValue<InsertTable::Committed>();
14-
const ui64 planStep = rowset.GetValue<InsertTable::PlanStep>();
15-
const ui64 writeTxId = rowset.GetValueOrDefault<InsertTable::WriteTxId>();
16-
const ui64 pathId = rowset.GetValue<InsertTable::PathId>();
17-
const TString dedupId = rowset.GetValue<InsertTable::DedupId>();
18-
const ui64 schemaVersion = rowset.HaveValue<InsertTable::SchemaVersion>() ? rowset.GetValue<InsertTable::SchemaVersion>() : 0;
13+
NOlap::TInsertTableRecordLoadContext constructor;
14+
constructor.ParseFromDatabase(rowset);
1915

20-
TString error;
21-
NOlap::TUnifiedBlobId blobId = NOlap::TUnifiedBlobId::ParseFromString(rowset.GetValue<InsertTable::BlobId>(), dsGroupSelector, error);
22-
Y_ABORT_UNLESS(blobId.IsValid(), "Failied to parse blob id: %s", error.c_str());
23-
24-
NKikimrTxColumnShard::TLogicalMetadata meta;
25-
if (auto metaStr = rowset.GetValue<InsertTable::Meta>()) {
26-
Y_ABORT_UNLESS(meta.ParseFromString(metaStr));
27-
}
28-
29-
std::optional<ui64> rangeOffset;
30-
if (rowset.HaveValue<InsertTable::BlobRangeOffset>()) {
31-
rangeOffset = rowset.GetValue<InsertTable::BlobRangeOffset>();
32-
}
33-
std::optional<ui64> rangeSize;
34-
if (rowset.HaveValue<InsertTable::BlobRangeSize>()) {
35-
rangeSize = rowset.GetValue<InsertTable::BlobRangeSize>();
36-
}
37-
AFL_VERIFY(!!rangeOffset == !!rangeSize);
38-
39-
auto userData = std::make_shared<NOlap::TUserData>(pathId,
40-
NOlap::TBlobRange(blobId, rangeOffset.value_or(0), rangeSize.value_or(blobId.BlobSize())), meta, schemaVersion, std::nullopt);
41-
42-
switch (recType) {
43-
case EInsertTableIds::Inserted:
44-
insertTable.AddInserted(NOlap::TInsertedData((TInsertWriteId)writeTxId, userData), true);
16+
switch (constructor.GetRecType()) {
17+
case Schema::EInsertTableIds::Inserted:
18+
insertTable.AddInserted(constructor.BuildInsertedOrAborted(dsGroupSelector), true);
4519
break;
46-
case EInsertTableIds::Committed:
47-
insertTable.AddCommitted(NOlap::TCommittedData(userData, planStep, writeTxId, dedupId), true);
20+
case Schema::EInsertTableIds::Committed:
21+
insertTable.AddCommitted(constructor.BuildCommitted(dsGroupSelector), true);
4822
break;
49-
case EInsertTableIds::Aborted:
50-
insertTable.AddAborted(NOlap::TInsertedData((TInsertWriteId)writeTxId, userData), true);
23+
case Schema::EInsertTableIds::Aborted:
24+
insertTable.AddAborted(constructor.BuildInsertedOrAborted(dsGroupSelector), true);
5125
break;
5226
}
5327
if (!rowset.Next()) {

ydb/core/tx/columnshard/columnshard_schema.h

Lines changed: 121 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -977,4 +977,124 @@ class TIndexChunkLoadContext {
977977
}
978978
};
979979

980-
}
980+
class TInsertTableRecordLoadContext {
981+
private:
982+
NColumnShard::Schema::EInsertTableIds RecType;
983+
ui64 PlanStep;
984+
ui64 WriteTxId;
985+
ui64 PathId;
986+
YDB_ACCESSOR_DEF(TString, DedupId);
987+
ui64 SchemaVersion;
988+
TString BlobIdString;
989+
std::optional<NOlap::TUnifiedBlobId> BlobId;
990+
TString MetadataString;
991+
std::optional<NKikimrTxColumnShard::TLogicalMetadata> Metadata;
992+
std::optional<ui64> RangeOffset;
993+
std::optional<ui64> RangeSize;
994+
995+
void Prepare(const IBlobGroupSelector* dsGroupSelector) {
996+
AFL_VERIFY(!PreparedFlag);
997+
PreparedFlag = true;
998+
TString error;
999+
NOlap::TUnifiedBlobId blobId = NOlap::TUnifiedBlobId::ParseFromString(BlobIdString, dsGroupSelector, error);
1000+
Y_ABORT_UNLESS(blobId.IsValid(), "Failied to parse blob id: %s", error.c_str());
1001+
BlobId = blobId;
1002+
1003+
NKikimrTxColumnShard::TLogicalMetadata meta;
1004+
AFL_VERIFY(MetadataString);
1005+
Y_ABORT_UNLESS(meta.ParseFromString(MetadataString));
1006+
Metadata = std::move(meta);
1007+
AFL_VERIFY(!!RangeOffset == !!RangeSize);
1008+
}
1009+
1010+
bool PreparedFlag = false;
1011+
bool ParsedFlag = false;
1012+
1013+
public:
1014+
TInsertWriteId GetInsertWriteId() const {
1015+
AFL_VERIFY(ParsedFlag);
1016+
AFL_VERIFY(RecType != NColumnShard::Schema::EInsertTableIds::Committed);
1017+
return (TInsertWriteId)WriteTxId;
1018+
}
1019+
1020+
NColumnShard::Schema::EInsertTableIds GetRecType() const {
1021+
AFL_VERIFY(ParsedFlag);
1022+
return RecType;
1023+
}
1024+
1025+
ui64 GetPlanStep() const {
1026+
AFL_VERIFY(ParsedFlag);
1027+
return PlanStep;
1028+
}
1029+
1030+
void Remove(NIceDb::TNiceDb& db) {
1031+
AFL_VERIFY(ParsedFlag);
1032+
db.Table<NColumnShard::Schema::InsertTable>().Key((ui8)RecType, PlanStep, WriteTxId, PathId, DedupId).Delete();
1033+
}
1034+
1035+
void Upsert(NIceDb::TNiceDb& db) {
1036+
AFL_VERIFY(ParsedFlag);
1037+
using namespace NColumnShard;
1038+
if (RangeOffset) {
1039+
db.Table<Schema::InsertTable>()
1040+
.Key((ui8)RecType, PlanStep, WriteTxId, PathId, DedupId)
1041+
.Update(NIceDb::TUpdate<Schema::InsertTable::BlobId>(BlobIdString),
1042+
NIceDb::TUpdate<Schema::InsertTable::BlobRangeOffset>(*RangeOffset),
1043+
NIceDb::TUpdate<Schema::InsertTable::BlobRangeSize>(*RangeSize), NIceDb::TUpdate<Schema::InsertTable::Meta>(MetadataString),
1044+
NIceDb::TUpdate<Schema::InsertTable::SchemaVersion>(SchemaVersion));
1045+
} else {
1046+
db.Table<Schema::InsertTable>()
1047+
.Key((ui8)RecType, PlanStep, WriteTxId, PathId, DedupId)
1048+
.Update(NIceDb::TUpdate<Schema::InsertTable::BlobId>(BlobIdString), NIceDb::TUpdate<Schema::InsertTable::Meta>(MetadataString),
1049+
NIceDb::TUpdate<Schema::InsertTable::SchemaVersion>(SchemaVersion));
1050+
}
1051+
}
1052+
1053+
template <class TRowset>
1054+
void ParseFromDatabase(TRowset& rowset) {
1055+
AFL_VERIFY(!ParsedFlag)("problem", "duplication parsing");
1056+
ParsedFlag = true;
1057+
using namespace NColumnShard;
1058+
RecType = (Schema::EInsertTableIds)rowset.template GetValue<Schema::InsertTable::Committed>();
1059+
PlanStep = rowset.template GetValue<Schema::InsertTable::PlanStep>();
1060+
WriteTxId = rowset.template GetValueOrDefault<Schema::InsertTable::WriteTxId>();
1061+
AFL_VERIFY(WriteTxId);
1062+
1063+
PathId = rowset.template GetValue<Schema::InsertTable::PathId>();
1064+
DedupId = rowset.template GetValue<Schema::InsertTable::DedupId>();
1065+
SchemaVersion =
1066+
rowset.template HaveValue<Schema::InsertTable::SchemaVersion>() ? rowset.template GetValue<Schema::InsertTable::SchemaVersion>() : 0;
1067+
BlobIdString = rowset.template GetValue<Schema::InsertTable::BlobId>();
1068+
MetadataString = rowset.template GetValue<Schema::InsertTable::Meta>();
1069+
if (rowset.template HaveValue<Schema::InsertTable::BlobRangeOffset>()) {
1070+
RangeOffset = rowset.template GetValue<Schema::InsertTable::BlobRangeOffset>();
1071+
}
1072+
if (rowset.template HaveValue<Schema::InsertTable::BlobRangeSize>()) {
1073+
RangeSize = rowset.template GetValue<Schema::InsertTable::BlobRangeSize>();
1074+
}
1075+
}
1076+
1077+
NOlap::TCommittedData BuildCommitted(const IBlobGroupSelector* dsGroupSelector) {
1078+
Prepare(dsGroupSelector);
1079+
using namespace NColumnShard;
1080+
AFL_VERIFY(RecType == Schema::EInsertTableIds::Committed);
1081+
auto userData = std::make_shared<NOlap::TUserData>(PathId,
1082+
NOlap::TBlobRange(*BlobId, RangeOffset.value_or(0), RangeSize.value_or(BlobId->BlobSize())), *Metadata, SchemaVersion, std::nullopt);
1083+
AFL_VERIFY(!!DedupId);
1084+
AFL_VERIFY(PlanStep);
1085+
return NOlap::TCommittedData(userData, PlanStep, WriteTxId, DedupId);
1086+
}
1087+
1088+
NOlap::TInsertedData BuildInsertedOrAborted(const IBlobGroupSelector* dsGroupSelector) {
1089+
Prepare(dsGroupSelector);
1090+
using namespace NColumnShard;
1091+
AFL_VERIFY(RecType != Schema::EInsertTableIds::Committed);
1092+
auto userData = std::make_shared<NOlap::TUserData>(PathId,
1093+
NOlap::TBlobRange(*BlobId, RangeOffset.value_or(0), RangeSize.value_or(BlobId->BlobSize())), *Metadata, SchemaVersion, std::nullopt);
1094+
AFL_VERIFY(!DedupId);
1095+
AFL_VERIFY(!PlanStep);
1096+
return NOlap::TInsertedData((TInsertWriteId)WriteTxId, userData);
1097+
}
1098+
};
1099+
1100+
} // namespace NKikimr::NOlap

ydb/core/tx/columnshard/normalizer/abstract/abstract.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ enum class ENormalizerSequentialId: ui32 {
5757
PortionsMetadata,
5858
CleanGranuleId,
5959
EmptyPortionsCleaner,
60+
CleanInsertionDedup,
6061

6162
MAX
6263
};
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
#include "broken_insertion_dedup.h"
2+
3+
#include <ydb/core/tx/columnshard/columnshard_private_events.h>
4+
#include <ydb/core/tx/columnshard/columnshard_schema.h>
5+
6+
namespace NKikimr::NOlap {
7+
8+
TConclusion<std::vector<INormalizerTask::TPtr>> TInsertionsDedupNormalizer::DoInit(
9+
const TNormalizationController& /*controller*/, NTabletFlatExecutor::TTransactionContext& txc) {
10+
NIceDb::TNiceDb db(txc.DB);
11+
12+
using namespace NColumnShard;
13+
auto rowset = db.Table<NColumnShard::Schema::InsertTable>().Select();
14+
if (!rowset.IsReady()) {
15+
return TConclusionStatus::Fail("cannot read insertion info");
16+
}
17+
18+
THashMap<TInsertWriteId, TInsertTableRecordLoadContext> aborted;
19+
THashMap<TInsertWriteId, TInsertTableRecordLoadContext> inserted;
20+
while (!rowset.EndOfSet()) {
21+
TInsertTableRecordLoadContext constructor;
22+
constructor.ParseFromDatabase(rowset);
23+
if (constructor.GetRecType() == NColumnShard::Schema::EInsertTableIds::Committed) {
24+
AFL_VERIFY(constructor.GetPlanStep());
25+
} else {
26+
AFL_VERIFY(!constructor.GetPlanStep());
27+
if (constructor.GetDedupId()) {
28+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "correct_record")("dedup", constructor.GetDedupId());
29+
constructor.Remove(db);
30+
constructor.SetDedupId("");
31+
constructor.Upsert(db);
32+
}
33+
if (constructor.GetRecType() == NColumnShard::Schema::EInsertTableIds::Aborted) {
34+
aborted.emplace(constructor.GetInsertWriteId(), constructor);
35+
} else if (constructor.GetRecType() == NColumnShard::Schema::EInsertTableIds::Inserted) {
36+
inserted.emplace(constructor.GetInsertWriteId(), constructor);
37+
} else {
38+
AFL_VERIFY(false);
39+
}
40+
}
41+
if (!rowset.Next()) {
42+
return TConclusionStatus::Fail("cannot read insertion info");
43+
}
44+
}
45+
46+
for (auto&& i : inserted) {
47+
if (aborted.contains(i.first)) {
48+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "remove_aborted_record")("write_id", i.first);
49+
i.second.Remove(db);
50+
}
51+
}
52+
53+
return std::vector<INormalizerTask::TPtr>();
54+
}
55+
56+
} // namespace NKikimr::NOlap
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
#pragma once
2+
3+
#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
4+
#include <ydb/core/tx/columnshard/columnshard_schema.h>
5+
6+
7+
namespace NKikimr::NOlap {
8+
9+
class TInsertionsDedupNormalizer: public TNormalizationController::INormalizerComponent {
10+
public:
11+
static TString GetClassNameStatic() {
12+
return "CleanInsertionDedup";
13+
}
14+
private:
15+
class TNormalizerResult;
16+
17+
static const inline INormalizerComponent::TFactory::TRegistrator<TInsertionsDedupNormalizer> Registrator =
18+
INormalizerComponent::TFactory::TRegistrator<TInsertionsDedupNormalizer>(GetClassNameStatic());
19+
20+
public:
21+
TInsertionsDedupNormalizer(const TNormalizationController::TInitContext&) {
22+
}
23+
24+
virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
25+
return ENormalizerSequentialId::CleanInsertionDedup;
26+
}
27+
28+
virtual TString GetClassName() const override {
29+
return GetClassNameStatic();
30+
}
31+
32+
virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
33+
};
34+
35+
}

ydb/core/tx/columnshard/normalizer/tablet/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ LIBRARY()
33
SRCS(
44
GLOBAL gc_counters.cpp
55
GLOBAL broken_txs.cpp
6+
GLOBAL broken_insertion_dedup.cpp
67
)
78

89
PEERDIR(

0 commit comments

Comments
 (0)