Skip to content

Commit

Permalink
Storages: support building vector index for ColumnFileTiny (Part 1)
Browse files Browse the repository at this point in the history
Signed-off-by: Lloyd-Pottiger <yan1579196623@gmail.com>
  • Loading branch information
Lloyd-Pottiger committed Oct 16, 2024
1 parent 4b56409 commit 7f095b7
Show file tree
Hide file tree
Showing 13 changed files with 317 additions and 89 deletions.
115 changes: 85 additions & 30 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,17 @@ void ColumnFileTiny::serializeMetadata(dtpb::ColumnFilePersisted * cf_pb, bool s
tiny_pb->set_id(data_page_id);
tiny_pb->set_rows(rows);
tiny_pb->set_bytes(bytes);

if (!index_infos)
return;

for (const auto & index_info : *index_infos)
{
auto * index_pb = tiny_pb->add_indexes();
index_pb->set_index_page_id(index_info.index_page_id);
if (index_info.vector_index.has_value())
index_pb->mutable_vector_index()->CopyFrom(*index_info.vector_index);
}
}

ColumnFilePersistedPtr ColumnFileTiny::deserializeMetadata(
Expand Down Expand Up @@ -223,8 +234,17 @@ ColumnFilePersistedPtr ColumnFileTiny::deserializeMetadata(
PageIdU64 data_page_id = cf_pb.id();
size_t rows = cf_pb.rows();
size_t bytes = cf_pb.bytes();
auto index_infos = std::make_shared<IndexInfos>();
index_infos->reserve(cf_pb.indexes().size());
for (const auto & index_pb : cf_pb.indexes())
{
if (index_pb.has_vector_index())
index_infos->emplace_back(index_pb.index_page_id(), index_pb.vector_index());
else
index_infos->emplace_back(index_pb.index_page_id(), std::nullopt);
}

return std::make_shared<ColumnFileTiny>(schema, rows, bytes, data_page_id, context);
return std::make_shared<ColumnFileTiny>(schema, rows, bytes, data_page_id, context, index_infos);
}

ColumnFilePersistedPtr ColumnFileTiny::restoreFromCheckpoint(
Expand All @@ -235,36 +255,55 @@ ColumnFilePersistedPtr ColumnFileTiny::restoreFromCheckpoint(
BlockPtr schema,
PageIdU64 data_page_id,
size_t rows,
size_t bytes)
size_t bytes,
IndexInfosPtr index_infos)
{
auto new_cf_id = context.storage_pool->newLogPageId();
/// Generate a new RemotePage with an entry with data location on S3
auto remote_page_id = UniversalPageIdFormat::toFullPageId(
UniversalPageIdFormat::toFullPrefix(context.keyspace_id, StorageType::Log, context.physical_table_id),
data_page_id);
// The `data_file_id` in temp_ps is lock key, we need convert it to data key before write to local ps
auto remote_data_location = temp_ps->getCheckpointLocation(remote_page_id);
RUNTIME_CHECK(remote_data_location.has_value());
auto remote_data_file_lock_key_view = S3::S3FilenameView::fromKey(*remote_data_location->data_file_id);
RUNTIME_CHECK(remote_data_file_lock_key_view.isLockFile());
auto remote_data_file_key = remote_data_file_lock_key_view.asDataFile().toFullKey();
PS::V3::CheckpointLocation new_remote_data_location{
.data_file_id = std::make_shared<String>(remote_data_file_key),
.offset_in_file = remote_data_location->offset_in_file,
.size_in_file = remote_data_location->size_in_file,
auto put_remote_page = [&](PageIdU64 page_id) {
auto new_cf_id = context.storage_pool->newLogPageId();
/// Generate a new RemotePage with an entry with data location on S3
auto remote_page_id = UniversalPageIdFormat::toFullPageId(
UniversalPageIdFormat::toFullPrefix(context.keyspace_id, StorageType::Log, context.physical_table_id),
page_id);
// The `data_file_id` in temp_ps is lock key, we need convert it to data key before write to local ps
auto remote_data_location = temp_ps->getCheckpointLocation(remote_page_id);
RUNTIME_CHECK(remote_data_location.has_value());
auto remote_data_file_lock_key_view = S3::S3FilenameView::fromKey(*remote_data_location->data_file_id);
RUNTIME_CHECK(remote_data_file_lock_key_view.isLockFile());
auto remote_data_file_key = remote_data_file_lock_key_view.asDataFile().toFullKey();
PS::V3::CheckpointLocation new_remote_data_location{
.data_file_id = std::make_shared<String>(remote_data_file_key),
.offset_in_file = remote_data_location->offset_in_file,
.size_in_file = remote_data_location->size_in_file,
};
// TODO: merge the `getEntry` and `getCheckpointLocation`
auto entry = temp_ps->getEntry(remote_page_id);
LOG_DEBUG(
parent_log,
"Write remote page to local, page_id={} remote_location={} remote_page_id={}",
new_cf_id,
new_remote_data_location.toDebugString(),
remote_page_id);
wbs.log.putRemotePage(new_cf_id, 0, entry.size, new_remote_data_location, std::move(entry.field_offsets));
return new_cf_id;
};
// TODO: merge the `getEntry` and `getCheckpointLocation`
auto entry = temp_ps->getEntry(remote_page_id);
LOG_DEBUG(
parent_log,
"Write remote page to local, page_id={} remote_location={} remote_page_id={}",
new_cf_id,
new_remote_data_location.toDebugString(),
remote_page_id);
wbs.log.putRemotePage(new_cf_id, 0, entry.size, new_remote_data_location, std::move(entry.field_offsets));

// Write column data page to local ps
auto new_cf_id = put_remote_page(data_page_id);
auto column_file_schema = std::make_shared<ColumnFileSchema>(*schema);
return std::make_shared<ColumnFileTiny>(column_file_schema, rows, bytes, new_cf_id, context);
if (!index_infos)
return std::make_shared<ColumnFileTiny>(column_file_schema, rows, bytes, new_cf_id, context);

// Write index data page to local ps
auto new_index_infos = std::make_shared<IndexInfos>();
for (const auto & index : *index_infos)
{
auto new_index_page_id = put_remote_page(index.index_page_id);
if (index.vector_index)
new_index_infos->emplace_back(new_index_page_id, index.vector_index);
else
new_index_infos->emplace_back(new_index_page_id, std::nullopt);
}
return std::make_shared<ColumnFileTiny>(column_file_schema, rows, bytes, new_cf_id, context, new_index_infos);
}

std::tuple<ColumnFilePersistedPtr, BlockPtr> ColumnFileTiny::createFromCheckpoint(
Expand All @@ -288,7 +327,7 @@ std::tuple<ColumnFilePersistedPtr, BlockPtr> ColumnFileTiny::createFromCheckpoin
readIntBinary(bytes, buf);

return {
restoreFromCheckpoint(parent_log, context, temp_ps, wbs, schema, data_page_id, rows, bytes),
restoreFromCheckpoint(parent_log, context, temp_ps, wbs, schema, data_page_id, rows, bytes, nullptr),
schema,
};
}
Expand All @@ -309,9 +348,18 @@ std::tuple<ColumnFilePersistedPtr, BlockPtr> ColumnFileTiny::createFromCheckpoin
PageIdU64 data_page_id = cf_pb.id();
size_t rows = cf_pb.rows();
size_t bytes = cf_pb.bytes();
auto index_infos = std::make_shared<IndexInfos>();
index_infos->reserve(cf_pb.indexes().size());
for (const auto & index_pb : cf_pb.indexes())
{
if (index_pb.has_vector_index())
index_infos->emplace_back(index_pb.index_page_id(), index_pb.vector_index());
else
index_infos->emplace_back(index_pb.index_page_id(), std::nullopt);
}

return {
restoreFromCheckpoint(parent_log, context, temp_ps, wbs, schema, data_page_id, rows, bytes),
restoreFromCheckpoint(parent_log, context, temp_ps, wbs, schema, data_page_id, rows, bytes, index_infos),
schema,
};
}
Expand Down Expand Up @@ -362,7 +410,7 @@ ColumnFileTinyPtr ColumnFileTiny::writeColumnFile(
auto schema = getSharedBlockSchemas(context)->getOrCreate(block);

auto bytes = block.bytes(offset, limit);
return std::make_shared<ColumnFileTiny>(schema, limit, bytes, page_id, context, cache);
return std::make_shared<ColumnFileTiny>(schema, limit, bytes, page_id, context, nullptr, cache);
}

PageIdU64 ColumnFileTiny::writeColumnFileData(
Expand Down Expand Up @@ -423,6 +471,11 @@ PageIdU64 ColumnFileTiny::writeColumnFileData(
void ColumnFileTiny::removeData(WriteBatches & wbs) const
{
wbs.removed_log.delPage(data_page_id);
if (index_infos)
{
for (const auto & index_info : *index_infos)
wbs.removed_log.delPage(index_info.index_page_id);
}
}

ColumnPtr ColumnFileTinyReader::getPKColumn()
Expand Down Expand Up @@ -483,11 +536,13 @@ ColumnFileTiny::ColumnFileTiny(
UInt64 bytes_,
PageIdU64 data_page_id_,
const DMContext & dm_context,
const IndexInfosPtr & index_infos_,
const CachePtr & cache_)
: schema(schema_)
, rows(rows_)
, bytes(bytes_)
, data_page_id(data_page_id_)
, index_infos(index_infos_)
, keyspace_id(dm_context.keyspace_id)
, file_provider(dm_context.global_context.getFileProvider())
, cache(cache_)
Expand Down
38 changes: 36 additions & 2 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <Storages/DeltaMerge/DMContext_fwd.h>
#include <Storages/DeltaMerge/Remote/Serializer_fwd.h>
#include <Storages/DeltaMerge/dtpb/column_file.pb.h>
#include <Storages/DeltaMerge/dtpb/vector_index.pb.h>
#include <Storages/Page/PageStorage_fwd.h>

namespace DB
Expand All @@ -36,9 +37,18 @@ using ColumnFileTinyPtr = std::shared_ptr<ColumnFileTiny>;
/// 2. created when flushed `ColumnFileInMemory` to disk
class ColumnFileTiny : public ColumnFilePersisted
{
public:
friend class ColumnFileTinyReader;
friend struct Remote::Serializer;

struct IndexInfo
{
PageIdU64 index_page_id{};
std::optional<dtpb::VectorIndexFileProps> vector_index = std::nullopt;
};
using IndexInfos = std::vector<IndexInfo>;
using IndexInfosPtr = std::shared_ptr<IndexInfos>;

private:
ColumnFileSchemaPtr schema;

Expand All @@ -53,6 +63,9 @@ class ColumnFileTiny : public ColumnFilePersisted
/// Maybe we should just drop this field, and store the data_page_size in somewhere else.
UInt64 data_page_size = 0;

/// The index information of this file.
IndexInfosPtr index_infos;

/// The id of the keyspace which this ColumnFileTiny belongs to.
KeyspaceID keyspace_id;
/// The global file_provider
Expand Down Expand Up @@ -89,17 +102,29 @@ class ColumnFileTiny : public ColumnFilePersisted
UInt64 bytes_,
PageIdU64 data_page_id_,
const DMContext & dm_context,
const IndexInfosPtr & index_infos_ = nullptr,
const CachePtr & cache_ = nullptr);

Type getType() const override { return Type::TINY_FILE; }

size_t getRows() const override { return rows; }
size_t getBytes() const override { return bytes; }

IndexInfosPtr getIndexInfos() const { return index_infos; }
bool hasIndex(Int64 index_id) const
{
if (!index_infos)
return false;
return std::any_of(index_infos->cbegin(), index_infos->cend(), [index_id](const auto & info) {
if (!info.vector_index)
return false;
return info.vector_index->index_id() == index_id;
});
}

auto getCache() const { return cache; }
void clearCache() { cache = {}; }

/// The schema of this pack. Could be empty, i.e. a DeleteRange does not have a schema.
ColumnFileSchemaPtr getSchema() const { return schema; }

ColumnFileTinyPtr cloneWith(PageIdU64 new_data_page_id)
Expand All @@ -109,6 +134,14 @@ class ColumnFileTiny : public ColumnFilePersisted
return new_tiny_file;
}

ColumnFileTinyPtr cloneWith(PageIdU64 new_data_page_id, const IndexInfosPtr & index_infos_) const
{
auto new_tiny_file = std::make_shared<ColumnFileTiny>(*this);
new_tiny_file->data_page_id = new_data_page_id;
new_tiny_file->index_infos = index_infos_;
return new_tiny_file;
}

ColumnFileReaderPtr getReader(
const DMContext &,
const IColumnFileDataProviderPtr & data_provider,
Expand Down Expand Up @@ -166,7 +199,8 @@ class ColumnFileTiny : public ColumnFilePersisted
BlockPtr schema,
PageIdU64 data_page_id,
size_t rows,
size_t bytes);
size_t bytes,
IndexInfosPtr index_infos);
static std::tuple<ColumnFilePersistedPtr, BlockPtr> createFromCheckpoint(
const LoggerPtr & parent_log,
const DMContext & context,
Expand Down
24 changes: 19 additions & 5 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@

#include <ext/scope_guard.h>

namespace DB
{
namespace DM
namespace DB::DM
{

// ================================================
// Public methods
// ================================================
Expand Down Expand Up @@ -158,6 +157,21 @@ std::vector<ColumnFilePtrT> CloneColumnFilesHelper<ColumnFilePtrT>::clone(
// Use a newly created page_id to reference the data page_id of current column file.
PageIdU64 new_data_page_id = dm_context.storage_pool->newLogPageId();
wbs.log.putRefPage(new_data_page_id, t->getDataPageId());
if (auto index_infos = t->getIndexInfos(); index_infos)
{
auto new_index_infos = std::make_shared<ColumnFileTiny::IndexInfos>();
new_index_infos->reserve(index_infos->size());
// Use a newly created page_id to reference the index page_id of current column file.
for (auto & index_info : *index_infos)
{
auto new_index_page_id = dm_context.storage_pool->newLogPageId();
wbs.log.putRefPage(new_index_page_id, index_info.index_page_id);
new_index_infos->emplace_back(new_index_page_id, index_info.vector_index);
}
auto new_column_file = t->cloneWith(new_data_page_id, new_index_infos);
cloned.push_back(new_column_file);
continue;
}
auto new_column_file = t->cloneWith(new_data_page_id);
cloned.push_back(new_column_file);
}
Expand Down Expand Up @@ -503,5 +517,5 @@ bool DeltaValueSpace::compact(DMContext & context)

return true;
}
} // namespace DM
} // namespace DB

} // namespace DB::DM
Loading

0 comments on commit 7f095b7

Please sign in to comment.