Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store DB identity and DB session ID in SST files #6983

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@
* Best-efforts recovery ignores CURRENT file completely. If CURRENT file is missing during recovery, best-efforts recovery still proceeds with MANIFEST file(s).
* In best-efforts recovery, an error that is not Corruption or IOError::kNotFound or IOError::kPathNotFound will be overwritten silently. Fix this by checking all non-ok cases and return early.

### Public API Change
* `DB::GetDbSessionId(std::string& session_id)` is added. `session_id` stores a unique identifier that gets reset every time the DB is opened. This DB session ID should be unique among all open DB instances on all hosts, and should be unique among re-openings of the same or other DBs. This identifier is recorded in the LOG file on the line starting with "DB Session ID:".

### New Features
* DB identity (`db_id`) and DB session identity (`db_session_id`) are added to table properties and stored in SST files. SST files generated from SstFileWriter and Repairer have DB identity “SST Writer” and “DB Repairer”, respectively. Their DB session IDs are generated in the same way as `DB::GetDbSessionId`. The session ID for SstFileWriter (resp., Repairer) resets every time `SstFileWriter::Open` (resp., `Repairer::Run`) is called.

## 6.11 (6/12/2020)
### Bug Fixes
* Fix consistency checking error swallowing in some cases when options.force_consistency_checks = true.
Expand All @@ -30,7 +36,6 @@
* `pin_l0_filter_and_index_blocks_in_cache` no longer applies to L0 files larger than `1.5 * write_buffer_size` to give more predictable memory usage. Such L0 files may exist due to intra-L0 compaction, external file ingestion, or user dynamically changing `write_buffer_size` (note, however, that files that are already pinned will continue being pinned, even after such a dynamic change).
* In point-in-time wal recovery mode, fail database recovery in case of IOError while reading the WAL to avoid data loss.
* A new method `Env::LowerThreadPoolCPUPriority(Priority, CpuPriority)` is added to `Env` to be able to lower to a specific priority such as `CpuPriority::kIdle`.
* `DB::GetDbSessionId(std::string& session_id)` is added. `session_id` stores a unique identifier that gets reset every time the DB is opened. This DB session ID should be unique among all open DB instances on all hosts, and should be unique among re-openings of the same or other DBs. This identifier is recorded in the `LOG` file on the line starting with `DB Session ID:`.

### New Features
* sst_dump to add a new --readahead_size argument. Users can specify read size when scanning the data. Sst_dump also tries to prefetch tail part of the SST files so usually some number of I/Os are saved there too.
Expand Down
10 changes: 6 additions & 4 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ TableBuilder* NewTableBuilder(
uint64_t sample_for_compression, const CompressionOptions& compression_opts,
int level, const bool skip_filters, const uint64_t creation_time,
const uint64_t oldest_key_time, const uint64_t target_file_size,
const uint64_t file_creation_time) {
const uint64_t file_creation_time, const std::string& db_id,
const std::string& db_session_id) {
assert((column_family_id ==
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
column_family_name.empty());
Expand All @@ -61,7 +62,7 @@ TableBuilder* NewTableBuilder(
sample_for_compression, compression_opts,
skip_filters, column_family_name, level,
creation_time, oldest_key_time, target_file_size,
file_creation_time),
file_creation_time, db_id, db_session_id),
column_family_id, file);
}

Expand All @@ -85,7 +86,8 @@ Status BuildTable(
EventLogger* event_logger, int job_id, const Env::IOPriority io_priority,
TableProperties* table_properties, int level, const uint64_t creation_time,
const uint64_t oldest_key_time, Env::WriteLifeTimeHint write_hint,
const uint64_t file_creation_time) {
const uint64_t file_creation_time, const std::string& db_id,
const std::string& db_session_id) {
assert((column_family_id ==
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
column_family_name.empty());
Expand Down Expand Up @@ -142,7 +144,7 @@ Status BuildTable(
column_family_name, file_writer.get(), compression,
sample_for_compression, compression_opts_for_flush, level,
false /* skip_filters */, creation_time, oldest_key_time,
0 /*target_file_size*/, file_creation_time);
0 /*target_file_size*/, file_creation_time, db_id, db_session_id);
}

