Skip to content

Commit

Permalink
Do not explicitly flush blob files when using the integrated BlobDB (#…
Browse files Browse the repository at this point in the history
…7892)

Summary:
In the original stacked BlobDB implementation, which writes blobs to blob files
immediately and treats blob files as logs, it makes sense to flush the file after
writing each blob to protect against process crashes; however, in the integrated
implementation, which builds blob files in the background jobs, this unnecessarily
reduces performance. This patch fixes this by simply adding a `do_flush` flag to
`BlobLogWriter`, which is set to `true` by the stacked implementation and to `false`
by the new code. Note: the change itself is trivial but the tests needed some work;
since in the new implementation, blobs are now buffered, adding a blob to
`BlobFileBuilder` is no longer guaranteed to result in an actual I/O. Therefore, we can
no longer rely on `FaultInjectionTestEnv` when testing failure cases; instead, we
manipulate the return values of I/O methods directly using `SyncPoint`s.

Pull Request resolved: #7892

Test Plan: `make check`

Reviewed By: jay-zhuang

Differential Revision: D26022814

Pulled By: ltamasi

fbshipit-source-id: b3dce419f312137fa70d84cdd9b908fd5d60d8cd
  • Loading branch information
ltamasi authored and facebook-github-bot committed Jan 25, 2021
1 parent 19076c9 commit 431e8af
Show file tree
Hide file tree
Showing 10 changed files with 78 additions and 76 deletions.
34 changes: 20 additions & 14 deletions db/blob/blob_file_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,12 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() {
std::unique_ptr<FSWritableFile> file;

{
TEST_SYNC_POINT("BlobFileBuilder::OpenBlobFileIfNeeded:NewWritableFile");

assert(file_options_);
const Status s =
NewWritableFile(fs_, blob_file_path, &file, *file_options_);
Status s = NewWritableFile(fs_, blob_file_path, &file, *file_options_);

TEST_SYNC_POINT_CALLBACK(
"BlobFileBuilder::OpenBlobFileIfNeeded:NewWritableFile", &s);

if (!s.ok()) {
return s;
}
Expand All @@ -184,9 +185,11 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() {
nullptr /*IOTracer*/, statistics, immutable_cf_options_->listeners,
immutable_cf_options_->file_checksum_gen_factory));

std::unique_ptr<BlobLogWriter> blob_log_writer(
new BlobLogWriter(std::move(file_writer), env_, statistics,
blob_file_number, immutable_cf_options_->use_fsync));
constexpr bool do_flush = false;

std::unique_ptr<BlobLogWriter> blob_log_writer(new BlobLogWriter(
std::move(file_writer), env_, statistics, blob_file_number,
immutable_cf_options_->use_fsync, do_flush));

constexpr bool has_ttl = false;
constexpr ExpirationRange expiration_range;
Expand All @@ -195,9 +198,11 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() {
expiration_range);

{
TEST_SYNC_POINT("BlobFileBuilder::OpenBlobFileIfNeeded:WriteHeader");
Status s = blob_log_writer->WriteHeader(header);

TEST_SYNC_POINT_CALLBACK(
"BlobFileBuilder::OpenBlobFileIfNeeded:WriteHeader", &s);

const Status s = blob_log_writer->WriteHeader(header);
if (!s.ok()) {
return s;
}
Expand Down Expand Up @@ -247,9 +252,10 @@ Status BlobFileBuilder::WriteBlobToFile(const Slice& key, const Slice& blob,

uint64_t key_offset = 0;

TEST_SYNC_POINT("BlobFileBuilder::WriteBlobToFile:AddRecord");
Status s = writer_->AddRecord(key, blob, &key_offset, blob_offset);

TEST_SYNC_POINT_CALLBACK("BlobFileBuilder::WriteBlobToFile:AddRecord", &s);

const Status s = writer_->AddRecord(key, blob, &key_offset, blob_offset);
if (!s.ok()) {
return s;
}
Expand All @@ -271,10 +277,10 @@ Status BlobFileBuilder::CloseBlobFile() {
std::string checksum_method;
std::string checksum_value;

TEST_SYNC_POINT("BlobFileBuilder::WriteBlobToFile:AppendFooter");
Status s = writer_->AppendFooter(footer, &checksum_method, &checksum_value);

TEST_SYNC_POINT_CALLBACK("BlobFileBuilder::WriteBlobToFile:AppendFooter", &s);

const Status s =
writer_->AppendFooter(footer, &checksum_method, &checksum_value);
if (!s.ok()) {
return s;
}
Expand Down
41 changes: 20 additions & 21 deletions db/blob/blob_file_builder_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ class TestFileNumberGenerator {

class BlobFileBuilderTest : public testing::Test {
protected:
BlobFileBuilderTest() : mock_env_(Env::Default()) {
fs_ = mock_env_.GetFileSystem();
}
BlobFileBuilderTest()
: mock_env_(Env::Default()), fs_(mock_env_.GetFileSystem().get()) {}

void VerifyBlobFile(uint64_t blob_file_number,
const std::string& blob_file_path,
Expand Down Expand Up @@ -109,7 +108,7 @@ class BlobFileBuilderTest : public testing::Test {
}

MockEnv mock_env_;
std::shared_ptr<FileSystem> fs_;
FileSystem* fs_;
FileOptions file_options_;
};

Expand Down Expand Up @@ -139,7 +138,7 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckOneFile) {
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;

BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_.get(),
BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_,
&immutable_cf_options, &mutable_cf_options,
&file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint,
Expand Down Expand Up @@ -222,7 +221,7 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckMultipleFiles) {
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;

BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_.get(),
BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_,
&immutable_cf_options, &mutable_cf_options,
&file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint,
Expand Down Expand Up @@ -307,7 +306,7 @@ TEST_F(BlobFileBuilderTest, InlinedValues) {
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;

BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_.get(),
BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_,
&immutable_cf_options, &mutable_cf_options,
&file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint,
Expand Down Expand Up @@ -359,7 +358,7 @@ TEST_F(BlobFileBuilderTest, Compression) {
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;

BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_.get(),
BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_,
&immutable_cf_options, &mutable_cf_options,
&file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint,
Expand Down Expand Up @@ -441,7 +440,7 @@ TEST_F(BlobFileBuilderTest, CompressionError) {
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;

BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_.get(),
BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_,
&immutable_cf_options, &mutable_cf_options,
&file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint,
Expand Down Expand Up @@ -518,7 +517,7 @@ TEST_F(BlobFileBuilderTest, Checksum) {
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;

BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_.get(),
BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_,
&immutable_cf_options, &mutable_cf_options,
&file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint,
Expand Down Expand Up @@ -571,13 +570,11 @@ class BlobFileBuilderIOErrorTest
protected:
BlobFileBuilderIOErrorTest()
: mock_env_(Env::Default()),
fault_injection_env_(&mock_env_),
fs_(fault_injection_env_.GetFileSystem()),
fs_(mock_env_.GetFileSystem().get()),
sync_point_(GetParam()) {}

MockEnv mock_env_;
FaultInjectionTestEnv fault_injection_env_;
std::shared_ptr<FileSystem> fs_;
FileSystem* fs_;
FileOptions file_options_;
std::string sync_point_;
};
Expand All @@ -598,11 +595,11 @@ TEST_P(BlobFileBuilderIOErrorTest, IOError) {

Options options;
options.cf_paths.emplace_back(
test::PerThreadDBPath(&fault_injection_env_,
"BlobFileBuilderIOErrorTest_IOError"),
test::PerThreadDBPath(&mock_env_, "BlobFileBuilderIOErrorTest_IOError"),
0);
options.enable_blob_files = true;
options.blob_file_size = value_size;
options.env = &mock_env_;

ImmutableCFOptions immutable_cf_options(options);
MutableCFOptions mutable_cf_options(options);
Expand All @@ -616,15 +613,17 @@ TEST_P(BlobFileBuilderIOErrorTest, IOError) {
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;

BlobFileBuilder builder(TestFileNumberGenerator(), &fault_injection_env_,
fs_.get(), &immutable_cf_options, &mutable_cf_options,
BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_,
&immutable_cf_options, &mutable_cf_options,
&file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint,
&blob_file_paths, &blob_file_additions);

SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) {
fault_injection_env_.SetFilesystemActive(false,
Status::IOError(sync_point_));
SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* arg) {
Status* const s = static_cast<Status*>(arg);
assert(s);

(*s) = Status::IOError(sync_point_);
});
SyncPoint::GetInstance()->EnableProcessing();

Expand Down
3 changes: 2 additions & 1 deletion db/blob/blob_file_cache_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ void WriteBlobFile(uint32_t column_family_id,

constexpr Statistics* statistics = nullptr;
constexpr bool use_fsync = false;
constexpr bool do_flush = false;

BlobLogWriter blob_log_writer(std::move(file_writer),
immutable_cf_options.env, statistics,
blob_file_number, use_fsync);
blob_file_number, use_fsync, do_flush);

constexpr bool has_ttl = false;
constexpr ExpirationRange expiration_range;
Expand Down
6 changes: 4 additions & 2 deletions db/blob/blob_file_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,11 @@ void WriteBlobFile(const ImmutableCFOptions& immutable_cf_options,

constexpr Statistics* statistics = nullptr;
constexpr bool use_fsync = false;
constexpr bool do_flush = false;

BlobLogWriter blob_log_writer(std::move(file_writer),
immutable_cf_options.env, statistics,
blob_file_number, use_fsync);
blob_file_number, use_fsync, do_flush);

BlobLogHeader header(column_family_id, compression_type, has_ttl,
expiration_range_header);
Expand Down Expand Up @@ -263,10 +264,11 @@ TEST_F(BlobFileReaderTest, Malformed) {

constexpr Statistics* statistics = nullptr;
constexpr bool use_fsync = false;
constexpr bool do_flush = false;

BlobLogWriter blob_log_writer(std::move(file_writer),
immutable_cf_options.env, statistics,
blob_file_number, use_fsync);
blob_file_number, use_fsync, do_flush);

BlobLogHeader header(column_family_id, kNoCompression, has_ttl,
expiration_range);
Expand Down
10 changes: 7 additions & 3 deletions db/blob/blob_log_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ namespace ROCKSDB_NAMESPACE {

BlobLogWriter::BlobLogWriter(std::unique_ptr<WritableFileWriter>&& dest,
Env* env, Statistics* statistics,
uint64_t log_number, bool use_fs, uint64_t boffset)
uint64_t log_number, bool use_fs, bool do_flush,
uint64_t boffset)
: dest_(std::move(dest)),
env_(env),
statistics_(statistics),
log_number_(log_number),
block_offset_(boffset),
use_fsync_(use_fs),
do_flush_(do_flush),
last_elem_type_(kEtNone) {}

BlobLogWriter::~BlobLogWriter() = default;
Expand All @@ -49,7 +51,9 @@ Status BlobLogWriter::WriteHeader(BlobLogHeader& header) {
Status s = dest_->Append(Slice(str));
if (s.ok()) {
block_offset_ += str.size();
s = dest_->Flush();
if (do_flush_) {
s = dest_->Flush();
}
}
last_elem_type_ = kEtFileHdr;
RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN,
Expand Down Expand Up @@ -152,7 +156,7 @@ Status BlobLogWriter::EmitPhysicalRecord(const std::string& headerbuf,
if (s.ok()) {
s = dest_->Append(val);
}
if (s.ok()) {
if (do_flush_ && s.ok()) {
s = dest_->Flush();
}

Expand Down
3 changes: 2 additions & 1 deletion db/blob/blob_log_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class BlobLogWriter {
// "*dest" must remain live while this BlobLogWriter is in use.
BlobLogWriter(std::unique_ptr<WritableFileWriter>&& dest, Env* env,
Statistics* statistics, uint64_t log_number, bool use_fsync,
uint64_t boffset = 0);
bool do_flush, uint64_t boffset = 0);
// No copying allowed
BlobLogWriter(const BlobLogWriter&) = delete;
BlobLogWriter& operator=(const BlobLogWriter&) = delete;
Expand Down Expand Up @@ -74,6 +74,7 @@ class BlobLogWriter {
uint64_t log_number_;
uint64_t block_offset_; // Current offset in block
bool use_fsync_;
bool do_flush_;

public:
enum ElemType { kEtNone, kEtFileHdr, kEtRecord, kEtFileFooter };
Expand Down
14 changes: 6 additions & 8 deletions db/db_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5957,11 +5957,8 @@ class DBCompactionTestBlobError
: public DBCompactionTest,
public testing::WithParamInterface<std::string> {
public:
DBCompactionTestBlobError()
: fault_injection_env_(env_), sync_point_(GetParam()) {}
~DBCompactionTestBlobError() { Close(); }
DBCompactionTestBlobError() : sync_point_(GetParam()) {}

FaultInjectionTestEnv fault_injection_env_;
std::string sync_point_;
};

Expand Down Expand Up @@ -5996,13 +5993,14 @@ TEST_P(DBCompactionTestBlobError, CompactionError) {
ASSERT_OK(Flush());

options.enable_blob_files = true;
options.env = &fault_injection_env_;

Reopen(options);

SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) {
fault_injection_env_.SetFilesystemActive(false,
Status::IOError(sync_point_));
SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* arg) {
Status* const s = static_cast<Status*>(arg);
assert(s);

(*s) = Status::IOError(sync_point_);
});
SyncPoint::GetInstance()->EnableProcessing();

Expand Down
20 changes: 7 additions & 13 deletions db/db_flush_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,6 @@ TEST_F(DBFlushTest, FlushWithBlob) {
constexpr uint64_t min_blob_size = 10;

Options options;
options.env = CurrentOptions().env;
options.enable_blob_files = true;
options.min_blob_size = min_blob_size;
options.disable_auto_compactions = true;
Expand Down Expand Up @@ -528,11 +527,8 @@ TEST_F(DBFlushTest, FlushWithBlob) {
class DBFlushTestBlobError : public DBFlushTest,
public testing::WithParamInterface<std::string> {
public:
DBFlushTestBlobError()
: fault_injection_env_(env_), sync_point_(GetParam()) {}
~DBFlushTestBlobError() { Close(); }
DBFlushTestBlobError() : sync_point_(GetParam()) {}

FaultInjectionTestEnv fault_injection_env_;
std::string sync_point_;
};

Expand All @@ -545,20 +541,18 @@ TEST_P(DBFlushTestBlobError, FlushError) {
Options options;
options.enable_blob_files = true;
options.disable_auto_compactions = true;
options.env = &fault_injection_env_;
options.env = env_;

Reopen(options);

ASSERT_OK(Put("key", "blob"));

SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) {
fault_injection_env_.SetFilesystemActive(false,
Status::IOError(sync_point_));
SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* arg) {
Status* const s = static_cast<Status*>(arg);
assert(s);

(*s) = Status::IOError(sync_point_);
});
SyncPoint::GetInstance()->SetCallBack(
"BuildTable:BeforeDeleteFile", [this](void* /* arg */) {
fault_injection_env_.SetFilesystemActive(true);
});
SyncPoint::GetInstance()->EnableProcessing();

ASSERT_NOK(Flush());
Expand Down
Loading

0 comments on commit 431e8af

Please sign in to comment.