Skip to content

Commit

Permalink
[fix](cooldown)Support change be.conf: max_sub_cache_file_size (apach…
Browse files Browse the repository at this point in the history
…e#17773)

* delete files when sub file cache size is changed.
  • Loading branch information
pengxiangyu authored Mar 15, 2023
1 parent bbf88ec commit a378a60
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 25 deletions.
63 changes: 40 additions & 23 deletions be/src/io/cache/sub_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ Status SubFileCache::read_at(size_t offset, Slice result, const IOContext& io_ct

Status SubFileCache::read_at_impl(size_t offset, Slice result, const IOContext& io_ctx,
size_t* bytes_read) {
_init();
RETURN_IF_ERROR(_init());
if (io_ctx.reader_type != READER_QUERY) {
return _remote_file_reader->read_at(offset, result, io_ctx, bytes_read);
}
Expand Down Expand Up @@ -225,7 +225,7 @@ Status SubFileCache::_get_need_cache_offsets(size_t offset, size_t req_size,
}

Status SubFileCache::clean_timeout_cache() {
_init();
RETURN_IF_ERROR(_init());
SubGcQueue gc_queue;
_gc_lru_queue.swap(gc_queue);
std::vector<size_t> timeout_keys;
Expand Down Expand Up @@ -301,31 +301,48 @@ Status SubFileCache::_clean_cache_internal(size_t offset, size_t* cleaned_size)
return _remove_cache_and_done(cache_file, done_file, cleaned_size);
}

void SubFileCache::_init() {
auto init = [this] {
std::vector<Path> cache_names;
Status SubFileCache::_init() {
if (_is_inited) {
return Status::OK();
}
std::vector<Path> cache_names;

std::unique_lock<std::shared_mutex> wrlock(_cache_map_lock);
if (!_get_dir_files_and_remove_unfinished(_cache_dir, cache_names).ok()) {
return;
}
for (const auto& file : cache_names) {
auto str_vec = split(file.native(), "_");
size_t offset = std::strtoul(str_vec[str_vec.size() - 1].c_str(), nullptr, 10);
std::unique_lock<std::shared_mutex> wrlock(_cache_map_lock);
size_t cache_file_size = 0;
RETURN_IF_ERROR(_get_dir_files_and_remove_unfinished(_cache_dir, cache_names));
std::map<int64_t, int64_t> expect_file_size_map;
RETURN_IF_ERROR(_get_all_sub_file_size(&expect_file_size_map));
for (const auto& file : cache_names) {
auto str_vec = split(file.native(), "_");
size_t offset = std::strtoul(str_vec[str_vec.size() - 1].c_str(), nullptr, 10);

size_t file_size = 0;
auto path = _cache_dir / file;
if (io::global_local_filesystem()->file_size(path, &file_size).ok()) {
_last_match_times[offset] = time(nullptr);
_cache_file_size += file_size;
} else {
LOG(WARNING) << "get local cache file size failed:" << path.native();
_clean_cache_internal(offset, nullptr);
}
size_t file_size = 0;
auto path = _cache_dir / file;
RETURN_IF_ERROR(io::global_local_filesystem()->file_size(path, &file_size));
if (expect_file_size_map.find(offset) == expect_file_size_map.end() ||
expect_file_size_map[offset] != file_size) {
LOG(INFO) << "Delete invalid cache file: " << path.native() << ", offset: " << offset
<< ", size: " << file_size;
_clean_cache_internal(offset, nullptr);
continue;
}
};
_last_match_times[offset] = time(nullptr);
cache_file_size += file_size;
}
_cache_file_size = cache_file_size;
_is_inited = true;
return Status::OK();
}

std::call_once(init_flag, init);
Status SubFileCache::_get_all_sub_file_size(std::map<int64_t, int64_t>* expect_file_size_map) {
std::vector<size_t> cache_offsets;
RETURN_IF_ERROR(_get_need_cache_offsets(0, _remote_file_reader->size(), &cache_offsets));
for (int i = 0; i < cache_offsets.size() - 1; ++i) {
expect_file_size_map->emplace(cache_offsets[i], config::max_sub_cache_file_size);
}
expect_file_size_map->emplace(cache_offsets[cache_offsets.size() - 1],
_remote_file_reader->size() % config::max_sub_cache_file_size);
return Status::OK();
}

} // namespace io
Expand Down
6 changes: 4 additions & 2 deletions be/src/io/cache/sub_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ class SubFileCache final : public FileCache {

std::pair<Path, Path> _cache_path(size_t offset);

void _init();
Status _init();

Status _get_all_sub_file_size(std::map<int64_t, int64_t>* expect_file_size_map);

private:
struct SubFileInfo {
Expand All @@ -97,7 +99,7 @@ class SubFileCache final : public FileCache {
// offset_begin -> local file reader
std::map<size_t, io::FileReaderSPtr> _cache_file_readers;

std::once_flag init_flag;
bool _is_inited = false;
};

} // namespace io
Expand Down

0 comments on commit a378a60

Please sign in to comment.