diff --git a/internal/core/src/common/Common.h b/internal/core/src/common/Common.h index 49fcbcb7c8592..7be8ccf8543f3 100644 --- a/internal/core/src/common/Common.h +++ b/internal/core/src/common/Common.h @@ -30,6 +30,32 @@ extern int64_t LOW_PRIORITY_THREAD_CORE_COEFFICIENT; extern int CPU_NUM; extern int64_t EXEC_EVAL_EXPR_BATCH_SIZE; +enum class Role { QueryNode, IndexNode }; + +const std::unordered_map RoleToStringMap = { + {Role::QueryNode, "querynode"}, {Role::IndexNode, "indexnode"}}; + +// convert role to string +inline std::string +ToString(Role role) { + auto it = RoleToStringMap.find(role); + if (it != RoleToStringMap.end()) { + return it->second; + } + PanicInfo(UnexpectedError, "role {} not found", int(role)); +} + +// convert string to role +inline Role +FromString(const std::string& role_str) { + for (const auto& pair : RoleToStringMap) { + if (pair.second == role_str) { + return pair.first; + } + } + PanicInfo(UnexpectedError, "role {} not found", role_str); +} + void SetIndexSliceSize(const int64_t size); diff --git a/internal/core/src/index/InvertedIndexTantivy.cpp b/internal/core/src/index/InvertedIndexTantivy.cpp index 0072c3de5d0c8..0cca570dd66dc 100644 --- a/internal/core/src/index/InvertedIndexTantivy.cpp +++ b/internal/core/src/index/InvertedIndexTantivy.cpp @@ -104,7 +104,7 @@ InvertedIndexTantivy::InvertedIndexTantivy( template InvertedIndexTantivy::~InvertedIndexTantivy() { auto local_chunk_manager = - storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + storage::LocalChunkManagerFactory::GetInstance().GetChunkManager(); auto prefix = path_; LOG_INFO("inverted index remove path:{}", path_); local_chunk_manager->RemoveDir(prefix); diff --git a/internal/core/src/index/VectorDiskIndex.cpp b/internal/core/src/index/VectorDiskIndex.cpp index e6360eb199159..4e078cbb5db25 100644 --- a/internal/core/src/index/VectorDiskIndex.cpp +++ b/internal/core/src/index/VectorDiskIndex.cpp @@ -47,7 +47,7 @@ VectorDiskAnnIndex::VectorDiskAnnIndex( std::make_shared(file_manager_context); AssertInfo(file_manager_ != nullptr, "create file manager failed!"); auto local_chunk_manager = - storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + storage::LocalChunkManagerFactory::GetInstance().GetChunkManager(); auto local_index_path_prefix = file_manager_->GetLocalIndexObjectPrefix(); // As we have guarded dup-load in QueryNode, @@ -135,7 +135,8 @@ template void VectorDiskAnnIndex::Build(const Config& config) { auto local_chunk_manager = - storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + storage::LocalChunkManagerFactory::GetInstance().GetChunkManager( + milvus::Role::IndexNode); knowhere::Json build_config; build_config.update(config); @@ -185,7 +186,8 @@ void VectorDiskAnnIndex::BuildWithDataset(const DatasetPtr& dataset, const Config& config) { auto local_chunk_manager = - storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + storage::LocalChunkManagerFactory::GetInstance().GetChunkManager( + milvus::Role::IndexNode); knowhere::Json build_config; build_config.update(config); // set data path @@ -376,7 +378,7 @@ template void VectorDiskAnnIndex::CleanLocalData() { auto local_chunk_manager = - storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + storage::LocalChunkManagerFactory::GetInstance().GetChunkManager(); local_chunk_manager->RemoveDir(file_manager_->GetLocalIndexObjectPrefix()); local_chunk_manager->RemoveDir( file_manager_->GetLocalRawDataObjectPrefix()); diff --git a/internal/core/src/segcore/load_index_c.cpp b/internal/core/src/segcore/load_index_c.cpp index 16d7bcdd89344..984c264d81304 100644 --- a/internal/core/src/segcore/load_index_c.cpp +++ b/internal/core/src/segcore/load_index_c.cpp @@ -15,6 +15,7 @@ #include "common/EasyAssert.h" #include "common/Types.h" #include "common/type_c.h" +#include "common/Common.h" #include "index/Index.h" #include "index/IndexFactory.h" #include "index/Meta.h" @@ -424,8 +425,8 @@ CleanLoadedIndex(CLoadIndexInfo c_load_index_info) { auto load_index_info = (milvus::segcore::LoadIndexInfo*)c_load_index_info; auto local_chunk_manager = - milvus::storage::LocalChunkManagerSingleton::GetInstance() - .GetChunkManager(); + milvus::storage::LocalChunkManagerFactory::GetInstance() + .GetChunkManager(milvus::Role::QueryNode); auto index_file_path_prefix = milvus::storage::GenIndexPathPrefix(local_chunk_manager, load_index_info->index_build_id, diff --git a/internal/core/src/storage/DiskFileManagerImpl.cpp b/internal/core/src/storage/DiskFileManagerImpl.cpp index e54fa6d748825..7b4044c79b878 100644 --- a/internal/core/src/storage/DiskFileManagerImpl.cpp +++ b/internal/core/src/storage/DiskFileManagerImpl.cpp @@ -48,13 +48,18 @@ namespace milvus::storage { DiskFileManagerImpl::DiskFileManagerImpl( const FileManagerContext& fileManagerContext) : FileManagerImpl(fileManagerContext.fieldDataMeta, - fileManagerContext.indexMeta) { + fileManagerContext.indexMeta), + for_loading_index_(fileManagerContext.for_loading_index) { rcm_ = fileManagerContext.chunkManagerPtr; } DiskFileManagerImpl::~DiskFileManagerImpl() { auto local_chunk_manager = - LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + for_loading_index_ + ? LocalChunkManagerFactory::GetInstance().GetChunkManager( + milvus::Role::QueryNode) + : LocalChunkManagerFactory::GetInstance().GetChunkManager( + milvus::Role::IndexNode); local_chunk_manager->RemoveDir(GetIndexPathPrefixWithBuildID( local_chunk_manager, index_meta_.build_id)); } @@ -82,7 +87,7 @@ DiskFileManagerImpl::GetRemoteTextLogPath(const std::string& file_name, bool DiskFileManagerImpl::AddFile(const std::string& file) noexcept { auto local_chunk_manager = - LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + LocalChunkManagerFactory::GetInstance().GetChunkManager(); FILEMANAGER_TRY if (!local_chunk_manager->Exist(file)) { LOG_ERROR("local file {} not exists", file); @@ -134,7 +139,7 @@ DiskFileManagerImpl::AddFile(const std::string& file) noexcept { bool DiskFileManagerImpl::AddTextLog(const std::string& file) noexcept { auto local_chunk_manager = - LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + LocalChunkManagerFactory::GetInstance().GetChunkManager(); FILEMANAGER_TRY if (!local_chunk_manager->Exist(file)) { LOG_ERROR("local file {} not exists", file); @@ -190,7 +195,7 @@ DiskFileManagerImpl::AddBatchIndexFiles( const std::vector& remote_files, const std::vector& remote_file_sizes) { auto local_chunk_manager = - LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + LocalChunkManagerFactory::GetInstance().GetChunkManager(); auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::HIGH); std::vector>> futures; @@ -239,7 +244,7 @@ void DiskFileManagerImpl::CacheIndexToDisk( const std::vector& remote_files) { auto local_chunk_manager = - LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + LocalChunkManagerFactory::GetInstance().GetChunkManager(); std::map> index_slices; for (auto& file_path : remote_files) { @@ -307,7 +312,7 @@ void DiskFileManagerImpl::CacheTextLogToDisk( const std::vector& remote_files) { auto local_chunk_manager = - LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + LocalChunkManagerFactory::GetInstance().GetChunkManager(); std::map> index_slices; for (auto& file_path : remote_files) { @@ -376,7 +381,8 @@ DiskFileManagerImpl::CacheRawDataToDisk(std::vector remote_files) { auto field_id = GetFieldDataMeta().field_id; auto local_chunk_manager = - LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + LocalChunkManagerFactory::GetInstance().GetChunkManager( + milvus::Role::IndexNode); std::string local_data_path; bool file_created = false; @@ -619,7 +625,8 @@ DiskFileManagerImpl::CacheOptFieldToDisk(OptFieldT& fields_map) { auto segment_id = GetFieldDataMeta().segment_id; auto vec_field_id = GetFieldDataMeta().field_id; auto local_chunk_manager = - LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + LocalChunkManagerFactory::GetInstance().GetChunkManager( + milvus::Role::IndexNode); auto local_data_path = storage::GenFieldRawDataPathPrefix( local_chunk_manager, segment_id, vec_field_id) + std::string(VEC_OPT_FIELDS); @@ -696,7 +703,8 @@ DiskFileManagerImpl::GetFileName(const std::string& localfile) { std::string DiskFileManagerImpl::GetLocalIndexObjectPrefix() { auto local_chunk_manager = - LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + LocalChunkManagerFactory::GetInstance().GetChunkManager( + milvus::Role::QueryNode); return GenIndexPathPrefix( local_chunk_manager, index_meta_.build_id, index_meta_.index_version); } @@ -704,7 +712,8 @@ DiskFileManagerImpl::GetLocalIndexObjectPrefix() { std::string DiskFileManagerImpl::GetLocalTextIndexPrefix() { auto local_chunk_manager = - LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + LocalChunkManagerFactory::GetInstance().GetChunkManager( + milvus::Role::QueryNode); return GenTextIndexPathPrefix(local_chunk_manager, index_meta_.build_id, index_meta_.index_version, @@ -729,7 +738,8 @@ DiskFileManagerImpl::GetTextIndexIdentifier() { std::string DiskFileManagerImpl::GetLocalRawDataObjectPrefix() { auto local_chunk_manager = - LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + LocalChunkManagerFactory::GetInstance().GetChunkManager( + milvus::Role::IndexNode); return GenFieldRawDataPathPrefix( local_chunk_manager, field_meta_.segment_id, field_meta_.field_id); } @@ -744,7 +754,7 @@ std::optional DiskFileManagerImpl::IsExisted(const std::string& file) noexcept { bool isExist = false; auto local_chunk_manager = - LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + LocalChunkManagerFactory::GetInstance().GetChunkManager(); try { isExist = local_chunk_manager->Exist(file); } catch (std::exception& e) { diff --git a/internal/core/src/storage/DiskFileManagerImpl.h b/internal/core/src/storage/DiskFileManagerImpl.h index b2c87b1ff78db..35ed387515140 100644 --- a/internal/core/src/storage/DiskFileManagerImpl.h +++ b/internal/core/src/storage/DiskFileManagerImpl.h @@ -131,6 +131,9 @@ class DiskFileManagerImpl : public FileManagerImpl { // remote file path std::map remote_paths_to_size_; + + // indicate whether disk file manager is used for building index or load index + bool for_loading_index_; }; using DiskANNFileManagerImplPtr = std::shared_ptr; diff --git a/internal/core/src/storage/LocalChunkManagerSingleton.h b/internal/core/src/storage/LocalChunkManagerSingleton.h index 2715796d7b771..41959874b5b3a 100644 --- a/internal/core/src/storage/LocalChunkManagerSingleton.h +++ b/internal/core/src/storage/LocalChunkManagerSingleton.h @@ -20,41 +20,56 @@ #include #include +#include "common/Common.h" #include "storage/ChunkManager.h" #include "storage/LocalChunkManager.h" namespace milvus::storage { -class LocalChunkManagerSingleton { - private: - LocalChunkManagerSingleton() { - } - +class LocalChunkManagerFactory { public: - LocalChunkManagerSingleton(const LocalChunkManagerSingleton&) = delete; - LocalChunkManagerSingleton& - operator=(const LocalChunkManagerSingleton&) = delete; - - static LocalChunkManagerSingleton& + static LocalChunkManagerFactory& GetInstance() { - static LocalChunkManagerSingleton instance; + static LocalChunkManagerFactory instance; return instance; } void - Init(std::string root_path) { - if (lcm_ == nullptr) { - lcm_ = std::make_shared(root_path); + AddChunkManager(Role role, std::string root_path) { + std::unique_lock lock(mutex_); + if (chunk_managers_.find(role) != chunk_managers_.end()) { + PanicInfo(UnexpectedError, + "chunk manager for role {} already exists", + ToString(role)); + } + chunk_managers_[role] = std::make_shared(root_path); + } + + LocalChunkManagerSPtr + GetChunkManager(Role role) const { + std::shared_lock lock(mutex_); + auto it = chunk_managers_.find(role); + if (it == chunk_managers_.end()) { + PanicInfo(UnexpectedError, + "local chunk manager for role:{} not found", + ToString(role)); } + return it->second; } + // some situations not need to specify the role + // just randomly choose one chunk manager + // because local chunk manager no need root_path + // and interface use abs paths params LocalChunkManagerSPtr - GetChunkManager() { - return lcm_; + GetChunkManager() const { + std::shared_lock lock(mutex_); + Assert(chunk_managers_.size() != 0); + return chunk_managers_.begin()->second; } - private: - LocalChunkManagerSPtr lcm_ = nullptr; + mutable std::shared_mutex mutex_; + std::unordered_map chunk_managers_; }; } // namespace milvus::storage \ No newline at end of file diff --git a/internal/core/src/storage/MmapChunkManager.cpp b/internal/core/src/storage/MmapChunkManager.cpp index 935e9db2e8260..2bff3c0cc112c 100644 --- a/internal/core/src/storage/MmapChunkManager.cpp +++ b/internal/core/src/storage/MmapChunkManager.cpp @@ -228,7 +228,7 @@ MmapChunkManager::~MmapChunkManager() { } // clean the mmap dir auto cm = - storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + storage::LocalChunkManagerFactory::GetInstance().GetChunkManager(); if (cm->Exist(mmap_file_prefix_)) { cm->RemoveDir(mmap_file_prefix_); } @@ -310,7 +310,7 @@ MmapChunkManager::MmapChunkManager(std::string root_path, std::make_unique(disk_limit, file_size, root_path); mmap_file_prefix_ = root_path; auto cm = - storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + storage::LocalChunkManagerFactory::GetInstance().GetChunkManager(); AssertInfo(cm != nullptr, "Fail to get LocalChunkManager, LocalChunkManagerPtr is null"); if (cm->Exist(root_path)) { diff --git a/internal/core/src/storage/storage_c.cpp b/internal/core/src/storage/storage_c.cpp index d2305b9153bdd..00d278f3bd44f 100644 --- a/internal/core/src/storage/storage_c.cpp +++ b/internal/core/src/storage/storage_c.cpp @@ -15,6 +15,7 @@ // limitations under the License. #include "storage/storage_c.h" +#include "common/Common.h" #include "monitor/prometheus_client.h" #include "storage/RemoteChunkManagerSingleton.h" #include "storage/LocalChunkManagerSingleton.h" @@ -24,7 +25,7 @@ CStatus GetLocalUsedSize(const char* c_dir, int64_t* size) { try { auto local_chunk_manager = - milvus::storage::LocalChunkManagerSingleton::GetInstance() + milvus::storage::LocalChunkManagerFactory::GetInstance() .GetChunkManager(); std::string dir(c_dir); if (local_chunk_manager->DirExist(dir)) { @@ -39,10 +40,12 @@ GetLocalUsedSize(const char* c_dir, int64_t* size) { } CStatus -InitLocalChunkManagerSingleton(const char* c_path) { +InitLocalChunkManager(const char* c_role, const char* c_path) { try { + std::string role(c_role); std::string path(c_path); - milvus::storage::LocalChunkManagerSingleton::GetInstance().Init(path); + milvus::storage::LocalChunkManagerFactory::GetInstance() + .AddChunkManager(milvus::FromString(role), path); return milvus::SuccessCStatus(); } catch (std::exception& e) { diff --git a/internal/core/src/storage/storage_c.h b/internal/core/src/storage/storage_c.h index d6cac3b46fae9..bf64d507dc1d9 100644 --- a/internal/core/src/storage/storage_c.h +++ b/internal/core/src/storage/storage_c.h @@ -25,7 +25,7 @@ CStatus GetLocalUsedSize(const char* c_path, int64_t* size); CStatus -InitLocalChunkManagerSingleton(const char* path); +InitLocalChunkManager(const char* c_role, const char* path); CStatus InitRemoteChunkManagerSingleton(CStorageConfig c_storage_config); diff --git a/internal/core/unittest/init_gtest.cpp b/internal/core/unittest/init_gtest.cpp index adc1b3b683256..52633a3c52b3e 100644 --- a/internal/core/unittest/init_gtest.cpp +++ b/internal/core/unittest/init_gtest.cpp @@ -22,8 +22,10 @@ main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); folly::Init follyInit(&argc, &argv, false); - milvus::storage::LocalChunkManagerSingleton::GetInstance().Init( - TestLocalPath); + milvus::storage::LocalChunkManagerFactory::GetInstance().AddChunkManager( + milvus::Role::QueryNode, TestLocalPath); + milvus::storage::LocalChunkManagerFactory::GetInstance().AddChunkManager( + milvus::Role::IndexNode, TestLocalPath); milvus::storage::RemoteChunkManagerSingleton::GetInstance().Init( get_default_local_storage_config()); milvus::storage::MmapManager::GetInstance().Init(get_default_mmap_config()); diff --git a/internal/core/unittest/test_chunk_cache.cpp b/internal/core/unittest/test_chunk_cache.cpp index 9888097e59ac8..2644e7be11abf 100644 --- a/internal/core/unittest/test_chunk_cache.cpp +++ b/internal/core/unittest/test_chunk_cache.cpp @@ -81,7 +81,8 @@ class ChunkCacheTest sparse_metric_type, false); - lcm = milvus::storage::LocalChunkManagerSingleton::GetInstance() + lcm = milvus::storage::LocalChunkManagerFactory() + .GetInstance() .GetChunkManager(); dense_data = dataset.get_col(fake_dense_vec_id); sparse_data = dataset.get_col>( diff --git a/internal/core/unittest/test_disk_file_manager_test.cpp b/internal/core/unittest/test_disk_file_manager_test.cpp index 565e063b6d938..a63e8f2727095 100644 --- a/internal/core/unittest/test_disk_file_manager_test.cpp +++ b/internal/core/unittest/test_disk_file_manager_test.cpp @@ -64,7 +64,7 @@ class DiskAnnFileManagerTest : public testing::Test { }; TEST_F(DiskAnnFileManagerTest, AddFilePositiveParallel) { - auto lcm = LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + auto lcm = LocalChunkManagerFactory::GetInstance().GetChunkManager(); std::string indexFilePath = "/tmp/diskann/index_files/1000/index"; auto exist = lcm->Exist(indexFilePath); EXPECT_EQ(exist, false); diff --git a/internal/core/unittest/test_local_chunk_manager.cpp b/internal/core/unittest/test_local_chunk_manager.cpp index e2df1a9663800..e8819fa78b832 100644 --- a/internal/core/unittest/test_local_chunk_manager.cpp +++ b/internal/core/unittest/test_local_chunk_manager.cpp @@ -25,7 +25,7 @@ using namespace milvus::storage; class LocalChunkManagerTest : public testing::Test {}; TEST_F(LocalChunkManagerTest, DirPositive) { - auto lcm = LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + auto lcm = LocalChunkManagerFactory::GetInstance().GetChunkManager(); string test_dir = lcm->GetRootPath() + "/local-test-dir/"; lcm->RemoveDir(test_dir); lcm->CreateDir(test_dir); @@ -39,7 +39,7 @@ TEST_F(LocalChunkManagerTest, DirPositive) { } TEST_F(LocalChunkManagerTest, FilePositive) { - auto lcm = LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + auto lcm = LocalChunkManagerFactory::GetInstance().GetChunkManager(); string test_dir = lcm->GetRootPath() + "/local-test-dir"; string file = test_dir + "/test-file"; @@ -59,7 +59,7 @@ TEST_F(LocalChunkManagerTest, FilePositive) { } TEST_F(LocalChunkManagerTest, WritePositive) { - auto lcm = LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + auto lcm = LocalChunkManagerFactory::GetInstance().GetChunkManager(); string test_dir = lcm->GetRootPath() + "/local-test-dir"; string file = test_dir + "/test-write-positive"; @@ -92,7 +92,7 @@ TEST_F(LocalChunkManagerTest, WritePositive) { } TEST_F(LocalChunkManagerTest, ReadPositive) { - auto lcm = LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + auto lcm = LocalChunkManagerFactory::GetInstance().GetChunkManager(); string test_dir = lcm->GetRootPath() + "/local-test-dir"; uint8_t data[5] = {0x17, 0x32, 0x45, 0x34, 0x23}; @@ -139,7 +139,7 @@ TEST_F(LocalChunkManagerTest, ReadPositive) { } TEST_F(LocalChunkManagerTest, WriteOffset) { - auto lcm = LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + auto lcm = LocalChunkManagerFactory::GetInstance().GetChunkManager(); string test_dir = lcm->GetRootPath() + "/local-test-dir"; string file = test_dir + "/test-write-offset"; @@ -183,7 +183,7 @@ TEST_F(LocalChunkManagerTest, WriteOffset) { } TEST_F(LocalChunkManagerTest, ReadOffset) { - auto lcm = LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + auto lcm = LocalChunkManagerFactory::GetInstance().GetChunkManager(); string test_dir = lcm->GetRootPath() + "/local-test-dir"; string file = test_dir + "/test-read-offset"; @@ -219,7 +219,7 @@ TEST_F(LocalChunkManagerTest, ReadOffset) { } TEST_F(LocalChunkManagerTest, GetSizeOfDir) { - auto lcm = LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + auto lcm = LocalChunkManagerFactory::GetInstance().GetChunkManager(); auto test_dir = lcm->GetRootPath() + "/local-test-dir"; EXPECT_EQ(lcm->DirExist(test_dir), false); lcm->CreateDir(test_dir); diff --git a/internal/core/unittest/test_sealed.cpp b/internal/core/unittest/test_sealed.cpp index 15e7f50b8a90d..872993ca611fa 100644 --- a/internal/core/unittest/test_sealed.cpp +++ b/internal/core/unittest/test_sealed.cpp @@ -1538,7 +1538,7 @@ TEST(Sealed, GetSparseVectorFromChunkCache) { auto file_name = std::string( "sealed_test_get_vector_from_chunk_cache/insert_log/1/101/1000000"); - auto lcm = milvus::storage::LocalChunkManagerSingleton::GetInstance() + auto lcm = milvus::storage::LocalChunkManagerFactory::GetInstance() .GetChunkManager(); auto schema = std::make_shared(); diff --git a/internal/core/unittest/test_storage.cpp b/internal/core/unittest/test_storage.cpp index 40f80528e83cf..51491533bb8be 100644 --- a/internal/core/unittest/test_storage.cpp +++ b/internal/core/unittest/test_storage.cpp @@ -69,14 +69,9 @@ class StorageTest : public testing::Test { } }; -TEST_F(StorageTest, InitLocalChunkManagerSingleton) { - auto status = InitLocalChunkManagerSingleton("tmp"); - EXPECT_EQ(status.error_code, Success); -} - TEST_F(StorageTest, GetLocalUsedSize) { int64_t size = 0; - auto lcm = LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + auto lcm = LocalChunkManagerFactory::GetInstance().GetChunkManager(); EXPECT_EQ(lcm->GetRootPath(), "/tmp/milvus/local_data/"); string test_dir = lcm->GetRootPath() + "tmp"; string test_file = test_dir + "/test.txt"; diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index 31617d18276ed..b67faa5071a1f 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -186,7 +186,7 @@ func (i *IndexNode) initSegcore() { C.SegcoreSetKnowhereBuildThreadPoolNum(cKnowhereThreadPoolSize) localDataRootPath := filepath.Join(Params.LocalStorageCfg.Path.GetValue(), typeutil.IndexNodeRole) - initcore.InitLocalChunkManager(localDataRootPath) + initcore.InitLocalChunkManager(typeutil.IndexNodeRole, localDataRootPath) cGpuMemoryPoolInitSize := C.uint32_t(paramtable.Get().GpuConfig.InitSize.GetAsUint32()) cGpuMemoryPoolMaxSize := C.uint32_t(paramtable.Get().GpuConfig.MaxSize.GetAsUint32()) C.SegcoreSetKnowhereGpuMemoryPoolSize(cGpuMemoryPoolInitSize, cGpuMemoryPoolMaxSize) diff --git a/internal/indexnode/task_index.go b/internal/indexnode/task_index.go index 87e0f44be8684..a33b2f9e3b7fa 100644 --- a/internal/indexnode/task_index.go +++ b/internal/indexnode/task_index.go @@ -19,6 +19,7 @@ package indexnode import ( "context" "fmt" + "path/filepath" "strconv" "strings" "time" @@ -42,6 +43,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/timerecord" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) // IndexBuildTask is used to record the information of the index tasks. @@ -221,7 +223,8 @@ func (it *indexBuildTask) Execute(ctx context.Context) error { } // check load size and size of field data - localUsedSize, err := indexcgowrapper.GetLocalUsedSize(paramtable.Get().LocalStorageCfg.Path.GetValue()) + localInPath := filepath.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), typeutil.IndexNodeRole) + localUsedSize, err := indexcgowrapper.GetLocalUsedSize(localInPath) if err != nil { log.Warn("IndexNode get local used size failed") return err diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 41188c1139bd5..85cf31d9b5c82 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -29,6 +29,7 @@ import "C" import ( "context" "fmt" + "path/filepath" "strings" "time" "unsafe" @@ -1274,7 +1275,8 @@ func (s *LocalSegment) Release(ctx context.Context, opts ...releaseOption) { GetDynamicPool().Submit(func() (any, error) { C.DeleteSegment(ptr) - localDiskUsage, err := segcore.GetLocalUsedSize(context.Background(), paramtable.Get().LocalStorageCfg.Path.GetValue()) + localQnPaths := filepath.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), typeutil.QueryNodeRole) + localDiskUsage, err := segcore.GetLocalUsedSize(context.Background(), localQnPaths) // ignore error here, shall not block releasing if err == nil { metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(localDiskUsage) / 1024 / 1024) // in MB diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 6e883ab30781a..d9722c2456acb 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -28,6 +28,7 @@ import ( "fmt" "io" "path" + "path/filepath" "runtime/debug" "strconv" "sync" @@ -453,7 +454,8 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer memoryUsage := hardware.GetUsedMemoryCount() totalMemory := hardware.GetMemoryCount() - diskUsage, err := segcore.GetLocalUsedSize(ctx, paramtable.Get().LocalStorageCfg.Path.GetValue()) + localQnPath := filepath.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), typeutil.QueryNodeRole) + diskUsage, err := segcore.GetLocalUsedSize(ctx, localQnPath) if err != nil { return result, errors.Wrap(err, "get local used size failed") } @@ -1366,7 +1368,8 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn return 0, 0, errors.New("get memory failed when checkSegmentSize") } - localDiskUsage, err := segcore.GetLocalUsedSize(ctx, paramtable.Get().LocalStorageCfg.Path.GetValue()) + localQnPaths := filepath.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), typeutil.QueryNodeRole) + localDiskUsage, err := segcore.GetLocalUsedSize(ctx, localQnPaths) if err != nil { return 0, 0, errors.Wrap(err, "get local used size failed") } diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index fd331a6bdfa0b..b8bb818c9eedf 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -244,7 +244,7 @@ func (node *QueryNode) InitSegcore() error { C.SegcoreSetKnowhereGpuMemoryPoolSize(cGpuMemoryPoolInitSize, cGpuMemoryPoolMaxSize) localDataRootPath := filepath.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), typeutil.QueryNodeRole) - initcore.InitLocalChunkManager(localDataRootPath) + initcore.InitLocalChunkManager(typeutil.QueryNodeRole, localDataRootPath) err := initcore.InitRemoteChunkManager(paramtable.Get()) if err != nil { @@ -321,7 +321,7 @@ func (node *QueryNode) Init() error { node.factory.Init(paramtable.Get()) - localRootPath := paramtable.Get().LocalStorageCfg.Path.GetValue() + localRootPath := filepath.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), typeutil.QueryNodeRole) localUsedSize, err := segcore.GetLocalUsedSize(node.ctx, localRootPath) if err != nil { log.Warn("get local used size failed", zap.Error(err)) diff --git a/internal/util/initcore/init_core.go b/internal/util/initcore/init_core.go index 88a4f4ae5aa15..fcd5b0b08d990 100644 --- a/internal/util/initcore/init_core.go +++ b/internal/util/initcore/init_core.go @@ -40,10 +40,12 @@ import ( "github.com/milvus-io/milvus/pkg/util/paramtable" ) -func InitLocalChunkManager(path string) { +func InitLocalChunkManager(role string, path string) { + CRole := C.CString(role) + defer C.free(unsafe.Pointer(CRole)) CLocalRootPath := C.CString(path) defer C.free(unsafe.Pointer(CLocalRootPath)) - C.InitLocalChunkManagerSingleton(CLocalRootPath) + C.InitLocalChunkManager(CRole, CLocalRootPath) } func InitTraceConfig(params *paramtable.ComponentParam) {