Skip to content

Commit

Permalink
Make it possible to enable blob files starting from a certain LSM tre…
Browse files Browse the repository at this point in the history
…e level (#10077)

Summary:
Currently, if blob files are enabled (i.e. `enable_blob_files` is true), large values are extracted both during flush/recovery (when SST files are written into level 0 of the LSM tree) and during compaction into any LSM tree level. For certain use cases that have a mix of short-lived and long-lived values, it might make sense to support extracting large values only during compactions whose output level is greater than or equal to a specified LSM tree level (e.g. compactions into L1/L2/... or above). This could reduce the space amplification caused by large values that are turned into garbage shortly after being written at the price of some write amplification incurred by long-lived values whose extraction to blob files is delayed.

In order to achieve this, we would like to do the following:
- Add a new configuration option `blob_file_starting_level` (default: 0) to `AdvancedColumnFamilyOptions` (and `MutableCFOptions` and extend the related logic)
- Instantiate `BlobFileBuilder` in `BuildTable` (used during flush and recovery, where the LSM tree level is L0) and `CompactionJob` iff `enable_blob_files` is set and the LSM tree level is `>= blob_file_starting_level`
- Add unit tests for the new functionality, and add the new option to our stress tests (`db_stress` and `db_crashtest.py` )
- Add the new option to our benchmarking tool `db_bench` and the BlobDB benchmark script `run_blob_bench.sh`
- Add the new option to the `ldb` tool (see https://github.com/facebook/rocksdb/wiki/Administration-and-Data-Access-Tool)
- Ideally extend the C and Java bindings with the new option
- Update the BlobDB wiki to document the new option.

Pull Request resolved: facebook/rocksdb#10077

Reviewed By: ltamasi

Differential Revision: D36884156

Pulled By: gangliao

fbshipit-source-id: 942bab025f04633edca8564ed64791cb5e31627d
  • Loading branch information
gangliao authored and facebook-github-bot committed Jun 3, 2022
1 parent a020031 commit e6432df
Show file tree
Hide file tree
Showing 32 changed files with 323 additions and 13 deletions.
3 changes: 3 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
* Add transaction `get_pinned` and `multi_get` to C API.
* Add two-phase commit support to C API.
* Add `rocksdb_transaction_get_writebatch_wi` and `rocksdb_transaction_rebuild_from_writebatch` to C API.
* Add `rocksdb_options_get_blob_file_starting_level` and `rocksdb_options_set_blob_file_starting_level` to C API.
* Add `blobFileStartingLevel` and `setBlobFileStartingLevel` to Java API.
* Add SingleDelete for DB in C API
* Add User Defined Timestamp in C API.
* `rocksdb_comparator_with_ts_create` to create timestamp aware comparator
Expand All @@ -20,6 +22,7 @@
* Add FileSystem::ReadAsync API in io_tracing
* Add blob garbage collection parameters `blob_garbage_collection_policy` and `blob_garbage_collection_age_cutoff` to both force-enable and force-disable GC, as well as selectively override age cutoff when using CompactRange.
* Add an extra sanity check in `GetSortedWalFiles()` (also used by `GetLiveFilesStorageInfo()`, `BackupEngine`, and `Checkpoint`) to reduce risk of successfully created backup or checkpoint failing to open because of missing WAL file.
* Add a new column family option `blob_file_starting_level` to enable writing blob files during flushes and compactions starting from the specified LSM tree level.

### Behavior changes
* DB::Open(), DB::OpenAsSecondary() will fail if a Logger cannot be created (#9984)
Expand Down
76 changes: 76 additions & 0 deletions db/blob/db_blob_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,82 @@ TEST_F(DBBlobCompactionTest, FilterByKeyLength) {
Close();
}

TEST_F(DBBlobCompactionTest, BlobCompactWithStartingLevel) {
Options options = GetDefaultOptions();

options.enable_blob_files = true;
options.min_blob_size = 1000;
options.blob_file_starting_level = 5;
options.create_if_missing = true;

// Open DB with fixed-prefix sst-partitioner so that compaction will cut
// new table file when encountering a new key whose 1-byte prefix changes.
constexpr size_t key_len = 1;
options.sst_partitioner_factory =
NewSstPartitionerFixedPrefixFactory(key_len);

ASSERT_OK(TryReopen(options));

constexpr size_t blob_size = 3000;

constexpr char first_key[] = "a";
const std::string first_blob(blob_size, 'a');
ASSERT_OK(Put(first_key, first_blob));

constexpr char second_key[] = "b";
const std::string second_blob(2 * blob_size, 'b');
ASSERT_OK(Put(second_key, second_blob));

constexpr char third_key[] = "d";
const std::string third_blob(blob_size, 'd');
ASSERT_OK(Put(third_key, third_blob));

ASSERT_OK(Flush());

constexpr char fourth_key[] = "c";
const std::string fourth_blob(blob_size, 'c');
ASSERT_OK(Put(fourth_key, fourth_blob));

ASSERT_OK(Flush());

ASSERT_EQ(0, GetBlobFileNumbers().size());
ASSERT_EQ(2, NumTableFilesAtLevel(/*level=*/0));
ASSERT_EQ(0, NumTableFilesAtLevel(/*level=*/1));

ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
/*end=*/nullptr));

// No blob file should be created since blob_file_starting_level is 5.
ASSERT_EQ(0, GetBlobFileNumbers().size());
ASSERT_EQ(0, NumTableFilesAtLevel(/*level=*/0));
ASSERT_EQ(4, NumTableFilesAtLevel(/*level=*/1));

{
options.blob_file_starting_level = 1;
DestroyAndReopen(options);

ASSERT_OK(Put(first_key, first_blob));
ASSERT_OK(Put(second_key, second_blob));
ASSERT_OK(Put(third_key, third_blob));
ASSERT_OK(Flush());
ASSERT_OK(Put(fourth_key, fourth_blob));
ASSERT_OK(Flush());

ASSERT_EQ(0, GetBlobFileNumbers().size());
ASSERT_EQ(2, NumTableFilesAtLevel(/*level=*/0));
ASSERT_EQ(0, NumTableFilesAtLevel(/*level=*/1));

ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
/*end=*/nullptr));
// The compaction's output level equals to blob_file_starting_level.
ASSERT_EQ(1, GetBlobFileNumbers().size());
ASSERT_EQ(0, NumTableFilesAtLevel(/*level=*/0));
ASSERT_EQ(4, NumTableFilesAtLevel(/*level=*/1));
}

