Skip to content

Commit

Permalink
[Enhancement][FlatJson] opitmize flat json compaction performance (St…
Browse files Browse the repository at this point in the history
…arRocks#49411)

Signed-off-by: Seaven <seaven_7@qq.com>
  • Loading branch information
Seaven authored Aug 15, 2024
1 parent 37ccfca commit f6290fd
Show file tree
Hide file tree
Showing 27 changed files with 1,042 additions and 280 deletions.
28 changes: 19 additions & 9 deletions be/src/column/column_access_path.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,22 +193,23 @@ StatusOr<std::unique_ptr<ColumnAccessPath>> ColumnAccessPath::create(const TColu
}

StatusOr<std::unique_ptr<ColumnAccessPath>> ColumnAccessPath::create(const TAccessPathType::type& type,
const std::string& path, uint32_t index) {
const std::string& path, uint32_t index,
const std::string& prefix) {
auto p = std::make_unique<ColumnAccessPath>();
p->_type = type;
p->_path = path;
p->_column_index = index;
p->_absolute_path = path;
if (prefix != "") {
p->_absolute_path = prefix + "." + path;
} else {
p->_absolute_path = path;
}
p->_value_type = TypeDescriptor(LogicalType::TYPE_JSON);
p->_children.clear();
return std::move(p);
}

ColumnAccessPath* insert_json_path_impl(const std::string& path, ColumnAccessPath* root) {
if (path.empty()) {
return root;
}

std::pair<std::string, std::string> _split_path(const std::string& path) {
size_t pos = 0;
if (path.starts_with("\"")) {
pos = path.find('\"', 1);
Expand All @@ -224,9 +225,18 @@ ColumnAccessPath* insert_json_path_impl(const std::string& path, ColumnAccessPat
next = path.substr(pos + 1);
}

return {key, next};
}

ColumnAccessPath* insert_json_path_impl(const std::string& path, ColumnAccessPath* root) {
if (path.empty()) {
return root;
}

auto [key, next] = _split_path(path);
auto child = root->get_child(key);
if (child == nullptr) {
auto n = ColumnAccessPath::create(TAccessPathType::FIELD, key, 0);
auto n = ColumnAccessPath::create(TAccessPathType::FIELD, key, 0, root->absolute_path());
DCHECK(n.ok());
root->children().emplace_back(std::move(n.value()));
child = root->children().back().get();
Expand All @@ -238,7 +248,7 @@ void ColumnAccessPath::insert_json_path(ColumnAccessPath* root, LogicalType type
auto leaf = insert_json_path_impl(path, root);
leaf->_type = TAccessPathType::type::FIELD;
leaf->_column_index = 0;
leaf->_absolute_path = path;
leaf->_absolute_path = root->absolute_path() + "." + path;
leaf->_value_type = TypeDescriptor(type);
}

Expand Down
12 changes: 9 additions & 3 deletions be/src/column/column_access_path.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ class ColumnAccessPath {
Status init(const std::string& parent_path, const TColumnAccessPath& column_path, RuntimeState* state,
ObjectPool* pool);

// for test
static StatusOr<std::unique_ptr<ColumnAccessPath>> create(const TAccessPathType::type& type,
const std::string& path, uint32_t index);
const std::string& path, uint32_t index,
const std::string& prefix = "");
// the path doesn't contains root
static void insert_json_path(ColumnAccessPath* root, LogicalType type, const std::string& path);
// end test

const std::string& path() const { return _path; }

Expand All @@ -60,6 +60,10 @@ class ColumnAccessPath {

std::vector<std::unique_ptr<ColumnAccessPath>>& children() { return _children; }

void set_from_compaction(bool from_compaction) { _from_compaction = from_compaction; }

bool is_from_compaction() const { return _from_compaction; }

bool is_key() const { return _type == TAccessPathType::type::KEY; }

bool is_offset() const { return _type == TAccessPathType::type::OFFSET; }
Expand Down Expand Up @@ -105,6 +109,8 @@ class ColumnAccessPath {

bool _from_predicate;

bool _from_compaction = false;

// the data type of the subfield
TypeDescriptor _value_type;

Expand Down
8 changes: 5 additions & 3 deletions be/src/column/json_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -476,10 +476,12 @@ std::string JsonColumn::debug_flat_paths() const {
}
std::ostringstream ss;
ss << "[";
for (size_t i = 0; i < _flat_column_paths.size() - 1; i++) {
ss << _flat_column_paths[i] << ", ";
size_t i = 0;
for (; i < _flat_column_paths.size() - 1; i++) {
ss << _flat_column_paths[i] << "(" << type_to_string(_flat_column_types[i]) << "), ";
}
ss << _flat_column_paths.back() << "]";
ss << _flat_column_paths[i] << "(" << type_to_string(_flat_column_types[i]) << ")";
ss << (has_remain() ? "]" : "}");
return ss.str();
}
} // namespace starrocks
7 changes: 5 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1327,8 +1327,11 @@ CONF_mInt32(dictionary_cache_refresh_threadpool_size, "8");
// json flat flag
CONF_mBool(enable_json_flat, "true");

// extract flat json column when row_num * null_factor < null_row_num
CONF_mDouble(json_flat_null_factor, "0.4");
// enable compaction is base on flat json, not whole json
CONF_mBool(enable_compaction_flat_json, "true");

// extract flat json column when row_num * null_factor > null_row_num
CONF_mDouble(json_flat_null_factor, "0.3");

// extract flat json column when row_num * sparsity_factor < hit_row_num
CONF_mDouble(json_flat_sparsity_factor, "0.9");
Expand Down
2 changes: 2 additions & 0 deletions be/src/storage/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ Status Compaction::_merge_rowsets_horizontally(size_t segment_iterator_num, Stat
TabletReaderParams reader_params;
reader_params.reader_type = compaction_type();
reader_params.profile = _runtime_profile.create_child("merge_rowsets");
reader_params.column_access_paths = &_column_access_paths;

int64_t total_num_rows = 0;
int64_t total_mem_footprint = 0;
Expand Down Expand Up @@ -253,6 +254,7 @@ Status Compaction::_merge_rowsets_vertically(size_t segment_iterator_num, Statis
TabletReaderParams reader_params;
reader_params.reader_type = compaction_type();
reader_params.profile = _runtime_profile.create_child("merge_rowsets");
reader_params.column_access_paths = &_column_access_paths;

int64_t total_num_rows = 0;
int64_t total_mem_footprint = 0;
Expand Down
2 changes: 2 additions & 0 deletions be/src/storage/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ class Compaction {
Version _output_version;

RuntimeProfile _runtime_profile;
// for flat json used
std::vector<std::unique_ptr<ColumnAccessPath>> _column_access_paths;

private:
// merge rows from vectorized reader and write into `_output_rs_writer`.
Expand Down
4 changes: 4 additions & 0 deletions be/src/storage/compaction_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@

#pragma once

#include <memory>
#include <mutex>
#include <sstream>
#include <vector>

#include "column/column_access_path.h"
#include "storage/background_task.h"
#include "storage/compaction_utils.h"
#include "storage/olap_common.h"
Expand Down Expand Up @@ -309,6 +311,8 @@ class CompactionTask : public BackgroundTask {
std::shared_lock<std::shared_mutex> _compaction_lock;
MonotonicStopWatch _watch;
MemTracker* _mem_tracker{nullptr};
// for flat json used
std::vector<std::unique_ptr<ColumnAccessPath>> _column_access_paths;
};

} // namespace starrocks
1 change: 1 addition & 0 deletions be/src/storage/horizontal_compaction_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ Status HorizontalCompactionTask::_horizontal_compact_data(Statistics* statistics
reader_params.reader_type =
compaction_type() == BASE_COMPACTION ? READER_BASE_COMPACTION : READER_CUMULATIVE_COMPACTION;
reader_params.profile = _runtime_profile.create_child("merge_rowsets");
reader_params.column_access_paths = &_column_access_paths;

int32_t chunk_size = CompactionUtils::get_read_chunk_size(
config::compaction_memory_limit_per_worker, config::vector_chunk_size, _task_info.input_rows_num,
Expand Down
3 changes: 3 additions & 0 deletions be/src/storage/lake/compaction_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <functional>
#include <ostream>

#include "column/column_access_path.h"
#include "common/status.h"
#include "compaction_task_context.h"
#include "runtime/mem_tracker.h"
Expand Down Expand Up @@ -55,6 +56,8 @@ class CompactionTask {
std::unique_ptr<MemTracker> _mem_tracker = nullptr;
CompactionTaskContext* _context;
std::shared_ptr<const TabletSchema> _tablet_schema;
// for flat json used
std::vector<std::unique_ptr<ColumnAccessPath>> _column_access_paths;
};

} // namespace starrocks::lake
1 change: 1 addition & 0 deletions be/src/storage/lake/horizontal_compaction_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ Status HorizontalCompactionTask::execute(CancelFunc cancel_func, ThreadPool* flu
reader_params.profile = nullptr;
reader_params.use_page_cache = false;
reader_params.lake_io_opts = {false, config::lake_compaction_stream_buffer_size_bytes};
reader_params.column_access_paths = &_column_access_paths;
RETURN_IF_ERROR(reader.open(reader_params));

ASSIGN_OR_RETURN(auto writer,
Expand Down
60 changes: 60 additions & 0 deletions be/src/storage/lake/tablet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "storage/tablet_schema_map.h"
#include "storage/types.h"
#include "storage/union_iterator.h"
#include "util/json_flattener.h"

namespace starrocks::lake {

Expand Down Expand Up @@ -106,6 +107,7 @@ Status TabletReader::open(const TabletReaderParams& read_params) {
read_params.reader_type != ReaderType::READER_ALTER_TABLE && !is_compaction(read_params.reader_type)) {
return Status::NotSupported("reader type not supported now");
}
RETURN_IF_ERROR(init_compaction_column_paths(read_params));

if (_need_split) {
std::vector<BaseTabletSharedPtr> tablets;
Expand Down Expand Up @@ -178,6 +180,64 @@ Status TabletReader::open(const TabletReaderParams& read_params) {
return Status::OK();
}

Status TabletReader::init_compaction_column_paths(const TabletReaderParams& read_params) {
if (!config::enable_compaction_flat_json || !is_compaction(read_params.reader_type) ||
read_params.column_access_paths == nullptr) {
return Status::OK();
}

if (!read_params.column_access_paths->empty()) {
VLOG(3) << "Lake Compaction flat json paths exists: " << read_params.column_access_paths->size();
return Status::OK();
}

DCHECK(is_compaction(read_params.reader_type) && read_params.column_access_paths != nullptr &&
read_params.column_access_paths->empty());
int num_readers = 0;
for (const auto& rowset : _rowsets) {
auto segments = rowset->get_segments();
std::for_each(segments.begin(), segments.end(),
[&](const auto& segment) { num_readers += segment->num_rows() > 0 ? 1 : 0; });
}

std::vector<const ColumnReader*> readers;
for (size_t i = 0; i < _tablet_schema->num_columns(); i++) {
const auto& col = _tablet_schema->column(i);
auto col_name = std::string(col.name());
if (_schema.get_field_by_name(col_name) == nullptr || col.type() != LogicalType::TYPE_JSON) {
continue;
}
readers.clear();
for (const auto& rowset : _rowsets) {
for (const auto& segment : rowset->get_segments()) {
if (segment->num_rows() == 0) {
continue;
}
auto reader = segment->column_with_uid(col.unique_id());
if (reader != nullptr && reader->column_type() == LogicalType::TYPE_JSON &&
nullptr != reader->sub_readers() && !reader->sub_readers()->empty()) {
readers.emplace_back(reader);
}
}
}
if (readers.size() == num_readers) {
// must all be flat json type
JsonPathDeriver deriver;
deriver.derived(readers);
auto paths = deriver.flat_paths();
auto types = deriver.flat_types();
VLOG(3) << "Lake Compaction flat json column: " << JsonFlatPath::debug_flat_json(paths, types, true);
ASSIGN_OR_RETURN(auto res, ColumnAccessPath::create(TAccessPathType::ROOT, std::string(col.name()), i));
for (size_t j = 0; j < paths.size(); j++) {
ColumnAccessPath::insert_json_path(res.get(), types[j], paths[j]);
}
res->set_from_compaction(true);
read_params.column_access_paths->emplace_back(std::move(res));
}
}
return Status::OK();
}

void TabletReader::close() {
if (_collect_iter != nullptr) {
_collect_iter->close();
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/lake/tablet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ class TabletReader final : public ChunkIterator {
Status init_delete_predicates(const TabletReaderParams& read_params, DeletePredicates* dels);

Status init_collector(const TabletReaderParams& read_params);
Status init_compaction_column_paths(const TabletReaderParams& read_params);

static Status to_seek_tuple(const TabletSchema& tablet_schema, const OlapTuple& input, SeekTuple* tuple,
MemPool* mempool);
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/lake/vertical_compaction_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ Status VerticalCompactionTask::compact_column_group(bool is_key, int column_grou
reader_params.chunk_size = chunk_size;
reader_params.profile = nullptr;
reader_params.use_page_cache = false;
reader_params.column_access_paths = &_column_access_paths;
reader_params.lake_io_opts = {config::lake_enable_vertical_compaction_fill_data_cache,
config::lake_compaction_stream_buffer_size_bytes};
RETURN_IF_ERROR(reader.open(reader_params));
Expand Down
18 changes: 13 additions & 5 deletions be/src/storage/rowset/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,8 @@ StatusOr<std::unique_ptr<ColumnIterator>> ColumnReader::new_iterator(ColumnAcces
}
// dynamic flattern
// we must dynamic flat json, because we don't know other segment wasn't the paths
return create_json_dynamic_flat_iterator(std::move(json_iter), target_paths, target_types);
return create_json_dynamic_flat_iterator(std::move(json_iter), target_paths, target_types,
path->is_from_compaction());
}

std::vector<std::string> source_paths;
Expand All @@ -716,7 +717,7 @@ StatusOr<std::unique_ptr<ColumnIterator>> ColumnReader::new_iterator(ColumnAcces
ASSIGN_OR_RETURN(null_iter, (*_sub_readers)[0]->new_iterator());
}

if (path == nullptr || path->children().empty()) {
if (path == nullptr || path->children().empty() || path->is_from_compaction()) {
DCHECK(_is_flat_json);
for (size_t i = start; i < end; i++) {
const auto& rd = (*_sub_readers)[i];
Expand All @@ -732,9 +733,16 @@ StatusOr<std::unique_ptr<ColumnIterator>> ColumnReader::new_iterator(ColumnAcces
ASSIGN_OR_RETURN(auto iter, rd->new_iterator());
all_iters.emplace_back(std::move(iter));
}
// access whole json
return create_json_merge_iterator(this, std::move(null_iter), std::move(all_iters), source_paths,
source_types);

if (path == nullptr || path->children().empty()) {
// access whole json
return create_json_merge_iterator(this, std::move(null_iter), std::move(all_iters), source_paths,
source_types);
} else {
DCHECK(path->is_from_compaction());
return create_json_flat_iterator(this, std::move(null_iter), std::move(all_iters), target_paths,
target_types, source_paths, source_types, true);
}
}

bool need_remain = false;
Expand Down
Loading

0 comments on commit f6290fd

Please sign in to comment.