Skip to content

Commit 35148ac

Browse files
pdillingermeta-codesync[bot]
authored andcommitted
Improve distinct compression for index and data blocks (facebook#14140)
Summary: This change enables a custom CompressionManager / Compressor to adopt custom handling for data and index blocks. In particular, index blocks for format_version >= 4 use a distinct variant of the block format. Thus, a potentially format-aware compression algorithm such as OpenZL should be told which kind of block we are compressing. (And previously I avoided passing block type in CompressBlock for efficient handling of things like dictionaries but also avoiding checks on every CompressBlock call.) Most of the change is in BlockBasedTableBuilder to call MaybeCloneSpecialized for both kDataBlock and for kIndexBlock. But I also needed some small tweaks/additions to the public API also: * Require a Clone() function from Compressors, to support proper implementations of MaybeCloneSpecialized() in wrapper Compressors. * Assert that the default implementation of CompressorWrapper::MaybeCloneSpecialized() is only used in allowable cases. * Convenience function Compressor::CloneMaybeSpecialized() This also fixes a serious bug/oversight in ManagedPtr for (ManagedWorkingArea) that somehow wasn't showing up before. It probably doesn't need a release note because CompressionManager stuff is still considered experimental. Pull Request resolved: facebook#14140 Test Plan: Greatly expanded DBCompressionTest.CompressionManagerWrapper to make sure the distinction between data blocks and index blocks is properly communicated to a custom CompressionManager/Compressor. The test includes processing the expected structure of data and index blocks, to serve as a tested example for structure-aware compressors. Reviewed By: hx235 Differential Revision: D87600019 Pulled By: pdillinger fbshipit-source-id: 252ef78910073a0e45f2c81dd45ac87ff8a41fc6
1 parent 8c7c8b8 commit 35148ac

File tree

10 files changed

+631
-285
lines changed

10 files changed

+631
-285
lines changed

include/rocksdb/advanced_compression.h

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ class Compressor {
9090
return CompressionType::kDisableCompressionOption;
9191
}
9292

93+
// Return a distinct but functionally equivalent Compressor. This is often
94+
// needed to implement MaybeCloneSpecialized() in wrapper compressors.
95+
virtual std::unique_ptr<Compressor> Clone() const = 0;
96+
9397
// Utility struct for providing sample data for the compression dictionary.
9498
// Potentially extensible by callers of Compressor (but not recommended)
9599
struct DictSampleArgs {
@@ -131,6 +135,18 @@ class Compressor {
131135
return nullptr;
132136
}
133137

138+
// A convenience function when a clone is needed and may or may not be
139+
// specialized.
140+
std::unique_ptr<Compressor> CloneMaybeSpecialized(
141+
CacheEntryRole block_type, DictSampleArgs&& dict_samples) {
142+
auto clone = MaybeCloneSpecialized(block_type, std::move(dict_samples));
143+
if (clone == nullptr) {
144+
clone = Clone();
145+
assert(clone != nullptr);
146+
}
147+
return clone;
148+
}
149+
134150
// A WorkingArea is an optional structure (both for callers and
135151
// implementations) that can enable optimizing repeated compressions by
136152
// reusing working space or thread-local tracking of statistics or trends.
@@ -473,9 +489,19 @@ class CompressorWrapper : public Compressor {
473489
return wrapped_->GetPreferredCompressionType();
474490
}
475491

492+
// NOTE: Clone() not implemented here because it needs to be in the derived
493+
// class
494+
495+
// NOTE: MaybeCloneSpecialized() is only implemented here for convenience
496+
// when the wrapped Compressor uses the default implementation of
497+
// MaybeCloneSpecialized(). This needs to be overridden if not.
476498
std::unique_ptr<Compressor> MaybeCloneSpecialized(
477499
CacheEntryRole block_type, DictSampleArgs&& dict_samples) override {
478-
return wrapped_->MaybeCloneSpecialized(block_type, std::move(dict_samples));
500+
auto clone =
501+
wrapped_->MaybeCloneSpecialized(block_type, std::move(dict_samples));
502+
// Assert default no-op MaybeCloneSpecialized()
503+
assert(clone == nullptr);
504+
return clone;
479505
}
480506

481507
ManagedWorkingArea ObtainWorkingArea() override {

include/rocksdb/data_structure.h

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -247,15 +247,7 @@ class ManagedPtr {
247247
public:
248248
ManagedPtr() = default;
249249
ManagedPtr(T* ptr, Owner* owner) : ptr_(ptr), owner_(owner) {}
250-
~ManagedPtr() {
251-
if (ptr_ && owner_) {
252-
if constexpr (std::is_member_function_pointer_v<decltype(Fn)>) {
253-
(owner_->*Fn)(ptr_);
254-
} else {
255-
Fn(owner_, ptr_);
256-
}
257-
}
258-
}
250+
~ManagedPtr() { Free(); }
259251
// No copies
260252
ManagedPtr(const ManagedPtr&) = delete;
261253
ManagedPtr& operator=(const ManagedPtr&) = delete;
@@ -267,6 +259,10 @@ class ManagedPtr {
267259
other.owner_ = nullptr;
268260
}
269261
ManagedPtr& operator=(ManagedPtr&& other) noexcept {
262+
if (this == &other) {
263+
return *this;
264+
}
265+
Free();
270266
ptr_ = other.ptr_;
271267
owner_ = other.owner_;
272268
other.ptr_ = nullptr;
@@ -284,6 +280,16 @@ class ManagedPtr {
284280
private:
285281
T* ptr_ = nullptr;
286282
Owner* owner_ = nullptr;
283+
284+
void Free() {
285+
if (ptr_ && owner_) {
286+
if constexpr (std::is_member_function_pointer_v<decltype(Fn)>) {
287+
(owner_->*Fn)(ptr_);
288+
} else {
289+
Fn(owner_, ptr_);
290+
}
291+
}
292+
}
287293
};
288294

289295
template <typename T, typename comp>

table/block_based/block_based_table_builder.cc

Lines changed: 60 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,19 @@ FilterBlockBuilder* CreateFilterBlockBuilder(
110110
}
111111
}
112112

113+
// A convenience function for populating the Compressor* fields; see ~Rep()
114+
Compressor* MaybeCloneSpecialized(
115+
Compressor* compressor, CacheEntryRole block_type,
116+
Compressor::DictSampleArgs&& dict_samples = {}) {
117+
auto specialized =
118+
compressor->MaybeCloneSpecialized(block_type, std::move(dict_samples));
119+
if (specialized) {
120+
// Caller is responsible for freeing when distinct
121+
return specialized.release();
122+
} else {
123+
return compressor;
124+
}
125+
}
113126
} // namespace
114127

115128
// kBlockBasedTableMagicNumber was picked by running
@@ -824,15 +837,17 @@ struct BlockBasedTableBuilder::Rep {
824837

825838
// *** Compressors & decompressors - Yes, it seems like a lot here but ***
826839
// *** these are distinct fields to minimize extra conditionals and ***
827-
// *** field reads on hot code paths. ***
840+
// *** field reads on hot code paths. And to avoid interlocked ***
841+
// *** instructions associated with shared_ptr. ***
828842

829843
// A compressor for blocks in general, without dictionary compression
830844
std::unique_ptr<Compressor> basic_compressor;
831-
// A compressor using dictionary compression (when applicable)
832-
std::unique_ptr<Compressor> compressor_with_dict;
833-
// Once configured/determined, points to one of the above Compressors to
834-
// use on data blocks.
835-
Compressor* data_block_compressor = nullptr;
845+
// A compressor for data blocks, which might be tuned differently and might
846+
// use dictionary compression (when applicable). See ~Rep() for some details.
847+
UnownedPtr<Compressor> data_block_compressor = nullptr;
848+
// A compressor for index blocks, which might be tuned differently from
849+
// basic_compressor. See ~Rep() for some details.
850+
UnownedPtr<Compressor> index_block_compressor = nullptr;
836851
// A decompressor corresponding to basic_compressor (when non-nullptr).
837852
// Used for verification and cache warming.
838853
std::shared_ptr<Decompressor> basic_decompressor;
@@ -853,7 +868,7 @@ struct BlockBasedTableBuilder::Rep {
853868
compression_types_used;
854869

855870
// Working area for basic_compressor when compression_parallel_threads==1
856-
WorkingAreaPair basic_working_area;
871+
WorkingAreaPair index_block_working_area;
857872
// Working area for data_block_compressor, for emit/compaction thread
858873
WorkingAreaPair data_block_working_area;
859874

@@ -1099,7 +1114,10 @@ struct BlockBasedTableBuilder::Rep {
10991114
filter_context, tbo.compression_opts, tbo.compression_type);
11001115
if (basic_compressor) {
11011116
if (table_options.enable_index_compression) {
1102-
basic_working_area.compress = basic_compressor->ObtainWorkingArea();
1117+
index_block_compressor = MaybeCloneSpecialized(
1118+
basic_compressor.get(), CacheEntryRole::kIndexBlock);
1119+
index_block_working_area.compress =
1120+
index_block_compressor->ObtainWorkingArea();
11031121
}
11041122
max_dict_sample_bytes = basic_compressor->GetMaxSampleSizeIfWantDict(
11051123
CacheEntryRole::kDataBlock);
@@ -1114,8 +1132,10 @@ struct BlockBasedTableBuilder::Rep {
11141132
tbo.compression_opts.max_dict_buffer_bytes);
11151133
}
11161134
} else {
1117-
// No distinct data block compressor using dictionary
1118-
data_block_compressor = basic_compressor.get();
1135+
// No distinct data block compressor using dictionary, but
1136+
// implementation might still want to specialize for data blocks
1137+
data_block_compressor = MaybeCloneSpecialized(
1138+
basic_compressor.get(), CacheEntryRole::kDataBlock);
11191139
data_block_working_area.compress =
11201140
data_block_compressor->ObtainWorkingArea();
11211141
}
@@ -1129,8 +1149,9 @@ struct BlockBasedTableBuilder::Rep {
11291149
if (table_options.verify_compression) {
11301150
verify_decompressor = basic_decompressor.get();
11311151
if (table_options.enable_index_compression) {
1132-
basic_working_area.verify = verify_decompressor->ObtainWorkingArea(
1133-
basic_compressor->GetPreferredCompressionType());
1152+
index_block_working_area.verify =
1153+
verify_decompressor->ObtainWorkingArea(
1154+
index_block_compressor->GetPreferredCompressionType());
11341155
}
11351156
if (state == State::kUnbuffered) {
11361157
assert(data_block_compressor);
@@ -1295,8 +1316,19 @@ struct BlockBasedTableBuilder::Rep {
12951316
}
12961317

12971318
~Rep() {
1319+
// Delete working areas before their compressors.
1320+
index_block_working_area = {};
1321+
data_block_working_area = {};
12981322
// Must have been cleaned up by StopParallelCompression
12991323
assert(pc_rep == nullptr);
1324+
// Delete specialized compressors if they were distinct (avoiding extra
1325+
// fields and interlocked instructions with shared_ptr)
1326+
if (data_block_compressor.get() != basic_compressor.get()) {
1327+
delete data_block_compressor.get();
1328+
}
1329+
if (index_block_compressor.get() != basic_compressor.get()) {
1330+
delete index_block_compressor.get();
1331+
}
13001332
}
13011333

13021334
Rep(const Rep&) = delete;
@@ -1729,9 +1761,11 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& uncompressed_block_data,
17291761
assert(!r->IsParallelCompressionActive());
17301762
CompressionType type;
17311763
bool is_data_block = block_type == BlockType::kData;
1764+
// NOTE: only index and data blocks are currently compressed
1765+
assert(is_data_block || block_type == BlockType::kIndex);
17321766
Status compress_status = CompressAndVerifyBlock(
17331767
uncompressed_block_data, is_data_block,
1734-
is_data_block ? r->data_block_working_area : r->basic_working_area,
1768+
is_data_block ? r->data_block_working_area : r->index_block_working_area,
17351769
&r->single_threaded_compressed_output, &type);
17361770
r->SetStatus(compress_status);
17371771
if (UNLIKELY(!ok())) {
@@ -1845,13 +1879,13 @@ Status BlockBasedTableBuilder::CompressAndVerifyBlock(
18451879
Rep* r = rep_.get();
18461880
Status status;
18471881

1848-
Compressor* compressor = nullptr;
1882+
UnownedPtr<Compressor> compressor = nullptr;
18491883
Decompressor* verify_decomp = nullptr;
18501884
if (is_data_block) {
18511885
compressor = r->data_block_compressor;
18521886
verify_decomp = r->data_block_verify_decompressor.get();
18531887
} else {
1854-
compressor = r->basic_compressor.get();
1888+
compressor = r->index_block_compressor;
18551889
verify_decomp = r->verify_decompressor.get();
18561890
}
18571891

@@ -2116,7 +2150,7 @@ void BlockBasedTableBuilder::MaybeStartParallelCompression() {
21162150
// that latency. So even with some optimizations, turning on the parallel
21172151
// framework when compression is disabled just eats more CPU with little-to-no
21182152
// improvement in throughput.
2119-
if (rep_->data_block_compressor == nullptr) {
2153+
if (!rep_->data_block_compressor) {
21202154
// Force the generally best configuration for no compression: no parallelism
21212155
return;
21222156
}
@@ -2463,8 +2497,8 @@ void BlockBasedTableBuilder::WritePropertiesBlock(
24632497
void BlockBasedTableBuilder::WriteCompressionDictBlock(
24642498
MetaIndexBuilder* meta_index_builder) {
24652499
Slice compression_dict;
2466-
if (rep_->compressor_with_dict) {
2467-
compression_dict = rep_->compressor_with_dict->GetSerializedDict();
2500+
if (rep_->data_block_compressor) {
2501+
compression_dict = rep_->data_block_compressor->GetSerializedDict();
24682502
}
24692503
if (!compression_dict.empty()) {
24702504
BlockHandle compression_dict_block_handle;
@@ -2559,6 +2593,7 @@ void BlockBasedTableBuilder::MaybeEnterUnbuffered(
25592593
// The below code is neither safe nor necessary for handling zero data
25602594
// blocks.
25612595
// For PostPopulateCompressionProperties()
2596+
assert(!r->data_block_compressor);
25622597
r->data_block_compressor = r->basic_compressor.get();
25632598
return;
25642599
}
@@ -2600,15 +2635,12 @@ void BlockBasedTableBuilder::MaybeEnterUnbuffered(
26002635

26012636
assert(samples.sample_data.size() > 0);
26022637

2603-
// final sample data block flushed, now we can generate dictionary
2604-
r->compressor_with_dict = r->basic_compressor->MaybeCloneSpecialized(
2605-
CacheEntryRole::kDataBlock, std::move(samples));
2638+
// final sample data block flushed, now we can generate dictionary (or it
2639+
// might opt not to use a dictionary and that's ok)
2640+
r->data_block_compressor =
2641+
MaybeCloneSpecialized(r->basic_compressor.get(),
2642+
CacheEntryRole::kDataBlock, std::move(samples));
26062643

2607-
// The compressor might opt not to use a dictionary, in which case we
2608-
// can use the same compressor as for e.g. index blocks.
2609-
r->data_block_compressor = r->compressor_with_dict
2610-
? r->compressor_with_dict.get()
2611-
: r->basic_compressor.get();
26122644
Slice serialized_dict = r->data_block_compressor->GetSerializedDict();
26132645
if (r->verify_decompressor) {
26142646
if (serialized_dict.empty()) {
@@ -2831,8 +2863,8 @@ uint64_t BlockBasedTableBuilder::EstimatedTailSize() const {
28312863
}
28322864

28332865
// 3. Estimate compression dictionary size
2834-
if (rep_->compressor_with_dict) {
2835-
Slice dict = rep_->compressor_with_dict->GetSerializedDict();
2866+
if (rep_->data_block_compressor) {
2867+
Slice dict = rep_->data_block_compressor->GetSerializedDict();
28362868
if (!dict.empty()) {
28372869
estimated_tail_size += dict.size();
28382870
}

test_util/testutil.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -766,6 +766,10 @@ struct CompressorCustomAlg : public CompressorWrapper {
766766
return kCompression;
767767
}
768768

769+
std::unique_ptr<Compressor> Clone() const override {
770+
return std::make_unique<CompressorCustomAlg>(wrapped_->Clone());
771+
}
772+
769773
Status CompressBlock(Slice uncompressed_data, char* compressed_output,
770774
size_t* compressed_output_size,
771775
CompressionType* out_compression_type,
@@ -794,7 +798,7 @@ struct CompressorCustomAlg : public CompressorWrapper {
794798
std::unique_ptr<Compressor> MaybeCloneSpecialized(
795799
CacheEntryRole block_type, DictSampleArgs&& dict_samples) override {
796800
auto clone =
797-
wrapped_->MaybeCloneSpecialized(block_type, std::move(dict_samples));
801+
wrapped_->CloneMaybeSpecialized(block_type, std::move(dict_samples));
798802
return std::make_unique<CompressorCustomAlg>(std::move(clone));
799803
}
800804

util/auto_tune_compressor.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ const char* AutoSkipCompressorWrapper::Name() const {
5959
return "AutoSkipCompressorWrapper";
6060
}
6161

62+
std::unique_ptr<Compressor> AutoSkipCompressorWrapper::Clone() const {
63+
return std::make_unique<AutoSkipCompressorWrapper>(wrapped_->Clone(), opts_);
64+
}
65+
6266
Status AutoSkipCompressorWrapper::CompressBlock(
6367
Slice uncompressed_data, char* compressed_output,
6468
size_t* compressed_output_size, CompressionType* out_compression_type,
@@ -174,6 +178,10 @@ CostAwareCompressor::CostAwareCompressor(const CompressionOptions& opts)
174178
}
175179

176180
const char* CostAwareCompressor::Name() const { return "CostAwareCompressor"; }
181+
182+
std::unique_ptr<Compressor> CostAwareCompressor::Clone() const {
183+
return std::make_unique<CostAwareCompressor>(opts_);
184+
}
177185
size_t CostAwareCompressor::GetMaxSampleSizeIfWantDict(
178186
CacheEntryRole block_type) const {
179187
auto idx = allcompressors_index_.back();

util/auto_tune_compressor.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ class AutoSkipCompressorWrapper : public CompressorWrapper {
6464
explicit AutoSkipCompressorWrapper(std::unique_ptr<Compressor> compressor,
6565
const CompressionOptions& opts);
6666

67+
std::unique_ptr<Compressor> Clone() const override;
6768
Status CompressBlock(Slice uncompressed_data, char* compressed_output,
6869
size_t* compressed_output_size,
6970
CompressionType* out_compression_type,
@@ -149,6 +150,7 @@ class CostAwareCompressor : public Compressor {
149150
public:
150151
explicit CostAwareCompressor(const CompressionOptions& opts);
151152
const char* Name() const override;
153+
std::unique_ptr<Compressor> Clone() const override;
152154
size_t GetMaxSampleSizeIfWantDict(CacheEntryRole block_type) const override;
153155
Slice GetSerializedDict() const override;
154156
CompressionType GetPreferredCompressionType() const override;

0 commit comments

Comments
 (0)