Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storages: support building vector index for ColumnFileTiny (Part 1) #9534

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 85 additions & 30 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,17 @@ void ColumnFileTiny::serializeMetadata(dtpb::ColumnFilePersisted * cf_pb, bool s
tiny_pb->set_id(data_page_id);
tiny_pb->set_rows(rows);
tiny_pb->set_bytes(bytes);

if (!index_infos)
return;

for (const auto & index_info : *index_infos)
{
auto * index_pb = tiny_pb->add_indexes();
index_pb->set_index_page_id(index_info.index_page_id);
if (index_info.vector_index.has_value())
index_pb->mutable_vector_index()->CopyFrom(*index_info.vector_index);
}
}

ColumnFilePersistedPtr ColumnFileTiny::deserializeMetadata(
Expand Down Expand Up @@ -224,8 +235,17 @@ ColumnFilePersistedPtr ColumnFileTiny::deserializeMetadata(
PageIdU64 data_page_id = cf_pb.id();
size_t rows = cf_pb.rows();
size_t bytes = cf_pb.bytes();
auto index_infos = std::make_shared<IndexInfos>();
index_infos->reserve(cf_pb.indexes().size());
for (const auto & index_pb : cf_pb.indexes())
{
if (index_pb.has_vector_index())
index_infos->emplace_back(index_pb.index_page_id(), index_pb.vector_index());
else
index_infos->emplace_back(index_pb.index_page_id(), std::nullopt);
}

return std::make_shared<ColumnFileTiny>(schema, rows, bytes, data_page_id, context);
return std::make_shared<ColumnFileTiny>(schema, rows, bytes, data_page_id, context, index_infos);
}

ColumnFilePersistedPtr ColumnFileTiny::restoreFromCheckpoint(
Expand All @@ -236,36 +256,55 @@ ColumnFilePersistedPtr ColumnFileTiny::restoreFromCheckpoint(
BlockPtr schema,
PageIdU64 data_page_id,
size_t rows,
size_t bytes)
size_t bytes,
IndexInfosPtr index_infos)
{
auto new_cf_id = context.storage_pool->newLogPageId();
/// Generate a new RemotePage with an entry with data location on S3
auto remote_page_id = UniversalPageIdFormat::toFullPageId(
UniversalPageIdFormat::toFullPrefix(context.keyspace_id, StorageType::Log, context.physical_table_id),
data_page_id);
// The `data_file_id` in temp_ps is lock key, we need convert it to data key before write to local ps
auto remote_data_location = temp_ps->getCheckpointLocation(remote_page_id);
RUNTIME_CHECK(remote_data_location.has_value());
auto remote_data_file_lock_key_view = S3::S3FilenameView::fromKey(*remote_data_location->data_file_id);
RUNTIME_CHECK(remote_data_file_lock_key_view.isLockFile());
auto remote_data_file_key = remote_data_file_lock_key_view.asDataFile().toFullKey();
PS::V3::CheckpointLocation new_remote_data_location{
.data_file_id = std::make_shared<String>(remote_data_file_key),
.offset_in_file = remote_data_location->offset_in_file,
.size_in_file = remote_data_location->size_in_file,
auto put_remote_page = [&](PageIdU64 page_id) {
auto new_cf_id = context.storage_pool->newLogPageId();
/// Generate a new RemotePage with an entry with data location on S3
auto remote_page_id = UniversalPageIdFormat::toFullPageId(
UniversalPageIdFormat::toFullPrefix(context.keyspace_id, StorageType::Log, context.physical_table_id),
page_id);
// The `data_file_id` in temp_ps is lock key, we need convert it to data key before write to local ps
auto remote_data_location = temp_ps->getCheckpointLocation(remote_page_id);
RUNTIME_CHECK(remote_data_location.has_value());
auto remote_data_file_lock_key_view = S3::S3FilenameView::fromKey(*remote_data_location->data_file_id);
RUNTIME_CHECK(remote_data_file_lock_key_view.isLockFile());
auto remote_data_file_key = remote_data_file_lock_key_view.asDataFile().toFullKey();
PS::V3::CheckpointLocation new_remote_data_location{
.data_file_id = std::make_shared<String>(remote_data_file_key),
.offset_in_file = remote_data_location->offset_in_file,
.size_in_file = remote_data_location->size_in_file,
};
// TODO: merge the `getEntry` and `getCheckpointLocation`
auto entry = temp_ps->getEntry(remote_page_id);
LOG_DEBUG(
parent_log,
"Write remote page to local, page_id={} remote_location={} remote_page_id={}",
new_cf_id,
new_remote_data_location.toDebugString(),
remote_page_id);
wbs.log.putRemotePage(new_cf_id, 0, entry.size, new_remote_data_location, std::move(entry.field_offsets));
return new_cf_id;
};
// TODO: merge the `getEntry` and `getCheckpointLocation`
auto entry = temp_ps->getEntry(remote_page_id);
LOG_DEBUG(
parent_log,
"Write remote page to local, page_id={} remote_location={} remote_page_id={}",
new_cf_id,
new_remote_data_location.toDebugString(),
remote_page_id);
wbs.log.putRemotePage(new_cf_id, 0, entry.size, new_remote_data_location, std::move(entry.field_offsets));

// Write column data page to local ps
auto new_cf_id = put_remote_page(data_page_id);
auto column_file_schema = std::make_shared<ColumnFileSchema>(*schema);
return std::make_shared<ColumnFileTiny>(column_file_schema, rows, bytes, new_cf_id, context);
if (!index_infos)
return std::make_shared<ColumnFileTiny>(column_file_schema, rows, bytes, new_cf_id, context);

// Write index data page to local ps
auto new_index_infos = std::make_shared<IndexInfos>();
for (const auto & index : *index_infos)
{
auto new_index_page_id = put_remote_page(index.index_page_id);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to ensure that the page must exist?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The workflow is

  1. write vector index (new page in log)
  2. delta update meta (new page in meta)
  3. page storage dump incremental checkpoint

So if new meta page was uploaded, the new log page must be uploaded.

if (index.vector_index)
new_index_infos->emplace_back(new_index_page_id, index.vector_index);
else
new_index_infos->emplace_back(new_index_page_id, std::nullopt);
}
return std::make_shared<ColumnFileTiny>(column_file_schema, rows, bytes, new_cf_id, context, new_index_infos);
}

std::tuple<ColumnFilePersistedPtr, BlockPtr> ColumnFileTiny::createFromCheckpoint(
Expand All @@ -289,7 +328,7 @@ std::tuple<ColumnFilePersistedPtr, BlockPtr> ColumnFileTiny::createFromCheckpoin
readIntBinary(bytes, buf);

return {
restoreFromCheckpoint(parent_log, context, temp_ps, wbs, schema, data_page_id, rows, bytes),
restoreFromCheckpoint(parent_log, context, temp_ps, wbs, schema, data_page_id, rows, bytes, nullptr),
schema,
};
}
Expand All @@ -310,9 +349,18 @@ std::tuple<ColumnFilePersistedPtr, BlockPtr> ColumnFileTiny::createFromCheckpoin
PageIdU64 data_page_id = cf_pb.id();
size_t rows = cf_pb.rows();
size_t bytes = cf_pb.bytes();
auto index_infos = std::make_shared<IndexInfos>();
index_infos->reserve(cf_pb.indexes().size());
for (const auto & index_pb : cf_pb.indexes())
{
if (index_pb.has_vector_index())
index_infos->emplace_back(index_pb.index_page_id(), index_pb.vector_index());
else
index_infos->emplace_back(index_pb.index_page_id(), std::nullopt);
}

return {
restoreFromCheckpoint(parent_log, context, temp_ps, wbs, schema, data_page_id, rows, bytes),
restoreFromCheckpoint(parent_log, context, temp_ps, wbs, schema, data_page_id, rows, bytes, index_infos),
schema,
};
}
Expand Down Expand Up @@ -363,7 +411,7 @@ ColumnFileTinyPtr ColumnFileTiny::writeColumnFile(
auto schema = getSharedBlockSchemas(context)->getOrCreate(block);

auto bytes = block.bytes(offset, limit);
return std::make_shared<ColumnFileTiny>(schema, limit, bytes, page_id, context, cache);
return std::make_shared<ColumnFileTiny>(schema, limit, bytes, page_id, context, nullptr, cache);
}

PageIdU64 ColumnFileTiny::writeColumnFileData(
Expand Down Expand Up @@ -424,6 +472,11 @@ PageIdU64 ColumnFileTiny::writeColumnFileData(
void ColumnFileTiny::removeData(WriteBatches & wbs) const
{
wbs.removed_log.delPage(data_page_id);
if (index_infos)
{
for (const auto & index_info : *index_infos)
wbs.removed_log.delPage(index_info.index_page_id);
}
}

ColumnFileTiny::ColumnFileTiny(
Expand All @@ -432,11 +485,13 @@ ColumnFileTiny::ColumnFileTiny(
UInt64 bytes_,
PageIdU64 data_page_id_,
const DMContext & dm_context,
const IndexInfosPtr & index_infos_,
const CachePtr & cache_)
: schema(schema_)
, rows(rows_)
, bytes(bytes_)
, data_page_id(data_page_id_)
, index_infos(index_infos_)
, keyspace_id(dm_context.keyspace_id)
, file_provider(dm_context.global_context.getFileProvider())
, cache(cache_)
Expand Down
38 changes: 36 additions & 2 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <Storages/DeltaMerge/DMContext_fwd.h>
#include <Storages/DeltaMerge/Remote/Serializer_fwd.h>
#include <Storages/DeltaMerge/dtpb/column_file.pb.h>
#include <Storages/DeltaMerge/dtpb/vector_index.pb.h>
#include <Storages/Page/PageStorage_fwd.h>

namespace DB::DM
Expand All @@ -34,9 +35,18 @@ using ColumnFileTinyPtr = std::shared_ptr<ColumnFileTiny>;
/// 2. created when flushed `ColumnFileInMemory` to disk
class ColumnFileTiny : public ColumnFilePersisted
{
public:
friend class ColumnFileTinyReader;
friend struct Remote::Serializer;

struct IndexInfo
{
PageIdU64 index_page_id{};
std::optional<dtpb::VectorIndexFileProps> vector_index = std::nullopt;
breezewish marked this conversation as resolved.
Show resolved Hide resolved
};
using IndexInfos = std::vector<IndexInfo>;
using IndexInfosPtr = std::shared_ptr<IndexInfos>;

private:
ColumnFileSchemaPtr schema;

Expand All @@ -51,6 +61,9 @@ class ColumnFileTiny : public ColumnFilePersisted
/// Maybe we should just drop this field, and store the data_page_size in somewhere else.
UInt64 data_page_size = 0;

/// The index information of this file.
IndexInfosPtr index_infos;

/// The id of the keyspace which this ColumnFileTiny belongs to.
KeyspaceID keyspace_id;
/// The global file_provider
Expand Down Expand Up @@ -87,17 +100,29 @@ class ColumnFileTiny : public ColumnFilePersisted
UInt64 bytes_,
PageIdU64 data_page_id_,
const DMContext & dm_context,
const IndexInfosPtr & index_infos_ = nullptr,
const CachePtr & cache_ = nullptr);

Type getType() const override { return Type::TINY_FILE; }

size_t getRows() const override { return rows; }
size_t getBytes() const override { return bytes; }

IndexInfosPtr getIndexInfos() const { return index_infos; }
bool hasIndex(Int64 index_id) const
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hasVectorIndex?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hasIndex is better since we will support other types of index later.

{
if (!index_infos)
return false;
return std::any_of(index_infos->cbegin(), index_infos->cend(), [index_id](const auto & info) {
if (!info.vector_index)
return false;
return info.vector_index->index_id() == index_id;
});
}

auto getCache() const { return cache; }
void clearCache() { cache = {}; }

/// The schema of this pack. Could be empty, i.e. a DeleteRange does not have a schema.
ColumnFileSchemaPtr getSchema() const { return schema; }

ColumnFileTinyPtr cloneWith(PageIdU64 new_data_page_id)
Expand All @@ -107,6 +132,14 @@ class ColumnFileTiny : public ColumnFilePersisted
return new_tiny_file;
}

ColumnFileTinyPtr cloneWith(PageIdU64 new_data_page_id, const IndexInfosPtr & index_infos_) const
{
auto new_tiny_file = std::make_shared<ColumnFileTiny>(*this);
new_tiny_file->data_page_id = new_data_page_id;
new_tiny_file->index_infos = index_infos_;
return new_tiny_file;
}

ColumnFileReaderPtr getReader(
const DMContext &,
const IColumnFileDataProviderPtr & data_provider,
Expand Down Expand Up @@ -165,7 +198,8 @@ class ColumnFileTiny : public ColumnFilePersisted
BlockPtr schema,
PageIdU64 data_page_id,
size_t rows,
size_t bytes);
size_t bytes,
IndexInfosPtr index_infos);
static std::tuple<ColumnFilePersistedPtr, BlockPtr> createFromCheckpoint(
const LoggerPtr & parent_log,
const DMContext & context,
Expand Down
34 changes: 19 additions & 15 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@

#include <ext/scope_guard.h>

namespace DB
{
namespace DM
namespace DB::DM
{

// ================================================
// Public methods
// ================================================
Expand Down Expand Up @@ -108,16 +107,6 @@ std::string DeltaValueSpace::serializeMeta() const
return wb.releaseStr();
}

template <class ColumnFileT>
struct CloneColumnFilesHelper
{
static std::vector<ColumnFileT> clone(
DMContext & dm_context,
const std::vector<ColumnFileT> & src,
const RowKeyRange & target_range,
WriteBatches & wbs);
};

template <class ColumnFilePtrT>
std::vector<ColumnFilePtrT> CloneColumnFilesHelper<ColumnFilePtrT>::clone(
DMContext & dm_context,
Expand Down Expand Up @@ -158,6 +147,21 @@ std::vector<ColumnFilePtrT> CloneColumnFilesHelper<ColumnFilePtrT>::clone(
// Use a newly created page_id to reference the data page_id of current column file.
PageIdU64 new_data_page_id = dm_context.storage_pool->newLogPageId();
wbs.log.putRefPage(new_data_page_id, t->getDataPageId());
if (auto index_infos = t->getIndexInfos(); index_infos)
{
auto new_index_infos = std::make_shared<ColumnFileTiny::IndexInfos>();
new_index_infos->reserve(index_infos->size());
// Use a newly created page_id to reference the index page_id of current column file.
for (auto & index_info : *index_infos)
{
auto new_index_page_id = dm_context.storage_pool->newLogPageId();
wbs.log.putRefPage(new_index_page_id, index_info.index_page_id);
new_index_infos->emplace_back(new_index_page_id, index_info.vector_index);
}
auto new_column_file = t->cloneWith(new_data_page_id, new_index_infos);
cloned.push_back(new_column_file);
continue;
}
auto new_column_file = t->cloneWith(new_data_page_id);
cloned.push_back(new_column_file);
}
Expand Down Expand Up @@ -503,5 +507,5 @@ bool DeltaValueSpace::compact(DMContext & context)

return true;
}
} // namespace DM
} // namespace DB

} // namespace DB::DM
10 changes: 10 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,16 @@ class DeltaValueSpace
DeltaSnapshotPtr createSnapshot(const DMContext & context, bool for_update, CurrentMetrics::Metric type);
};

template <class ColumnFileT>
struct CloneColumnFilesHelper
{
static std::vector<ColumnFileT> clone(
DMContext & dm_context,
const std::vector<ColumnFileT> & src,
const RowKeyRange & target_range,
WriteBatches & wbs);
};

class DeltaValueSnapshot
: public std::enable_shared_from_this<DeltaValueSnapshot>
, private boost::noncopyable
Expand Down
Loading