Skip to content

Commit

Permalink
compression_per_level should be used for flush and changeable (#9658)
Browse files Browse the repository at this point in the history
Summary:
- Make `compression_per_level` dynamical changeable with `SetOptions`;
- Fix a bug that `compression_per_level` is not used for flush;

Pull Request resolved: facebook/rocksdb#9658

Test Plan: CI

Reviewed By: ajkr

Differential Revision: D34700749

Pulled By: jay-zhuang

fbshipit-source-id: a23b9dfa7ad03d393c1d71781d19e91de796f49c
  • Loading branch information
jay-zhuang authored and facebook-github-bot committed Mar 8, 2022
1 parent 9b8b8b1 commit 36aec94
Show file tree
Hide file tree
Showing 14 changed files with 74 additions and 62 deletions.
2 changes: 2 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
### Bug Fixes
* Fixed a data race on `versions_` between `DBImpl::ResumeImpl()` and threads waiting for recovery to complete (#9496)
* Fixed a bug caused by race among flush, incoming writes and taking snapshots. Queries to snapshots created with these race condition can return incorrect result, e.g. resurfacing deleted data.
* Fixed a bug that DB flush uses `options.compression` even `options.compression_per_level` is set.

### Public API changes
* Remove BlockBasedTableOptions.hash_index_allow_collision which already takes no effect.
* `options.compression_per_level` is dynamically changeable with `SetOptions()`.

### Bug Fixes
* Fix a race condition when cancel manual compaction with `DisableManualCompaction`. Also DB close can cancel the manual compaction thread.
Expand Down
6 changes: 3 additions & 3 deletions db/compaction/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,9 @@ Compaction::~Compaction() {

bool Compaction::InputCompressionMatchesOutput() const {
int base_level = input_vstorage_->base_level();
bool matches = (GetCompressionType(immutable_options_, input_vstorage_,
mutable_cf_options_, start_level_,
base_level) == output_compression_);
bool matches =
(GetCompressionType(input_vstorage_, mutable_cf_options_, start_level_,
base_level) == output_compression_);
if (matches) {
TEST_SYNC_POINT("Compaction::InputCompressionMatchesOutput:Matches");
return true;
Expand Down
21 changes: 10 additions & 11 deletions db/compaction/compaction_picker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ bool FindIntraL0Compaction(const std::vector<FileMetaData*>& level_files,
// If enable_compression is false, then compression is always disabled no
// matter what the values of the other two parameters are.
// Otherwise, the compression type is determined based on options and level.
CompressionType GetCompressionType(const ImmutableCFOptions& ioptions,
const VersionStorageInfo* vstorage,
CompressionType GetCompressionType(const VersionStorageInfo* vstorage,
const MutableCFOptions& mutable_cf_options,
int level, int base_level,
const bool enable_compression) {
Expand All @@ -118,17 +117,19 @@ CompressionType GetCompressionType(const ImmutableCFOptions& ioptions,
}
// If the user has specified a different compression level for each level,
// then pick the compression for that level.
if (!ioptions.compression_per_level.empty()) {
if (!mutable_cf_options.compression_per_level.empty()) {
assert(level == 0 || level >= base_level);
int idx = (level == 0) ? 0 : level - base_level + 1;

const int n = static_cast<int>(ioptions.compression_per_level.size()) - 1;
const int n =
static_cast<int>(mutable_cf_options.compression_per_level.size()) - 1;
// It is possible for level_ to be -1; in that case, we use level
// 0's compression. This occurs mostly in backwards compatibility
// situations when the builder doesn't know what level the file
// belongs to. Likewise, if level is beyond the end of the
// specified compression levels, use the last value.
return ioptions.compression_per_level[std::max(0, std::min(idx, n))];
return mutable_cf_options
.compression_per_level[std::max(0, std::min(idx, n))];
} else {
return mutable_cf_options.compression;
}
Expand Down Expand Up @@ -347,9 +348,8 @@ Compaction* CompactionPicker::CompactFiles(
} else {
base_level = 1;
}
compression_type =
GetCompressionType(ioptions_, vstorage, mutable_cf_options,
output_level, base_level);
compression_type = GetCompressionType(vstorage, mutable_cf_options,
output_level, base_level);
} else {
// TODO(ajkr): `CompactionOptions` offers configurable `CompressionType`
// without configurable `CompressionOptions`, which is inconsistent.
Expand Down Expand Up @@ -637,8 +637,7 @@ Compaction* CompactionPicker::CompactRange(
ioptions_.compaction_style),
/* max_compaction_bytes */ LLONG_MAX,
compact_range_options.target_path_id,
GetCompressionType(ioptions_, vstorage, mutable_cf_options,
output_level, 1),
GetCompressionType(vstorage, mutable_cf_options, output_level, 1),
GetCompressionOptions(mutable_cf_options, vstorage, output_level),
Temperature::kUnknown, compact_range_options.max_subcompactions,
/* grandparents */ {},
Expand Down Expand Up @@ -816,7 +815,7 @@ Compaction* CompactionPicker::CompactRange(
ioptions_.level_compaction_dynamic_level_bytes),
mutable_cf_options.max_compaction_bytes,
compact_range_options.target_path_id,
GetCompressionType(ioptions_, vstorage, mutable_cf_options, output_level,
GetCompressionType(vstorage, mutable_cf_options, output_level,
vstorage->base_level()),
GetCompressionOptions(mutable_cf_options, vstorage, output_level),
Temperature::kUnknown, compact_range_options.max_subcompactions,
Expand Down
3 changes: 1 addition & 2 deletions db/compaction/compaction_picker.h
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,7 @@ bool FindIntraL0Compaction(
CompactionInputFiles* comp_inputs,
SequenceNumber earliest_mem_seqno = kMaxSequenceNumber);

CompressionType GetCompressionType(const ImmutableCFOptions& ioptions,
const VersionStorageInfo* vstorage,
CompressionType GetCompressionType(const VersionStorageInfo* vstorage,
const MutableCFOptions& mutable_cf_options,
int level, int base_level,
const bool enable_compression = true);
Expand Down
4 changes: 2 additions & 2 deletions db/compaction/compaction_picker_level.cc
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,8 @@ Compaction* LevelCompactionBuilder::GetCompaction() {
ioptions_.level_compaction_dynamic_level_bytes),
mutable_cf_options_.max_compaction_bytes,
GetPathId(ioptions_, mutable_cf_options_, output_level_),
GetCompressionType(ioptions_, vstorage_, mutable_cf_options_,
output_level_, vstorage_->base_level()),
GetCompressionType(vstorage_, mutable_cf_options_, output_level_,
vstorage_->base_level()),
GetCompressionOptions(mutable_cf_options_, vstorage_, output_level_),
Temperature::kUnknown,
/* max_subcompactions */ 0, std::move(grandparents_), is_manual_,
Expand Down
37 changes: 18 additions & 19 deletions db/compaction/compaction_picker_universal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -746,19 +746,19 @@ Compaction* UniversalCompactionBuilder::PickCompactionToReduceSortedRuns(
} else {
compaction_reason = CompactionReason::kUniversalSortedRunNum;
}
return new Compaction(
vstorage_, ioptions_, mutable_cf_options_, mutable_db_options_,
std::move(inputs), output_level,
MaxFileSizeForLevel(mutable_cf_options_, output_level,
kCompactionStyleUniversal),
GetMaxOverlappingBytes(), path_id,
GetCompressionType(ioptions_, vstorage_, mutable_cf_options_, start_level,
1, enable_compression),
GetCompressionOptions(mutable_cf_options_, vstorage_, start_level,
enable_compression),
Temperature::kUnknown,
/* max_subcompactions */ 0, grandparents, /* is manual */ false, score_,
false /* deletion_compaction */, compaction_reason);
return new Compaction(vstorage_, ioptions_, mutable_cf_options_,
mutable_db_options_, std::move(inputs), output_level,
MaxFileSizeForLevel(mutable_cf_options_, output_level,
kCompactionStyleUniversal),
GetMaxOverlappingBytes(), path_id,
GetCompressionType(vstorage_, mutable_cf_options_,
start_level, 1, enable_compression),
GetCompressionOptions(mutable_cf_options_, vstorage_,
start_level, enable_compression),
Temperature::kUnknown,
/* max_subcompactions */ 0, grandparents,
/* is manual */ false, score_,
false /* deletion_compaction */, compaction_reason);
}

// Look at overall size amplification. If size amplification
Expand Down Expand Up @@ -1076,8 +1076,8 @@ Compaction* UniversalCompactionBuilder::PickIncrementalForReduceSizeAmp(
MaxFileSizeForLevel(mutable_cf_options_, output_level,
kCompactionStyleUniversal),
GetMaxOverlappingBytes(), path_id,
GetCompressionType(ioptions_, vstorage_, mutable_cf_options_,
output_level, 1, true /* enable_compression */),
GetCompressionType(vstorage_, mutable_cf_options_, output_level, 1,
true /* enable_compression */),
GetCompressionOptions(mutable_cf_options_, vstorage_, output_level,
true /* enable_compression */),
Temperature::kUnknown,
Expand Down Expand Up @@ -1220,8 +1220,7 @@ Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() {
MaxFileSizeForLevel(mutable_cf_options_, output_level,
kCompactionStyleUniversal),
/* max_grandparent_overlap_bytes */ GetMaxOverlappingBytes(), path_id,
GetCompressionType(ioptions_, vstorage_, mutable_cf_options_,
output_level, 1),
GetCompressionType(vstorage_, mutable_cf_options_, output_level, 1),
GetCompressionOptions(mutable_cf_options_, vstorage_, output_level),
Temperature::kUnknown,
/* max_subcompactions */ 0, grandparents, /* is manual */ false, score_,
Expand Down Expand Up @@ -1294,8 +1293,8 @@ Compaction* UniversalCompactionBuilder::PickCompactionToOldest(
MaxFileSizeForLevel(mutable_cf_options_, output_level,
kCompactionStyleUniversal),
GetMaxOverlappingBytes(), path_id,
GetCompressionType(ioptions_, vstorage_, mutable_cf_options_,
output_level, 1, true /* enable_compression */),
GetCompressionType(vstorage_, mutable_cf_options_, output_level, 1,
true /* enable_compression */),
GetCompressionOptions(mutable_cf_options_, vstorage_, output_level,
true /* enable_compression */),
Temperature::kUnknown,
Expand Down
20 changes: 9 additions & 11 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,18 +119,16 @@ CompressionType GetCompressionFlush(
// Compressing memtable flushes might not help unless the sequential load
// optimization is used for leveled compaction. Otherwise the CPU and
// latency overhead is not offset by saving much space.
if (ioptions.compaction_style == kCompactionStyleUniversal) {
if (mutable_cf_options.compaction_options_universal
.compression_size_percent < 0) {
return mutable_cf_options.compression;
} else {
return kNoCompression;
}
} else if (!ioptions.compression_per_level.empty()) {
// For leveled compress when min_level_to_compress != 0.
return ioptions.compression_per_level[0];
} else {
if (ioptions.compaction_style == kCompactionStyleUniversal &&
mutable_cf_options.compaction_options_universal
.compression_size_percent >= 0) {
return kNoCompression;
}
if (mutable_cf_options.compression_per_level.empty()) {
return mutable_cf_options.compression;
} else {
// For leveled compress when min_level_to_compress != 0.
return mutable_cf_options.compression_per_level[0];
}
}

Expand Down
10 changes: 8 additions & 2 deletions db/db_properties_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1067,11 +1067,17 @@ TEST_F(DBPropertiesTest, EstimateCompressionRatio) {
const int kNumEntriesPerFile = 1000;

Options options = CurrentOptions();
options.compression_per_level = {kNoCompression, kSnappyCompression};
options.disable_auto_compactions = true;
options.num_levels = 2;
options.num_levels = 3;
Reopen(options);

ASSERT_OK(db_->SetOptions(
{{"compression_per_level", "kNoCompression:kSnappyCompression"}}));
auto opts = db_->GetOptions();
ASSERT_EQ(opts.compression_per_level.size(), 2);
ASSERT_EQ(opts.compression_per_level[0], kNoCompression);
ASSERT_EQ(opts.compression_per_level[1], kSnappyCompression);

// compression ratio is -1.0 when no open files at level
ASSERT_EQ(CompressionRatioAtLevel(0), -1.0);

Expand Down
8 changes: 8 additions & 0 deletions include/rocksdb/advanced_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,14 @@ struct AdvancedColumnFamilyOptions {
// according to compression_per_level[1], L3 using compression_per_level[2]
// and L4 using compression_per_level[3]. Compaction for each level can
// change when data grows.
//
// NOTE: if the vector size is smaller than the level number, the undefined
// lower level uses the last option in the vector, for example, for 3 level
// LSM tree the following settings are the same:
// {kNoCompression, kSnappyCompression}
// {kNoCompression, kSnappyCompression, kSnappyCompression}
//
// Dynamically changeable through SetOptions() API
std::vector<CompressionType> compression_per_level;

// Number of levels for this database
Expand Down
11 changes: 5 additions & 6 deletions options/cf_options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,11 @@ static std::unordered_map<std::string, OptionTypeInfo>
{offsetof(struct MutableCFOptions, bottommost_compression),
OptionType::kCompressionType, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"compression_per_level",
OptionTypeInfo::Vector<CompressionType>(
offsetof(struct MutableCFOptions, compression_per_level),
OptionVerificationType::kNormal, OptionTypeFlags::kMutable,
{0, OptionType::kCompressionType})},
{kOptNameCompOpts,
OptionTypeInfo::Struct(
kOptNameCompOpts, &compression_options_type_info,
Expand Down Expand Up @@ -548,11 +553,6 @@ static std::unordered_map<std::string, OptionTypeInfo>
{"rate_limit_delay_max_milliseconds",
{0, OptionType::kUInt, OptionVerificationType::kDeprecated,
OptionTypeFlags::kNone}},
{"compression_per_level",
OptionTypeInfo::Vector<CompressionType>(
offsetof(struct ImmutableCFOptions, compression_per_level),
OptionVerificationType::kNormal, OptionTypeFlags::kNone,
{0, OptionType::kCompressionType})},
{"comparator",
OptionTypeInfo::AsCustomRawPtr<const Comparator>(
offsetof(struct ImmutableCFOptions, user_comparator),
Expand Down Expand Up @@ -849,7 +849,6 @@ ImmutableCFOptions::ImmutableCFOptions(const ColumnFamilyOptions& cf_options)
table_properties_collector_factories(
cf_options.table_properties_collector_factories),
bloom_locality(cf_options.bloom_locality),
compression_per_level(cf_options.compression_per_level),
level_compaction_dynamic_level_bytes(
cf_options.level_compaction_dynamic_level_bytes),
num_levels(cf_options.num_levels),
Expand Down
6 changes: 3 additions & 3 deletions options/cf_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ struct ImmutableCFOptions {
// to PlainTableOptions just like bloom_bits_per_key
uint32_t bloom_locality;

std::vector<CompressionType> compression_per_level;

bool level_compaction_dynamic_level_bytes;

int num_levels;
Expand Down Expand Up @@ -154,7 +152,8 @@ struct MutableCFOptions {
bottommost_compression_opts(options.bottommost_compression_opts),
bottommost_temperature(options.bottommost_temperature),
sample_for_compression(
options.sample_for_compression) { // TODO: is 0 fine here?
options.sample_for_compression), // TODO: is 0 fine here?
compression_per_level(options.compression_per_level) {
RefreshDerivedOptions(options.num_levels, options.compaction_style);
}

Expand Down Expand Up @@ -271,6 +270,7 @@ struct MutableCFOptions {
Temperature bottommost_temperature;

uint64_t sample_for_compression;
std::vector<CompressionType> compression_per_level;

// Derived options
// Per-level target file size.
Expand Down
2 changes: 1 addition & 1 deletion options/options_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ void UpdateColumnFamilyOptions(const MutableCFOptions& moptions,
cf_opts->bottommost_compression = moptions.bottommost_compression;
cf_opts->bottommost_compression_opts = moptions.bottommost_compression_opts;
cf_opts->sample_for_compression = moptions.sample_for_compression;
cf_opts->compression_per_level = moptions.compression_per_level;
cf_opts->bottommost_temperature = moptions.bottommost_temperature;
}

Expand All @@ -287,7 +288,6 @@ void UpdateColumnFamilyOptions(const ImmutableCFOptions& ioptions,
cf_opts->table_properties_collector_factories =
ioptions.table_properties_collector_factories;
cf_opts->bloom_locality = ioptions.bloom_locality;
cf_opts->compression_per_level = ioptions.compression_per_level;
cf_opts->level_compaction_dynamic_level_bytes =
ioptions.level_compaction_dynamic_level_bytes;
cf_opts->num_levels = ioptions.num_levels;
Expand Down
2 changes: 2 additions & 0 deletions options/options_settable_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,8 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
{offsetof(struct MutableCFOptions,
max_bytes_for_level_multiplier_additional),
sizeof(std::vector<int>)},
{offsetof(struct MutableCFOptions, compression_per_level),
sizeof(std::vector<CompressionType>)},
{offsetof(struct MutableCFOptions, max_file_size),
sizeof(std::vector<uint64_t>)},
};
Expand Down
4 changes: 2 additions & 2 deletions table/sst_file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,9 @@ Status SstFileWriter::Open(const std::string& file_path) {
} else {
compression_opts = r->mutable_cf_options.compression_opts;
}
} else if (!r->ioptions.compression_per_level.empty()) {
} else if (!r->mutable_cf_options.compression_per_level.empty()) {
// Use the compression of the last level if we have per level compression
compression_type = *(r->ioptions.compression_per_level.rbegin());
compression_type = *(r->mutable_cf_options.compression_per_level.rbegin());
compression_opts = r->mutable_cf_options.compression_opts;
} else {
compression_type = r->mutable_cf_options.compression;
Expand Down

0 comments on commit 36aec94

Please sign in to comment.