Skip to content

Commit

Permalink
[Feature] persistent index support compression (StarRocks#32112)
Browse files Browse the repository at this point in the history
Signed-off-by: luohaha <18810541851@163.com>
  • Loading branch information
luohaha authored Nov 21, 2023
1 parent 3f33fc7 commit d8545ee
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 45 deletions.
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1040,6 +1040,8 @@ CONF_mInt64(pindex_major_compaction_schedule_interval_seconds, "15");
CONF_mInt64(pindex_shard_data_gc_interval_seconds, "18000"); // 5 hour
// enable use bloom filter for pindex or not
CONF_mBool(enable_pindex_filter, "true");
// enable persistent index compression
CONF_mBool(enable_pindex_compression, "true");
// use bloom filter in pindex can reduce disk io, but in the following scenarios, we should skip the bloom filter
// 1. The records to be found are in the index, bloom filter is no usage
// 2. The records to be found is very small but bloom filter is very large, read bloom filter may cost a lot of disk io
Expand Down
120 changes: 101 additions & 19 deletions be/src/storage/persistent_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ struct ImmutableIndexShard {

Status write(WritableFile& wb) const;

Status compress_and_write(const CompressionTypePB& compression_type, WritableFile& wb,
size_t* uncompressed_size) const;

Status decompress_pages(const CompressionTypePB& compression_type, uint32_t npage, size_t uncompressed_size,
size_t compressed_size);

static StatusOr<std::unique_ptr<ImmutableIndexShard>> try_create(size_t key_size, size_t npage, size_t nbucket,
const std::vector<KVRef>& kv_refs);

Expand All @@ -173,6 +179,46 @@ Status ImmutableIndexShard::write(WritableFile& wb) const {
}
}

Status ImmutableIndexShard::compress_and_write(const CompressionTypePB& compression_type, WritableFile& wb,
size_t* uncompressed_size) const {
if (compression_type == CompressionTypePB::NO_COMPRESSION) {
return write(wb);
}
if (pages.size() > 0) {
const BlockCompressionCodec* codec = nullptr;
RETURN_IF_ERROR(get_block_compression_codec(compression_type, &codec));
Slice input((uint8_t*)pages.data(), kPageSize * pages.size());
*uncompressed_size = input.get_size();
faststring compressed_body;
compressed_body.resize(codec->max_compressed_len(*uncompressed_size));
Slice compressed_slice(compressed_body);
RETURN_IF_ERROR(codec->compress(input, &compressed_slice));
return wb.append(compressed_slice);
} else {
return Status::OK();
}
}

Status ImmutableIndexShard::decompress_pages(const CompressionTypePB& compression_type, uint32_t npage,
size_t uncompressed_size, size_t compressed_size) {
if (uncompressed_size == 0) {
// No compression
return Status::OK();
}
if (kPageSize * npage != uncompressed_size) {
return Status::Corruption(
fmt::format("invalid uncompressed shared size, {} / {}", kPageSize * npage, uncompressed_size));
}
const BlockCompressionCodec* codec = nullptr;
RETURN_IF_ERROR(get_block_compression_codec(compression_type, &codec));
Slice compressed_body((uint8_t*)pages.data(), compressed_size);
std::vector<IndexPage> uncompressed_pages(npage);
Slice decompressed_body((uint8_t*)uncompressed_pages.data(), uncompressed_size);
RETURN_IF_ERROR(codec->decompress(compressed_body, &decompressed_body));
pages.swap(uncompressed_pages);
return Status::OK();
}

inline size_t num_pack_for_bucket(size_t kv_size, size_t num_kv) {
return npad(num_kv, kPackSize) + npad(kv_size * num_kv, kPackSize);
}
Expand Down Expand Up @@ -496,6 +542,11 @@ Status ImmutableIndexWriter::init(const string& idx_file_path, const EditVersion

_bf_file_path = _idx_file_path + BloomFilterSuffix;
ASSIGN_OR_RETURN(_bf_wb, _fs->new_writable_file(wblock_opts, _bf_file_path));
if (config::enable_pindex_compression) {
_meta.set_compression_type(CompressionTypePB::LZ4_FRAME);
} else {
_meta.set_compression_type(CompressionTypePB::NO_COMPRESSION);
}
return Status::OK();
}

Expand Down Expand Up @@ -545,14 +596,17 @@ Status ImmutableIndexWriter::write_shard(size_t key_size, size_t npage_hint, siz
}
auto& shard = rs_create.value();
size_t pos_before = _idx_wb->size();
RETURN_IF_ERROR(shard->write(*_idx_wb));
size_t uncompressed_size = 0;
RETURN_IF_ERROR(shard->compress_and_write(static_cast<CompressionTypePB>(_meta.compression_type()), *_idx_wb,
&uncompressed_size));
size_t pos_after = _idx_wb->size();
auto shard_meta = _meta.add_shards();
shard_meta->set_size(kvs.size());
shard_meta->set_npage(shard->npage());
shard_meta->set_key_size(key_size);
shard_meta->set_value_size(kIndexValueSize);
shard_meta->set_nbucket(nbucket);
shard_meta->set_uncompressed_size(uncompressed_size);
auto ptr_meta = shard_meta->mutable_data();
ptr_meta->set_offset(pos_before);
ptr_meta->set_size(pos_after - pos_before);
Expand Down Expand Up @@ -631,12 +685,14 @@ Status ImmutableIndexWriter::finish() {
RETURN_IF_ERROR(write_bf());
}
LOG(INFO) << strings::Substitute(
"finish writing immutable index $0 #shard:$1 #kv:$2 #moved:$3($4) kv_bytes:$5 usage:$6 bf_bytes:$7",
"finish writing immutable index $0 #shard:$1 #kv:$2 #moved:$3($4) kv_bytes:$5 usage:$6 bf_bytes:$7 "
"compression_type:$8",
_idx_file_path_tmp, _nshard, _total, _total_moved, _total_moved * 1000 / std::max(_total, 1UL) / 1000.0,
_total_kv_bytes, _total_kv_size * 1000 / std::max(_total_kv_bytes, 1UL) / 1000.0, _total_bf_bytes);
_total_kv_bytes, _total_kv_size * 1000 / std::max(_total_kv_bytes, 1UL) / 1000.0, _total_bf_bytes,
_meta.compression_type());
_version.to_pb(_meta.mutable_version());
_meta.set_size(_total);
_meta.set_format_version(PERSISTENT_INDEX_VERSION_3);
_meta.set_format_version(PERSISTENT_INDEX_VERSION_4);
for (const auto& [key_size, shard_info] : _shard_info_by_length) {
const auto [shard_offset, shard_num] = shard_info;
auto info = _meta.add_shard_info();
Expand Down Expand Up @@ -1804,7 +1860,7 @@ Status ShardByLengthMutableIndex::commit(MutableIndexMetaPB* meta, const EditVer
// create a new empty _l0 file, set _offset to 0
data->set_offset(0);
data->set_size(0);
meta->set_format_version(PERSISTENT_INDEX_VERSION_3);
meta->set_format_version(PERSISTENT_INDEX_VERSION_4);
_offset = 0;
_page_size = 0;
break;
Expand Down Expand Up @@ -1839,7 +1895,7 @@ Status ShardByLengthMutableIndex::commit(MutableIndexMetaPB* meta, const EditVer
data->set_size(snapshot_size);
snapshot->clear_dumped_shard_idxes();
snapshot->mutable_dumped_shard_idxes()->Add(dumped_shard_idxes.begin(), dumped_shard_idxes.end());
meta->set_format_version(PERSISTENT_INDEX_VERSION_3);
meta->set_format_version(PERSISTENT_INDEX_VERSION_4);
_offset = snapshot_size;
_page_size = 0;
break;
Expand All @@ -1850,7 +1906,7 @@ Status ShardByLengthMutableIndex::commit(MutableIndexMetaPB* meta, const EditVer
PagePointerPB* data = wal_pb->mutable_data();
data->set_offset(_offset);
data->set_size(_page_size);
meta->set_format_version(PERSISTENT_INDEX_VERSION_3);
meta->set_format_version(PERSISTENT_INDEX_VERSION_4);
_offset += _page_size;
_page_size = 0;
break;
Expand All @@ -1864,9 +1920,10 @@ Status ShardByLengthMutableIndex::commit(MutableIndexMetaPB* meta, const EditVer

Status ShardByLengthMutableIndex::load(const MutableIndexMetaPB& meta) {
auto format_version = meta.format_version();
if (format_version != PERSISTENT_INDEX_VERSION_2 && format_version != PERSISTENT_INDEX_VERSION_3) {
if (format_version != PERSISTENT_INDEX_VERSION_2 && format_version != PERSISTENT_INDEX_VERSION_3 &&
format_version != PERSISTENT_INDEX_VERSION_4) {
std::string msg = strings::Substitute("different l0 format, should rebuid index. actual:$0, expect:$1",
format_version, PERSISTENT_INDEX_VERSION_3);
format_version, PERSISTENT_INDEX_VERSION_4);
LOG(WARNING) << msg;
return Status::InternalError(msg);
}
Expand Down Expand Up @@ -2069,6 +2126,8 @@ Status ImmutableIndex::_get_kvs_for_shard(std::vector<std::vector<KVRef>>& kvs_b
}
*shard = std::make_unique<ImmutableIndexShard>(shard_info.npage);
RETURN_IF_ERROR(_file->read_at_fully(shard_info.offset, (*shard)->pages.data(), shard_info.bytes));
RETURN_IF_ERROR((*shard)->decompress_pages(_compression_type, shard_info.npage, shard_info.uncompressed_size,
shard_info.bytes));
if (shard_info.key_size != 0) {
return _get_fixlen_kvs_for_shard(kvs_by_shard, shard_idx, shard_bits, shard);
} else {
Expand Down Expand Up @@ -2352,13 +2411,19 @@ Status ImmutableIndex::_get_in_shard(size_t shard_idx, size_t n, const Slice* ke
return st;
}

if (st.ok() && keys_info_by_page.size() == 1) {
if (st.ok() && keys_info_by_page.size() == 1 && shard_info.uncompressed_size == 0) {
return _get_in_shard_by_page(shard_idx, n, keys, values, found_keys_info, keys_info_by_page);
}

std::unique_ptr<ImmutableIndexShard> shard = std::make_unique<ImmutableIndexShard>(shard_info.npage);
CHECK(shard->pages.size() * kPageSize == shard_info.bytes) << "illegal shard size";
if (shard_info.uncompressed_size == 0) {
CHECK(shard->pages.size() * kPageSize == shard_info.bytes) << "illegal shard size";
} else {
CHECK(shard->pages.size() * kPageSize == shard_info.uncompressed_size) << "illegal shard size";
}
RETURN_IF_ERROR(_file->read_at_fully(shard_info.offset, shard->pages.data(), shard_info.bytes));
RETURN_IF_ERROR(shard->decompress_pages(_compression_type, shard_info.npage, shard_info.uncompressed_size,
shard_info.bytes));
if (stat != nullptr) {
stat->read_io_bytes += shard_info.bytes;
}
Expand Down Expand Up @@ -2434,8 +2499,14 @@ Status ImmutableIndex::_check_not_exist_in_shard(size_t shard_idx, size_t n, con
return Status::OK();
}
std::unique_ptr<ImmutableIndexShard> shard = std::make_unique<ImmutableIndexShard>(shard_info.npage);
CHECK(shard->pages.size() * kPageSize == shard_info.bytes) << "illegal shard size";
if (shard_info.uncompressed_size == 0) {
CHECK(shard->pages.size() * kPageSize == shard_info.bytes) << "illegal shard size";
} else {
CHECK(shard->pages.size() * kPageSize == shard_info.uncompressed_size) << "illegal shard size";
}
RETURN_IF_ERROR(_file->read_at_fully(shard_info.offset, shard->pages.data(), shard_info.bytes));
RETURN_IF_ERROR(shard->decompress_pages(_compression_type, shard_info.npage, shard_info.uncompressed_size,
shard_info.bytes));
if (shard_info.key_size != 0) {
return _check_not_exist_in_fixlen_shard(shard_idx, n, keys, keys_info, &shard);
} else {
Expand Down Expand Up @@ -2641,17 +2712,23 @@ StatusOr<std::unique_ptr<ImmutableIndex>> ImmutableIndex::load(std::unique_ptr<R
}

auto format_version = meta.format_version();
if (format_version != PERSISTENT_INDEX_VERSION_2 && format_version != PERSISTENT_INDEX_VERSION_3) {
if (format_version != PERSISTENT_INDEX_VERSION_2 && format_version != PERSISTENT_INDEX_VERSION_3 &&
format_version != PERSISTENT_INDEX_VERSION_4) {
std::string msg =
strings::Substitute("different immutable index format, should rebuid index. actual:$0, expect:$1",
format_version, PERSISTENT_INDEX_VERSION_3);
format_version, PERSISTENT_INDEX_VERSION_4);
LOG(WARNING) << msg;
return Status::InternalError(msg);
}

std::unique_ptr<ImmutableIndex> idx = std::make_unique<ImmutableIndex>();
idx->_version = EditVersion(meta.version());
idx->_size = meta.size();
if (meta.compression_type() > 0) {
idx->_compression_type = static_cast<CompressionTypePB>(meta.compression_type());
} else {
idx->_compression_type = CompressionTypePB::NO_COMPRESSION;
}
size_t nshard = meta.shards_size();
idx->_shards.resize(nshard);
for (size_t i = 0; i < nshard; i++) {
Expand All @@ -2664,6 +2741,11 @@ StatusOr<std::unique_ptr<ImmutableIndex>> ImmutableIndex::load(std::unique_ptr<R
dest.key_size = src.key_size();
dest.value_size = src.value_size();
dest.nbucket = src.nbucket();
dest.uncompressed_size = src.uncompressed_size();
if (idx->_compression_type == CompressionTypePB::NO_COMPRESSION) {
CHECK(dest.uncompressed_size == 0) << "compression type: " << idx->_compression_type
<< " uncompressed_size: " << dest.uncompressed_size;
}
// This is for compatibility, we don't add data_size in shard_info in the rc version
// And data_size is added to reslove some bug(https://github.com/StarRocks/starrocks/issues/11868)
// However, if we upgrade from rc version, the data_size will be used as default value(0) which will cause
Expand Down Expand Up @@ -3194,7 +3276,7 @@ Status PersistentIndex::load_from_tablet(Tablet* tablet) {
index_meta.clear_l1_version();
index_meta.clear_l2_versions();
index_meta.set_key_size(_key_size);
index_meta.set_format_version(PERSISTENT_INDEX_VERSION_3);
index_meta.set_format_version(PERSISTENT_INDEX_VERSION_4);
lastest_applied_version.to_pb(index_meta.mutable_version());
MutableIndexMetaPB* l0_meta = index_meta.mutable_l0_meta();
l0_meta->clear_wals();
Expand Down Expand Up @@ -3367,7 +3449,7 @@ Status PersistentIndex::commit(PersistentIndexMetaPB* index_meta, IOStat* stat)
// update PersistentIndexMetaPB
index_meta->set_size(_size);
index_meta->set_usage(_usage);
index_meta->set_format_version(PERSISTENT_INDEX_VERSION_3);
index_meta->set_format_version(PERSISTENT_INDEX_VERSION_4);
_version.to_pb(index_meta->mutable_version());
_version.to_pb(index_meta->mutable_l1_version());
MutableIndexMetaPB* l0_meta = index_meta->mutable_l0_meta();
Expand All @@ -3377,14 +3459,14 @@ Status PersistentIndex::commit(PersistentIndexMetaPB* index_meta, IOStat* stat)
} else if (_dump_snapshot) {
index_meta->set_size(_size);
index_meta->set_usage(_usage);
index_meta->set_format_version(PERSISTENT_INDEX_VERSION_3);
index_meta->set_format_version(PERSISTENT_INDEX_VERSION_4);
_version.to_pb(index_meta->mutable_version());
MutableIndexMetaPB* l0_meta = index_meta->mutable_l0_meta();
RETURN_IF_ERROR(_l0->commit(l0_meta, _version, kSnapshot));
} else {
index_meta->set_size(_size);
index_meta->set_usage(_usage);
index_meta->set_format_version(PERSISTENT_INDEX_VERSION_3);
index_meta->set_format_version(PERSISTENT_INDEX_VERSION_4);
_version.to_pb(index_meta->mutable_version());
MutableIndexMetaPB* l0_meta = index_meta->mutable_l0_meta();
RETURN_IF_ERROR(_l0->commit(l0_meta, _version, kAppendWAL));
Expand Down Expand Up @@ -4442,7 +4524,7 @@ Status PersistentIndex::_minor_compaction(PersistentIndexMetaPB* index_meta) {
// 3. modify meta
index_meta->set_size(_size);
index_meta->set_usage(_usage);
index_meta->set_format_version(PERSISTENT_INDEX_VERSION_3);
index_meta->set_format_version(PERSISTENT_INDEX_VERSION_4);
_version.to_pb(index_meta->mutable_version());
_version.to_pb(index_meta->mutable_l1_version());
MutableIndexMetaPB* l0_meta = index_meta->mutable_l0_meta();
Expand Down
5 changes: 4 additions & 1 deletion be/src/storage/persistent_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ enum PersistentIndexFileVersion {
PERSISTENT_INDEX_VERSION_UNKNOWN = 0,
PERSISTENT_INDEX_VERSION_1,
PERSISTENT_INDEX_VERSION_2,
PERSISTENT_INDEX_VERSION_3
PERSISTENT_INDEX_VERSION_3,
PERSISTENT_INDEX_VERSION_4
};

static constexpr uint64_t NullIndexValue = -1;
Expand Down Expand Up @@ -535,12 +536,14 @@ class ImmutableIndex {
uint32_t value_size;
uint32_t nbucket;
uint64_t data_size;
uint64_t uncompressed_size;
};

std::vector<ShardInfo> _shards;
std::map<size_t, std::pair<size_t, size_t>> _shard_info_by_length;
mutable std::vector<std::unique_ptr<BloomFilter>> _bf_vec;
std::vector<size_t> _bf_off;
CompressionTypePB _compression_type;
};

class ImmutableIndexWriter {
Expand Down
Loading

0 comments on commit d8545ee

Please sign in to comment.