Skip to content

Commit

Permalink
Use correct FileMeta for atomic flush result install (#4932)
Browse files Browse the repository at this point in the history
Summary:
1. this commit fixes our handling of a combination of two separate edge
cases. If a flush job does not pick any memtable to flush (because another
flush job has already picked the same memtables), and the column family
assigned to the flush job is dropped right before RocksDB calls
rocksdb::InstallMemtableAtomicFlushResults, our original code passes
a FileMetaData object whose file number is 0, failing the assertion in
rocksdb::InstallMemtableAtomicFlushResults (assert(m->GetFileNumber() > 0)).
2. Also piggyback a small change: since we already create a local copy of column family's mutable CF options to eliminate potential race condition with `SetOptions` call, we might as well use the local copy in other function calls in the same scope.
Pull Request resolved: facebook/rocksdb#4932

Differential Revision: D13901322

Pulled By: riversand963

fbshipit-source-id: b936580af7c127ea0c6c19ea10cd5fcede9fb0f9
  • Loading branch information
riversand963 committed Jan 31, 2019
1 parent acba14b commit 65b2298
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 21 deletions.
22 changes: 10 additions & 12 deletions db/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -310,21 +310,18 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
jobs.back().PickMemTable();
}

autovector<FileMetaData> file_meta;
std::vector<FileMetaData> file_meta(num_cfs);
Status s;
assert(num_cfs == static_cast<int>(jobs.size()));

for (int i = 0; i != num_cfs; ++i) {
file_meta.emplace_back();

#ifndef ROCKSDB_LITE
const MutableCFOptions& mutable_cf_options =
*cfds[i]->GetLatestMutableCFOptions();
for (int i = 0; i != num_cfs; ++i) {
const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.at(i);
// may temporarily unlock and lock the mutex.
NotifyOnFlushBegin(cfds[i], &file_meta[i], mutable_cf_options,
job_context->job_id, jobs[i].GetTableProperties());
#endif /* !ROCKSDB_LITE */
}
#endif /* !ROCKSDB_LITE */

if (logfile_number_ > 0) {
// TODO (yanqin) investigate whether we should sync the closed logs for
Expand Down Expand Up @@ -428,19 +425,21 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
autovector<ColumnFamilyData*> tmp_cfds;
autovector<const autovector<MemTable*>*> mems_list;
autovector<const MutableCFOptions*> mutable_cf_options_list;
autovector<FileMetaData*> tmp_file_meta;
for (int i = 0; i != num_cfs; ++i) {
const auto& mems = jobs[i].GetMemTables();
if (!cfds[i]->IsDropped() && !mems.empty()) {
tmp_cfds.emplace_back(cfds[i]);
mems_list.emplace_back(&mems);
mutable_cf_options_list.emplace_back(&all_mutable_cf_options[i]);
tmp_file_meta.emplace_back(&file_meta[i]);
}
}

s = InstallMemtableAtomicFlushResults(
nullptr /* imm_lists */, tmp_cfds, mutable_cf_options_list, mems_list,
versions_.get(), &mutex_, file_meta, &job_context->memtables_to_free,
directories_.GetDbDir(), log_buffer);
versions_.get(), &mutex_, tmp_file_meta,
&job_context->memtables_to_free, directories_.GetDbDir(), log_buffer);
}

