Skip to content

Commit

Permalink
Track SST unique id in MANIFEST and verify (#9990)
Browse files Browse the repository at this point in the history
Summary:
Start tracking SST unique id in MANIFEST, which is used to verify with
SST properties to make sure the SST file is not overwritten or
misplaced. A DB option `try_verify_sst_unique_id` is introduced to
enable/disable the verification, if enabled, it opens all SST files
during DB-open to read the unique_id from table properties (default is
false), so it's recommended to use it with `max_open_files = -1` to
pre-open the files.

Pull Request resolved: facebook/rocksdb#9990

Test Plan: unittests, format-compatible test, mini-crash

Reviewed By: anand1976

Differential Revision: D36381863

Pulled By: jay-zhuang

fbshipit-source-id: 89ea2eb6b35ed3e80ead9c724eb096083eaba63f
  • Loading branch information
jay-zhuang authored and facebook-github-bot committed May 19, 2022
1 parent dde774d commit c6d326d
Show file tree
Hide file tree
Showing 45 changed files with 950 additions and 199 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* DB::GetLiveFilesStorageInfo is ready for production use.
* Add new stats PREFETCHED_BYTES_DISCARDED which records number of prefetched bytes discarded by RocksDB FilePrefetchBuffer on destruction and POLL_WAIT_MICROS records wait time for FS::Poll API completion.
* RemoteCompaction supports table_properties_collector_factories override on compaction worker.
* Start tracking SST unique id in MANIFEST, which will be used to verify with SST properties during DB open to make sure the SST file is not overwritten or misplaced. A db option `verify_sst_unique_id_in_manifest` is introduced to enable/disable the verification, if enabled all SST files will be opened during DB-open to verify the unique id (default is false), so it's recommended to use it with `max_open_files = -1` to pre-open the files.

### Public API changes
* Add rollback_deletion_type_callback to TransactionDBOptions so that write-prepared transactions know whether to issue a Delete or SingleDelete to cancel a previous key written during prior prepare phase. The PR aims to prevent mixing SingleDeletes and Deletes for the same key that can lead to undefined behaviors for write-prepared transactions.
Expand Down
10 changes: 10 additions & 0 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "table/block_based/block_based_table_builder.h"
#include "table/format.h"
#include "table/internal_iterator.h"
#include "table/unique_id_impl.h"
#include "test_util/sync_point.h"
#include "util/stop_watch.h"

Expand Down Expand Up @@ -310,6 +311,15 @@ Status BuildTable(
meta->file_checksum_func_name = file_writer->GetFileChecksumFuncName();
file_checksum = meta->file_checksum;
file_checksum_func_name = meta->file_checksum_func_name;
// Set unique_id only if db_id and db_session_id exist
if (!tboptions.db_id.empty() && !tboptions.db_session_id.empty()) {
if (!GetSstInternalUniqueId(tboptions.db_id, tboptions.db_session_id,
meta->fd.GetNumber(), &(meta->unique_id))
.ok()) {
// if failed to get unique id, just set it Null
meta->unique_id = kNullUniqueId64x2;
}
}
}

if (s.ok()) {
Expand Down
25 changes: 24 additions & 1 deletion db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
#include "table/block_based/block_based_table_factory.h"
#include "table/merging_iterator.h"
#include "table/table_builder.h"
#include "table/unique_id_impl.h"
#include "test_util/sync_point.h"
#include "util/coding.h"
#include "util/hash.h"
Expand Down Expand Up @@ -1047,6 +1048,7 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
const Compaction* compaction = sub_compact->compaction;
CompactionServiceInput compaction_input;
compaction_input.output_level = compaction->output_level();
compaction_input.db_id = db_id_;

const std::vector<CompactionInputFiles>& inputs =
*(compact_->compaction->inputs());
Expand Down Expand Up @@ -1208,6 +1210,7 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
meta.oldest_ancester_time = file.oldest_ancester_time;
meta.file_creation_time = file.file_creation_time;
meta.marked_for_compaction = file.marked_for_compaction;
meta.unique_id = file.unique_id;

auto cfd = compaction->column_family_data();
sub_compact->outputs.emplace_back(std::move(meta),
Expand Down Expand Up @@ -2277,6 +2280,18 @@ Status CompactionJob::OpenCompactionOutputFile(
meta.oldest_ancester_time = oldest_ancester_time;
meta.file_creation_time = current_time;
meta.temperature = temperature;
assert(!db_id_.empty());
assert(!db_session_id_.empty());
s = GetSstInternalUniqueId(db_id_, db_session_id_, meta.fd.GetNumber(),
&meta.unique_id);
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"[%s] [JOB %d] file #%" PRIu64
" failed to generate unique id: %s.",
cfd->GetName().c_str(), job_id_, meta.fd.GetNumber(),
s.ToString().c_str());
return s;
}
sub_compact->outputs.emplace_back(
std::move(meta), cfd->internal_comparator(),
/*enable_order_check=*/
Expand Down Expand Up @@ -2609,7 +2624,7 @@ Status CompactionServiceCompactionJob::Run() {
meta.fd.largest_seqno, meta.smallest.Encode().ToString(),
meta.largest.Encode().ToString(), meta.oldest_ancester_time,
meta.file_creation_time, output_file.validator.GetHash(),
meta.marked_for_compaction);
meta.marked_for_compaction, meta.unique_id);
}
compaction_result_->num_output_records = sub_compact->num_output_records;
compaction_result_->total_bytes = sub_compact->total_bytes;
Expand Down Expand Up @@ -2713,6 +2728,9 @@ static std::unordered_map<std::string, OptionTypeInfo> cs_input_type_info = {
{"output_level",
{offsetof(struct CompactionServiceInput, output_level), OptionType::kInt,
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
{"db_id",
{offsetof(struct CompactionServiceInput, db_id),
OptionType::kEncodedString}},
{"has_begin",
{offsetof(struct CompactionServiceInput, has_begin), OptionType::kBoolean,
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
Expand Down Expand Up @@ -2770,6 +2788,11 @@ static std::unordered_map<std::string, OptionTypeInfo>
{offsetof(struct CompactionServiceOutputFile, marked_for_compaction),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"unique_id",
OptionTypeInfo::Array<uint64_t, 2>(
offsetof(struct CompactionServiceOutputFile, unique_id),
OptionVerificationType::kNormal, OptionTypeFlags::kNone,
{0, OptionType::kUInt64T})},
};

static std::unordered_map<std::string, OptionTypeInfo>
Expand Down
10 changes: 8 additions & 2 deletions db/compaction/compaction_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,9 @@ struct CompactionServiceInput {
std::vector<std::string> input_files;
int output_level;

// db_id is used to generate unique id of sst on the remote compactor
std::string db_id;

// information for subcompaction
bool has_begin = false;
std::string begin;
Expand Down Expand Up @@ -290,13 +293,15 @@ struct CompactionServiceOutputFile {
uint64_t file_creation_time;
uint64_t paranoid_hash;
bool marked_for_compaction;
UniqueId64x2 unique_id;

CompactionServiceOutputFile() = default;
CompactionServiceOutputFile(
const std::string& name, SequenceNumber smallest, SequenceNumber largest,
std::string _smallest_internal_key, std::string _largest_internal_key,
uint64_t _oldest_ancester_time, uint64_t _file_creation_time,
uint64_t _paranoid_hash, bool _marked_for_compaction)
uint64_t _paranoid_hash, bool _marked_for_compaction,
UniqueId64x2 _unique_id)
: file_name(name),
smallest_seqno(smallest),
largest_seqno(largest),
Expand All @@ -305,7 +310,8 @@ struct CompactionServiceOutputFile {
oldest_ancester_time(_oldest_ancester_time),
file_creation_time(_file_creation_time),
paranoid_hash(_paranoid_hash),
marked_for_compaction(_marked_for_compaction) {}
marked_for_compaction(_marked_for_compaction),
unique_id(std::move(_unique_id)) {}
};

// CompactionServiceResult contains the compaction result from a different db
Expand Down
20 changes: 16 additions & 4 deletions db/compaction/compaction_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "rocksdb/options.h"
#include "rocksdb/write_buffer_manager.h"
#include "table/mock_table.h"
#include "table/unique_id_impl.h"
#include "test_util/testharness.h"
#include "test_util/testutil.h"
#include "util/string_util.h"
Expand Down Expand Up @@ -206,7 +207,7 @@ class CompactionJobTestBase : public testing::Test {
oldest_blob_file_number, kUnknownOldestAncesterTime,
kUnknownFileCreationTime, kUnknownFileChecksum,
kUnknownFileChecksumFuncName, kDisableUserTimestamp,
kDisableUserTimestamp);
kDisableUserTimestamp, kNullUniqueId64x2);

mutex_.Lock();
EXPECT_OK(
Expand Down Expand Up @@ -360,8 +361,8 @@ class CompactionJobTestBase : public testing::Test {
table_cache_, &event_logger, false, false, dbname_,
&compaction_job_stats_, Env::Priority::USER, nullptr /* IOTracer */,
/*manual_compaction_paused=*/nullptr,
/*manual_compaction_canceled=*/nullptr, /*db_id=*/"",
/*db_session_id=*/"", full_history_ts_low_);
/*manual_compaction_canceled=*/nullptr, env_->GenerateUniqueId(),
DBImpl::GenerateDbSessionId(nullptr), full_history_ts_low_);
VerifyInitializationOfCompactionJobStats(compaction_job_stats_);

compaction_job.Prepare();
Expand Down Expand Up @@ -1254,13 +1255,14 @@ TEST_F(CompactionJobTest, ResultSerialization) {
result.status =
status_list.at(rnd.Uniform(static_cast<int>(status_list.size())));
while (!rnd.OneIn(10)) {
UniqueId64x2 id{rnd64.Uniform(UINT64_MAX), rnd64.Uniform(UINT64_MAX)};
result.output_files.emplace_back(
rnd.RandomString(rnd.Uniform(kStrMaxLen)), rnd64.Uniform(UINT64_MAX),
rnd64.Uniform(UINT64_MAX),
rnd.RandomBinaryString(rnd.Uniform(kStrMaxLen)),
rnd.RandomBinaryString(rnd.Uniform(kStrMaxLen)),
rnd64.Uniform(UINT64_MAX), rnd64.Uniform(UINT64_MAX),
rnd64.Uniform(UINT64_MAX), rnd.OneIn(2));
rnd64.Uniform(UINT64_MAX), rnd.OneIn(2), id);
}
result.output_level = rnd.Uniform(10);
result.output_path = rnd.RandomString(rnd.Uniform(kStrMaxLen));
Expand Down Expand Up @@ -1288,6 +1290,16 @@ TEST_F(CompactionJobTest, ResultSerialization) {
ASSERT_FALSE(deserialized1.TEST_Equals(&result, &mismatch));
ASSERT_EQ(mismatch, "stats.num_input_files");

// Test unique id mismatch
if (!result.output_files.empty()) {
CompactionServiceResult deserialized_tmp;
ASSERT_OK(CompactionServiceResult::Read(output, &deserialized_tmp));
deserialized_tmp.output_files[0].unique_id[0] += 1;
ASSERT_FALSE(deserialized_tmp.TEST_Equals(&result, &mismatch));
ASSERT_EQ(mismatch, "output_files.unique_id");
deserialized_tmp.status.PermitUncheckedError();
}

// Test unknown field
CompactionServiceResult deserialized2;
output.clear();
Expand Down
3 changes: 2 additions & 1 deletion db/compaction/compaction_picker_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "db/compaction/compaction_picker_level.h"
#include "db/compaction/compaction_picker_universal.h"
#include "db/compaction/file_pri.h"
#include "table/unique_id_impl.h"
#include "test_util/testharness.h"
#include "test_util/testutil.h"
#include "util/string_util.h"
Expand Down Expand Up @@ -115,7 +116,7 @@ class CompactionPickerTest : public testing::Test {
largest_seq, marked_for_compact, temperature, kInvalidBlobFileNumber,
kUnknownOldestAncesterTime, kUnknownFileCreationTime,
kUnknownFileChecksum, kUnknownFileChecksumFuncName,
kDisableUserTimestamp, kDisableUserTimestamp);
kDisableUserTimestamp, kDisableUserTimestamp, kNullUniqueId64x2);
f->compensated_file_size =
(compensated_file_size != 0) ? compensated_file_size : file_size;
f->oldest_ancester_time = oldest_ancestor_time;
Expand Down
16 changes: 16 additions & 0 deletions db/compaction/compaction_service_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,16 @@ TEST_F(CompactionServiceTest, BasicCompactions) {
auto s = static_cast<Status*>(status);
*s = Status::Aborted("MyTestCompactionService failed to compact!");
});

// tracking success unique id verification
std::atomic_int verify_passed{0};
SyncPoint::GetInstance()->SetCallBack(
"Version::VerifySstUniqueIds::Passed", [&](void* arg) {
// override job status
auto id = static_cast<std::string*>(arg);
assert(!id->empty());
verify_passed++;
});
SyncPoint::GetInstance()->EnableProcessing();

Status s;
Expand All @@ -312,6 +322,12 @@ TEST_F(CompactionServiceTest, BasicCompactions) {
}
}
ASSERT_TRUE(s.IsAborted());

// Test verification
ASSERT_EQ(verify_passed, 0);
options.verify_sst_unique_id_in_manifest = true;
Reopen(options);
ASSERT_GT(verify_passed, 0);
}

TEST_F(CompactionServiceTest, ManualCompaction) {
Expand Down
5 changes: 5 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1974,6 +1974,11 @@ class DBImpl : public DB {
IOStatus CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number,
size_t preallocate_block_size, log::Writer** new_log);

// Verify SST file unique id between Manifest and table properties to make
// sure they're the same. Currently only used during DB open when
// `verify_sst_unique_id_in_manifest = true`.
Status VerifySstUniqueIdInManifest();

// Validate self-consistency of DB options
static Status ValidateOptions(const DBOptions& db_options);
// Validate self-consistency of DB options and its consistency with cf options
Expand Down
5 changes: 3 additions & 2 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1651,7 +1651,8 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
f->smallest, f->largest, f->fd.smallest_seqno, f->fd.largest_seqno,
f->marked_for_compaction, f->temperature, f->oldest_blob_file_number,
f->oldest_ancester_time, f->file_creation_time, f->file_checksum,
f->file_checksum_func_name, f->min_timestamp, f->max_timestamp);
f->file_checksum_func_name, f->min_timestamp, f->max_timestamp,
f->unique_id);
}
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
"[%s] Apply version edit:\n%s", cfd->GetName().c_str(),
Expand Down Expand Up @@ -3276,7 +3277,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
f->fd.largest_seqno, f->marked_for_compaction, f->temperature,
f->oldest_blob_file_number, f->oldest_ancester_time,
f->file_creation_time, f->file_checksum, f->file_checksum_func_name,
f->min_timestamp, f->max_timestamp);
f->min_timestamp, f->max_timestamp, f->unique_id);

