Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion ydb/core/tx/schemeshard/schemeshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4558,8 +4558,12 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
auto& buildInfo = *buildInfoPtr->Get();
buildInfo.KMeans.Set(
rowset.GetValue<Schema::KMeansTreeProgress::Level>(),
rowset.GetValue<Schema::KMeansTreeProgress::ParentBegin>(),
rowset.GetValue<Schema::KMeansTreeProgress::Parent>(),
rowset.GetValue<Schema::KMeansTreeProgress::State>()
rowset.GetValue<Schema::KMeansTreeProgress::ChildBegin>(),
rowset.GetValue<Schema::KMeansTreeProgress::Child>(),
rowset.GetValue<Schema::KMeansTreeProgress::State>(),
rowset.GetValue<Schema::KMeansTreeProgress::TableSize>()
);
buildInfo.Sample.Rows.reserve(buildInfo.KMeans.K * 2);

Expand Down
20 changes: 12 additions & 8 deletions ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@
namespace NKikimr {
namespace NSchemeShard {

// TODO(mbkkt) get table rows count (but even better to have unique prefixes count)
static constexpr ui64 TableSize = 1'000;

static constexpr const char* Name(TIndexBuildInfo::EState state) noexcept {
switch (state) {
case TIndexBuildInfo::EState::Invalid:
Expand Down Expand Up @@ -681,7 +678,8 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
ev->Record.SetNeedsRounds(3); // TODO(mbkkt) should be configurable

const auto shardIndex = buildInfo.Shards.at(shardIdx).Index;
ev->Record.SetChild(buildInfo.KMeans.ChildBegin + (1 + TableSize) * shardIndex);
// about 2 * TableSize see comment in PrefixIndexDone
ev->Record.SetChild(buildInfo.KMeans.ChildBegin + (2 * buildInfo.KMeans.TableSize) * shardIndex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Это чтобы номера не пересекались нужно? Выглядит не очень надёжно :(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Да все так.
Ну не очень очевидно как по другому сделать.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Не очень понимаю что тут происходит

Вот мы хотим построить индекс, запускаем на каждом шарде prefix kmeans

Запускаем последовательно и TableSize это число строк в шардах слева?

Что значит Y_ASSERT(buildInfo.KMeans.Level == 2), почему уровень второй?

Copy link
Contributor Author

@MBkkt MBkkt Mar 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Не очень понимаю что тут происходит
Вот мы хотим построить индекс, запускаем на каждом шарде prefix kmeans
Запускаем последовательно и TableSize это число строк в шардах слева?

Комментарий в другом месте подробный

Что значит Y_ASSERT(buildInfo.KMeans.Level == 2), почему уровень второй?

У нас построение выглядит так.
1ый уровень: построили вторичный индекс для префикса (в covered column запихнули embedding + vector index covered column)
2ой уровень: запустили читая из этого индекса n prefix_kmeans, они будут работать параллельно и важно чтобы их id не пересеклись. К сожалению мы заранее не знаем сколько уникальных префиксов всего и не знаем сколько их на конкретном шарде


ev->Record.SetPostingName(path.Dive(buildInfo.KMeans.WriteTo()).PathString());
path.Rise().Dive(NTableIndex::NTableVectorKmeansTreeIndex::LevelTable);
Expand Down Expand Up @@ -931,7 +929,11 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
db.Table<Schema::KMeansTreeProgress>().Key(buildInfo.Id).Update(
NIceDb::TUpdate<Schema::KMeansTreeProgress::Level>(buildInfo.KMeans.Level),
NIceDb::TUpdate<Schema::KMeansTreeProgress::State>(buildInfo.KMeans.State),
NIceDb::TUpdate<Schema::KMeansTreeProgress::Parent>(buildInfo.KMeans.Parent)
NIceDb::TUpdate<Schema::KMeansTreeProgress::Parent>(buildInfo.KMeans.Parent),
NIceDb::TUpdate<Schema::KMeansTreeProgress::ParentBegin>(buildInfo.KMeans.ParentBegin),
NIceDb::TUpdate<Schema::KMeansTreeProgress::Child>(buildInfo.KMeans.Child),
NIceDb::TUpdate<Schema::KMeansTreeProgress::ChildBegin>(buildInfo.KMeans.ChildBegin),
NIceDb::TUpdate<Schema::KMeansTreeProgress::TableSize>(buildInfo.KMeans.TableSize)
);
}

Expand All @@ -944,7 +946,9 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
const ui64 doneShards = buildInfo.DoneShards.size();

ClearDoneShards(txc, buildInfo);
Y_ABORT_UNLESS(buildInfo.KMeans.PrefixTableDone(TableSize, doneShards));
// it's approximate but upper bound, so it's ok
buildInfo.KMeans.TableSize = std::max<ui64>(1, buildInfo.Processed.GetUploadRows());
buildInfo.KMeans.PrefixIndexDone(doneShards);
PersistKMeansState(txc, buildInfo);
NIceDb::TNiceDb db{txc.DB};
Self->PersistBuildIndexUploadReset(db, buildInfo);
Expand Down Expand Up @@ -1316,14 +1320,14 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
auto tableColumns = NTableIndex::ExtractInfo(table); // skip dropped columns
TSerializedTableRange shardRange = InfiniteRange(tableColumns.Keys.size());
static constexpr std::string_view LogPrefix = "";
LOG_D("infinite range " << buildInfo.KMeans.RangeToDebugStr(shardRange));
LOG_D("infinite range " << buildInfo.KMeans.RangeToDebugStr(shardRange, buildInfo.IsBuildPrefixedVectorIndex() ? 2 : 1));

buildInfo.Cluster2Shards.clear();
for (const auto& x: table->GetPartitions()) {
Y_ABORT_UNLESS(Self->ShardInfos.contains(x.ShardIdx));
TSerializedCellVec bound{x.EndOfRange};
shardRange.To = bound;
LOG_D("shard " << x.ShardIdx << " range " << buildInfo.KMeans.RangeToDebugStr(shardRange));
LOG_D("shard " << x.ShardIdx << " range " << buildInfo.KMeans.RangeToDebugStr(shardRange, buildInfo.IsBuildPrefixedVectorIndex() ? 2 : 1));
buildInfo.AddParent(shardRange, x.ShardIdx);
auto [it, emplaced] = buildInfo.Shards.emplace(x.ShardIdx, TIndexBuildInfo::TShardStatus{std::move(shardRange), "", buildInfo.Shards.size()});
Y_ASSERT(emplaced);
Expand Down
42 changes: 24 additions & 18 deletions ydb/core/tx/schemeshard/schemeshard_info_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -3113,6 +3113,9 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
NTableIndex::TClusterId ChildBegin = 1; // included
NTableIndex::TClusterId Child = ChildBegin;

ui64 TableSize = 0;


ui64 ParentEnd() const noexcept { // included
return ChildBegin - 1;
}
Expand Down Expand Up @@ -3173,25 +3176,28 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
return true;
}

bool PrefixTableDone(ui64 tableSize, ui64 shards) {
if (!NeedsAnotherLevel()) {
return false;
}
void PrefixIndexDone(ui64 shards) {
Y_ABORT_UNLESS(NeedsAnotherLevel());
State = MultiLocal;
NextLevel((1 + tableSize) * shards);
// There's two worst cases, but in both one shard contains TableSize rows
// 1. all rows have unique prefix (*), in such case we need 1 id for each row (parent, id in prefix table)
// 2. all unique prefixes have size K, so we have TableSize/K parents + TableSize childs
// * it doesn't work now, because now prefix should have at least K embeddings, but it's bug
NextLevel((2 * TableSize) * shards);
Parent = ParentEnd();
return true;
}

void Set(ui32 level, NTableIndex::TClusterId parent, ui32 state) {
// TODO(mbkkt) make it without cycles
while (Level < level) {
NextLevel();
}
while (Parent < parent) {
NextParent();
}
void Set(ui32 level,
NTableIndex::TClusterId parentBegin, NTableIndex::TClusterId parent,
NTableIndex::TClusterId childBegin, NTableIndex::TClusterId child,
ui32 state, ui64 tableSize) {
Level = level;
ParentBegin = parentBegin;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Могу только догадываться что эти числа используются для нумерации кластеров, но явно не хватает какого-то комментария с тем картинкой как они используются и что значат

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ну они не в первый раз возникли)
Вообще идея такая:
У нас есть процесс построения, по сути это цикл в цикле в цикле развернутый в конечный автомат.

циклы выглядят так

for (;level < levels; ++level) {
  for (parent = parentBegin; parent < parentEnd; ++parent) {
    do something
  }
}

parentEnd ~ childBegin (там функции есть, по ним видно)
child/childBegin нужны для удобства, в целом их посчитать можно из Parent+ParentEnd

Бтв
prefixed vector index levels setting транслируется в levels+1 (собственно + 1 уровень на вторичный индекс безусловно)

Parent = parent;
ChildBegin = childBegin;
Child = child;
State = static_cast<EState>(state);
TableSize = tableSize;
}

NKikimrTxDataShard::TEvLocalKMeansRequest::EState GetUpload() const {
Expand Down Expand Up @@ -3251,7 +3257,7 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
return {parentFrom, parentTo};
}

TString RangeToDebugStr(const TSerializedTableRange& range) const {
TString RangeToDebugStr(const TSerializedTableRange& range, ui32 rootLevel) const {
auto toStr = [&](const TSerializedCellVec& v) -> TString {
const auto cells = v.GetCells();
if (cells.empty()) {
Expand All @@ -3261,8 +3267,7 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
return "-inf";
}
auto str = TStringBuilder{} << "{ count: " << cells.size();
if (Parent != 0) {
Y_ASSERT(Level != 0);
if (Level > rootLevel) {
str << ", parent: " << cells[0].AsValue<NTableIndex::TClusterId>();
if (cells.size() != 1 && cells[1].IsNull()) {
str << ", pk: null";
Expand Down Expand Up @@ -3652,7 +3657,8 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {

TSerializedTableRange bound{range};
LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::BUILD_INDEX,
"AddShardStatus id# " << Id << " shard " << shardIdx << " range " << KMeans.RangeToDebugStr(bound));
"AddShardStatus id# " << Id << " shard " << shardIdx <<
" range " << KMeans.RangeToDebugStr(bound, IsBuildPrefixedVectorIndex() ? 2 : 1));
AddParent(bound, shardIdx);
Shards.emplace(
shardIdx, TIndexBuildInfo::TShardStatus(std::move(bound), std::move(lastKeyAck), Shards.size()));
Expand Down
13 changes: 12 additions & 1 deletion ydb/core/tx/schemeshard/schemeshard_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -1919,13 +1919,24 @@ struct Schema : NIceDb::Schema {
struct Level : Column<2, NScheme::NTypeIds::Uint32> {};
struct State : Column<3, NScheme::NTypeIds::Uint32> {};
struct Parent : Column<4, ClusterIdTypeId> {};
struct ParentBegin : Column<5, ClusterIdTypeId> {};
struct Child : Column<6, ClusterIdTypeId> {};
struct ChildBegin : Column<7, ClusterIdTypeId> {};
struct TableSize : Column<8, NScheme::NTypeIds::Uint64> {};
// TableSize required for prefixed kmeans tree
// But can be filled and used for other kmeans tree for "auto" settings choice
// Also for "auto" settings will needs to save K

using TKey = TableKey<Id>;
using TColumns = TableColumns<
Id,
Level,
State,
Parent
Parent,
ParentBegin,
Child,
ChildBegin,
TableSize
>;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8148,6 +8148,26 @@
"ColumnId": 4,
"ColumnName": "Parent",
"ColumnType": "Uint64"
},
{
"ColumnId": 5,
"ColumnName": "ParentBegin",
"ColumnType": "Uint64"
},
{
"ColumnId": 6,
"ColumnName": "Child",
"ColumnType": "Uint64"
},
{
"ColumnId": 7,
"ColumnName": "ChildBegin",
"ColumnType": "Uint64"
},
{
"ColumnId": 8,
"ColumnName": "TableSize",
"ColumnType": "Uint64"
}
],
"ColumnsDropped": [],
Expand All @@ -8157,7 +8177,11 @@
1,
2,
3,
4
4,
5,
6,
7,
8
],
"RoomID": 0,
"Codec": 0,
Expand Down
Loading