Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry pick2 before #11

Open
wants to merge 5 commits into
base: feature/vector-index
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/tipb
1 change: 1 addition & 0 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ namespace DB
M(proactive_flush_force_set_type) \
M(exception_when_fetch_disagg_pages) \
M(cop_send_failure) \
M(file_cache_fg_download_fail) \
M(force_set_parallel_prehandle_threshold) \
M(force_raise_prehandle_exception) \
M(force_agg_on_partial_block) \
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Common/LRUCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ class LRUCache
return res;
}

/// Returns whether a specific key is in the LRU cache
/// without updating the LRU order.
bool contains(const Key & key)
{
std::lock_guard cache_lock(mutex);
return cells.find(key) != cells.end();
}

void set(const Key & key, const MappedPtr & mapped)
{
std::scoped_lock cache_lock(mutex);
Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
#include <Storages/DeltaMerge/ColumnFile/ColumnFileSchema.h>
#include <Storages/DeltaMerge/DeltaIndexManager.h>
#include <Storages/DeltaMerge/Index/MinMaxIndex.h>
#include <Storages/DeltaMerge/Index/VectorIndex.h>
#include <Storages/DeltaMerge/Index/VectorIndexCache.h>
#include <Storages/DeltaMerge/StoragePool/GlobalPageIdAllocator.h>
#include <Storages/DeltaMerge/StoragePool/GlobalStoragePool.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
Expand Down Expand Up @@ -1385,16 +1385,16 @@ void Context::dropMinMaxIndexCache() const
{
auto lock = getLock();
if (shared->minmax_index_cache)
shared->minmax_index_cache->reset();
shared->minmax_index_cache.reset();
}

void Context::setVectorIndexCache(size_t cache_size_in_bytes)
void Context::setVectorIndexCache(size_t cache_entities)
{
auto lock = getLock();

RUNTIME_CHECK(!shared->vector_index_cache);

shared->vector_index_cache = std::make_shared<DM::VectorIndexCache>(cache_size_in_bytes);
shared->vector_index_cache = std::make_shared<DM::VectorIndexCache>(cache_entities);
}

DM::VectorIndexCachePtr Context::getVectorIndexCache() const
Expand All @@ -1407,7 +1407,7 @@ void Context::dropVectorIndexCache() const
{
auto lock = getLock();
if (shared->vector_index_cache)
shared->vector_index_cache->reset();
shared->vector_index_cache.reset();
}

bool Context::isDeltaIndexLimited() const
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ class Context
std::shared_ptr<DM::MinMaxIndexCache> getMinMaxIndexCache() const;
void dropMinMaxIndexCache() const;

void setVectorIndexCache(size_t cache_size_in_bytes);
void setVectorIndexCache(size_t cache_entities);
std::shared_ptr<DM::VectorIndexCache> getVectorIndexCache() const;
void dropVectorIndexCache() const;

Expand Down
7 changes: 3 additions & 4 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1439,10 +1439,9 @@ int Server::main(const std::vector<std::string> & /*args*/)
if (minmax_index_cache_size)
global_context->setMinMaxIndexCache(minmax_index_cache_size);

// 1GiB vector index cache.
size_t vec_index_cache_size = config().getUInt64("vec_index_cache_size", 1ULL * 1024 * 1024 * 1024);
if (vec_index_cache_size)
global_context->setVectorIndexCache(vec_index_cache_size);
size_t vec_index_cache_entities = config().getUInt64("vec_index_cache_entities", 1000);
if (vec_index_cache_entities)
global_context->setVectorIndexCache(vec_index_cache_entities);

/// Size of max memory usage of DeltaIndex, used by DeltaMerge engine.
/// - In non-disaggregated mode, its default value is 0, means unlimited, and it
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/DeltaMerge/DMContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ struct DMContext : private boost::noncopyable
TableID physical_table_id_,
bool is_common_handle_,
size_t rowkey_column_size_,
const DB::Settings & settings)
const DB::Settings & settings,
const ScanContextPtr & scan_context = nullptr)
{
return std::unique_ptr<DMContext>(new DMContext(
session_context_,
Expand All @@ -145,7 +146,7 @@ struct DMContext : private boost::noncopyable
is_common_handle_,
rowkey_column_size_,
settings,
nullptr,
scan_context,
""));
}

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,14 @@ struct ColumnDefine
/// Note: ColumnDefine is used in both Write path and Read path.
/// In the read path, vector_index is usually not available. Use AnnQueryInfo for
/// read related vector index information.
TiDB::VectorIndexInfoPtr vector_index;
TiDB::VectorIndexDefinitionPtr vector_index;

explicit ColumnDefine(
ColId id_ = 0,
String name_ = "",
DataTypePtr type_ = nullptr,
Field default_value_ = Field{},
TiDB::VectorIndexInfoPtr vector_index_ = nullptr)
TiDB::VectorIndexDefinitionPtr vector_index_ = nullptr)
: id(id_)
, name(std::move(name_))
, type(std::move(type_))
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/File/ColumnStat.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ struct ColumnStat
size_t array_sizes_bytes = 0;
size_t array_sizes_mark_bytes = 0;

std::optional<dtpb::ColumnVectorIndexInfo> vector_index = std::nullopt;
std::optional<dtpb::VectorIndexFileProps> vector_index = std::nullopt;

dtpb::ColumnStat toProto() const
{
Expand Down
Loading