Skip to content

Commit

Permalink
Fix delete obsolete files on recovery not rate limited (#12590)
Browse files Browse the repository at this point in the history
Summary:
This PR fix the issue that deletion of obsolete files during DB::Open are not rate limited.

The root cause is slow deletion is disabled if trash/db size ratio exceeds the configured `max_trash_db_ratio` https://github.com/facebook/rocksdb/blob/d610e14f9386bab7f1fa85cf34dcb5b465152699/include/rocksdb/sst_file_manager.h#L126 however, the current handling in DB::Open starts with tracking nothing but the obsolete files. This will make the ratio always look like it's 1.

In order for the deletion rate limiting logic to work properly, we should only start deleting files after `SstFileManager` has finished tracking the whole DB, so the main fix is to move these two places that attempts to delete file after the tracking are done: 1) the `DeleteScheduler::CleanupDirectory` call in `SanitizeOptions`, 2) the `DB::DeleteObsoleteFiles` call.

There are some other aesthetic changes like refactoring collecting all the DB paths into a function, rename `DBImp::DeleteUnreferencedSstFiles` to `DBImpl:: MaybeUpdateNextFileNumber` as it doesn't actually delete the files.

Pull Request resolved: #12590

Test Plan: Added unit test and verified with manual testing

Reviewed By: anand1976

Differential Revision: D56830519

Pulled By: jowlyzhang

fbshipit-source-id: 8a38a21b1ea11c5371924f2b88663648f7a17885
  • Loading branch information
jowlyzhang authored and facebook-github-bot committed May 1, 2024
1 parent 8b3d9e6 commit 2412530
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 161 deletions.
37 changes: 25 additions & 12 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1421,9 +1421,8 @@ class DBImpl : public DB {
autovector<ColumnFamilyData*> cfds_;
autovector<const MutableCFOptions*> mutable_cf_opts_;
autovector<autovector<VersionEdit*>> edit_lists_;
// Stale SST files to delete found upon recovery. This stores a mapping from
// such a file's absolute path to its parent directory.
std::unordered_map<std::string, std::string> files_to_delete_;
// All existing data files (SST files and Blob files) found during DB::Open.
std::vector<std::string> existing_data_files_;
bool is_new_db_ = false;
};

