Skip to content

Commit

Permalink
Enhancement: reduce the memory usage of segment (StarRocks#896)
Browse files Browse the repository at this point in the history
1. Does not keep `SegmentFooterPB` in memory anymore
2. Save index meta and index reader as a union in `ColumnReader`
3. Reorder class memory fields to reduce class size
4. Avoid create unused fields
In my test environment (with 1 BE, 10,000 replicas, 300 columns per replica), memory consumption dropped from 15GB to 7.2GB
  • Loading branch information
sduzh authored Oct 27, 2021
1 parent 324146f commit 55b98fe
Show file tree
Hide file tree
Showing 7 changed files with 277 additions and 234 deletions.
265 changes: 144 additions & 121 deletions be/src/storage/rowset/segment_v2/column_reader.cpp

Large diffs are not rendered by default.

92 changes: 63 additions & 29 deletions be/src/storage/rowset/segment_v2/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,21 @@ struct ColumnIteratorOptions {
// same information, such as OrdinalPageIndex and Page data.
// This will cache data shared by all reader
class ColumnReader {
struct private_type;

public:
// Create an initialized ColumnReader in *reader.
// This should be a lightweight operation without I/O.
static Status create(MemTracker* mem_tracker, const ColumnReaderOptions& opts, const ColumnMetaPB& meta,
uint64_t num_rows, const std::string& file_name, std::unique_ptr<ColumnReader>* reader);
// Create and initialize a ColumnReader.
// This method will not take the ownership of |meta|.
// Note that |meta| is mutable, this method may change its internal state.
//
// To developers: keep this method lightweight, should not incur any I/O.
static Status create(MemTracker* mem_tracker, const ColumnReaderOptions& opts, ColumnMetaPB* meta,
const std::string& file_name, std::unique_ptr<ColumnReader>* reader);

ColumnReader(const private_type&, MemTracker* mem_tracker, const ColumnReaderOptions& opts,
const std::string& file_name);

~ColumnReader() = default;
~ColumnReader();

// create a new column iterator. Client should delete returned iterator
Status new_iterator(ColumnIterator** iterator);
Expand All @@ -136,13 +144,17 @@ class ColumnReader {
Status read_page(const ColumnIteratorOptions& iter_opts, const PagePointer& pp, PageHandle* handle,
Slice* page_body, PageFooterPB* footer);

bool is_nullable() const { return _is_nullable; }
bool is_nullable() const { return _flags[kIsNullablePos]; }

const EncodingInfo* encoding_info() const { return _encoding_info; }

bool has_zone_map() const { return _zone_map_index_meta != nullptr; }
bool has_bitmap_index() const { return _bitmap_index_meta != nullptr; }
bool has_bloom_filter_index() const { return _bf_index_meta != nullptr; }
bool has_zone_map() const { return _flags[kHasZoneMapIndexMetaPos] || _flags[kHasZoneMapIndexReaderPos]; }
bool has_bitmap_index() const { return _flags[kHasBitmapIndexMetaPos] || _flags[kHasBitmapIndexReaderPos]; }
bool has_bloom_filter_index() const {
return _flags[kHasBloomFilterIndexMetaPos] || _flags[kHasBloomFilterIndexReaderPos];
}

ZoneMapPB* segment_zone_map() const { return _segment_zone_map.get(); }

// Check if this column could match `cond' using segment zone map.
// Since segment zone map is stored in metadata, this function is fast without I/O.
Expand All @@ -161,8 +173,8 @@ class ColumnReader {

PagePointer get_dict_page_pointer() const { return _dict_page_pointer; }
FieldType column_type() const { return _column_type; }
bool has_all_dict_encoded() const { return _has_all_dict_encoded; }
bool all_dict_encoded() const { return _all_dict_encoded; }
bool has_all_dict_encoded() const { return _flags[kHasAllDictEncodedPos]; }
bool all_dict_encoded() const { return _flags[kAllDictEncodedPos]; }

size_t num_rows() const { return _num_rows; }

Expand Down Expand Up @@ -192,10 +204,36 @@ class ColumnReader {
Status ensure_index_loaded(ReaderType reader_type);

private:
ColumnReader(MemTracker* mem_tracker, const ColumnReaderOptions& opts, const ColumnMetaPB& meta, uint64_t num_rows,
const std::string& file_name);

Status init(const ColumnMetaPB& meta);
struct private_type {
private_type(int) {}
};

template <typename Meta, typename Reader>
union ColumnIndex {
Meta* meta;
Reader* reader;
};

constexpr static size_t kHasZoneMapIndexMetaPos = 0;
constexpr static size_t kHasZoneMapIndexReaderPos = 1;
constexpr static size_t kHasOrdinalIndexMetaPos = 2;
constexpr static size_t kHasOrdinalIndexReaderPos = 3;
constexpr static size_t kHasBitmapIndexMetaPos = 4;
constexpr static size_t kHasBitmapIndexReaderPos = 5;
constexpr static size_t kHasBloomFilterIndexMetaPos = 6;
constexpr static size_t kHasBloomFilterIndexReaderPos = 7;
constexpr static size_t kIsNullablePos = 8;
constexpr static size_t kHasAllDictEncodedPos = 9;
constexpr static size_t kAllDictEncodedPos = 10;

// Disable copy and assignment
ColumnReader(const ColumnReader&) = delete;
void operator=(const ColumnReader&) = delete;
// Disable move copy and move assignment
ColumnReader(ColumnReader&&) = delete;
void operator=(ColumnReader&&) = delete;

Status init(ColumnMetaPB* meta);

Status _load_zone_map_index(bool use_page_cache, bool kept_in_memory);
Status _load_ordinal_index(bool use_page_cache, bool kept_in_memory);
Expand Down Expand Up @@ -229,10 +267,7 @@ class ColumnReader {
// and now the content that is not needed in Meta is not saved to ColumnReader
int32_t _column_length = 0;
FieldType _column_type = OLAP_FIELD_TYPE_UNKNOWN;
bool _is_nullable = false;
PagePointer _dict_page_pointer;
bool _has_all_dict_encoded = false;
bool _all_dict_encoded = false;
ColumnReaderOptions _opts;
uint64_t _num_rows;
const std::string& _file_name;
Expand All @@ -241,11 +276,15 @@ class ColumnReader {
const EncodingInfo* _encoding_info = nullptr;
const BlockCompressionCodec* _compress_codec = nullptr; // initialized in init()

// meta for various column indexes (null if the index is absent)
const ZoneMapIndexPB* _zone_map_index_meta = nullptr;
const OrdinalIndexPB* _ordinal_index_meta = nullptr;
const BitmapIndexPB* _bitmap_index_meta = nullptr;
const BloomFilterIndexPB* _bf_index_meta = nullptr;
ColumnIndex<ZoneMapIndexPB, ZoneMapIndexReader> _zone_map_index;
ColumnIndex<OrdinalIndexPB, OrdinalIndexReader> _ordinal_index;
ColumnIndex<BitmapIndexPB, BitmapIndexReader> _bitmap_index;
ColumnIndex<BloomFilterIndexPB, BloomFilterIndexReader> _bloom_filter_index;

std::unique_ptr<ZoneMapPB> _segment_zone_map;

using SubReaderList = std::vector<std::unique_ptr<ColumnReader>>;
std::unique_ptr<SubReaderList> _sub_readers;

// The read operation comprise of compaction, query, checksum and so on.
// The ordinal index must be loaded before read operation.
Expand All @@ -254,12 +293,7 @@ class ColumnReader {
StarRocksCallOnce<Status> _load_ordinal_index_once;
StarRocksCallOnce<Status> _load_indices_once;

std::unique_ptr<ZoneMapIndexReader> _zone_map_index;
std::unique_ptr<OrdinalIndexReader> _ordinal_index;
std::unique_ptr<BitmapIndexReader> _bitmap_index;
std::unique_ptr<BloomFilterIndexReader> _bloom_filter_index;

std::vector<std::unique_ptr<ColumnReader>> _sub_readers;
std::bitset<16> _flags;
};

// Base iterator to read one column data
Expand Down
50 changes: 26 additions & 24 deletions be/src/storage/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,19 @@ Segment::Segment(const private_type&, MemTracker* mem_tracker, fs::BlockManager*
: _mem_tracker(mem_tracker),
_block_mgr(blk_mgr),
_fname(std::move(fname)),
_segment_id(segment_id),
_tablet_schema(tablet_schema) {
_tablet_schema(tablet_schema),
_segment_id(segment_id) {
_mem_tracker->consume(sizeof(Segment) + _fname.size());
}

Segment::~Segment() = default;

Status Segment::_open(size_t* footer_length_hint) {
RETURN_IF_ERROR(_parse_footer(footer_length_hint));
RETURN_IF_ERROR(_create_column_readers());
SegmentFooterPB footer;
RETURN_IF_ERROR(_parse_footer(footer_length_hint, &footer));
RETURN_IF_ERROR(_create_column_readers(&footer));
_num_rows = footer.num_rows();
_short_key_index_page = PagePointer(footer.short_key_index_page());
_prepare_adapter_info();
return Status::OK();
}
Expand Down Expand Up @@ -107,7 +110,7 @@ Status Segment::new_iterator(const Schema& schema, const StorageReadOptions& rea
// to the actual format. And create an AdaptSegmentIterator to wrap
if (_needs_block_adapter) {
std::unique_ptr<vectorized::SegmentV2IteratorAdapter> adapter(
new vectorized::SegmentV2IteratorAdapter(*_tablet_schema, _column_storage_types, schema));
new vectorized::SegmentV2IteratorAdapter(*_tablet_schema, *_column_storage_types, schema));

RETURN_IF_ERROR(adapter->init(read_options));

Expand Down Expand Up @@ -146,7 +149,7 @@ StatusOr<ChunkIteratorPtr> Segment::new_iterator(const vectorized::Schema& schem
// to the actual format. And create an AdaptSegmentIterator to wrap
if (_needs_chunk_adapter) {
std::unique_ptr<vectorized::SegmentChunkIteratorAdapter> adapter(new vectorized::SegmentChunkIteratorAdapter(
*_tablet_schema, _column_storage_types, schema, read_options.chunk_size));
*_tablet_schema, *_column_storage_types, schema, read_options.chunk_size));
RETURN_IF_ERROR(adapter->prepare(read_options));

auto result = _new_iterator(adapter->in_schema(), adapter->in_read_options());
Expand All @@ -160,7 +163,7 @@ StatusOr<ChunkIteratorPtr> Segment::new_iterator(const vectorized::Schema& schem
}
}

Status Segment::_parse_footer(size_t* footer_length_hint) {
Status Segment::_parse_footer(size_t* footer_length_hint, SegmentFooterPB* footer) {
// Footer := SegmentFooterPB, FooterPBSize(4), FooterPBChecksum(4), MagicNumber(4)
std::unique_ptr<fs::ReadableBlock> rblock;
RETURN_IF_ERROR(_block_mgr->open_block(_fname, &rblock));
Expand Down Expand Up @@ -202,9 +205,9 @@ Status Segment::_parse_footer(size_t* footer_length_hint) {

uint32_t actual_checksum = 0;
if (footer_length <= buff.size()) {
std::string_view footer(buff.data() + buff.size() - footer_length, footer_length);
actual_checksum = crc32c::Value(footer.data(), footer.size());
if (!_footer.ParseFromArray(footer.data(), footer.size())) {
std::string_view buf_footer(buff.data() + buff.size() - footer_length, footer_length);
actual_checksum = crc32c::Value(buf_footer.data(), buf_footer.size());
if (!footer->ParseFromArray(buf_footer.data(), buf_footer.size())) {
return Status::Corruption(strings::Substitute("Bad segment file $0: failed to parse footer", _fname));
}
} else { // Need read file again.
Expand All @@ -219,7 +222,7 @@ Status Segment::_parse_footer(size_t* footer_length_hint) {
::google::protobuf::io::ArrayInputStream stream2(buff.data(), buff.size());
::google::protobuf::io::ZeroCopyInputStream* streams[2] = {&stream1, &stream2};
::google::protobuf::io::ConcatenatingInputStream concatenating_stream(streams, 2);
if (!_footer.ParseFromZeroCopyStream(&concatenating_stream)) {
if (!footer->ParseFromZeroCopyStream(&concatenating_stream)) {
return Status::Corruption(strings::Substitute("Bad segment file $0: failed to parse footer", _fname));
}
}
Expand All @@ -230,10 +233,6 @@ Status Segment::_parse_footer(size_t* footer_length_hint) {
strings::Substitute("Bad segment file $0: footer checksum not match, actual=$1 vs expect=$2", _fname,
actual_checksum, checksum));
}

// The memory usage obtained through SpaceUsedLong() is an estimate
_mem_tracker->consume(static_cast<int64_t>(_footer.SpaceUsedLong()) -
static_cast<int64_t>(sizeof(SegmentFooterPB)));
return Status::OK();
}

Expand All @@ -246,7 +245,7 @@ Status Segment::_load_index() {
PageReadOptions opts;
opts.use_page_cache = !config::disable_storage_page_cache;
opts.rblock = rblock.get();
opts.page_pointer = PagePointer(_footer.short_key_index_page());
opts.page_pointer = _short_key_index_page;
opts.codec = nullptr; // short key index page uses NO_COMPRESSION for now
OlapReaderStatistics tmp_stats;
opts.stats = &tmp_stats;
Expand All @@ -267,10 +266,10 @@ Status Segment::_load_index() {
});
}

Status Segment::_create_column_readers() {
Status Segment::_create_column_readers(SegmentFooterPB* footer) {
std::unordered_map<uint32_t, uint32_t> column_id_to_footer_ordinal;
for (uint32_t ordinal = 0; ordinal < _footer.columns().size(); ++ordinal) {
const auto& column_pb = _footer.columns(ordinal);
for (uint32_t ordinal = 0; ordinal < footer->columns().size(); ++ordinal) {
const auto& column_pb = footer->columns(ordinal);
column_id_to_footer_ordinal.emplace(column_pb.unique_id(), ordinal);
}

Expand All @@ -284,11 +283,11 @@ Status Segment::_create_column_readers() {

ColumnReaderOptions opts;
opts.block_mgr = _block_mgr;
opts.storage_format_version = _footer.version();
opts.storage_format_version = footer->version();
opts.kept_in_memory = _tablet_schema->is_in_memory();
std::unique_ptr<ColumnReader> reader;
RETURN_IF_ERROR(ColumnReader::create(_mem_tracker, opts, _footer.columns(iter->second), _footer.num_rows(),
_fname, &reader));
RETURN_IF_ERROR(
ColumnReader::create(_mem_tracker, opts, footer->mutable_columns(iter->second), _fname, &reader));
_column_readers[ordinal] = std::move(reader);
}
return Status::OK();
Expand All @@ -298,7 +297,7 @@ void Segment::_prepare_adapter_info() {
ColumnId num_columns = _tablet_schema->num_columns();
_needs_block_adapter = false;
_needs_chunk_adapter = false;
_column_storage_types.resize(num_columns);
std::vector<FieldType> types(num_columns);
for (ColumnId cid = 0; cid < num_columns; ++cid) {
FieldType type;
if (_column_readers[cid] != nullptr) {
Expand All @@ -308,14 +307,17 @@ void Segment::_prepare_adapter_info() {
// And the type will be same with the tablet schema.
type = _tablet_schema->column(cid).type();
}
_column_storage_types[cid] = type;
types[cid] = type;
if (TypeUtils::specific_type_of_format_v1(type)) {
_needs_chunk_adapter = true;
}
if (type != _tablet_schema->column(cid).type()) {
_needs_block_adapter = true;
}
}
if (_needs_block_adapter || _needs_chunk_adapter) {
_column_storage_types = std::make_unique<std::vector<FieldType>>(std::move(types));
}
}

Status Segment::new_column_iterator(uint32_t cid, ColumnIterator** iter) {
Expand Down
23 changes: 12 additions & 11 deletions be/src/storage/rowset/segment_v2/segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "gutil/macros.h"
#include "storage/iterators.h"
#include "storage/rowset/segment_v2/page_handle.h"
#include "storage/rowset/segment_v2/page_pointer.h"
#include "storage/short_key_index.h"
#include "storage/tablet_schema.h"
#include "util/faststring.h"
Expand Down Expand Up @@ -96,7 +97,7 @@ class Segment : public std::enable_shared_from_this<Segment> {

uint64_t id() const { return _segment_id; }

uint32_t num_rows() const { return _footer.num_rows(); }
uint32_t num_rows() const { return _num_rows; }

Status new_column_iterator(uint32_t cid, ColumnIterator** iter);

Expand Down Expand Up @@ -128,11 +129,12 @@ class Segment : public std::enable_shared_from_this<Segment> {
return _sk_index_decoder->num_items() - 1;
}

// only used by UT
const SegmentFooterPB& footer() const { return _footer; }

const std::string& file_name() const { return _fname; }

size_t num_columns() const { return _column_readers.size(); }

const ColumnReader* column(size_t i) const { return _column_readers[i].get(); }

private:
Segment(const Segment&) = delete;
const Segment& operator=(const Segment&) = delete;
Expand All @@ -143,8 +145,8 @@ class Segment : public std::enable_shared_from_this<Segment> {

// open segment file and read the minimum amount of necessary information (footer)
Status _open(size_t* footer_length_hint);
Status _parse_footer(size_t* footer_length_hint);
Status _create_column_readers();
Status _parse_footer(size_t* footer_length_hint, SegmentFooterPB* footer);
Status _create_column_readers(SegmentFooterPB* footer);
// Load and decode short key index.
// May be called multiple times, subsequent calls will no op.
Status _load_index();
Expand All @@ -161,13 +163,12 @@ class Segment : public std::enable_shared_from_this<Segment> {
friend class vectorized::SegmentIterator;

MemTracker* _mem_tracker = nullptr;

fs::BlockManager* _block_mgr;
std::string _fname;
uint32_t _segment_id;
const TabletSchema* _tablet_schema;

SegmentFooterPB _footer;
uint32_t _segment_id = 0;
uint32_t _num_rows = 0;
PagePointer _short_key_index_page;

// ColumnReader for each column in TabletSchema. If ColumnReader is nullptr,
// This means that this segment has no data for that column, which may be added
Expand All @@ -182,7 +183,7 @@ class Segment : public std::enable_shared_from_this<Segment> {
std::unique_ptr<ShortKeyIndexDecoder> _sk_index_decoder;

// Actual storage type for each column, used to rewrite the input readoptions
std::vector<FieldType> _column_storage_types;
std::unique_ptr<std::vector<FieldType>> _column_storage_types;
// When reading old type format data this will be set to true.
bool _needs_chunk_adapter = false;
// When the storage types is different with TabletSchema
Expand Down
Loading

0 comments on commit 55b98fe

Please sign in to comment.