MergeHelper merge(env, internal_comparator.user_comparator(),
Expand Down
6 changes: 4 additions & 2 deletions db/builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ TableBuilder* NewTableBuilder(
const CompressionOptions& compression_opts, int level,
const bool skip_filters = false, const uint64_t creation_time = 0,
const uint64_t oldest_key_time = 0, const uint64_t target_file_size = 0,
const uint64_t file_creation_time = 0);
const uint64_t file_creation_time = 0, const std::string& db_id = "",
const std::string& db_session_id = "");

// Build a Table file from the contents of *iter. The generated file
// will be named according to number specified in meta. On success, the rest of
Expand Down Expand Up @@ -83,6 +84,7 @@ extern Status BuildTable(
TableProperties* table_properties = nullptr, int level = -1,
const uint64_t creation_time = 0, const uint64_t oldest_key_time = 0,
Env::WriteLifeTimeHint write_hint = Env::WLTH_NOT_SET,
const uint64_t file_creation_time = 0);
const uint64_t file_creation_time = 0, const std::string& db_id = "",
const std::string& db_session_id = "");

} // namespace ROCKSDB_NAMESPACE
8 changes: 6 additions & 2 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -305,12 +305,15 @@ CompactionJob::CompactionJob(
const SnapshotChecker* snapshot_checker, std::shared_ptr<Cache> table_cache,
EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats,
const std::string& dbname, CompactionJobStats* compaction_job_stats,
Env::Priority thread_pri, const std::atomic<bool>* manual_compaction_paused)
Env::Priority thread_pri, const std::atomic<bool>* manual_compaction_paused,
const std::string& db_id, const std::string& db_session_id)
: job_id_(job_id),
compact_(new CompactionState(compaction)),
compaction_job_stats_(compaction_job_stats),
compaction_stats_(compaction->compaction_reason(), 1),
dbname_(dbname),
db_id_(db_id),
db_session_id_(db_session_id),
db_options_(db_options),
file_options_(file_options),
env_(db_options.env),
Expand Down Expand Up @@ -1554,7 +1557,8 @@ Status CompactionJob::OpenCompactionOutputFile(
sub_compact->compaction->output_compression_opts(),
sub_compact->compaction->output_level(), skip_filters,
oldest_ancester_time, 0 /* oldest_key_time */,
sub_compact->compaction->max_output_file_size(), current_time));
sub_compact->compaction->max_output_file_size(), current_time, db_id_,
db_session_id_));
LogFlush(db_options_.info_log);
return s;
}
Expand Down
6 changes: 5 additions & 1 deletion db/compaction/compaction_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ class CompactionJob {
const std::string& dbname,
CompactionJobStats* compaction_job_stats,
Env::Priority thread_pri,
const std::atomic<bool>* manual_compaction_paused = nullptr);
const std::atomic<bool>* manual_compaction_paused = nullptr,
const std::string& db_id = "",
const std::string& db_session_id = "");

~CompactionJob();

Expand Down Expand Up @@ -152,6 +154,8 @@ class CompactionJob {

// DBImpl state
const std::string& dbname_;
const std::string db_id_;
const std::string db_session_id_;
const ImmutableDBOptions& db_options_;
const FileOptions file_options_;

Expand Down
11 changes: 7 additions & 4 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ Status DBImpl::FlushMemTableToOutputFile(
GetDataDir(cfd, 0U),
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
&event_logger_, mutable_cf_options.report_bg_io_stats,
true /* sync_output_directory */, true /* write_manifest */, thread_pri);
true /* sync_output_directory */, true /* write_manifest */, thread_pri,
db_id_, db_session_id_);
FileMetaData file_meta;

TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:BeforePickMemtables");
Expand Down Expand Up @@ -345,7 +346,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
data_dir, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
stats_, &event_logger_, mutable_cf_options.report_bg_io_stats,
false /* sync_output_directory */, false /* write_manifest */,
thread_pri));
thread_pri, db_id_, db_session_id_));
jobs.back()->PickMemTable();
}

Expand Down Expand Up @@ -1031,7 +1032,8 @@ Status DBImpl::CompactFilesImpl(
snapshot_checker, table_cache_, &event_logger_,
c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_,
&compaction_job_stats, Env::Priority::USER, &manual_compaction_paused_);
&compaction_job_stats, Env::Priority::USER, &manual_compaction_paused_,
db_id_, db_session_id_);

// Creating a compaction influences the compaction score because the score
// takes running compactions into account (by skipping files that are already
Expand Down Expand Up @@ -2822,7 +2824,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
&event_logger_, c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_,
&compaction_job_stats, thread_pri,
is_manual ? &manual_compaction_paused_ : nullptr);
is_manual ? &manual_compaction_paused_ : nullptr, db_id_,
db_session_id_);
compaction_job.Prepare();

NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
Expand Down
3 changes: 2 additions & 1 deletion db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1289,7 +1289,8 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
mutable_cf_options.compression_opts, paranoid_file_checks,
cfd->internal_stats(), TableFileCreationReason::kRecovery, &io_s,
&event_logger_, job_id, Env::IO_HIGH, nullptr /* table_properties */,
-1 /* level */, current_time, write_hint);
-1 /* level */, current_time, 0 /* oldest_key_time */, write_hint,
0 /* file_creation_time */, db_id_, db_session_id_);
LogFlush(immutable_db_options_.info_log);
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
"[%s] [WriteLevel0TableForRecovery]"
Expand Down
20 changes: 20 additions & 0 deletions db/db_table_properties_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,26 @@ TEST_F(DBTablePropertiesTest, GetColumnFamilyNameProperty) {
}
}

TEST_F(DBTablePropertiesTest, GetDbIdentifiersProperty) {
CreateAndReopenWithCF({"goku"}, CurrentOptions());

for (uint32_t cf = 0; cf < 2; ++cf) {
Put(cf, "key", "val");
Put(cf, "foo", "bar");
Flush(cf);

TablePropertiesCollection fname_to_props;
ASSERT_OK(db_->GetPropertiesOfAllTables(handles_[cf], &fname_to_props));
ASSERT_EQ(1U, fname_to_props.size());

std::string id, sid;
db_->GetDbIdentity(id);
db_->GetDbSessionId(sid);
ASSERT_EQ(id, fname_to_props.begin()->second->db_id);
ASSERT_EQ(sid, fname_to_props.begin()->second->db_session_id);
}
}

class DeletionTriggeredCompactionTestListener : public EventListener {
public:
void OnCompactionBegin(DB* , const CompactionJobInfo& ci) override {
Expand Down
4 changes: 3 additions & 1 deletion db/event_helpers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ void EventHelpers::LogAndNotifyTableFileCreationFinished(
<< table_properties.compression_options << "creation_time"
<< table_properties.creation_time << "oldest_key_time"
<< table_properties.oldest_key_time << "file_creation_time"
<< table_properties.file_creation_time;
<< table_properties.file_creation_time << "db_id"
<< table_properties.db_id << "db_session_id"
<< table_properties.db_session_id;

// user collected properties
for (const auto& prop : table_properties.readable_properties) {
Expand Down
8 changes: 6 additions & 2 deletions db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,11 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
CompressionType output_compression, Statistics* stats,
EventLogger* event_logger, bool measure_io_stats,
const bool sync_output_directory, const bool write_manifest,
Env::Priority thread_pri)
Env::Priority thread_pri, const std::string& db_id,
const std::string& db_session_id)
: dbname_(dbname),
db_id_(db_id),
db_session_id_(db_session_id),
cfd_(cfd),
db_options_(db_options),
mutable_cf_options_(mutable_cf_options),
Expand Down Expand Up @@ -393,7 +396,8 @@ Status FlushJob::WriteLevel0Table() {
mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
TableFileCreationReason::kFlush, &io_s, event_logger_,
job_context_->job_id, Env::IO_HIGH, &table_properties_, 0 /* level */,
creation_time, oldest_key_time, write_hint, current_time);
creation_time, oldest_key_time, write_hint, current_time, db_id_,
db_session_id_);
if (!io_s.ok()) {
io_status_ = io_s;
}
Expand Down
5 changes: 4 additions & 1 deletion db/flush_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ class FlushJob {
CompressionType output_compression, Statistics* stats,
EventLogger* event_logger, bool measure_io_stats,
const bool sync_output_directory, const bool write_manifest,
Env::Priority thread_pri);
Env::Priority thread_pri, const std::string& db_id = "",
const std::string& db_session_id = "");