ROCKS_LOG_BUFFER(
log_buffer,
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_experimental.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ Status DBImpl::PromoteL0(ColumnFamilyHandle* column_family, int target_level) {
f->fd.largest_seqno, f->marked_for_compaction, f->temperature,
f->oldest_blob_file_number, f->oldest_ancester_time,
f->file_creation_time, f->file_checksum, f->file_checksum_func_name,
f->min_timestamp, f->max_timestamp);
f->min_timestamp, f->max_timestamp, f->unique_id);
}

status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
Expand Down
40 changes: 33 additions & 7 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,12 @@ Status DBImpl::Recover(
if (!s.ok()) {
return s;
}
if (immutable_db_options_.verify_sst_unique_id_in_manifest) {
s = VerifySstUniqueIdInManifest();
if (!s.ok()) {
return s;
}
}
s = SetDBId(read_only);
if (s.ok() && !read_only) {
s = DeleteUnreferencedSstFiles();
Expand Down Expand Up @@ -698,6 +704,25 @@ Status DBImpl::Recover(
return s;
}

Status DBImpl::VerifySstUniqueIdInManifest() {
mutex_.AssertHeld();
ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"Verifying SST unique id between MANIFEST and SST file table properties");
Status status;
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (!cfd->IsDropped()) {
auto version = cfd->current();
version->Ref();
mutex_.Unlock();
status = version->VerifySstUniqueIds();
mutex_.Lock();
version->Unref();
}
}
return status;
}

