Skip to content

Commit

Permalink
Introduce a new storage specific Env API (facebook#5761)
Browse files Browse the repository at this point in the history
Summary:
The current Env API encompasses both storage/file operations, as well as OS related operations. Most of the APIs return a Status, which does not have enough metadata about an error, such as whether its retry-able or not, scope (i.e fault domain) of the error etc., that may be required in order to properly handle a storage error. The file APIs also do not provide enough control over the IO SLA, such as timeout, prioritization, hinting about placement and redundancy etc.

This PR separates out the file/storage APIs from Env into a new FileSystem class. The APIs are updated to return an IOStatus with metadata about the error, as well as to take an IOOptions structure as input in order to allow more control over the IO.

The user can set both ```options.env``` and ```options.file_system``` to specify that RocksDB should use the former for OS related operations and the latter for storage operations. Internally, a ```CompositeEnvWrapper``` has been introduced that inherits from ```Env``` and redirects individual methods to either an ```Env``` implementation or the ```FileSystem``` as appropriate. When options are sanitized during ```DB::Open```, ```options.env``` is replaced with a newly allocated ```CompositeEnvWrapper``` instance if both env and file_system have been specified. This way, the rest of the RocksDB code can continue to function as before.

This PR also ports PosixEnv to the new API by splitting it into two - PosixEnv and PosixFileSystem. PosixEnv is defined as a sub-class of CompositeEnvWrapper, and threading/time functions are overridden with Posix specific implementations in order to avoid an extra level of indirection.

The ```CompositeEnvWrapper``` translates ```IOStatus``` return code to ```Status```, and sets the severity to ```kSoftError``` if the io_status is retryable. The error handling code in RocksDB can then recover the DB automatically.
Pull Request resolved: facebook#5761

Differential Revision: D18868376

Pulled By: anand1976

fbshipit-source-id: 39efe18a162ea746fabac6360ff529baba48486f
  • Loading branch information
anand76 authored and facebook-github-bot committed Dec 13, 2019
1 parent 58d46d1 commit afa2420
Show file tree
Hide file tree
Showing 114 changed files with 4,880 additions and 1,484 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,7 @@ set(SOURCES
env/env_chroot.cc
env/env_encryption.cc
env/env_hdfs.cc
env/file_system.cc
env/mock_env.cc
file/delete_scheduler.cc
file/file_prefetch_buffer.cc
Expand Down Expand Up @@ -766,6 +767,7 @@ else()
list(APPEND SOURCES
port/port_posix.cc
env/env_posix.cc
env/fs_posix.cc
env/io_posix.cc)
endif()

Expand Down
4 changes: 4 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Rocksdb Change Log
## Unreleased
### Public API Change
* Added a rocksdb::FileSystem class in include/rocksdb/file_system.h to encapsulate file creation/read/write operations, and an option DBOptions::file_system to allow a user to pass in an instance of rocksdb::FileSystem. If its a non-null value, this will take precendence over DBOptions::env for file operations. A new API rocksdb::FileSystem::Default() returns a platform default object. The DBOptions::env option and Env::Default() API will continue to be used for threading and other OS related functions, and where DBOptions::file_system is not specified, for file operations. For storage developers who are accustomed to rocksdb::Env, the interface in rocksdb::FileSystem is new and will probably undergo some changes as more storage systems are ported to it from rocksdb::Env. As of now, no env other than Posix has been ported to the new interface.
* A new rocksdb::NewSstFileManager() API that allows the caller to pass in separate Env and FileSystem objects.

### Bug Fixes
* Fix a bug that can cause unnecessary bg thread to be scheduled(#6104).
* Fix a bug in which a snapshot read could be affected by a DeleteRange after the snapshot (#6062).
Expand Down
2 changes: 2 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ cpp_library(
"env/env_encryption.cc",
"env/env_hdfs.cc",
"env/env_posix.cc",
"env/file_system.cc",
"env/fs_posix.cc",
"env/io_posix.cc",
"env/mock_env.cc",
"file/delete_scheduler.cc",
Expand Down
17 changes: 9 additions & 8 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ TableBuilder* NewTableBuilder(
}

Status BuildTable(
const std::string& dbname, Env* env, const ImmutableCFOptions& ioptions,
const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options,
const std::string& dbname, Env* env, FileSystem* fs,
const ImmutableCFOptions& ioptions,
const MutableCFOptions& mutable_cf_options, const FileOptions& file_options,
TableCache* table_cache, InternalIterator* iter,
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
range_del_iters,
Expand Down Expand Up @@ -115,12 +116,12 @@ Status BuildTable(
compression_opts_for_flush.max_dict_bytes = 0;
compression_opts_for_flush.zstd_max_train_bytes = 0;
{
std::unique_ptr<WritableFile> file;
std::unique_ptr<FSWritableFile> file;
#ifndef NDEBUG
bool use_direct_writes = env_options.use_direct_writes;
bool use_direct_writes = file_options.use_direct_writes;
TEST_SYNC_POINT_CALLBACK("BuildTable:create_file", &use_direct_writes);
#endif // !NDEBUG
s = NewWritableFile(env, fname, &file, env_options);
s = NewWritableFile(fs, fname, &file, file_options);
if (!s.ok()) {
EventHelpers::LogAndNotifyTableFileCreationFinished(
event_logger, ioptions.listeners, dbname, column_family_name, fname,
Expand All @@ -131,7 +132,7 @@ Status BuildTable(
file->SetWriteLifeTimeHint(write_hint);

file_writer.reset(
new WritableFileWriter(std::move(file), fname, env_options, env,
new WritableFileWriter(std::move(file), fname, file_options, env,
ioptions.statistics, ioptions.listeners));
builder = NewTableBuilder(
ioptions, mutable_cf_options, internal_comparator,
Expand Down Expand Up @@ -218,7 +219,7 @@ Status BuildTable(
// we will regrad this verification as user reads since the goal is
// to cache it here for further user reads
std::unique_ptr<InternalIterator> it(table_cache->NewIterator(
ReadOptions(), env_options, internal_comparator, *meta,
ReadOptions(), file_options, internal_comparator, *meta,
nullptr /* range_del_agg */,
mutable_cf_options.prefix_extractor.get(), nullptr,
(internal_stats == nullptr) ? nullptr
Expand All @@ -241,7 +242,7 @@ Status BuildTable(
}

if (!s.ok() || meta->fd.GetFileSize() == 0) {
env->DeleteFile(fname);
fs->DeleteFile(fname, IOOptions(), nullptr);
}

if (meta->fd.GetFileSize() == 0) {
Expand Down
5 changes: 3 additions & 2 deletions db/builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ TableBuilder* NewTableBuilder(
// @param column_family_name Name of the column family that is also identified
// by column_family_id, or empty string if unknown.
extern Status BuildTable(
const std::string& dbname, Env* env, const ImmutableCFOptions& options,
const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options,
const std::string& dbname, Env* env, FileSystem* fs,
const ImmutableCFOptions& options,
const MutableCFOptions& mutable_cf_options, const FileOptions& file_options,
TableCache* table_cache, InternalIterator* iter,
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
range_del_iters,
Expand Down
16 changes: 8 additions & 8 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ ColumnFamilyData::ColumnFamilyData(
uint32_t id, const std::string& name, Version* _dummy_versions,
Cache* _table_cache, WriteBufferManager* write_buffer_manager,
const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options,
const EnvOptions& env_options, ColumnFamilySet* column_family_set,
const FileOptions& file_options, ColumnFamilySet* column_family_set,
BlockCacheTracer* const block_cache_tracer)
: id_(id),
name_(name),
Expand Down Expand Up @@ -517,7 +517,7 @@ ColumnFamilyData::ColumnFamilyData(
if (_dummy_versions != nullptr) {
internal_stats_.reset(
new InternalStats(ioptions_.num_levels, db_options.env, this));
table_cache_.reset(new TableCache(ioptions_, env_options, _table_cache,
table_cache_.reset(new TableCache(ioptions_, file_options, _table_cache,
block_cache_tracer));
if (ioptions_.compaction_style == kCompactionStyleLevel) {
compaction_picker_.reset(
Expand Down Expand Up @@ -967,8 +967,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
return write_stall_condition;
}

const EnvOptions* ColumnFamilyData::soptions() const {
return &(column_family_set_->env_options_);
const FileOptions* ColumnFamilyData::soptions() const {
return &(column_family_set_->file_options_);
}

void ColumnFamilyData::SetCurrent(Version* current_version) {
Expand Down Expand Up @@ -1335,19 +1335,19 @@ Directory* ColumnFamilyData::GetDataDir(size_t path_id) const {

ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
const ImmutableDBOptions* db_options,
const EnvOptions& env_options,
const FileOptions& file_options,
Cache* table_cache,
WriteBufferManager* write_buffer_manager,
WriteController* write_controller,
BlockCacheTracer* const block_cache_tracer)
: max_column_family_(0),
dummy_cfd_(new ColumnFamilyData(
0, "", nullptr, nullptr, nullptr, ColumnFamilyOptions(), *db_options,
env_options, nullptr, block_cache_tracer)),
file_options, nullptr, block_cache_tracer)),
default_cfd_cache_(nullptr),
db_name_(dbname),
db_options_(db_options),
env_options_(env_options),
file_options_(file_options),
table_cache_(table_cache),
write_buffer_manager_(write_buffer_manager),
write_controller_(write_controller),
Expand Down Expand Up @@ -1417,7 +1417,7 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
assert(column_families_.find(name) == column_families_.end());
ColumnFamilyData* new_cfd = new ColumnFamilyData(
id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
*db_options_, env_options_, this, block_cache_tracer_);
*db_options_, file_options_, this, block_cache_tracer_);
column_families_.insert({name, id});
column_family_data_.insert({id, new_cfd});
max_column_family_ = std::max(max_column_family_, id);
Expand Down
8 changes: 4 additions & 4 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ class ColumnFamilyData {
}
FlushReason GetFlushReason() const { return flush_reason_; }
// thread-safe
const EnvOptions* soptions() const;
const FileOptions* soptions() const;
const ImmutableCFOptions* ioptions() const { return &ioptions_; }
// REQUIRES: DB mutex held
// This returns the MutableCFOptions used by current SuperVersion
Expand Down Expand Up @@ -510,7 +510,7 @@ class ColumnFamilyData {
WriteBufferManager* write_buffer_manager,
const ColumnFamilyOptions& options,
const ImmutableDBOptions& db_options,
const EnvOptions& env_options,
const FileOptions& file_options,
ColumnFamilySet* column_family_set,
BlockCacheTracer* const block_cache_tracer);

Expand Down Expand Up @@ -638,7 +638,7 @@ class ColumnFamilySet {

ColumnFamilySet(const std::string& dbname,
const ImmutableDBOptions* db_options,
const EnvOptions& env_options, Cache* table_cache,
const FileOptions& file_options, Cache* table_cache,
WriteBufferManager* write_buffer_manager,
WriteController* write_controller,
BlockCacheTracer* const block_cache_tracer);
Expand Down Expand Up @@ -696,7 +696,7 @@ class ColumnFamilySet {

const std::string db_name_;
const ImmutableDBOptions* const db_options_;
const EnvOptions env_options_;
const FileOptions file_options_;
Cache* table_cache_;
WriteBufferManager* write_buffer_manager_;
WriteController* write_controller_;
Expand Down
23 changes: 12 additions & 11 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ void CompactionJob::AggregateStatistics() {

CompactionJob::CompactionJob(
int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
const EnvOptions env_options, VersionSet* versions,
const FileOptions& file_options, VersionSet* versions,
const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer,
Directory* db_directory, Directory* output_directory, Statistics* stats,
Expand All @@ -318,10 +318,11 @@ CompactionJob::CompactionJob(
compaction_stats_(compaction->compaction_reason(), 1),
dbname_(dbname),
db_options_(db_options),
env_options_(env_options),
file_options_(file_options),
env_(db_options.env),
env_options_for_read_(
env_->OptimizeForCompactionTableRead(env_options, db_options_)),
fs_(db_options.fs.get()),
file_options_for_read_(
fs_->OptimizeForCompactionTableRead(file_options, db_options_)),
versions_(versions),
shutting_down_(shutting_down),
manual_compaction_paused_(manual_compaction_paused),
Expand Down Expand Up @@ -647,7 +648,7 @@ Status CompactionJob::Run() {
// we will regard this verification as user reads since the goal is
// to cache it here for further user reads
InternalIterator* iter = cfd->table_cache()->NewIterator(
ReadOptions(), env_options_, cfd->internal_comparator(),
ReadOptions(), file_options_, cfd->internal_comparator(),
*files_meta[file_idx], /*range_del_agg=*/nullptr, prefix_extractor,
/*table_reader_ptr=*/nullptr,
cfd->internal_stats()->GetFileReadHist(
Expand Down Expand Up @@ -836,7 +837,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
// Although the v2 aggregator is what the level iterator(s) know about,
// the AddTombstones calls will be propagated down to the v1 aggregator.
std::unique_ptr<InternalIterator> input(versions_->MakeInputIterator(
sub_compact->compaction, &range_del_agg, env_options_for_read_));
sub_compact->compaction, &range_del_agg, file_options_for_read_));

AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_PROCESS_KV);
Expand Down Expand Up @@ -1457,13 +1458,13 @@ Status CompactionJob::OpenCompactionOutputFile(
TableFileCreationReason::kCompaction);
#endif // !ROCKSDB_LITE
// Make the output file
std::unique_ptr<WritableFile> writable_file;
std::unique_ptr<FSWritableFile> writable_file;
#ifndef NDEBUG
bool syncpoint_arg = env_options_.use_direct_writes;
bool syncpoint_arg = file_options_.use_direct_writes;
TEST_SYNC_POINT_CALLBACK("CompactionJob::OpenCompactionOutputFile",
&syncpoint_arg);
#endif
Status s = NewWritableFile(env_, fname, &writable_file, env_options_);
Status s = NewWritableFile(fs_, fname, &writable_file, file_options_);
if (!s.ok()) {
ROCKS_LOG_ERROR(
db_options_.info_log,
Expand Down Expand Up @@ -1506,14 +1507,14 @@ Status CompactionJob::OpenCompactionOutputFile(
sub_compact->outputs.push_back(out);
}

writable_file->SetIOPriority(Env::IO_LOW);
writable_file->SetIOPriority(Env::IOPriority::IO_LOW);
writable_file->SetWriteLifeTimeHint(write_hint_);
writable_file->SetPreallocationBlockSize(static_cast<size_t>(
sub_compact->compaction->OutputFilePreallocationSize()));
const auto& listeners =
sub_compact->compaction->immutable_cf_options()->listeners;
sub_compact->outfile.reset(
new WritableFileWriter(std::move(writable_file), fname, env_options_,
new WritableFileWriter(std::move(writable_file), fname, file_options_,
env_, db_options_.statistics.get(), listeners));

// If the Column family flag is to only optimize filters for hits,
Expand Down
7 changes: 4 additions & 3 deletions db/compaction/compaction_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class CompactionJob {
public:
CompactionJob(int job_id, Compaction* compaction,
const ImmutableDBOptions& db_options,
const EnvOptions env_options, VersionSet* versions,
const FileOptions& file_options, VersionSet* versions,
const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum,
LogBuffer* log_buffer, Directory* db_directory,
Expand Down Expand Up @@ -150,11 +150,12 @@ class CompactionJob {
// DBImpl state
const std::string& dbname_;
const ImmutableDBOptions& db_options_;
const EnvOptions env_options_;
const FileOptions file_options_;

Env* env_;
FileSystem* fs_;
// env_option optimized for compaction table reads
EnvOptions env_options_for_read_;
FileOptions file_options_for_read_;
VersionSet* versions_;
const std::atomic<bool>* shutting_down_;
const std::atomic<bool>* manual_compaction_paused_;
Expand Down
8 changes: 6 additions & 2 deletions db/compaction/compaction_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class CompactionJobTest : public testing::Test {
public:
CompactionJobTest()
: env_(Env::Default()),
fs_(std::make_shared<LegacyFileSystemWrapper>(env_)),
dbname_(test::PerThreadDBPath("compaction_job_test")),
db_options_(),
mutable_cf_options_(cf_options_),
Expand All @@ -86,6 +87,8 @@ class CompactionJobTest : public testing::Test {
mock_table_factory_(new mock::MockTableFactory()),
error_handler_(nullptr, db_options_, &mutex_) {
EXPECT_OK(env_->CreateDirIfMissing(dbname_));
db_options_.env = env_;
db_options_.fs = fs_;
db_options_.db_paths.emplace_back(dbname_,
std::numeric_limits<uint64_t>::max());
}
Expand Down Expand Up @@ -267,8 +270,8 @@ class CompactionJobTest : public testing::Test {
Status s = env_->NewWritableFile(
manifest, &file, env_->OptimizeForManifestWrite(env_options_));
ASSERT_OK(s);
std::unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), manifest, env_options_));
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
NewLegacyWritableFileWrapper(std::move(file)), manifest, env_options_));
{
log::Writer log(std::move(file_writer), 0, false);
std::string record;
Expand Down Expand Up @@ -360,6 +363,7 @@ class CompactionJobTest : public testing::Test {
}

Env* env_;
std::shared_ptr<FileSystem> fs_;
std::string dbname_;
EnvOptions env_options_;
ImmutableDBOptions db_options_;
Expand Down
8 changes: 5 additions & 3 deletions db/convenience.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,16 @@ Status VerifySstFileChecksum(const Options& options,
const EnvOptions& env_options,
const ReadOptions& read_options,
const std::string& file_path) {
std::unique_ptr<RandomAccessFile> file;
std::unique_ptr<FSRandomAccessFile> file;
uint64_t file_size;
InternalKeyComparator internal_comparator(options.comparator);
ImmutableCFOptions ioptions(options);

Status s = ioptions.env->NewRandomAccessFile(file_path, &file, env_options);
Status s = ioptions.fs->NewRandomAccessFile(file_path,
FileOptions(env_options),
&file, nullptr);
if (s.ok()) {
s = ioptions.env->GetFileSize(file_path, &file_size);
s = ioptions.fs->GetFileSize(file_path, IOOptions(), &file_size, nullptr);
} else {
return s;
}
Expand Down
5 changes: 4 additions & 1 deletion db/corruption_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "db/db_test_util.h"
#include "db/log_format.h"
#include "db/version_set.h"
#include "env/composite_env_wrapper.h"
#include "file/filename.h"
#include "rocksdb/cache.h"
#include "rocksdb/convenience.h"
Expand Down Expand Up @@ -189,6 +190,7 @@ class CorruptionTest : public testing::Test {
ASSERT_TRUE(s.ok()) << s.ToString();
Options options;
EnvOptions env_options;
options.file_system.reset(new LegacyFileSystemWrapper(options.env));
ASSERT_NOK(VerifySstFileChecksum(options, env_options, fname));
}

Expand Down Expand Up @@ -539,7 +541,8 @@ TEST_F(CorruptionTest, RangeDeletionCorrupted) {
std::unique_ptr<RandomAccessFile> file;
ASSERT_OK(options_.env->NewRandomAccessFile(filename, &file, EnvOptions()));
std::unique_ptr<RandomAccessFileReader> file_reader(
new RandomAccessFileReader(std::move(file), filename));
new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(file),
filename));

uint64_t file_size;
ASSERT_OK(options_.env->GetFileSize(filename, &file_size));
Expand Down
Loading

0 comments on commit afa2420

Please sign in to comment.