Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions ydb/core/tx/columnshard/normalizer/schema_version/version.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
#include "version.h"

namespace NKikimr::NOlap {

class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges {
private:
class TKey {
public:
ui64 Step;
ui64 TxId;
ui64 Version;
ui32 Id;

public:
TKey() = default;

TKey(ui32 id, ui64 step, ui64 txId, ui64 version)
: Step(step)
, TxId(txId)
, Version(version)
, Id(id)
{
}
};

std::vector<TKey> VersionsToRemove;

public:
TNormalizerResult(std::vector<TKey>&& versions)
: VersionsToRemove(versions)
{
}

bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /* normController */) const override {
using namespace NColumnShard;
NIceDb::TNiceDb db(txc.DB);
for (auto& key: VersionsToRemove) {
db.Table<Schema::SchemaPresetVersionInfo>().Key(key.Id, key.Step, key.TxId).Delete();
}
return true;
}

ui64 GetSize() const override {
return VersionsToRemove.size();
}

static std::optional<std::vector<INormalizerChanges::TPtr>> Init(NTabletFlatExecutor::TTransactionContext& txc) {
using namespace NColumnShard;
THashSet<ui64> usedSchemaVersions;
NIceDb::TNiceDb db(txc.DB);
{
auto rowset = db.Table<Schema::IndexPortions>().Select();
if (rowset.IsReady()) {
while (!rowset.EndOfSet()) {
usedSchemaVersions.insert(rowset.GetValue<Schema::IndexPortions::SchemaVersion>());
if (!rowset.Next()) {
return std::nullopt;
}
}
} else {
return std::nullopt;
}
}
{
auto rowset = db.Table<Schema::InsertTable>().Select();
if (rowset.IsReady()) {
while (!rowset.EndOfSet()) {
if (rowset.HaveValue<Schema::InsertTable::SchemaVersion>()) {
usedSchemaVersions.insert(rowset.GetValue<Schema::InsertTable::SchemaVersion>());
if (!rowset.Next()) {
return std::nullopt;
}
}
}
} else {
return std::nullopt;
}
}

std::vector<TKey> unusedSchemaIds;
std::optional<ui64> maxVersion;
std::vector<INormalizerChanges::TPtr> changes;

{
auto rowset = db.Table<Schema::SchemaPresetVersionInfo>().Select();
if (rowset.IsReady()) {
while (!rowset.EndOfSet()) {
const ui32 id = rowset.GetValue<Schema::SchemaPresetVersionInfo::Id>();
NKikimrTxColumnShard::TSchemaPresetVersionInfo info;
Y_ABORT_UNLESS(info.ParseFromString(rowset.GetValue<Schema::SchemaPresetVersionInfo::InfoProto>()));
if (info.HasSchema()) {
ui64 version = info.GetSchema().GetVersion();
if (!maxVersion.has_value() || (version > *maxVersion)) {
maxVersion = version;
}
if (!usedSchemaVersions.contains(version)) {
unusedSchemaIds.emplace_back(id, rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceStep>(), rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceTxId>(), version);
}
}

if (!rowset.Next()) {
return std::nullopt;
}
}
} else {
return std::nullopt;
}
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Вот тут уже известен maxVersion и его можно удалить из unused

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ну это же лишняя операция будет
Это же вектор, его нужно полностью обойти, а он и так будет обходиться в момент удаления

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

а что мы оптимизируем?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Время работы

std::vector<TKey> portion;
portion.reserve(10000);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

кажется, что это можно убрать

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Почему?
Там как раз до 10000 элементов может добавиться

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Или отсюда убрать или после move'a тоже добавить. Сейчас как-то несимметрично. Для первого батча делается reserve, для всех последующих - нет.

for (const auto& id: unusedSchemaIds) {
if (!maxVersion.has_value() || (id.Version != *maxVersion)) {
portion.push_back(id);
if (portion.size() >= 10000) {
changes.emplace_back(std::make_shared<TNormalizerResult>(std::move(portion)));
}
}
}
if (portion.size() > 0) {
changes.emplace_back(std::make_shared<TNormalizerResult>(std::move(portion)));
}
return changes;
}
};

TConclusion<std::vector<INormalizerTask::TPtr>> TSchemaVersionNormalizer::DoInit(const TNormalizationController&, NTabletFlatExecutor::TTransactionContext& txc) {
auto changes = TNormalizerResult::Init(txc);
if (!changes) {
return TConclusionStatus::Fail("Not ready");;
}
std::vector<INormalizerTask::TPtr> tasks;
for (auto&& c : *changes) {
tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(c));
}
return tasks;
}

}
42 changes: 42 additions & 0 deletions ydb/core/tx/columnshard/normalizer/schema_version/version.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#pragma once

#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
#include <ydb/core/tx/columnshard/columnshard_schema.h>
#include <ydb/core/tx/columnshard/defs.h>