Status DBImpl::PersistentStatsProcessFormatVersion() {
mutex_.AssertHeld();
Status s;
Expand Down Expand Up @@ -1498,13 +1523,14 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
constexpr int level = 0;

if (s.ok() && has_output) {
edit->AddFile(
level, meta.fd.GetNumber(), meta.fd.GetPathId(), meta.fd.GetFileSize(),
meta.smallest, meta.largest, meta.fd.smallest_seqno,
meta.fd.largest_seqno, meta.marked_for_compaction, meta.temperature,
meta.oldest_blob_file_number, meta.oldest_ancester_time,
meta.file_creation_time, meta.file_checksum,
meta.file_checksum_func_name, meta.min_timestamp, meta.max_timestamp);
edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
meta.fd.GetFileSize(), meta.smallest, meta.largest,
meta.fd.smallest_seqno, meta.fd.largest_seqno,
meta.marked_for_compaction, meta.temperature,
meta.oldest_blob_file_number, meta.oldest_ancester_time,
meta.file_creation_time, meta.file_checksum,
meta.file_checksum_func_name, meta.min_timestamp,
meta.max_timestamp, meta.unique_id);

for (const auto& blob : blob_file_additions) {
edit->AddBlobFile(blob);
Expand Down
9 changes: 8 additions & 1 deletion db/db_impl/db_impl_secondary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -772,12 +772,19 @@ Status DBImplSecondary::CompactWithoutInstallation(

const int job_id = next_job_id_.fetch_add(1);

// use primary host's db_id for running the compaction, but db_session_id is
// using the local one, which is to make sure the unique id is unique from
// the remote compactors. Because the id is generated from db_id,
// db_session_id and orig_file_number, unlike the local compaction, remote
// compaction cannot guarantee the uniqueness of orig_file_number, the file
// number is only assigned when compaction is done.
CompactionServiceCompactionJob compaction_job(
job_id, c.get(), immutable_db_options_, mutable_db_options_,
file_options_for_compaction_, versions_.get(), &shutting_down_,
&log_buffer, output_dir.get(), stats_, &mutex_, &error_handler_,
input.snapshots, table_cache_, &event_logger_, dbname_, io_tracer_,
options.canceled, db_id_, db_session_id_, secondary_path_, input, result);
options.canceled, input.db_id, db_session_id_, secondary_path_, input,
result);

mutex_.Unlock();
s = compaction_job.Run();
Expand Down
16 changes: 16 additions & 0 deletions db/db_options_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,22 @@ TEST_F(DBOptionsTest, ImmutableTrackAndVerifyWalsInManifest) {
ASSERT_FALSE(s.ok());
}

TEST_F(DBOptionsTest, ImmutableVerifySstUniqueIdInManifest) {
Options options;
options.env = env_;
options.verify_sst_unique_id_in_manifest = true;

ImmutableDBOptions db_options(options);
ASSERT_TRUE(db_options.verify_sst_unique_id_in_manifest);

Reopen(options);
ASSERT_TRUE(dbfull()->GetDBOptions().verify_sst_unique_id_in_manifest);

Status s =
dbfull()->SetDBOptions({{"verify_sst_unique_id_in_manifest", "false"}});
ASSERT_FALSE(s.ok());
}

// RocksDB lite don't support dynamic options.
#ifndef ROCKSDB_LITE

Expand Down
Loading

0 comments on commit c6d326d

Please sign in to comment.