~FlushJob();

Expand Down Expand Up @@ -103,6 +104,8 @@ class FlushJob {
#endif // !ROCKSDB_LITE

const std::string& dbname_;
const std::string db_id_;
const std::string db_session_id_;
ColumnFamilyData* cfd_;
const ImmutableDBOptions& db_options_;
const MutableCFOptions& mutable_cf_options_;
Expand Down
1 change: 0 additions & 1 deletion db/flush_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ TEST_F(FlushJobTest, FlushMemTablesSingleColumnFamily) {
assert(memtable_ids.size() == num_mems);
uint64_t smallest_memtable_id = memtable_ids.front();
uint64_t flush_memtable_id = smallest_memtable_id + num_mems_to_flush - 1;

FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
db_options_, *cfd->GetLatestMutableCFOptions(),
&flush_memtable_id, env_options_, versions_.get(), &mutex_,
Expand Down
7 changes: 6 additions & 1 deletion db/repair.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ class Repairer {
}
// Just create a DBImpl temporarily so we can reuse NewDB()
DBImpl* db_impl = new DBImpl(db_options_, dbname_);
// Also use this temp DBImpl to get a session id
db_impl->GetDbSessionId(db_session_id_);
status = db_impl->NewDB();
delete db_impl;
}
Expand Down Expand Up @@ -229,6 +231,7 @@ class Repairer {
};

std::string const dbname_;
std::string db_session_id_;
Env* const env_;
const EnvOptions env_options_;
const DBOptions db_options_;
Expand Down Expand Up @@ -435,7 +438,9 @@ class Repairer {
CompressionOptions(), false, nullptr /* internal_stats */,
TableFileCreationReason::kRecovery, &io_s, nullptr /* event_logger */,
0 /* job_id */, Env::IO_HIGH, nullptr /* table_properties */,
-1 /* level */, current_time, write_hint);
-1 /* level */, current_time, 0 /* oldest_key_time */, write_hint,
0 /* file_creation_time */, "DB Repairer" /* db_id */,
db_session_id_);
ROCKS_LOG_INFO(db_options_.info_log,
"Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s",
log, counter, meta.fd.GetNumber(),
Expand Down
13 changes: 13 additions & 0 deletions include/rocksdb/table_properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ typedef std::map<std::string, std::string> UserCollectedProperties;

// table properties' human-readable names in the property block.
struct TablePropertiesNames {
static const std::string kDbId;
static const std::string kDbSessionId;
static const std::string kDataSize;
static const std::string kIndexSize;
static const std::string kIndexPartitions;
Expand Down Expand Up @@ -193,6 +195,17 @@ struct TableProperties {
// Actual SST file creation time. 0 means unknown.
uint64_t file_creation_time = 0;

// DB identity
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add one or two sentences to explain the identity and session id.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!

// db_id is an identifier generated the first time the DB is created
// If DB identity is unset or unassigned, `db_id` will be an empty string.
std::string db_id;

// DB session identity
// db_session_id is an identifier that gets reset every time the DB is opened
// If DB session identity is unset or unassigned, `db_session_id` will be an
// empty string.
std::string db_session_id;

// Name of the column family with which this SST file is associated.
// If column family is unknown, `column_family_name` will be an empty string.
std::string column_family_name;
Expand Down
28 changes: 19 additions & 9 deletions table/block_based/block_based_table_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,10 @@ struct BlockBasedTableBuilder::Rep {
const uint64_t target_file_size;
uint64_t file_creation_time = 0;

// DB IDs
const std::string db_id;
const std::string db_session_id;

std::vector<std::unique_ptr<IntTblPropCollector>> table_properties_collectors;

std::unique_ptr<ParallelCompressionRep> pc_rep;
Expand Down Expand Up @@ -447,7 +451,8 @@ struct BlockBasedTableBuilder::Rep {
const CompressionOptions& _compression_opts, const bool skip_filters,
const int _level_at_creation, const std::string& _column_family_name,
const uint64_t _creation_time, const uint64_t _oldest_key_time,
const uint64_t _target_file_size, const uint64_t _file_creation_time)
const uint64_t _target_file_size, const uint64_t _file_creation_time,
const std::string& _db_id, const std::string& _db_session_id)
: ioptions(_ioptions),
moptions(_moptions),
table_options(table_opt),
Expand Down Expand Up @@ -488,7 +493,9 @@ struct BlockBasedTableBuilder::Rep {
creation_time(_creation_time),
oldest_key_time(_oldest_key_time),
target_file_size(_target_file_size),
file_creation_time(_file_creation_time) {
file_creation_time(_file_creation_time),
db_id(_db_id),
db_session_id(_db_session_id) {
for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) {
compression_ctxs[i].reset(new CompressionContext(compression_type));
}
Expand Down Expand Up @@ -698,7 +705,8 @@ BlockBasedTableBuilder::BlockBasedTableBuilder(
const CompressionOptions& compression_opts, const bool skip_filters,
const std::string& column_family_name, const int level_at_creation,
const uint64_t creation_time, const uint64_t oldest_key_time,
const uint64_t target_file_size, const uint64_t file_creation_time) {
const uint64_t target_file_size, const uint64_t file_creation_time,
const std::string& db_id, const std::string& db_session_id) {
BlockBasedTableOptions sanitized_table_options(table_options);
if (sanitized_table_options.format_version == 0 &&
sanitized_table_options.checksum != kCRC32c) {
Expand All @@ -711,12 +719,12 @@ BlockBasedTableBuilder::BlockBasedTableBuilder(
sanitized_table_options.format_version = 1;
}

rep_ = new Rep(ioptions, moptions, sanitized_table_options,
internal_comparator, int_tbl_prop_collector_factories,
column_family_id, file, compression_type,
sample_for_compression, compression_opts, skip_filters,
level_at_creation, column_family_name, creation_time,
oldest_key_time, target_file_size, file_creation_time);
rep_ = new Rep(
ioptions, moptions, sanitized_table_options, internal_comparator,
int_tbl_prop_collector_factories, column_family_id, file,
compression_type, sample_for_compression, compression_opts, skip_filters,
level_at_creation, column_family_name, creation_time, oldest_key_time,
target_file_size, file_creation_time, db_id, db_session_id);

if (rep_->filter_builder != nullptr) {
rep_->filter_builder->StartBlock(0);
Expand Down Expand Up @@ -1445,6 +1453,8 @@ void BlockBasedTableBuilder::WritePropertiesBlock(
rep_->props.creation_time = rep_->creation_time;
rep_->props.oldest_key_time = rep_->oldest_key_time;
rep_->props.file_creation_time = rep_->file_creation_time;
rep_->props.db_id = rep_->db_id;
rep_->props.db_session_id = rep_->db_session_id;

// Add basic properties
property_block_builder.AddTableProperty(rep_->props);
Expand Down
Loading