namespace NKikimr::NColumnShard {
class TTablesManager;
}

namespace NKikimr::NOlap {

class TSchemaVersionNormalizer : public TNormalizationController::INormalizerComponent {
public:
static TString GetClassNameStatic() {
return "SchemaVersionCleaner";
}

private:
static inline TFactory::TRegistrator<TSchemaVersionNormalizer> Registrator = TFactory::TRegistrator<TSchemaVersionNormalizer>(
GetClassNameStatic());
public:
class TNormalizerResult;
class TTask;

public:
virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
return std::nullopt;
}

virtual TString GetClassName() const override {
return GetClassNameStatic();
}

TSchemaVersionNormalizer(const TNormalizationController::TInitContext&) {
}

virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
};

}
11 changes: 11 additions & 0 deletions ydb/core/tx/columnshard/normalizer/schema_version/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
LIBRARY()

SRCS(
GLOBAL version.cpp
)

PEERDIR(
ydb/core/tx/columnshard/normalizer/abstract
)

END()
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/normalizer/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ PEERDIR(
ydb/core/tx/columnshard/normalizer/tables
ydb/core/tx/columnshard/normalizer/portion
ydb/core/tx/columnshard/normalizer/insert_table
ydb/core/tx/columnshard/normalizer/schema_version
)

END()
40 changes: 39 additions & 1 deletion ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,36 @@ class TColumnChunksCleaner: public NYDBTest::ILocalDBModifier {
}
};

class TPortionsCleaner: public NYDBTest::ILocalDBModifier {
class TSchemaVersionsCleaner : public NYDBTest::ILocalDBModifier {
public:
virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override {
using namespace NColumnShard;
NIceDb::TNiceDb db(txc.DB);
auto rowset = db.Table<Schema::IndexPortions>().Select();
UNIT_ASSERT(rowset.IsReady());

ui64 minVersion = (ui64)-1;
while (!rowset.EndOfSet()) {
auto version = rowset.GetValue<Schema::IndexPortions::SchemaVersion>();
if (version < minVersion) {
minVersion = version;
}
UNIT_ASSERT(rowset.Next());
}

// Add invalid widow schema, if SchemaVersionCleaner will not erase it, then test will fail
TString serialized;
NKikimrTxColumnShard::TSchemaPresetVersionInfo info;
info.MutableSchema()->SetVersion(minVersion - 1);
Y_ABORT_UNLESS(info.SerializeToString(&serialized));
db.Table<Schema::SchemaPresetVersionInfo>().Key(11, 1, 1).Update(NIceDb::TUpdate<Schema::SchemaPresetVersionInfo::InfoProto>(serialized));

db.Table<Schema::SchemaPresetInfo>().Key(10).Update(NIceDb::TUpdate<Schema::SchemaPresetInfo::Name>("default"));

}
};

class TPortionsCleaner : public NYDBTest::ILocalDBModifier {
public:
virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override {
using namespace NColumnShard;
Expand Down Expand Up @@ -259,6 +288,10 @@ Y_UNIT_TEST_SUITE(Normalizers) {
TTestBasicRuntime runtime;
TTester::Setup(runtime);

auto* repair = runtime.GetAppData().ColumnShardConfig.MutableRepairs()->Add();
repair->SetClassName("SchemaVersionCleaner");
repair->SetDescription("Removing unused schema versions");

const ui64 tableId = 1;
const std::vector<NArrow::NTest::TTestColumn> schema = { NArrow::NTest::TTestColumn("key1", TTypeInfo(NTypeIds::Uint64)),
NArrow::NTest::TTestColumn("key2", TTypeInfo(NTypeIds::Uint64)), NArrow::NTest::TTestColumn("field", TTypeInfo(NTypeIds::Utf8)) };
Expand Down Expand Up @@ -308,10 +341,15 @@ Y_UNIT_TEST_SUITE(Normalizers) {
TestNormalizerImpl<TPortionsCleaner>();
}

Y_UNIT_TEST(SchemaVersionsNormalizer) {
TestNormalizerImpl<TSchemaVersionsCleaner>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

локальные модификаторы были сделаны для моделирования ошибки в локальной базе. для случая, который у нас, можно, просто, поменять схему несколько раз. чтобы по ходу дела не удалялись схемы - через контроллер, просто, отключим, когда будет что отключать, и все.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Тогда как проверить что нормалайзер отработал?
Сейчас я добавляю невалидную схему и если нормалайзер ее не удалит, то тест свалится с ошибкой

}

Y_UNIT_TEST(CleanEmptyPortionsNormalizer) {
TestNormalizerImpl<TEmptyPortionsCleaner>();
}


Y_UNIT_TEST(EmptyTablesNormalizer) {
class TLocalNormalizerChecker: public TNormalizerChecker {
public:
Expand Down