Skip to content

Commit

Permalink
[CH] Added cleanup logic for expiration mergetree part cache
Browse files Browse the repository at this point in the history
What changes were proposed in this pull request?
Added cleanup logic for expiration mergetree part cache

How was this patch tested?
unit tests, manual tests

(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
  • Loading branch information
liuneng1994 authored Aug 21, 2024
1 parent be35d57 commit 371be6f
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 16 deletions.
8 changes: 4 additions & 4 deletions cpp-ch/local-engine/Common/GlutenConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,14 +172,14 @@ struct MergeTreeConfig
inline static const String TABLE_PART_METADATA_CACHE_MAX_COUNT = "table_part_metadata_cache_max_count";
inline static const String TABLE_METADATA_CACHE_MAX_COUNT = "table_metadata_cache_max_count";

size_t table_part_metadata_cache_max_count = 1000;
size_t table_metadata_cache_max_count = 100;
size_t table_part_metadata_cache_max_count = 5000;
size_t table_metadata_cache_max_count = 500;

static MergeTreeConfig loadFromContext(DB::ContextPtr context)
{
MergeTreeConfig config;
config.table_part_metadata_cache_max_count = context->getConfigRef().getUInt64(TABLE_PART_METADATA_CACHE_MAX_COUNT, 1000);
config.table_metadata_cache_max_count = context->getConfigRef().getUInt64(TABLE_METADATA_CACHE_MAX_COUNT, 100);
config.table_part_metadata_cache_max_count = context->getConfigRef().getUInt64(TABLE_PART_METADATA_CACHE_MAX_COUNT, 5000);
config.table_metadata_cache_max_count = context->getConfigRef().getUInt64(TABLE_METADATA_CACHE_MAX_COUNT, 500);
return config;
}
};
Expand Down
6 changes: 3 additions & 3 deletions cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,16 +262,16 @@ MergeTreeData::LoadPartResult CustomStorageMergeTree::loadDataPart(
return res;
}

void CustomStorageMergeTree::removePartFromMemory(const MergeTreeData::DataPartPtr & part_to_detach)
void CustomStorageMergeTree::removePartFromMemory(const MergeTreeData::DataPart & part_to_detach)
{
auto lock = lockParts();
bool removed_active_part = false;
bool restored_active_part = false;

auto it_part = data_parts_by_info.find(part_to_detach->info);
auto it_part = data_parts_by_info.find(part_to_detach.info);
if (it_part == data_parts_by_info.end())
{
LOG_DEBUG(log, "No such data part {}", part_to_detach->getNameWithState());
LOG_DEBUG(log, "No such data part {}", part_to_detach.getNameWithState());
return;
}

Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Storages/CustomStorageMergeTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class CustomStorageMergeTree final : public MergeTreeData
bool scheduleDataProcessingJob(BackgroundJobsAssignee & executor) override;
std::map<std::string, MutationCommands> getUnfinishedMutationCommands() const override;
std::vector<MergeTreeDataPartPtr> loadDataPartsWithNames(std::unordered_set<std::string> parts);
void removePartFromMemory(const MergeTreeData::DataPartPtr & part_to_detach);
void removePartFromMemory(const MergeTreeData::DataPart & part_to_detach);

MergeTreeDataWriter writer;
MergeTreeDataSelectExecutor reader;
Expand Down
12 changes: 6 additions & 6 deletions cpp-ch/local-engine/Storages/StorageMergeTreeFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,20 +87,20 @@ DataPartsVector StorageMergeTreeFactory::getDataPartsByNames(const StorageID & i
std::unordered_set<String> missing_names;
if (!datapart_map->has(table_name)) [[unlikely]]
{
auto cache = std::make_shared<Poco::LRUCache<std::string, DataPartPtr>>(config.table_part_metadata_cache_max_count);
auto cache = std::make_shared<Poco::LRUCache<std::string, DataPartStorageHolderPtr>>(config.table_part_metadata_cache_max_count);
datapart_map->add(table_name, cache);
}

// find the missing cache part name
for (const auto & name : part_name)
{
if (!(*(datapart_map->get(table_name)))->has(name))
if (!(*datapart_map->get(table_name))->has(name))
{
missing_names.emplace(name);
}
else
{
res.emplace_back((*((*(datapart_map->get(table_name)))->get(name))));
res.emplace_back((*datapart_map->get(table_name))->get(name)->get()->dataPart());
}
}

Expand All @@ -112,17 +112,17 @@ DataPartsVector StorageMergeTreeFactory::getDataPartsByNames(const StorageID & i
storage_merge_tree = storage_map->get(table_name)->first;
}
auto missing_parts = storage_merge_tree->loadDataPartsWithNames(missing_names);
for (const auto & part : missing_parts)
for (auto & part : missing_parts)
{
res.emplace_back(part);
(*(datapart_map->get(table_name)))->add(part->name, part);
(*datapart_map->get(table_name))->add(part->name, std::make_shared<DataPartStorageHolder>(part, storage_merge_tree));
}
}
return res;
}
// will be inited in native init phase
std::unique_ptr<Poco::LRUCache<std::string, std::pair<CustomStorageMergeTreePtr, MergeTreeTable>>> StorageMergeTreeFactory::storage_map = nullptr;
std::unique_ptr<Poco::LRUCache<std::string, std::shared_ptr<Poco::LRUCache<std::string, DataPartPtr>>>> StorageMergeTreeFactory::datapart_map = nullptr;
std::unique_ptr<Poco::LRUCache<std::string, std::shared_ptr<Poco::LRUCache<std::string, DataPartStorageHolderPtr>>>> StorageMergeTreeFactory::datapart_map = nullptr;
std::recursive_mutex StorageMergeTreeFactory::storage_map_mutex;
std::recursive_mutex StorageMergeTreeFactory::datapart_mutex;