Expand Down Expand Up @@ -1561,22 +1560,36 @@ class DBImpl : public DB {
// Assign db_id_ and write DB ID to manifest if necessary.
void SetDBId(std::string&& id, bool read_only, RecoveryContext* recovery_ctx);

// Collect a deduplicated collection of paths used by this DB, including
// dbname_, DBOptions.db_paths, ColumnFamilyOptions.cf_paths.
std::set<std::string> CollectAllDBPaths();

// REQUIRES: db mutex held when calling this function, but the db mutex can
// be released and re-acquired. Db mutex will be held when the function
// returns.
// After recovery, there may be SST files in db/cf paths that are
// not referenced in the MANIFEST (e.g.
// It stores all existing data files (SST and Blob) in RecoveryContext. In
// the meantime, we find out the largest file number present in the paths, and
// bump up the version set's next_file_number_ to be 1 + largest_file_number.
// recovery_ctx stores the context about version edits. All those edits are
// persisted to new Manifest after successfully syncing the new WAL.
Status MaybeUpdateNextFileNumber(RecoveryContext* recovery_ctx);

// Track existing data files, including both referenced and unreferenced SST
// and Blob files in SstFileManager. This is only called during DB::Open and
// it's called before any file deletion start so that their deletion can be
// properly rate limited.
// Files may not be referenced in the MANIFEST because (e.g.
// 1. It's best effort recovery;
// 2. The VersionEdits referencing the SST files are appended to
// RecoveryContext, DB crashes when syncing the MANIFEST, the VersionEdits are
// still not synced to MANIFEST during recovery.)
// It stores the SST files to be deleted in RecoveryContext. In the
// meantime, we find out the largest file number present in the paths, and
// bump up the version set's next_file_number_ to be 1 + largest_file_number.
// recovery_ctx stores the context about version edits and files to be
// deleted. All those edits are persisted to new Manifest after successfully
// syncing the new WAL.
Status DeleteUnreferencedSstFiles(RecoveryContext* recovery_ctx);
//
// If the file is referenced in Manifest (typically that's the
// vast majority of all files), since it already has the file size
// on record, we don't need to query the file system. Otherwise, we query the
// file system for the size of an unreferenced file.
void TrackExistingDataFiles(
const std::vector<std::string>& existing_data_files);

// SetDbSessionId() should be called in the constuctor DBImpl()
// to ensure that db_session_id_ gets updated every time the DB is opened
Expand Down
57 changes: 15 additions & 42 deletions db/db_impl/db_impl_files.cc
Original file line number Diff line number Diff line change
Expand Up @@ -177,31 +177,10 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
versions_->AddLiveFiles(&job_context->sst_live, &job_context->blob_live);
InfoLogPrefix info_log_prefix(!immutable_db_options_.db_log_dir.empty(),
dbname_);
std::set<std::string> paths;
for (size_t path_id = 0; path_id < immutable_db_options_.db_paths.size();
path_id++) {
paths.insert(immutable_db_options_.db_paths[path_id].path);
}

// Note that if cf_paths is not specified in the ColumnFamilyOptions
// of a particular column family, we use db_paths as the cf_paths
// setting. Hence, there can be multiple duplicates of files from db_paths
// in the following code. The duplicate are removed while identifying
// unique files in PurgeObsoleteFiles.
for (auto cfd : *versions_->GetColumnFamilySet()) {
for (size_t path_id = 0; path_id < cfd->ioptions()->cf_paths.size();
path_id++) {
auto& path = cfd->ioptions()->cf_paths[path_id].path;

if (paths.find(path) == paths.end()) {
paths.insert(path);
}
}
}

// PurgeObsoleteFiles will dedupe duplicate files.
IOOptions io_opts;
io_opts.do_not_recurse = true;
for (auto& path : paths) {
for (auto& path : CollectAllDBPaths()) {
// set of all files in the directory. We'll exclude files that are still
// alive in the subsequent processings.
std::vector<std::string> files;
Expand Down Expand Up @@ -966,28 +945,26 @@ Status DBImpl::SetupDBId(const WriteOptions& write_options, bool read_only,
return s;
}

Status DBImpl::DeleteUnreferencedSstFiles(RecoveryContext* recovery_ctx) {
mutex_.AssertHeld();
std::vector<std::string> paths;
paths.push_back(NormalizePath(dbname_ + std::string(1, kFilePathSeparator)));
std::set<std::string> DBImpl::CollectAllDBPaths() {
std::set<std::string> all_db_paths;
all_db_paths.insert(NormalizePath(dbname_));
for (const auto& db_path : immutable_db_options_.db_paths) {
paths.push_back(
NormalizePath(db_path.path + std::string(1, kFilePathSeparator)));
all_db_paths.insert(NormalizePath(db_path.path));
}
for (const auto* cfd : *versions_->GetColumnFamilySet()) {
for (const auto& cf_path : cfd->ioptions()->cf_paths) {
paths.push_back(
NormalizePath(cf_path.path + std::string(1, kFilePathSeparator)));
all_db_paths.insert(NormalizePath(cf_path.path));
}
}
// Dedup paths
std::sort(paths.begin(), paths.end());
paths.erase(std::unique(paths.begin(), paths.end()), paths.end());
return all_db_paths;
}

Status DBImpl::MaybeUpdateNextFileNumber(RecoveryContext* recovery_ctx) {
mutex_.AssertHeld();
uint64_t next_file_number = versions_->current_next_file_number();
uint64_t largest_file_number = next_file_number;
Status s;
for (const auto& path : paths) {
for (const auto& path : CollectAllDBPaths()) {
std::vector<std::string> files;
s = env_->GetChildren(path, &files);
if (!s.ok()) {
Expand All @@ -999,13 +976,10 @@ Status DBImpl::DeleteUnreferencedSstFiles(RecoveryContext* recovery_ctx) {
if (!ParseFileName(fname, &number, &type)) {
continue;
}
// path ends with '/' or '\\'
const std::string normalized_fpath = path + fname;
const std::string normalized_fpath = path + kFilePathSeparator + fname;
largest_file_number = std::max(largest_file_number, number);
if (type == kTableFile && number >= next_file_number &&
recovery_ctx->files_to_delete_.find(normalized_fpath) ==
recovery_ctx->files_to_delete_.end()) {
recovery_ctx->files_to_delete_.emplace(normalized_fpath, path);
if ((type == kTableFile || type == kBlobFile)) {
recovery_ctx->existing_data_files_.push_back(normalized_fpath);
}
}
}
Expand All @@ -1025,5 +999,4 @@ Status DBImpl::DeleteUnreferencedSstFiles(RecoveryContext* recovery_ctx) {
recovery_ctx->UpdateVersionEdits(default_cfd, edit);
return s;
}

} // namespace ROCKSDB_NAMESPACE
161 changes: 69 additions & 92 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,6 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src,
}
}
}
// When the DB is stopped, it's possible that there are some .trash files that
// were not deleted yet, when we open the DB we will find these .trash files
// and schedule them to be deleted (or delete immediately if SstFileManager
// was not used)
auto sfm = static_cast<SstFileManagerImpl*>(result.sst_file_manager.get());
for (size_t i = 0; i < result.db_paths.size(); i++) {
DeleteScheduler::CleanupDirectory(result.env, sfm, result.db_paths[i].path)
.PermitUncheckedError();
}

// Create a default SstFileManager for purposes of tracking compaction size
// and facilitating recovery from out of space errors.
Expand Down Expand Up @@ -669,7 +660,7 @@ Status DBImpl::Recover(
s = SetupDBId(write_options, read_only, recovery_ctx);
ROCKS_LOG_INFO(immutable_db_options_.info_log, "DB ID: %s\n", db_id_.c_str());
if (s.ok() && !read_only) {
s = DeleteUnreferencedSstFiles(recovery_ctx);
s = MaybeUpdateNextFileNumber(recovery_ctx);
}

if (immutable_db_options_.paranoid_checks && s.ok()) {
Expand Down Expand Up @@ -971,19 +962,6 @@ Status DBImpl::LogAndApplyForRecovery(const RecoveryContext& recovery_ctx) {
recovery_ctx.mutable_cf_opts_, read_options,
write_options, recovery_ctx.edit_lists_,
&mutex_, directories_.GetDbDir());
if (s.ok() && !(recovery_ctx.files_to_delete_.empty())) {
mutex_.Unlock();
for (const auto& stale_sst_file : recovery_ctx.files_to_delete_) {
s = DeleteDBFile(&immutable_db_options_, stale_sst_file.first,
stale_sst_file.second,
/*force_bg=*/false,
/*force_fg=*/false);
if (!s.ok()) {
break;
}
}
mutex_.Lock();
}
return s;
}

Expand Down Expand Up @@ -1982,6 +1960,50 @@ IOStatus DBImpl::CreateWAL(const WriteOptions& write_options,
return io_s;
}

void DBImpl::TrackExistingDataFiles(
const std::vector<std::string>& existing_data_files) {
auto sfm = static_cast<SstFileManagerImpl*>(
immutable_db_options_.sst_file_manager.get());
assert(sfm);
std::vector<ColumnFamilyMetaData> metadata;
GetAllColumnFamilyMetaData(&metadata);

std::unordered_set<std::string> referenced_files;
for (const auto& md : metadata) {
for (const auto& lmd : md.levels) {
for (const auto& fmd : lmd.files) {
// We're assuming that each sst file name exists in at most one of
// the paths.
std::string file_path =
fmd.directory + kFilePathSeparator + fmd.relative_filename;
sfm->OnAddFile(file_path, fmd.size).PermitUncheckedError();
referenced_files.insert(file_path);
}
}
for (const auto& bmd : md.blob_files) {
std::string name = bmd.blob_file_name;
// The BlobMetaData.blob_file_name may start with "/".
if (!name.empty() && name[0] == kFilePathSeparator) {
name = name.substr(1);
}
// We're assuming that each blob file name exists in at most one of
// the paths.
std::string file_path = bmd.blob_file_path + kFilePathSeparator + name;
sfm->OnAddFile(file_path, bmd.blob_file_size).PermitUncheckedError();
referenced_files.insert(file_path);
}
}

for (const auto& file_path : existing_data_files) {
if (referenced_files.find(file_path) != referenced_files.end()) {
continue;
}
// There shouldn't be any duplicated files. In case there is, SstFileManager
// will take care of deduping it.
sfm->OnAddFile(file_path).PermitUncheckedError();
}
}

Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
Expand Down Expand Up @@ -2029,7 +2051,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
paths.emplace_back(cf_path.path);
}
}
for (auto& path : paths) {
for (const auto& path : paths) {
s = impl->env_->CreateDirIfMissing(path);
if (!s.ok()) {
break;
Expand Down Expand Up @@ -2200,9 +2222,6 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
impl->WriteOptionsFile(write_options, true /*db_mutex_already_held*/);
*dbptr = impl;
impl->opened_successfully_ = true;
impl->DeleteObsoleteFiles();
TEST_SYNC_POINT("DBImpl::Open:AfterDeleteFiles");
impl->MaybeScheduleFlushOrCompaction();
} else {
persist_options_status.PermitUncheckedError();
}
Expand All @@ -2217,80 +2236,38 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
ROCKS_LOG_INFO(impl->immutable_db_options_.info_log,
"SstFileManager instance %p", sfm);

// Notify SstFileManager about all sst files that already exist in
// db_paths[0] and cf_paths[0] when the DB is opened.

// SstFileManagerImpl needs to know sizes of the files. For files whose size
// we already know (sst files that appear in manifest - typically that's the
// vast majority of all files), we'll pass the size to SstFileManager.
// For all other files SstFileManager will query the size from filesystem.

std::vector<ColumnFamilyMetaData> metadata;
impl->GetAllColumnFamilyMetaData(&metadata);

std::unordered_map<std::string, uint64_t> known_file_sizes;
for (const auto& md : metadata) {
for (const auto& lmd : md.levels) {
for (const auto& fmd : lmd.files) {
known_file_sizes[fmd.relative_filename] = fmd.size;
}
}
for (const auto& bmd : md.blob_files) {
std::string name = bmd.blob_file_name;
// The BlobMetaData.blob_file_name may start with "/".
if (!name.empty() && name[0] == '/') {
name = name.substr(1);
}
known_file_sizes[name] = bmd.blob_file_size;
}
}

std::vector<std::string> paths;
paths.emplace_back(impl->immutable_db_options_.db_paths[0].path);
for (auto& cf : column_families) {
if (!cf.options.cf_paths.empty()) {
paths.emplace_back(cf.options.cf_paths[0].path);
}
}
// Remove duplicate paths.
std::sort(paths.begin(), paths.end());
paths.erase(std::unique(paths.begin(), paths.end()), paths.end());
IOOptions io_opts;
io_opts.do_not_recurse = true;
for (auto& path : paths) {
std::vector<std::string> existing_files;
impl->immutable_db_options_.fs
->GetChildren(path, io_opts, &existing_files,
/*IODebugContext*=*/nullptr)
.PermitUncheckedError(); //**TODO: What do to on error?
for (auto& file_name : existing_files) {
uint64_t file_number;
FileType file_type;
std::string file_path = path + "/" + file_name;
if (ParseFileName(file_name, &file_number, &file_type) &&
(file_type == kTableFile || file_type == kBlobFile)) {
// TODO: Check for errors from OnAddFile?
if (known_file_sizes.count(file_name)) {
// We're assuming that each sst file name exists in at most one of
// the paths.
sfm->OnAddFile(file_path, known_file_sizes.at(file_name))
.PermitUncheckedError();
} else {
sfm->OnAddFile(file_path).PermitUncheckedError();
}
}
}
}
impl->TrackExistingDataFiles(recovery_ctx.existing_data_files_);

// Reserve some disk buffer space. This is a heuristic - when we run out
// of disk space, this ensures that there is atleast write_buffer_size
// of disk space, this ensures that there is at least write_buffer_size
// amount of free space before we resume DB writes. In low disk space
// conditions, we want to avoid a lot of small L0 files due to frequent
// WAL write failures and resultant forced flushes
sfm->ReserveDiskBuffer(max_write_buffer_size,
impl->immutable_db_options_.db_paths[0].path);
}

if (s.ok()) {
// When the DB is stopped, it's possible that there are some .trash files
// that were not deleted yet, when we open the DB we will find these .trash
// files and schedule them to be deleted (or delete immediately if
// SstFileManager was not used).
// Note that we only start doing this and below delete obsolete file after
// `TrackExistingDataFiles` are called, the `max_trash_db_ratio` is
// ineffective otherwise and these files' deletion won't be rate limited
// which can cause discard stall.
for (const auto& path : impl->CollectAllDBPaths()) {
DeleteScheduler::CleanupDirectory(impl->immutable_db_options_.env, sfm,
path)
.PermitUncheckedError();
}
impl->mutex_.Lock();
// This will do a full scan.
impl->DeleteObsoleteFiles();
TEST_SYNC_POINT("DBImpl::Open:AfterDeleteFiles");
impl->MaybeScheduleFlushOrCompaction();
impl->mutex_.Unlock();
}

if (s.ok()) {
ROCKS_LOG_HEADER(impl->immutable_db_options_.info_log, "DB pointer %p",
Expand Down
Loading

0 comments on commit 2412530

Please sign in to comment.