-
Notifications
You must be signed in to change notification settings - Fork 735
Implemented schema versions normalizer #9627
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
d773615
b6d89d3
da40fec
c5f0105
45580f7
5a86c1d
a7d99c6
66c746c
56ba516
eb31a54
510cddc
552c730
47aebaa
3fe401c
cbfe631
a928a63
4d16a50
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,139 @@ | ||
| #include "version.h" | ||
|
|
||
| namespace NKikimr::NOlap { | ||
|
|
||
| class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges { | ||
| private: | ||
| class TKey { | ||
| public: | ||
| ui64 Step; | ||
| ui64 TxId; | ||
| ui64 Version; | ||
| ui32 Id; | ||
|
|
||
| public: | ||
| TKey() = default; | ||
|
|
||
| TKey(ui32 id, ui64 step, ui64 txId, ui64 version) | ||
| : Step(step) | ||
| , TxId(txId) | ||
| , Version(version) | ||
| , Id(id) | ||
| { | ||
| } | ||
| }; | ||
|
|
||
| std::vector<TKey> VersionsToRemove; | ||
|
|
||
| public: | ||
| TNormalizerResult(std::vector<TKey>&& versions) | ||
| : VersionsToRemove(versions) | ||
| { | ||
| } | ||
|
|
||
| bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /* normController */) const override { | ||
| using namespace NColumnShard; | ||
| NIceDb::TNiceDb db(txc.DB); | ||
| for (auto& key: VersionsToRemove) { | ||
| db.Table<Schema::SchemaPresetVersionInfo>().Key(key.Id, key.Step, key.TxId).Delete(); | ||
| } | ||
| return true; | ||
| } | ||
|
|
||
| ui64 GetSize() const override { | ||
| return VersionsToRemove.size(); | ||
| } | ||
|
|
||
| static std::optional<std::vector<INormalizerChanges::TPtr>> Init(NTabletFlatExecutor::TTransactionContext& txc) { | ||
| using namespace NColumnShard; | ||
| THashSet<ui64> usedSchemaVersions; | ||
| NIceDb::TNiceDb db(txc.DB); | ||
| { | ||
| auto rowset = db.Table<Schema::IndexPortions>().Select(); | ||
| if (rowset.IsReady()) { | ||
| while (!rowset.EndOfSet()) { | ||
| usedSchemaVersions.insert(rowset.GetValue<Schema::IndexPortions::SchemaVersion>()); | ||
| if (!rowset.Next()) { | ||
| return std::nullopt; | ||
| } | ||
| } | ||
| } else { | ||
| return std::nullopt; | ||
| } | ||
| } | ||
| { | ||
| auto rowset = db.Table<Schema::InsertTable>().Select(); | ||
| if (rowset.IsReady()) { | ||
| while (!rowset.EndOfSet()) { | ||
| if (rowset.HaveValue<Schema::InsertTable::SchemaVersion>()) { | ||
| usedSchemaVersions.insert(rowset.GetValue<Schema::InsertTable::SchemaVersion>()); | ||
| if (!rowset.Next()) { | ||
| return std::nullopt; | ||
| } | ||
| } | ||
| } | ||
| } else { | ||
| return std::nullopt; | ||
| } | ||
| } | ||
|
|
||
| std::vector<TKey> unusedSchemaIds; | ||
| std::optional<ui64> maxVersion; | ||
| std::vector<INormalizerChanges::TPtr> changes; | ||
|
|
||
| { | ||
| auto rowset = db.Table<Schema::SchemaPresetVersionInfo>().Select(); | ||
| if (rowset.IsReady()) { | ||
| while (!rowset.EndOfSet()) { | ||
| const ui32 id = rowset.GetValue<Schema::SchemaPresetVersionInfo::Id>(); | ||
| NKikimrTxColumnShard::TSchemaPresetVersionInfo info; | ||
| Y_ABORT_UNLESS(info.ParseFromString(rowset.GetValue<Schema::SchemaPresetVersionInfo::InfoProto>())); | ||
| if (info.HasSchema()) { | ||
| ui64 version = info.GetSchema().GetVersion(); | ||
| if (!maxVersion.has_value() || (version > *maxVersion)) { | ||
| maxVersion = version; | ||
| } | ||
| if (!usedSchemaVersions.contains(version)) { | ||
| unusedSchemaIds.emplace_back(id, rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceStep>(), rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceTxId>(), version); | ||
| } | ||
| } | ||
|
|
||
| if (!rowset.Next()) { | ||
| return std::nullopt; | ||
| } | ||
| } | ||
| } else { | ||
| return std::nullopt; | ||
| } | ||
| } | ||
|
|
||
| std::vector<TKey> portion; | ||
| portion.reserve(10000); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. кажется, что это можно убрать
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Почему?
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Или отсюда убрать или после move'a тоже добавить. Сейчас как-то несимметрично. Для первого батча делается reserve, для всех последующих - нет. |
||
| for (const auto& id: unusedSchemaIds) { | ||
| if (!maxVersion.has_value() || (id.Version != *maxVersion)) { | ||
| portion.push_back(id); | ||
| if (portion.size() >= 10000) { | ||
| changes.emplace_back(std::make_shared<TNormalizerResult>(std::move(portion))); | ||
| } | ||
| } | ||
| } | ||
| if (portion.size() > 0) { | ||
| changes.emplace_back(std::make_shared<TNormalizerResult>(std::move(portion))); | ||
| } | ||
| return changes; | ||
| } | ||
| }; | ||
|
|
||
| TConclusion<std::vector<INormalizerTask::TPtr>> TSchemaVersionNormalizer::DoInit(const TNormalizationController&, NTabletFlatExecutor::TTransactionContext& txc) { | ||
| auto changes = TNormalizerResult::Init(txc); | ||
| if (!changes) { | ||
| return TConclusionStatus::Fail("Not ready");; | ||
| } | ||
| std::vector<INormalizerTask::TPtr> tasks; | ||
| for (auto&& c : *changes) { | ||
| tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(c)); | ||
| } | ||
| return tasks; | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| #pragma once | ||
|
|
||
| #include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h> | ||
| #include <ydb/core/tx/columnshard/columnshard_schema.h> | ||
| #include <ydb/core/tx/columnshard/defs.h> | ||
|
|
||
|
|
||
| namespace NKikimr::NColumnShard { | ||
| class TTablesManager; | ||
| } | ||
|
|
||
| namespace NKikimr::NOlap { | ||
|
|
||
| class TSchemaVersionNormalizer : public TNormalizationController::INormalizerComponent { | ||
| public: | ||
| static TString GetClassNameStatic() { | ||
| return "SchemaVersionCleaner"; | ||
| } | ||
|
|
||
| private: | ||
| static inline TFactory::TRegistrator<TSchemaVersionNormalizer> Registrator = TFactory::TRegistrator<TSchemaVersionNormalizer>( | ||
| GetClassNameStatic()); | ||
| public: | ||
| class TNormalizerResult; | ||
| class TTask; | ||
|
|
||
| public: | ||
| virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override { | ||
| return std::nullopt; | ||
| } | ||
|
|
||
| virtual TString GetClassName() const override { | ||
| return GetClassNameStatic(); | ||
| } | ||
|
|
||
| TSchemaVersionNormalizer(const TNormalizationController::TInitContext&) { | ||
| } | ||
|
|
||
| virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; | ||
| }; | ||
|
|
||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| LIBRARY() | ||
|
|
||
| SRCS( | ||
| GLOBAL version.cpp | ||
| ) | ||
|
|
||
| PEERDIR( | ||
| ydb/core/tx/columnshard/normalizer/abstract | ||
| ) | ||
|
|
||
| END() |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -152,7 +152,36 @@ class TColumnChunksCleaner: public NYDBTest::ILocalDBModifier { | |
| } | ||
| }; | ||
|
|
||
| class TPortionsCleaner: public NYDBTest::ILocalDBModifier { | ||
| class TSchemaVersionsCleaner : public NYDBTest::ILocalDBModifier { | ||
| public: | ||
| virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override { | ||
| using namespace NColumnShard; | ||
| NIceDb::TNiceDb db(txc.DB); | ||
| auto rowset = db.Table<Schema::IndexPortions>().Select(); | ||
| UNIT_ASSERT(rowset.IsReady()); | ||
|
|
||
| ui64 minVersion = (ui64)-1; | ||
| while (!rowset.EndOfSet()) { | ||
| auto version = rowset.GetValue<Schema::IndexPortions::SchemaVersion>(); | ||
| if (version < minVersion) { | ||
| minVersion = version; | ||
| } | ||
| UNIT_ASSERT(rowset.Next()); | ||
| } | ||
|
|
||
| // Add invalid widow schema, if SchemaVersionCleaner will not erase it, then test will fail | ||
| TString serialized; | ||
| NKikimrTxColumnShard::TSchemaPresetVersionInfo info; | ||
| info.MutableSchema()->SetVersion(minVersion - 1); | ||
| Y_ABORT_UNLESS(info.SerializeToString(&serialized)); | ||
| db.Table<Schema::SchemaPresetVersionInfo>().Key(11, 1, 1).Update(NIceDb::TUpdate<Schema::SchemaPresetVersionInfo::InfoProto>(serialized)); | ||
|
|
||
| db.Table<Schema::SchemaPresetInfo>().Key(10).Update(NIceDb::TUpdate<Schema::SchemaPresetInfo::Name>("default")); | ||
|
|
||
| } | ||
| }; | ||
|
|
||
| class TPortionsCleaner : public NYDBTest::ILocalDBModifier { | ||
| public: | ||
| virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override { | ||
| using namespace NColumnShard; | ||
|
|
@@ -259,6 +288,10 @@ Y_UNIT_TEST_SUITE(Normalizers) { | |
| TTestBasicRuntime runtime; | ||
| TTester::Setup(runtime); | ||
|
|
||
| auto* repair = runtime.GetAppData().ColumnShardConfig.MutableRepairs()->Add(); | ||
| repair->SetClassName("SchemaVersionCleaner"); | ||
| repair->SetDescription("Removing unused schema versions"); | ||
|
|
||
| const ui64 tableId = 1; | ||
| const std::vector<NArrow::NTest::TTestColumn> schema = { NArrow::NTest::TTestColumn("key1", TTypeInfo(NTypeIds::Uint64)), | ||
| NArrow::NTest::TTestColumn("key2", TTypeInfo(NTypeIds::Uint64)), NArrow::NTest::TTestColumn("field", TTypeInfo(NTypeIds::Utf8)) }; | ||
|
|
@@ -308,10 +341,15 @@ Y_UNIT_TEST_SUITE(Normalizers) { | |
| TestNormalizerImpl<TPortionsCleaner>(); | ||
| } | ||
|
|
||
| Y_UNIT_TEST(SchemaVersionsNormalizer) { | ||
| TestNormalizerImpl<TSchemaVersionsCleaner>(); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. локальные модификаторы были сделаны для моделирования ошибки в локальной базе. для случая, который у нас, можно, просто, поменять схему несколько раз. чтобы по ходу дела не удалялись схемы - через контроллер, просто, отключим, когда будет что отключать, и все.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Тогда как проверить что нормалайзер отработал? |
||
| } | ||
|
|
||
| Y_UNIT_TEST(CleanEmptyPortionsNormalizer) { | ||
| TestNormalizerImpl<TEmptyPortionsCleaner>(); | ||
| } | ||
|
|
||
|
|
||
| Y_UNIT_TEST(EmptyTablesNormalizer) { | ||
| class TLocalNormalizerChecker: public TNormalizerChecker { | ||
| public: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Вот тут уже известен maxVersion и его можно удалить из unused
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ну это же лишняя операция будет
Это же вектор, его нужно полностью обойти, а он и так будет обходиться в момент удаления
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
а что мы оптимизируем?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Время работы