Close();
}

TEST_F(DBBlobCompactionTest, BlindWriteFilter) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
Expand Down
5 changes: 4 additions & 1 deletion db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,10 @@ Status BuildTable(
snapshots.empty() ? 0 : snapshots.back(), snapshot_checker);

std::unique_ptr<BlobFileBuilder> blob_file_builder(
(mutable_cf_options.enable_blob_files && blob_file_additions)
(mutable_cf_options.enable_blob_files &&
tboptions.level_at_creation >=
mutable_cf_options.blob_file_starting_level &&
blob_file_additions)
? new BlobFileBuilder(
versions, fs, &ioptions, &mutable_cf_options, &file_options,
job_id, tboptions.column_family_id,
Expand Down
9 changes: 9 additions & 0 deletions db/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3039,6 +3039,15 @@ uint64_t rocksdb_options_get_blob_compaction_readahead_size(
return opt->rep.blob_compaction_readahead_size;
}

void rocksdb_options_set_blob_file_starting_level(rocksdb_options_t* opt,
int val) {
opt->rep.blob_file_starting_level = val;
}

int rocksdb_options_get_blob_file_starting_level(rocksdb_options_t* opt) {
return opt->rep.blob_file_starting_level;
}

void rocksdb_options_set_num_levels(rocksdb_options_t* opt, int n) {
opt->rep.num_levels = n;
}
Expand Down
3 changes: 3 additions & 0 deletions db/c_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -1857,6 +1857,9 @@ int main(int argc, char** argv) {
CheckCondition(262144 ==
rocksdb_options_get_blob_compaction_readahead_size(o));

rocksdb_options_set_blob_file_starting_level(o, 5);
CheckCondition(5 == rocksdb_options_get_blob_file_starting_level(o));

// Create a copy that should be equal to the original.
rocksdb_options_t* copy;
copy = rocksdb_options_create_copy(o);
Expand Down
4 changes: 3 additions & 1 deletion db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1454,7 +1454,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
std::vector<std::string> blob_file_paths;

std::unique_ptr<BlobFileBuilder> blob_file_builder(
mutable_cf_options->enable_blob_files
(mutable_cf_options->enable_blob_files &&
sub_compact->compaction->output_level() >=
mutable_cf_options->blob_file_starting_level)
? new BlobFileBuilder(
versions_, fs_.get(),
sub_compact->compaction->immutable_options(),
Expand Down
1 change: 1 addition & 0 deletions db_stress_tool/db_stress_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ DECLARE_bool(enable_blob_garbage_collection);
DECLARE_double(blob_garbage_collection_age_cutoff);
DECLARE_double(blob_garbage_collection_force_threshold);
DECLARE_uint64(blob_compaction_readahead_size);
DECLARE_int32(blob_file_starting_level);

DECLARE_int32(approximate_size_one_in);
DECLARE_bool(sync_fault_injection);
Expand Down
12 changes: 9 additions & 3 deletions db_stress_tool/db_stress_gflags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ DEFINE_string(key_len_percent_dist, "",
"24 bytes. If not specified, it will be evenly distributed");

DEFINE_int32(key_window_scale_factor, 10,
"This value will be multiplied by 100 to come up with a window "
"size for varying the key length");
"This value will be multiplied by 100 to come up with a window "
"size for varying the key length");

DEFINE_int32(column_families, 10, "Number of column families");

Expand Down Expand Up @@ -439,6 +439,12 @@ DEFINE_uint64(blob_compaction_readahead_size,
.blob_compaction_readahead_size,
"[Integrated BlobDB] Compaction readahead for blob files.");

DEFINE_int32(
blob_file_starting_level,
ROCKSDB_NAMESPACE::AdvancedColumnFamilyOptions().blob_file_starting_level,
"[Integrated BlobDB] Enable writing blob files during flushes and "
"compactions starting from the specified level.");

static const bool FLAGS_subcompactions_dummy __attribute__((__unused__)) =
RegisterFlagValidator(&FLAGS_subcompactions, &ValidateUint32Range);

Expand Down Expand Up @@ -857,7 +863,7 @@ DEFINE_int32(approximate_size_one_in, 64,
" random key ranges.");

DEFINE_int32(read_fault_one_in, 1000,
"On non-zero, enables fault injection on read");
"On non-zero, enables fault injection on read");

DEFINE_int32(get_property_one_in, 1000,
"If non-zero, then DB::GetProperty() will be called to get various"
Expand Down
9 changes: 7 additions & 2 deletions db_stress_tool/db_stress_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ bool StressTest::BuildOptionsTable() {
std::vector<std::string>{"0.5", "0.75", "1.0"});
options_tbl.emplace("blob_compaction_readahead_size",
std::vector<std::string>{"0", "1M", "4M"});
options_tbl.emplace("blob_file_starting_level",
std::vector<std::string>{"0", "1", "2"});
}

options_table_ = std::move(options_tbl);
Expand Down Expand Up @@ -2384,14 +2386,16 @@ void StressTest::Open(SharedState* shared) {
"Integrated BlobDB: blob files enabled %d, min blob size %" PRIu64
", blob file size %" PRIu64
", blob compression type %s, blob GC enabled %d, cutoff %f, force "
"threshold %f, blob compaction readahead size %" PRIu64 "\n",
"threshold %f, blob compaction readahead size %" PRIu64
", blob file starting level %d\n",
options_.enable_blob_files, options_.min_blob_size,
options_.blob_file_size,
CompressionTypeToString(options_.blob_compression_type).c_str(),
options_.enable_blob_garbage_collection,
options_.blob_garbage_collection_age_cutoff,
options_.blob_garbage_collection_force_threshold,
options_.blob_compaction_readahead_size);
options_.blob_compaction_readahead_size,
options_.blob_file_starting_level);

fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str());

Expand Down Expand Up @@ -2987,6 +2991,7 @@ void InitializeOptionsFromFlags(
options.blob_garbage_collection_force_threshold =
FLAGS_blob_garbage_collection_force_threshold;
options.blob_compaction_readahead_size = FLAGS_blob_compaction_readahead_size;
options.blob_file_starting_level = FLAGS_blob_file_starting_level;

options.wal_compression =
StringToCompressionType(FLAGS_wal_compression.c_str());
Expand Down
15 changes: 15 additions & 0 deletions include/rocksdb/advanced_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,21 @@ struct AdvancedColumnFamilyOptions {
// Dynamically changeable through the SetOptions() API
uint64_t blob_compaction_readahead_size = 0;

// Enable blob files starting from a certain LSM tree level.
//
// For certain use cases that have a mix of short-lived and long-lived values,
// it might make sense to support extracting large values only during
// compactions whose output level is greater than or equal to a specified LSM
// tree level (e.g. compactions into L1/L2/... or above). This could reduce
// the space amplification caused by large values that are turned into garbage
// shortly after being written at the price of some write amplification
// incurred by long-lived values whose extraction to blob files is delayed.
//
// Default: 0
//
// Dynamically changeable through the SetOptions() API
int blob_file_starting_level = 0;

// Create ColumnFamilyOptions with default values for all fields
AdvancedColumnFamilyOptions();
// Create ColumnFamilyOptions from Options
Expand Down
5 changes: 5 additions & 0 deletions include/rocksdb/c.h
Original file line number Diff line number Diff line change
Expand Up @@ -1259,6 +1259,11 @@ rocksdb_options_set_blob_compaction_readahead_size(rocksdb_options_t* opt,
extern ROCKSDB_LIBRARY_API uint64_t
rocksdb_options_get_blob_compaction_readahead_size(rocksdb_options_t* opt);

extern ROCKSDB_LIBRARY_API void rocksdb_options_set_blob_file_starting_level(
rocksdb_options_t* opt, int val);
extern ROCKSDB_LIBRARY_API int rocksdb_options_get_blob_file_starting_level(
rocksdb_options_t* opt);

/* returns a pointer to a malloc()-ed, null terminated string */
extern ROCKSDB_LIBRARY_API char* rocksdb_options_statistics_get_string(
rocksdb_options_t* opt);
Expand Down
1 change: 1 addition & 0 deletions include/rocksdb/utilities/ldb_cmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class LDBCommand {
static const std::string ARG_BLOB_GARBAGE_COLLECTION_AGE_CUTOFF;
static const std::string ARG_BLOB_GARBAGE_COLLECTION_FORCE_THRESHOLD;
static const std::string ARG_BLOB_COMPACTION_READAHEAD_SIZE;
static const std::string ARG_BLOB_FILE_STARTING_LEVEL;
static const std::string ARG_DECODE_BLOB_INDEX;
static const std::string ARG_DUMP_UNCOMPRESSED_BLOBS;

Expand Down
22 changes: 22 additions & 0 deletions java/rocksjni/options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3840,6 +3840,28 @@ jlong Java_org_rocksdb_Options_blobCompactionReadaheadSize(JNIEnv*, jobject,
return static_cast<jlong>(opts->blob_compaction_readahead_size);
}

/*
* Class: org_rocksdb_Options
* Method: setBlobFileStartingLevel
* Signature: (JJ)V
*/
void Java_org_rocksdb_Options_setBlobFileStartingLevel(
JNIEnv*, jobject, jlong jhandle, jint jblob_file_starting_level) {
auto* opts = reinterpret_cast<ROCKSDB_NAMESPACE::Options*>(jhandle);
opts->blob_file_starting_level = jblob_file_starting_level;
}

/*
* Class: org_rocksdb_Options
* Method: blobFileStartingLevel
* Signature: (J)J
*/
jint Java_org_rocksdb_Options_blobFileStartingLevel(JNIEnv*, jobject,
jlong jhandle) {
auto* opts = reinterpret_cast<ROCKSDB_NAMESPACE::Options*>(jhandle);
return static_cast<jint>(opts->blob_file_starting_level);
}

//////////////////////////////////////////////////////////////////////////////
// ROCKSDB_NAMESPACE::ColumnFamilyOptions

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,29 @@ T setReportBgIoStats(
*/
long blobCompactionReadaheadSize();

/**
* Set a certain LSM tree level to enable blob files.
*
* Default: 0
*
* Dynamically changeable through
* {@link RocksDB#setOptions(ColumnFamilyHandle, MutableColumnFamilyOptions)}.
*
* @param setBlobFileStartingLevel the starting level to enable blob files
*
* @return the reference to the current options.
*/
T setBlobFileStartingLevel(final int blobFileStartingLevel);

/**
* Get the starting LSM tree level to enable blob files.
*
* Default: 0
*
* @return the current LSM tree level to enable blob files.
*/
int blobFileStartingLevel();

//
// END options for blobs (integrated BlobDB)
//
Expand Down
33 changes: 33 additions & 0 deletions java/src/main/java/org/rocksdb/ColumnFamilyOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1238,6 +1238,36 @@ public long blobCompactionReadaheadSize() {
return blobCompactionReadaheadSize(nativeHandle_);
}

/**
* Set a certain LSM tree level to enable blob files.
*
* Default: 0
*
* Dynamically changeable through
* {@link RocksDB#setOptions(ColumnFamilyHandle, MutableColumnFamilyOptions)}.
*
* @param setBlobFileStartingLevel the starting level to enable blob files
*
* @return the reference to the current options.
*/
@Override
public ColumnFamilyOptions setBlobFileStartingLevel(final int blobFileStartingLevel) {
setBlobFileStartingLevel(nativeHandle_, blobFileStartingLevel);
return this;
}

/**
* Get the starting LSM tree level to enable blob files.
*
* Default: 0
*
* @return the current LSM tree level to enable blob files.
*/
@Override
public int blobFileStartingLevel() {
return blobFileStartingLevel(nativeHandle_);
}

//
// END options for blobs (integrated BlobDB)
//
Expand Down Expand Up @@ -1440,6 +1470,9 @@ private native void setBlobGarbageCollectionForceThreshold(
private native void setBlobCompactionReadaheadSize(
final long nativeHandle_, final long blobCompactionReadaheadSize);
private native long blobCompactionReadaheadSize(final long nativeHandle_);
private native void setBlobFileStartingLevel(
final long nativeHandle_, final int blobFileStartingLevel);
private native int blobFileStartingLevel(final long nativeHandle_);

// instance variables
// NOTE: If you add new member variables, please update the copy constructor above!
Expand Down
Loading

0 comments on commit e6432df

Please sign in to comment.