Skip to content

Index build: do not lose the requested partitioning info of indexImplTables in case of SchemeShard reboots #10579

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions ydb/core/tx/schemeshard/schemeshard_build_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ void TSchemeShard::Handle(TEvPrivate::TEvIndexBuildingMakeABill::TPtr& ev, const

void TSchemeShard::PersistCreateBuildIndex(NIceDb::TNiceDb& db, const TIndexBuildInfo& info) {
Y_ABORT_UNLESS(info.BuildKind != TIndexBuildInfo::EBuildKind::BuildKindUnspecified);
db.Table<Schema::IndexBuild>().Key(info.Id).Update(
auto persistedBuildIndex = db.Table<Schema::IndexBuild>().Key(info.Id);
persistedBuildIndex.Update(
NIceDb::TUpdate<Schema::IndexBuild::Uid>(info.Uid),
NIceDb::TUpdate<Schema::IndexBuild::DomainOwnerId>(info.DomainPathId.OwnerId),
NIceDb::TUpdate<Schema::IndexBuild::DomainLocalId>(info.DomainPathId.LocalPathId),
Expand All @@ -59,9 +60,28 @@ void TSchemeShard::PersistCreateBuildIndex(NIceDb::TNiceDb& db, const TIndexBuil
NIceDb::TUpdate<Schema::IndexBuild::MaxShards>(info.Limits.MaxShards),
NIceDb::TUpdate<Schema::IndexBuild::MaxRetries>(info.Limits.MaxRetries),
NIceDb::TUpdate<Schema::IndexBuild::BuildKind>(ui32(info.BuildKind))

// TODO save info.ImplTableDescriptions
);
// Persist details of the index build operation: ImplTableDescriptions and SpecializedIndexDescription.
// We have chosen TIndexCreationConfig's string representation as the serialization format.
if (bool hasSpecializedDescription = !std::holds_alternative<std::monostate>(info.SpecializedIndexDescription);
info.ImplTableDescriptions || hasSpecializedDescription
) {
NKikimrSchemeOp::TIndexCreationConfig serializableRepresentation;

for (const auto& description : info.ImplTableDescriptions) {
*serializableRepresentation.AddIndexImplTableDescriptions() = description;
}

std::visit([&]<typename T>(const T& specializedDescription) {
if constexpr (std::is_same_v<T, NKikimrSchemeOp::TVectorIndexKmeansTreeDescription>) {
*serializableRepresentation.MutableVectorIndexKmeansTreeDescription() = specializedDescription;
}
}, info.SpecializedIndexDescription);

persistedBuildIndex.Update(
NIceDb::TUpdate<Schema::IndexBuild::CreationConfig>(serializableRepresentation.SerializeAsString())
);
}

ui32 columnNo = 0;
for (ui32 i = 0; i < info.IndexColumns.size(); ++i, ++columnNo) {
Expand Down
25 changes: 22 additions & 3 deletions ydb/core/tx/schemeshard/schemeshard_info_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -2991,7 +2991,7 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
// TODO(mbkkt) move to TVectorIndexKmeansTreeDescription
ui32 K = 4;
ui32 Levels = 5;

// progress
enum EState : ui32 {
Sample = 0,
Expand All @@ -3007,7 +3007,7 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
EState State = Sample;

ui32 ChildBegin = 1; // included

static ui32 BinPow(ui32 k, ui32 l) {
ui32 r = 1;
while (l != 0) {
Expand Down Expand Up @@ -3282,7 +3282,26 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
indexInfo->IndexName = row.template GetValue<Schema::IndexBuild::IndexName>();
indexInfo->IndexType = row.template GetValue<Schema::IndexBuild::IndexType>();

// TODO load indexInfo->ImplTableDescriptions
// Restore the operation details: ImplTableDescriptions and SpecializedIndexDescription.
if (row.template HaveValue<Schema::IndexBuild::CreationConfig>()) {
NKikimrSchemeOp::TIndexCreationConfig creationConfig;
Y_ABORT_UNLESS(creationConfig.ParseFromString(row.template GetValue<Schema::IndexBuild::CreationConfig>()));

auto& descriptions = *creationConfig.MutableIndexImplTableDescriptions();
indexInfo->ImplTableDescriptions.reserve(descriptions.size());
for (auto& description : descriptions) {
indexInfo->ImplTableDescriptions.emplace_back(std::move(description));
}

switch (creationConfig.GetSpecializedIndexDescriptionCase()) {
case NKikimrSchemeOp::TIndexCreationConfig::kVectorIndexKmeansTreeDescription:
indexInfo->SpecializedIndexDescription = std::move(*creationConfig.MutableVectorIndexKmeansTreeDescription());
break;
case NKikimrSchemeOp::TIndexCreationConfig::SPECIALIZEDINDEXDESCRIPTION_NOT_SET:
/* do nothing */
break;
}
}

indexInfo->State = TIndexBuildInfo::EState(
row.template GetValue<Schema::IndexBuild::State>());
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/tx/schemeshard/schemeshard_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -1325,6 +1325,9 @@ struct Schema : NIceDb::Schema {
struct AlterMainTableTxStatus : Column<32, NScheme::NTypeIds::Uint32> { using Type = NKikimrScheme::EStatus; };
struct AlterMainTableTxDone : Column<33, NScheme::NTypeIds::Bool> {};

// Serialized as string NKikimrSchemeOp::TIndexCreationConfig protobuf.
struct CreationConfig : Column<34, NScheme::NTypeIds::String> { using Type = TString; };

using TKey = TableKey<Id>;
using TColumns = TableColumns<
Id,
Expand Down Expand Up @@ -1359,7 +1362,8 @@ struct Schema : NIceDb::Schema {
BuildKind,
AlterMainTableTxId,
AlterMainTableTxStatus,
AlterMainTableTxDone
AlterMainTableTxDone,
CreationConfig
>;
};

Expand Down
22 changes: 17 additions & 5 deletions ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1699,11 +1699,23 @@ namespace NSchemeShardUT_Private {
} break;
case NKikimrSchemeOp::EIndexTypeGlobalVectorKmeansTree: {
auto& settings = *index.mutable_global_vector_kmeans_tree_index();
settings = Ydb::Table::GlobalVectorKMeansTreeIndex();
// some random valid settings
settings.mutable_vector_settings()->mutable_settings()->set_vector_type(Ydb::Table::VectorIndexSettings::VECTOR_TYPE_FLOAT);
settings.mutable_vector_settings()->mutable_settings()->set_vector_dimension(42);
settings.mutable_vector_settings()->mutable_settings()->set_metric(Ydb::Table::VectorIndexSettings::DISTANCE_COSINE);

auto& vectorIndexSettings = *settings.mutable_vector_settings()->mutable_settings();
if (cfg.VectorIndexSettings) {
cfg.VectorIndexSettings->SerializeTo(vectorIndexSettings);
} else {
// some random valid settings
vectorIndexSettings.set_vector_type(Ydb::Table::VectorIndexSettings::VECTOR_TYPE_FLOAT);
vectorIndexSettings.set_vector_dimension(42);
vectorIndexSettings.set_metric(Ydb::Table::VectorIndexSettings::DISTANCE_COSINE);
}

if (cfg.GlobalIndexSettings) {
cfg.GlobalIndexSettings[0].SerializeTo(*settings.mutable_level_table_settings());
if (cfg.GlobalIndexSettings.size() > 1) {
cfg.GlobalIndexSettings[1].SerializeTo(*settings.mutable_posting_table_settings());
}
}
} break;
default:
UNIT_ASSERT_C(false, "Unknown index type: " << static_cast<ui32>(cfg.IndexType));
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tx/schemeshard/ut_helpers/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@

namespace NYdb::NTable {
struct TGlobalIndexSettings;
struct TVectorIndexSettings;
}

namespace NSchemeShardUT_Private {
Expand Down Expand Up @@ -371,6 +372,8 @@ namespace NSchemeShardUT_Private {
TVector<TString> IndexColumns;
TVector<TString> DataColumns;
TVector<NYdb::NTable::TGlobalIndexSettings> GlobalIndexSettings = {};
// implementation note: it was made a pointer, not optional, to enable forward declaration
std::unique_ptr<NYdb::NTable::TVectorIndexSettings> VectorIndexSettings = {};
};

std::unique_ptr<TEvIndexBuilder::TEvCreateRequest> CreateBuildColumnRequest(ui64 id, const TString& dbName, const TString& src, const TString& columnName, const Ydb::TypedValue& literal);
Expand Down
56 changes: 54 additions & 2 deletions ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ TCheckFunc IndexDataColumns(const TVector<TString>& dataColumnNames) {
};
}

TCheckFunc VectorIndexDescription(Ydb::Table::VectorIndexSettings_Metric metric,
TCheckFunc VectorIndexDescription(Ydb::Table::VectorIndexSettings_Metric metric,
Ydb::Table::VectorIndexSettings_VectorType vectorType,
ui32 vectorDimension
) {
Expand Down Expand Up @@ -1309,11 +1309,63 @@ TCheckFunc PartitionKeys(TVector<TString> lastShardKeys) {
const auto& pathDescr = record.GetPathDescription();
UNIT_ASSERT_VALUES_EQUAL(lastShardKeys.size(), pathDescr.TablePartitionsSize());
for (size_t i = 0; i < lastShardKeys.size(); ++i) {
UNIT_ASSERT_STRING_CONTAINS(pathDescr.GetTablePartitions(i).GetEndOfRangeKeyPrefix(), lastShardKeys[i]);
const auto& partition = pathDescr.GetTablePartitions(i);
UNIT_ASSERT_STRING_CONTAINS_C(
partition.GetEndOfRangeKeyPrefix(), lastShardKeys[i],
"partition index: " << i << '\n'
<< "actual key prefix: " << partition.GetEndOfRangeKeyPrefix().Quote() << '\n'
<< "expected key prefix: " << lastShardKeys[i].Quote() << '\n'
);
}
};
}

namespace {

// Serializes / deserializes a value of type T to a cell vector string representation.
template <typename T>
struct TSplitBoundarySerializer {
static TString Serialize(T splitBoundary) {
const auto cell = TCell::Make(splitBoundary);
TSerializedCellVec cellVec(TArrayRef<const TCell>(&cell, 1));
return cellVec.ReleaseBuffer();
}

static TVector<T> Deserialize(const TString& serializedCells) {
TSerializedCellVec cells(serializedCells);
TVector<T> values;
for (const auto& cell : cells.GetCells()) {
if (cell.IsNull()) {
// the last cell
break;
}
values.emplace_back(cell.AsValue<T>());
}
return values;
}
};

}

template <typename T>
TCheckFunc SplitBoundaries(TVector<T>&& expectedBoundaries) {
return [expectedBoundaries = std::move(expectedBoundaries)] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
const auto& pathDescr = record.GetPathDescription();
UNIT_ASSERT_VALUES_EQUAL(pathDescr.TablePartitionsSize(), expectedBoundaries.size() + 1);
for (size_t i = 0; i < expectedBoundaries.size(); ++i) {
const auto& partition = pathDescr.GetTablePartitions(i);
const auto actualBoundary = TSplitBoundarySerializer<T>::Deserialize(partition.GetEndOfRangeKeyPrefix()).at(0);
UNIT_ASSERT_VALUES_EQUAL_C(
actualBoundary, expectedBoundaries[i],
"partition index: " << i << '\n'
<< "actual key prefix: " << partition.GetEndOfRangeKeyPrefix().Quote() << '\n'
);
}
};
}

template TCheckFunc SplitBoundaries<ui32>(TVector<ui32>&&);

TCheckFunc ServerlessComputeResourcesMode(NKikimrSubDomains::EServerlessComputeResourcesMode serverlessComputeResourcesMode) {
return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
UNIT_ASSERT_C(IsGoodDomainStatus(record.GetStatus()), "Unexpected status: " << record.GetStatus());
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/tx/schemeshard/ut_helpers/ls_checks.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ namespace NLs {
void CheckBoundaries(const NKikimrScheme::TEvDescribeSchemeResult& record);
TCheckFunc PartitionCount(ui32 count);
TCheckFunc PartitionKeys(TVector<TString> lastShardKeys);
// Checks if the serialized representation of an expected boundary is a prefix of the actual one.
// Similar to PartitionKeys check, but does not require you to pass split boundaries in a serialized form.
template <typename T>
TCheckFunc SplitBoundaries(TVector<T>&& expectedBoundaries);
TCheckFunc FollowerCount(ui32 count);
TCheckFunc CrossDataCenterFollowerCount(ui32 count);
TCheckFunc AllowFollowerPromotion(bool val);
Expand Down Expand Up @@ -141,7 +145,7 @@ namespace NLs {
TCheckFunc IndexState(NKikimrSchemeOp::EIndexState state);
TCheckFunc IndexKeys(const TVector<TString>& keyNames);
TCheckFunc IndexDataColumns(const TVector<TString>& dataColumnNames);

TCheckFunc VectorIndexDescription(Ydb::Table::VectorIndexSettings_Metric metric,
Ydb::Table::VectorIndexSettings_VectorType vectorType,
ui32 vectorDimension
Expand Down
72 changes: 72 additions & 0 deletions ydb/core/tx/schemeshard/ut_index_build/ut_index_build.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,78 @@ Y_UNIT_TEST_SUITE(IndexBuildTest) {
}
}

Y_UNIT_TEST(IndexPartitioningIsPersisted) {
TTestBasicRuntime runtime;
TTestEnv env(runtime);
ui64 txId = 100;

TestCreateTable(runtime, ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Uint64" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: [ "key" ]
)");
env.TestWaitNotification(runtime, txId);

Ydb::Table::GlobalIndexSettings settings;
UNIT_ASSERT(google::protobuf::TextFormat::ParseFromString(R"(
partition_at_keys {
split_points {
type { tuple_type { elements { optional_type { item { type_id: UTF8 } } } } }
value { items { text_value: "alice" } }
}
split_points {
type { tuple_type { elements { optional_type { item { type_id: UTF8 } } } } }
value { items { text_value: "bob" } }
}
}
partitioning_settings {
min_partitions_count: 3
max_partitions_count: 3
}
)", &settings));

TBlockEvents<TEvSchemeShard::TEvModifySchemeTransaction> indexCreationBlocker(runtime, [](const auto& ev) {
const auto& modifyScheme = ev->Get()->Record.GetTransaction(0);
return modifyScheme.GetOperationType() == NKikimrSchemeOp::ESchemeOpCreateIndexBuild;
});

const ui64 buildIndexTx = ++txId;
TestBuildIndex(runtime, buildIndexTx, TTestTxConfig::SchemeShard, "/MyRoot", "/MyRoot/Table", TBuildIndexConfig{
"Index", NKikimrSchemeOp::EIndexTypeGlobal, { "value" }, {},
{ NYdb::NTable::TGlobalIndexSettings::FromProto(settings) }
});

RebootTablet(runtime, TTestTxConfig::SchemeShard, runtime.AllocateEdgeActor());

indexCreationBlocker.Stop().Unblock();
env.TestWaitNotification(runtime, buildIndexTx);

auto buildIndexOperation = TestGetBuildIndex(runtime, TTestTxConfig::SchemeShard, "/MyRoot", buildIndexTx);
UNIT_ASSERT_VALUES_EQUAL_C(
buildIndexOperation.GetIndexBuild().GetState(), Ydb::Table::IndexBuildState::STATE_DONE,
buildIndexOperation.DebugString()
);

TestDescribeResult(DescribePath(runtime, "/MyRoot/Table"), {
NLs::IsTable,
NLs::IndexesCount(1)
});

TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index"), {
NLs::PathExist,
NLs::IndexState(NKikimrSchemeOp::EIndexState::EIndexStateReady)
});

TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable", true, true), {
NLs::IsTable,
NLs::PartitionCount(3),
NLs::MinPartitionsCountEqual(3),
NLs::MaxPartitionsCountEqual(3),
NLs::PartitionKeys({"alice", "bob", ""})
});
}

Y_UNIT_TEST(DropIndex) {
TTestBasicRuntime runtime;
TTestEnv env(runtime);
Expand Down
Loading
Loading