Expand Down
36 changes: 34 additions & 2 deletions cpp-ch/local-engine/Storages/StorageMergeTreeFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,37 @@ namespace local_engine
{
using CustomStorageMergeTreePtr = std::shared_ptr<CustomStorageMergeTree>;

class DataPartStorageHolder
{
public:
DataPartStorageHolder(const DataPartPtr& data_part, const CustomStorageMergeTreePtr& storage)
: data_part_(data_part),
storage_(storage)
{
}

[[nodiscard]] DataPartPtr dataPart() const
{
return data_part_;
}

[[nodiscard]] CustomStorageMergeTreePtr storage() const
{
return storage_;
}

~DataPartStorageHolder()
{
storage_->removePartFromMemory(*data_part_);
std::cerr << fmt::format("clean part {}", data_part_->name) << std::endl;
}

private:
DataPartPtr data_part_;
CustomStorageMergeTreePtr storage_;
};
using DataPartStorageHolderPtr = std::shared_ptr<DataPartStorageHolder>;

class StorageMergeTreeFactory
{
public:
Expand All @@ -50,7 +81,7 @@ class StorageMergeTreeFactory
auto & datapart_map_v = datapart_map;
if (!datapart_map_v)
{
datapart_map_v = std::make_unique<Poco::LRUCache<std::string, std::shared_ptr<Poco::LRUCache<std::string, DataPartPtr>>>>(
datapart_map_v = std::make_unique<Poco::LRUCache<std::string, std::shared_ptr<Poco::LRUCache<std::string, DataPartStorageHolderPtr>>>>(
config.table_metadata_cache_max_count);
}
else
Expand All @@ -68,7 +99,8 @@ class StorageMergeTreeFactory

private:
static std::unique_ptr<Poco::LRUCache<std::string, std::pair<CustomStorageMergeTreePtr, MergeTreeTable>>> storage_map;
static std::unique_ptr<Poco::LRUCache<std::string, std::shared_ptr<Poco::LRUCache<std::string, DataPartPtr>>>> datapart_map;
static std::unique_ptr<Poco::LRUCache<std::string, std::shared_ptr<Poco::LRUCache<std::string, DataPartStorageHolderPtr>>>> datapart_map;

static std::recursive_mutex storage_map_mutex;
static std::recursive_mutex datapart_mutex;
};
Expand Down

0 comments on commit 371be6f

Please sign in to comment.