if (s.ok() || s.IsShutdownInProgress()) {
Expand All @@ -452,7 +451,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
}
InstallSuperVersionAndScheduleWork(cfds[i],
&job_context->superversion_contexts[i],
*cfds[i]->GetLatestMutableCFOptions());
all_mutable_cf_options[i]);
VersionStorageInfo::LevelSummaryStorage tmp;
ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n",
cfds[i]->GetName().c_str(),
Expand All @@ -468,8 +467,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
if (cfds[i]->IsDropped()) {
continue;
}
NotifyOnFlushCompleted(cfds[i], &file_meta[i],
*cfds[i]->GetLatestMutableCFOptions(),
NotifyOnFlushCompleted(cfds[i], &file_meta[i], all_mutable_cf_options[i],
job_context->job_id, jobs[i].GetTableProperties());
if (sfm) {
std::string file_path = MakeTableFileName(
Expand Down
10 changes: 8 additions & 2 deletions db/flush_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,9 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) {
k++;
}
HistogramData hist;
autovector<FileMetaData> file_metas;
std::vector<FileMetaData> file_metas;
// Call reserve to avoid auto-resizing
file_metas.reserve(flush_jobs.size());
mutex_.Lock();
for (auto& job : flush_jobs) {
job.PickMemTable();
Expand All @@ -319,6 +321,10 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) {
ASSERT_OK(job.Run(nullptr /**/, &meta));
file_metas.emplace_back(meta);
}
autovector<FileMetaData*> file_meta_ptrs;
for (auto& meta : file_metas) {
file_meta_ptrs.push_back(&meta);
}
autovector<const autovector<MemTable*>*> mems_list;
for (size_t i = 0; i != all_cfds.size(); ++i) {
const auto& mems = flush_jobs[i].GetMemTables();
Expand All @@ -331,7 +337,7 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) {

Status s = InstallMemtableAtomicFlushResults(
nullptr /* imm_lists */, all_cfds, mutable_cf_options_list, mems_list,
versions_.get(), &mutex_, file_metas, &job_context.memtables_to_free,
versions_.get(), &mutex_, file_meta_ptrs, &job_context.memtables_to_free,
nullptr /* db_directory */, nullptr /* log_buffer */);
ASSERT_OK(s);

Expand Down
5 changes: 3 additions & 2 deletions db/memtable_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ Status InstallMemtableAtomicFlushResults(
const autovector<ColumnFamilyData*>& cfds,
const autovector<const MutableCFOptions*>& mutable_cf_options_list,
const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
InstrumentedMutex* mu, const autovector<FileMetaData>& file_metas,
InstrumentedMutex* mu, const autovector<FileMetaData*>& file_metas,
autovector<MemTable*>* to_delete, Directory* db_directory,
LogBuffer* log_buffer) {
AutoThreadOperationStageUpdater stage_updater(
Expand All @@ -553,10 +553,11 @@ Status InstallMemtableAtomicFlushResults(
assert((*mems_list[k])[0]->GetID() == imm->GetEarliestMemTableID());
}
#endif
assert(nullptr != file_metas[k]);
for (size_t i = 0; i != mems_list[k]->size(); ++i) {
assert(i == 0 || (*mems_list[k])[i]->GetEdits()->NumEntries() == 0);
(*mems_list[k])[i]->SetFlushCompleted(true);
(*mems_list[k])[i]->SetFileNumber(file_metas[k].fd.GetNumber());
(*mems_list[k])[i]->SetFileNumber(file_metas[k]->fd.GetNumber());
}
}

Expand Down
6 changes: 3 additions & 3 deletions db/memtable_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class MemTableListVersion {
const autovector<const MutableCFOptions*>& mutable_cf_options_list,
const autovector<const autovector<MemTable*>*>& mems_list,
VersionSet* vset, InstrumentedMutex* mu,
const autovector<FileMetaData>& file_meta,
const autovector<FileMetaData*>& file_meta,
autovector<MemTable*>* to_delete, Directory* db_directory,
LogBuffer* log_buffer);

Expand Down Expand Up @@ -301,7 +301,7 @@ class MemTableList {
const autovector<const MutableCFOptions*>& mutable_cf_options_list,
const autovector<const autovector<MemTable*>*>& mems_list,
VersionSet* vset, InstrumentedMutex* mu,
const autovector<FileMetaData>& file_meta,
const autovector<FileMetaData*>& file_meta,
autovector<MemTable*>* to_delete, Directory* db_directory,
LogBuffer* log_buffer);

Expand Down Expand Up @@ -337,7 +337,7 @@ extern Status InstallMemtableAtomicFlushResults(
const autovector<ColumnFamilyData*>& cfds,
const autovector<const MutableCFOptions*>& mutable_cf_options_list,
const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
InstrumentedMutex* mu, const autovector<FileMetaData>& file_meta,
InstrumentedMutex* mu, const autovector<FileMetaData*>& file_meta,
autovector<MemTable*>* to_delete, Directory* db_directory,
LogBuffer* log_buffer);
} // namespace rocksdb
9 changes: 7 additions & 2 deletions db/memtable_list_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,18 +161,23 @@ class MemTableListTest : public testing::Test {
cfds.emplace_back(column_family_set->GetColumnFamily(cf_ids[i]));
EXPECT_NE(nullptr, cfds[i]);
}
autovector<FileMetaData> file_metas;
std::vector<FileMetaData> file_metas;
file_metas.reserve(cf_ids.size());
for (size_t i = 0; i != cf_ids.size(); ++i) {
FileMetaData meta;
uint64_t file_num = file_number.fetch_add(1);
meta.fd = FileDescriptor(file_num, 0, 0);
file_metas.emplace_back(meta);
}
autovector<FileMetaData*> file_meta_ptrs;
for (auto& meta : file_metas) {
file_meta_ptrs.push_back(&meta);
}
InstrumentedMutex mutex;
InstrumentedMutexLock l(&mutex);
return InstallMemtableAtomicFlushResults(
&lists, cfds, mutable_cf_options_list, mems_list, &versions, &mutex,
file_metas, to_delete, nullptr, &log_buffer);
file_meta_ptrs, to_delete, nullptr, &log_buffer);
}
};

Expand Down

0 comments on commit 65b2298

Please sign in to comment.