From f6290fdff8ec5c583ac4593d70da5dc62f692252 Mon Sep 17 00:00:00 2001 From: Seaven Date: Thu, 15 Aug 2024 11:45:47 +0800 Subject: [PATCH] [Enhancement][FlatJson] opitmize flat json compaction performance (#49411) Signed-off-by: Seaven --- be/src/column/column_access_path.cpp | 28 +- be/src/column/column_access_path.h | 12 +- be/src/column/json_column.cpp | 8 +- be/src/common/config.h | 7 +- be/src/storage/compaction.cpp | 2 + be/src/storage/compaction.h | 2 + be/src/storage/compaction_task.h | 4 + be/src/storage/horizontal_compaction_task.cpp | 1 + be/src/storage/lake/compaction_task.h | 3 + .../lake/horizontal_compaction_task.cpp | 1 + be/src/storage/lake/tablet_reader.cpp | 60 ++ be/src/storage/lake/tablet_reader.h | 1 + .../storage/lake/vertical_compaction_task.cpp | 1 + be/src/storage/rowset/column_reader.cpp | 18 +- .../storage/rowset/json_column_compactor.cpp | 103 ++-- be/src/storage/rowset/json_column_compactor.h | 4 + .../storage/rowset/json_column_iterator.cpp | 139 ++--- be/src/storage/rowset/json_column_iterator.h | 20 +- be/src/storage/rowset/json_column_writer.cpp | 3 + be/src/storage/rowset/segment.h | 3 + be/src/storage/tablet_reader.cpp | 68 +++ be/src/storage/tablet_reader.h | 1 + be/src/storage/vertical_compaction_task.cpp | 1 + be/src/util/json_flattener.cpp | 141 ++--- be/src/util/json_flattener.h | 39 +- .../rowset/flat_json_column_compact_test.cpp | 526 +++++++++++++++++- .../rowset/flat_json_column_rw_test.cpp | 126 ++--- 27 files changed, 1042 insertions(+), 280 deletions(-) diff --git a/be/src/column/column_access_path.cpp b/be/src/column/column_access_path.cpp index b8a706c7bb043..05acb7fc9a8ae 100644 --- a/be/src/column/column_access_path.cpp +++ b/be/src/column/column_access_path.cpp @@ -193,22 +193,23 @@ StatusOr> ColumnAccessPath::create(const TColu } StatusOr> ColumnAccessPath::create(const TAccessPathType::type& type, - const std::string& path, uint32_t index) { + const std::string& path, uint32_t index, + const std::string& prefix) { auto p = std::make_unique(); p->_type = type; p->_path = path; p->_column_index = index; - p->_absolute_path = path; + if (prefix != "") { + p->_absolute_path = prefix + "." + path; + } else { + p->_absolute_path = path; + } p->_value_type = TypeDescriptor(LogicalType::TYPE_JSON); p->_children.clear(); return std::move(p); } -ColumnAccessPath* insert_json_path_impl(const std::string& path, ColumnAccessPath* root) { - if (path.empty()) { - return root; - } - +std::pair _split_path(const std::string& path) { size_t pos = 0; if (path.starts_with("\"")) { pos = path.find('\"', 1); @@ -224,9 +225,18 @@ ColumnAccessPath* insert_json_path_impl(const std::string& path, ColumnAccessPat next = path.substr(pos + 1); } + return {key, next}; +} + +ColumnAccessPath* insert_json_path_impl(const std::string& path, ColumnAccessPath* root) { + if (path.empty()) { + return root; + } + + auto [key, next] = _split_path(path); auto child = root->get_child(key); if (child == nullptr) { - auto n = ColumnAccessPath::create(TAccessPathType::FIELD, key, 0); + auto n = ColumnAccessPath::create(TAccessPathType::FIELD, key, 0, root->absolute_path()); DCHECK(n.ok()); root->children().emplace_back(std::move(n.value())); child = root->children().back().get(); @@ -238,7 +248,7 @@ void ColumnAccessPath::insert_json_path(ColumnAccessPath* root, LogicalType type auto leaf = insert_json_path_impl(path, root); leaf->_type = TAccessPathType::type::FIELD; leaf->_column_index = 0; - leaf->_absolute_path = path; + leaf->_absolute_path = root->absolute_path() + "." + path; leaf->_value_type = TypeDescriptor(type); } diff --git a/be/src/column/column_access_path.h b/be/src/column/column_access_path.h index d6edddfccd45f..071bd4df987d8 100644 --- a/be/src/column/column_access_path.h +++ b/be/src/column/column_access_path.h @@ -46,11 +46,11 @@ class ColumnAccessPath { Status init(const std::string& parent_path, const TColumnAccessPath& column_path, RuntimeState* state, ObjectPool* pool); - // for test static StatusOr> create(const TAccessPathType::type& type, - const std::string& path, uint32_t index); + const std::string& path, uint32_t index, + const std::string& prefix = ""); + // the path doesn't contains root static void insert_json_path(ColumnAccessPath* root, LogicalType type, const std::string& path); - // end test const std::string& path() const { return _path; } @@ -60,6 +60,10 @@ class ColumnAccessPath { std::vector>& children() { return _children; } + void set_from_compaction(bool from_compaction) { _from_compaction = from_compaction; } + + bool is_from_compaction() const { return _from_compaction; } + bool is_key() const { return _type == TAccessPathType::type::KEY; } bool is_offset() const { return _type == TAccessPathType::type::OFFSET; } @@ -105,6 +109,8 @@ class ColumnAccessPath { bool _from_predicate; + bool _from_compaction = false; + // the data type of the subfield TypeDescriptor _value_type; diff --git a/be/src/column/json_column.cpp b/be/src/column/json_column.cpp index 26a6b3972f1ff..46bdb2dc88f91 100644 --- a/be/src/column/json_column.cpp +++ b/be/src/column/json_column.cpp @@ -476,10 +476,12 @@ std::string JsonColumn::debug_flat_paths() const { } std::ostringstream ss; ss << "["; - for (size_t i = 0; i < _flat_column_paths.size() - 1; i++) { - ss << _flat_column_paths[i] << ", "; + size_t i = 0; + for (; i < _flat_column_paths.size() - 1; i++) { + ss << _flat_column_paths[i] << "(" << type_to_string(_flat_column_types[i]) << "), "; } - ss << _flat_column_paths.back() << "]"; + ss << _flat_column_paths[i] << "(" << type_to_string(_flat_column_types[i]) << ")"; + ss << (has_remain() ? "]" : "}"); return ss.str(); } } // namespace starrocks diff --git a/be/src/common/config.h b/be/src/common/config.h index c69ece7cbe852..fa9dbde7ac10f 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1327,8 +1327,11 @@ CONF_mInt32(dictionary_cache_refresh_threadpool_size, "8"); // json flat flag CONF_mBool(enable_json_flat, "true"); -// extract flat json column when row_num * null_factor < null_row_num -CONF_mDouble(json_flat_null_factor, "0.4"); +// enable compaction is base on flat json, not whole json +CONF_mBool(enable_compaction_flat_json, "true"); + +// extract flat json column when row_num * null_factor > null_row_num +CONF_mDouble(json_flat_null_factor, "0.3"); // extract flat json column when row_num * sparsity_factor < hit_row_num CONF_mDouble(json_flat_sparsity_factor, "0.9"); diff --git a/be/src/storage/compaction.cpp b/be/src/storage/compaction.cpp index 1d805ff578ba7..273a34408182c 100644 --- a/be/src/storage/compaction.cpp +++ b/be/src/storage/compaction.cpp @@ -167,6 +167,7 @@ Status Compaction::_merge_rowsets_horizontally(size_t segment_iterator_num, Stat TabletReaderParams reader_params; reader_params.reader_type = compaction_type(); reader_params.profile = _runtime_profile.create_child("merge_rowsets"); + reader_params.column_access_paths = &_column_access_paths; int64_t total_num_rows = 0; int64_t total_mem_footprint = 0; @@ -253,6 +254,7 @@ Status Compaction::_merge_rowsets_vertically(size_t segment_iterator_num, Statis TabletReaderParams reader_params; reader_params.reader_type = compaction_type(); reader_params.profile = _runtime_profile.create_child("merge_rowsets"); + reader_params.column_access_paths = &_column_access_paths; int64_t total_num_rows = 0; int64_t total_mem_footprint = 0; diff --git a/be/src/storage/compaction.h b/be/src/storage/compaction.h index 658739011a27d..c3ddeaae58142 100644 --- a/be/src/storage/compaction.h +++ b/be/src/storage/compaction.h @@ -86,6 +86,8 @@ class Compaction { Version _output_version; RuntimeProfile _runtime_profile; + // for flat json used + std::vector> _column_access_paths; private: // merge rows from vectorized reader and write into `_output_rs_writer`. diff --git a/be/src/storage/compaction_task.h b/be/src/storage/compaction_task.h index 9f7ab5a187a6e..40d2d6d37403b 100644 --- a/be/src/storage/compaction_task.h +++ b/be/src/storage/compaction_task.h @@ -14,10 +14,12 @@ #pragma once +#include #include #include #include +#include "column/column_access_path.h" #include "storage/background_task.h" #include "storage/compaction_utils.h" #include "storage/olap_common.h" @@ -309,6 +311,8 @@ class CompactionTask : public BackgroundTask { std::shared_lock _compaction_lock; MonotonicStopWatch _watch; MemTracker* _mem_tracker{nullptr}; + // for flat json used + std::vector> _column_access_paths; }; } // namespace starrocks diff --git a/be/src/storage/horizontal_compaction_task.cpp b/be/src/storage/horizontal_compaction_task.cpp index f2378e4975173..4b8e67bfa6f25 100644 --- a/be/src/storage/horizontal_compaction_task.cpp +++ b/be/src/storage/horizontal_compaction_task.cpp @@ -71,6 +71,7 @@ Status HorizontalCompactionTask::_horizontal_compact_data(Statistics* statistics reader_params.reader_type = compaction_type() == BASE_COMPACTION ? READER_BASE_COMPACTION : READER_CUMULATIVE_COMPACTION; reader_params.profile = _runtime_profile.create_child("merge_rowsets"); + reader_params.column_access_paths = &_column_access_paths; int32_t chunk_size = CompactionUtils::get_read_chunk_size( config::compaction_memory_limit_per_worker, config::vector_chunk_size, _task_info.input_rows_num, diff --git a/be/src/storage/lake/compaction_task.h b/be/src/storage/lake/compaction_task.h index a6898f574e8c1..ad4717cdde8d8 100644 --- a/be/src/storage/lake/compaction_task.h +++ b/be/src/storage/lake/compaction_task.h @@ -18,6 +18,7 @@ #include #include +#include "column/column_access_path.h" #include "common/status.h" #include "compaction_task_context.h" #include "runtime/mem_tracker.h" @@ -55,6 +56,8 @@ class CompactionTask { std::unique_ptr _mem_tracker = nullptr; CompactionTaskContext* _context; std::shared_ptr _tablet_schema; + // for flat json used + std::vector> _column_access_paths; }; } // namespace starrocks::lake diff --git a/be/src/storage/lake/horizontal_compaction_task.cpp b/be/src/storage/lake/horizontal_compaction_task.cpp index fcd924fe59ba5..279ca784cc8e4 100644 --- a/be/src/storage/lake/horizontal_compaction_task.cpp +++ b/be/src/storage/lake/horizontal_compaction_task.cpp @@ -51,6 +51,7 @@ Status HorizontalCompactionTask::execute(CancelFunc cancel_func, ThreadPool* flu reader_params.profile = nullptr; reader_params.use_page_cache = false; reader_params.lake_io_opts = {false, config::lake_compaction_stream_buffer_size_bytes}; + reader_params.column_access_paths = &_column_access_paths; RETURN_IF_ERROR(reader.open(reader_params)); ASSIGN_OR_RETURN(auto writer, diff --git a/be/src/storage/lake/tablet_reader.cpp b/be/src/storage/lake/tablet_reader.cpp index c2ee0a09976d1..f9e74ac825fdb 100644 --- a/be/src/storage/lake/tablet_reader.cpp +++ b/be/src/storage/lake/tablet_reader.cpp @@ -38,6 +38,7 @@ #include "storage/tablet_schema_map.h" #include "storage/types.h" #include "storage/union_iterator.h" +#include "util/json_flattener.h" namespace starrocks::lake { @@ -106,6 +107,7 @@ Status TabletReader::open(const TabletReaderParams& read_params) { read_params.reader_type != ReaderType::READER_ALTER_TABLE && !is_compaction(read_params.reader_type)) { return Status::NotSupported("reader type not supported now"); } + RETURN_IF_ERROR(init_compaction_column_paths(read_params)); if (_need_split) { std::vector tablets; @@ -178,6 +180,64 @@ Status TabletReader::open(const TabletReaderParams& read_params) { return Status::OK(); } +Status TabletReader::init_compaction_column_paths(const TabletReaderParams& read_params) { + if (!config::enable_compaction_flat_json || !is_compaction(read_params.reader_type) || + read_params.column_access_paths == nullptr) { + return Status::OK(); + } + + if (!read_params.column_access_paths->empty()) { + VLOG(3) << "Lake Compaction flat json paths exists: " << read_params.column_access_paths->size(); + return Status::OK(); + } + + DCHECK(is_compaction(read_params.reader_type) && read_params.column_access_paths != nullptr && + read_params.column_access_paths->empty()); + int num_readers = 0; + for (const auto& rowset : _rowsets) { + auto segments = rowset->get_segments(); + std::for_each(segments.begin(), segments.end(), + [&](const auto& segment) { num_readers += segment->num_rows() > 0 ? 1 : 0; }); + } + + std::vector readers; + for (size_t i = 0; i < _tablet_schema->num_columns(); i++) { + const auto& col = _tablet_schema->column(i); + auto col_name = std::string(col.name()); + if (_schema.get_field_by_name(col_name) == nullptr || col.type() != LogicalType::TYPE_JSON) { + continue; + } + readers.clear(); + for (const auto& rowset : _rowsets) { + for (const auto& segment : rowset->get_segments()) { + if (segment->num_rows() == 0) { + continue; + } + auto reader = segment->column_with_uid(col.unique_id()); + if (reader != nullptr && reader->column_type() == LogicalType::TYPE_JSON && + nullptr != reader->sub_readers() && !reader->sub_readers()->empty()) { + readers.emplace_back(reader); + } + } + } + if (readers.size() == num_readers) { + // must all be flat json type + JsonPathDeriver deriver; + deriver.derived(readers); + auto paths = deriver.flat_paths(); + auto types = deriver.flat_types(); + VLOG(3) << "Lake Compaction flat json column: " << JsonFlatPath::debug_flat_json(paths, types, true); + ASSIGN_OR_RETURN(auto res, ColumnAccessPath::create(TAccessPathType::ROOT, std::string(col.name()), i)); + for (size_t j = 0; j < paths.size(); j++) { + ColumnAccessPath::insert_json_path(res.get(), types[j], paths[j]); + } + res->set_from_compaction(true); + read_params.column_access_paths->emplace_back(std::move(res)); + } + } + return Status::OK(); +} + void TabletReader::close() { if (_collect_iter != nullptr) { _collect_iter->close(); diff --git a/be/src/storage/lake/tablet_reader.h b/be/src/storage/lake/tablet_reader.h index 9288a7fa8de90..ff0b5af3a206d 100644 --- a/be/src/storage/lake/tablet_reader.h +++ b/be/src/storage/lake/tablet_reader.h @@ -99,6 +99,7 @@ class TabletReader final : public ChunkIterator { Status init_delete_predicates(const TabletReaderParams& read_params, DeletePredicates* dels); Status init_collector(const TabletReaderParams& read_params); + Status init_compaction_column_paths(const TabletReaderParams& read_params); static Status to_seek_tuple(const TabletSchema& tablet_schema, const OlapTuple& input, SeekTuple* tuple, MemPool* mempool); diff --git a/be/src/storage/lake/vertical_compaction_task.cpp b/be/src/storage/lake/vertical_compaction_task.cpp index 52d538c0a32aa..5704e1c5b7cab 100644 --- a/be/src/storage/lake/vertical_compaction_task.cpp +++ b/be/src/storage/lake/vertical_compaction_task.cpp @@ -163,6 +163,7 @@ Status VerticalCompactionTask::compact_column_group(bool is_key, int column_grou reader_params.chunk_size = chunk_size; reader_params.profile = nullptr; reader_params.use_page_cache = false; + reader_params.column_access_paths = &_column_access_paths; reader_params.lake_io_opts = {config::lake_enable_vertical_compaction_fill_data_cache, config::lake_compaction_stream_buffer_size_bytes}; RETURN_IF_ERROR(reader.open(reader_params)); diff --git a/be/src/storage/rowset/column_reader.cpp b/be/src/storage/rowset/column_reader.cpp index 568953c01e192..e7db6b181f582 100644 --- a/be/src/storage/rowset/column_reader.cpp +++ b/be/src/storage/rowset/column_reader.cpp @@ -703,7 +703,8 @@ StatusOr> ColumnReader::new_iterator(ColumnAcces } // dynamic flattern // we must dynamic flat json, because we don't know other segment wasn't the paths - return create_json_dynamic_flat_iterator(std::move(json_iter), target_paths, target_types); + return create_json_dynamic_flat_iterator(std::move(json_iter), target_paths, target_types, + path->is_from_compaction()); } std::vector source_paths; @@ -716,7 +717,7 @@ StatusOr> ColumnReader::new_iterator(ColumnAcces ASSIGN_OR_RETURN(null_iter, (*_sub_readers)[0]->new_iterator()); } - if (path == nullptr || path->children().empty()) { + if (path == nullptr || path->children().empty() || path->is_from_compaction()) { DCHECK(_is_flat_json); for (size_t i = start; i < end; i++) { const auto& rd = (*_sub_readers)[i]; @@ -732,9 +733,16 @@ StatusOr> ColumnReader::new_iterator(ColumnAcces ASSIGN_OR_RETURN(auto iter, rd->new_iterator()); all_iters.emplace_back(std::move(iter)); } - // access whole json - return create_json_merge_iterator(this, std::move(null_iter), std::move(all_iters), source_paths, - source_types); + + if (path == nullptr || path->children().empty()) { + // access whole json + return create_json_merge_iterator(this, std::move(null_iter), std::move(all_iters), source_paths, + source_types); + } else { + DCHECK(path->is_from_compaction()); + return create_json_flat_iterator(this, std::move(null_iter), std::move(all_iters), target_paths, + target_types, source_paths, source_types, true); + } } bool need_remain = false; diff --git a/be/src/storage/rowset/json_column_compactor.cpp b/be/src/storage/rowset/json_column_compactor.cpp index 651089acb9479..0715af0ddc74d 100644 --- a/be/src/storage/rowset/json_column_compactor.cpp +++ b/be/src/storage/rowset/json_column_compactor.cpp @@ -18,6 +18,7 @@ #include #include +#include #include #include @@ -30,6 +31,7 @@ #include "gutil/casts.h" #include "storage/rowset/column_writer.h" #include "types/constexpr.h" +#include "types/logical_type.h" #include "util/json_flattener.h" namespace starrocks { @@ -54,48 +56,82 @@ Status FlatJsonColumnCompactor::_compact_columns(std::vector& json_da _flat_types = deriver.flat_types(); _has_remain = deriver.has_remain_json(); + VLOG(1) << "FlatJsonColumnCompactor compact_columns, json_datas size: " << json_datas.size() + << ", flat json: " << JsonFlatPath::debug_flat_json(_flat_paths, _flat_types, _has_remain); + if (_flat_paths.empty()) { // write json directly - _is_flat = false; - _json_meta->mutable_json_meta()->set_has_remain(false); - _json_meta->mutable_json_meta()->set_is_flat(false); + return _merge_columns(json_datas); + } + return _flatten_columns(json_datas); +} - for (auto& col : json_datas) { - JsonColumn* json_col; - NullColumnPtr null_col; - if (col->is_nullable()) { - auto nullable_column = down_cast(col.get()); - json_col = down_cast(nullable_column->data_column().get()); - null_col = nullable_column->null_column(); - } else { - json_col = down_cast(col.get()); +bool check_is_same_schema(JsonColumn* one, JsonColumn* two) { + if (one == nullptr || two == nullptr) { + return false; + } + + if (one->is_flat_json() && two->is_flat_json()) { + return one->flat_column_paths() == two->flat_column_paths() && + one->flat_column_types() == two->flat_column_types() && one->has_remain() == two->has_remain(); + } + return false; +} + +Status FlatJsonColumnCompactor::_merge_columns(std::vector& json_datas) { + VLOG(1) << "FlatJsonColumnCompactor merge_columns, json_datas: " << json_datas.size(); + _is_flat = false; + _json_meta->mutable_json_meta()->set_has_remain(false); + _json_meta->mutable_json_meta()->set_is_flat(false); + + JsonColumn* pre_col = nullptr; + std::unique_ptr merger = nullptr; + for (auto& col : json_datas) { + JsonColumn* json_col; + NullColumnPtr null_col; + if (col->is_nullable()) { + auto nullable_column = down_cast(col.get()); + json_col = down_cast(nullable_column->data_column().get()); + null_col = nullable_column->null_column(); + } else { + json_col = down_cast(col.get()); + } + + if (!json_col->is_flat_json()) { + VLOG(1) << "FlatJsonColumnCompactor merge_columns direct write"; + RETURN_IF_ERROR(_json_writer->append(*col)); + } else { + VLOG(1) << "FlatJsonColumnCompactor merge_columns merge: " << json_col->debug_flat_paths(); + if (!check_is_same_schema(pre_col, json_col)) { + merger = std::make_unique(json_col->flat_column_paths(), json_col->flat_column_types(), + json_col->has_remain()); + pre_col = json_col; } + auto j = merger->merge(json_col->get_flat_fields()); - if (!json_col->is_flat_json()) { - RETURN_IF_ERROR(_json_writer->append(*col)); + if (col->is_nullable()) { + auto n = NullableColumn::create(j, null_col); + n->set_has_null(col->has_null()); + RETURN_IF_ERROR(_json_writer->append(*n)); } else { - JsonMerger merger(json_col->flat_column_paths(), json_col->flat_column_types(), json_col->has_remain()); - auto j = merger.merge(json_col->get_flat_fields()); - - if (col->is_nullable()) { - auto n = NullableColumn::create(j, null_col); - n->set_has_null(col->has_null()); - RETURN_IF_ERROR(_json_writer->append(*n)); - } else { - RETURN_IF_ERROR(_json_writer->append(*j)); - } + RETURN_IF_ERROR(_json_writer->append(*j)); } - col->resize_uninitialized(0); } - return Status::OK(); + col->resize_uninitialized(0); } + return Status::OK(); +} +Status FlatJsonColumnCompactor::_flatten_columns(std::vector& json_datas) { + VLOG(1) << "FlatJsonColumnCompactor flatten_columns, json_datas: " << json_datas.size(); _is_flat = true; - RETURN_IF_ERROR(_init_flat_writers()); - JsonFlattener flattener(deriver); - HyperJsonTransformer transformer(deriver); + // init flattener first, the flat_paths/types will change in _init_flat_writers + JsonFlattener flattener(_flat_paths, _flat_types, _has_remain); + HyperJsonTransformer transformer(_flat_paths, _flat_types, _has_remain); + RETURN_IF_ERROR(_init_flat_writers()); + JsonColumn* pre_col = nullptr; for (auto& col : json_datas) { JsonColumn* json_col; if (col->is_nullable()) { @@ -106,13 +142,18 @@ Status FlatJsonColumnCompactor::_compact_columns(std::vector& json_da } if (!json_col->is_flat_json()) { + VLOG(1) << "FlatJsonColumnCompactor flatten_columns flat json."; flattener.flatten(json_col); _flat_columns = flattener.mutable_result(); } else { - transformer.init_compaction_task(json_col); + if (!check_is_same_schema(pre_col, json_col)) { + transformer.init_compaction_task(json_col->flat_column_paths(), json_col->flat_column_types(), + json_col->has_remain()); + pre_col = json_col; + } + VLOG(1) << "FlatJsonColumnCompactor flatten_columns hyper-transformer: " << json_col->debug_flat_paths(); RETURN_IF_ERROR(transformer.trans(json_col->get_flat_fields())); _flat_columns = transformer.mutable_result(); - transformer.reset(); } // recode null column in 1st diff --git a/be/src/storage/rowset/json_column_compactor.h b/be/src/storage/rowset/json_column_compactor.h index 06ce9b4929626..ffbf0d7f7f354 100644 --- a/be/src/storage/rowset/json_column_compactor.h +++ b/be/src/storage/rowset/json_column_compactor.h @@ -32,6 +32,10 @@ class FlatJsonColumnCompactor final : public FlatJsonColumnWriter { private: Status _compact_columns(std::vector& json_datas); + + Status _merge_columns(std::vector& json_datas); + + Status _flatten_columns(std::vector& json_datas); }; class JsonColumnCompactor final : public ColumnWriter { diff --git a/be/src/storage/rowset/json_column_iterator.cpp b/be/src/storage/rowset/json_column_iterator.cpp index fc78670bf9d97..ea632eb6920ec 100644 --- a/be/src/storage/rowset/json_column_iterator.cpp +++ b/be/src/storage/rowset/json_column_iterator.cpp @@ -50,14 +50,16 @@ class JsonFlatColumnIterator final : public ColumnIterator { JsonFlatColumnIterator(ColumnReader* reader, std::unique_ptr null_iter, std::vector> field_iters, const std::vector& target_paths, const std::vector& target_types, - const std::vector& source_paths, const std::vector& source_types) + const std::vector& source_paths, const std::vector& source_types, + bool need_remain) : _reader(reader), _null_iter(std::move(null_iter)), _flat_iters(std::move(field_iters)), _target_paths(std::move(target_paths)), _target_types(std::move(target_types)), _source_paths(std::move(source_paths)), - _source_types(std::move(source_types)){}; + _source_types(std::move(source_types)), + _need_remain(need_remain){}; ~JsonFlatColumnIterator() override { if (transformer != nullptr) { @@ -101,6 +103,7 @@ class JsonFlatColumnIterator final : public ColumnIterator { std::vector _target_types; std::vector _source_paths; std::vector _source_types; + bool _need_remain = false; std::vector _source_column_modules; // to avoid create column with find type @@ -119,15 +122,13 @@ Status JsonFlatColumnIterator::init(const ColumnIteratorOptions& opts) { } bool has_remain = _source_paths.size() != _flat_iters.size(); - transformer = std::make_unique(_target_paths, _target_types, false); - { - SCOPED_RAW_TIMER(&_opts.stats->json_init_ns); - transformer->init_read_task(_source_paths, _source_types, has_remain); + VLOG(1) << "JsonFlatColumnIterator init, target: " + << JsonFlatPath::debug_flat_json(_target_paths, _target_types, _need_remain) + << ", source: " << JsonFlatPath::debug_flat_json(_source_paths, _source_types, has_remain); - for (int i = 0; i < _source_paths.size(); i++) { - auto column = ColumnHelper::create_column(TypeDescriptor(_source_types[i]), true); - _source_column_modules.emplace_back(column); - } + for (int i = 0; i < _source_paths.size(); i++) { + auto column = ColumnHelper::create_column(TypeDescriptor(_source_types[i]), true); + _source_column_modules.emplace_back(column); } DCHECK_EQ(_source_column_modules.size(), _source_paths.size()); @@ -135,6 +136,17 @@ Status JsonFlatColumnIterator::init(const ColumnIteratorOptions& opts) { _source_column_modules.emplace_back(JsonColumn::create()); } + { + SCOPED_RAW_TIMER(&_opts.stats->json_init_ns); + if (_need_remain) { + transformer = std::make_unique(_target_paths, _target_types, true); + transformer->init_compaction_task(_source_paths, _source_types, has_remain); + } else { + transformer = std::make_unique(_target_paths, _target_types, false); + transformer->init_read_task(_source_paths, _source_types, has_remain); + } + } + // update stats { auto cp = transformer->cast_paths(); @@ -268,10 +280,11 @@ Status JsonFlatColumnIterator::get_row_ranges_by_zone_map(const std::vector& json_iter, std::vector target_paths, - std::vector target_types) + std::vector target_types, bool need_remain) : _json_iter(std::move(json_iter)), _target_paths(std::move(target_paths)), - _target_types(std::move(target_types)){}; + _target_types(std::move(target_types)), + _need_remain(need_remain){}; ~JsonDynamicFlatIterator() override = default; @@ -297,28 +310,39 @@ class JsonDynamicFlatIterator final : public ColumnIterator { Status fetch_values_by_rowid(const rowid_t* rowids, size_t size, Column* values) override; private: - Status _flat_json(Column* input, Column* output); + template + Status _dynamic_flat(Column* dst, FUNC read_fn); private: std::unique_ptr _json_iter; std::vector _target_paths; std::vector _target_types; + bool _need_remain = false; std::unique_ptr _flattener; }; Status JsonDynamicFlatIterator::init(const ColumnIteratorOptions& opts) { RETURN_IF_ERROR(ColumnIterator::init(opts)); + RETURN_IF_ERROR(_json_iter->init(opts)); + for (auto& p : _target_paths) { opts.stats->dynamic_json_hits[p] += 1; } SCOPED_RAW_TIMER(&_opts.stats->json_init_ns); - _flattener = std::make_unique(_target_paths, _target_types, false); - return _json_iter->init(opts); + _flattener = std::make_unique(_target_paths, _target_types, _need_remain); + VLOG(1) << "JsonDynamicFlatIterator init, target: " + << JsonFlatPath::debug_flat_json(_target_paths, _target_types, _need_remain); + return Status::OK(); } -Status JsonDynamicFlatIterator::_flat_json(Column* input, Column* output) { +template +Status JsonDynamicFlatIterator::_dynamic_flat(Column* output, FUNC read_fn) { + auto proxy = output->clone_empty(); + RETURN_IF_ERROR(read_fn(_json_iter.get(), proxy.get())); + output->set_delete_state(proxy->delete_state()); + SCOPED_RAW_TIMER(&_opts.stats->json_flatten_ns); JsonColumn* json_data = nullptr; @@ -328,7 +352,7 @@ Status JsonDynamicFlatIterator::_flat_json(Column* input, Column* output) { auto* output_nullable = down_cast(output); auto* output_null = down_cast(output_nullable->null_column().get()); - auto* input_nullable = down_cast(input); + auto* input_nullable = down_cast(proxy.get()); auto* input_null = down_cast(input_nullable->null_column().get()); output_null->append(*input_null, 0, input_null->size()); @@ -341,31 +365,25 @@ Status JsonDynamicFlatIterator::_flat_json(Column* input, Column* output) { } // 2. flat - _flattener->flatten(input); + _flattener->flatten(proxy.get()); auto result = _flattener->mutable_result(); json_data->set_flat_columns(_target_paths, _target_types, result); return Status::OK(); } Status JsonDynamicFlatIterator::next_batch(size_t* n, Column* dst) { - auto proxy = dst->clone_empty(); - RETURN_IF_ERROR(_json_iter->next_batch(n, proxy.get())); - dst->set_delete_state(proxy->delete_state()); - return _flat_json(proxy.get(), dst); + auto read = [&](ColumnIterator* iter, Column* column) { return iter->next_batch(n, column); }; + return _dynamic_flat(dst, read); } Status JsonDynamicFlatIterator::next_batch(const SparseRange<>& range, Column* dst) { - auto proxy = dst->clone_empty(); - RETURN_IF_ERROR(_json_iter->next_batch(range, proxy.get())); - dst->set_delete_state(proxy->delete_state()); - return _flat_json(proxy.get(), dst); + auto read = [&](ColumnIterator* iter, Column* column) { return iter->next_batch(range, column); }; + return _dynamic_flat(dst, read); } Status JsonDynamicFlatIterator::fetch_values_by_rowid(const rowid_t* rowids, size_t size, Column* values) { - auto proxy = values->clone_empty(); - RETURN_IF_ERROR(_json_iter->fetch_values_by_rowid(rowids, size, proxy.get())); - values->set_delete_state(proxy->delete_state()); - return _flat_json(proxy.get(), values); + auto read = [&](ColumnIterator* iter, Column* column) { return iter->fetch_values_by_rowid(rowids, size, column); }; + return _dynamic_flat(values, read); } Status JsonDynamicFlatIterator::seek_to_first() { @@ -386,13 +404,12 @@ class JsonMergeIterator final : public ColumnIterator { public: JsonMergeIterator(ColumnReader* reader, std::unique_ptr null_iter, std::vector> all_iter, const std::vector& src_paths, - const std::vector& src_types, bool is_merge) + const std::vector& src_types) : _reader(reader), _null_iter(std::move(null_iter)), _all_iter(std::move(all_iter)), _src_paths(std::move(src_paths)), - _src_types(std::move(src_types)), - _is_merge(is_merge){}; + _src_types(std::move(src_types)){}; ~JsonMergeIterator() override = default; @@ -431,7 +448,6 @@ class JsonMergeIterator final : public ColumnIterator { std::vector _src_column_modules; std::unique_ptr _merger; - bool _is_merge; }; Status JsonMergeIterator::init(const ColumnIteratorOptions& opts) { @@ -444,15 +460,8 @@ Status JsonMergeIterator::init(const ColumnIteratorOptions& opts) { RETURN_IF_ERROR(iter->init(opts)); } - if (_is_merge) { - for (auto& p : _src_paths) { - opts.stats->merge_json_hits[p] += 1; - } - - SCOPED_RAW_TIMER(&_opts.stats->json_init_ns); - _merger = std::make_unique(_src_paths, _src_types, _all_iter.size() != _src_paths.size()); - } - + bool has_remain = _all_iter.size() != _src_paths.size(); + VLOG(1) << "JsonMergeIterator init, source: " << JsonFlatPath::debug_flat_json(_src_paths, _src_types, has_remain); DCHECK(_all_iter.size() == _src_paths.size() || _all_iter.size() == _src_paths.size() + 1); for (int i = 0; i < _src_paths.size(); i++) { auto column = ColumnHelper::create_column(TypeDescriptor(_src_types[i]), true); @@ -464,6 +473,12 @@ Status JsonMergeIterator::init(const ColumnIteratorOptions& opts) { _src_column_modules.emplace_back(JsonColumn::create()); } + for (auto& p : _src_paths) { + opts.stats->merge_json_hits[p] += 1; + } + SCOPED_RAW_TIMER(&_opts.stats->json_init_ns); + _merger = std::make_unique(_src_paths, _src_types, has_remain); + return Status::OK(); } @@ -477,13 +492,9 @@ Status JsonMergeIterator::_merge(JsonColumn* dst, FUNC func) { all_columns.emplace_back(std::move(c)); } - if (_is_merge) { - SCOPED_RAW_TIMER(&_opts.stats->json_merge_ns); - auto json = _merger->merge(all_columns); - dst->append(*json, 0, json->size()); - } else { - dst->set_flat_columns(_src_paths, _src_types, all_columns); - } + SCOPED_RAW_TIMER(&_opts.stats->json_merge_ns); + auto json = _merger->merge(all_columns); + dst->append(*json, 0, json->size()); dst->check_or_die(); return Status::OK(); } @@ -586,21 +597,19 @@ Status JsonMergeIterator::get_row_ranges_by_zone_map(const std::vector> create_json_flat_iterator(ColumnReader* reader, - std::unique_ptr null_iter, - std::vector> iters, - const std::vector& target_paths, - const std::vector& target_types, - const std::vector& source_paths, - const std::vector& source_types) { +StatusOr> create_json_flat_iterator( + ColumnReader* reader, std::unique_ptr null_iter, + std::vector> iters, const std::vector& target_paths, + const std::vector& target_types, const std::vector& source_paths, + const std::vector& source_types, bool need_remain) { return std::make_unique(reader, std::move(null_iter), std::move(iters), target_paths, - target_types, source_paths, source_types); + target_types, source_paths, source_types, need_remain); } StatusOr> create_json_dynamic_flat_iterator( std::unique_ptr json_iter, const std::vector& target_paths, - const std::vector& target_types) { - return std::make_unique(json_iter, target_paths, target_types); + const std::vector& target_types, bool need_remain) { + return std::make_unique(json_iter, target_paths, target_types, need_remain); } StatusOr> create_json_merge_iterator( @@ -608,14 +617,6 @@ StatusOr> create_json_merge_iterator( std::vector> all_iters, const std::vector& merge_paths, const std::vector& merge_types) { return std::make_unique(reader, std::move(null_iter), std::move(all_iters), merge_paths, - merge_types, true); -} - -StatusOr> create_json_direct_iterator( - ColumnReader* reader, std::unique_ptr null_iter, - std::vector> all_iters, const std::vector& merge_paths, - const std::vector& merge_types) { - return std::make_unique(reader, std::move(null_iter), std::move(all_iters), merge_paths, - merge_types, false); + merge_types); } } // namespace starrocks diff --git a/be/src/storage/rowset/json_column_iterator.h b/be/src/storage/rowset/json_column_iterator.h index 317aa723e75cb..e9661ad79ec25 100644 --- a/be/src/storage/rowset/json_column_iterator.h +++ b/be/src/storage/rowset/json_column_iterator.h @@ -22,26 +22,18 @@ namespace starrocks { -StatusOr> create_json_flat_iterator(ColumnReader* reader, - std::unique_ptr null_iter, - std::vector> iters, - const std::vector& target_paths, - const std::vector& target_types, - const std::vector& source_paths, - const std::vector& source_types); +StatusOr> create_json_flat_iterator( + ColumnReader* reader, std::unique_ptr null_iter, + std::vector> iters, const std::vector& target_paths, + const std::vector& target_types, const std::vector& source_paths, + const std::vector& source_types, bool need_remain = false); StatusOr> create_json_dynamic_flat_iterator( std::unique_ptr json_iter, const std::vector& target_paths, - const std::vector& target_types); + const std::vector& target_types, bool need_remain = false); StatusOr> create_json_merge_iterator( ColumnReader* reader, std::unique_ptr null_iter, std::vector> all_iters, const std::vector& merge_paths, const std::vector& merge_types); - -StatusOr> create_json_direct_iterator( - ColumnReader* reader, std::unique_ptr null_iter, - std::vector> all_iters, const std::vector& all_paths, - const std::vector& all_types); - } // namespace starrocks diff --git a/be/src/storage/rowset/json_column_writer.cpp b/be/src/storage/rowset/json_column_writer.cpp index e649d87d6817e..a7ed25184a85c 100644 --- a/be/src/storage/rowset/json_column_writer.cpp +++ b/be/src/storage/rowset/json_column_writer.cpp @@ -92,6 +92,8 @@ Status FlatJsonColumnWriter::_flat_column(std::vector& json_datas) { _flat_types = deriver.flat_types(); _has_remain = deriver.has_remain_json(); + VLOG(1) << "FlatJsonColumnWriter flat_column flat json: " + << JsonFlatPath::debug_flat_json(_flat_paths, _flat_types, _has_remain); if (_flat_paths.empty()) { return Status::InternalError("doesn't have flat column."); } @@ -281,6 +283,7 @@ Status FlatJsonColumnWriter::finish_current_page() { StatusOr> create_json_column_writer(const ColumnWriterOptions& opts, TypeInfoPtr type_info, WritableFile* wfile, std::unique_ptr json_writer) { + VLOG(1) << "Create Json Column Writer is_compaction: " << opts.is_compaction << ", need_flat : " << opts.need_flat; // compaction if (opts.is_compaction) { if (opts.need_flat) { diff --git a/be/src/storage/rowset/segment.h b/be/src/storage/rowset/segment.h index fd164f11cb225..4966a80d341bd 100644 --- a/be/src/storage/rowset/segment.h +++ b/be/src/storage/rowset/segment.h @@ -222,6 +222,9 @@ class Segment : public std::enable_shared_from_this { DISALLOW_COPY_AND_MOVE(Segment); + // for ut test + void set_num_rows(uint32_t num_rows) { _num_rows = num_rows; } + private: friend struct SegmentZoneMapPruner; diff --git a/be/src/storage/tablet_reader.cpp b/be/src/storage/tablet_reader.cpp index fd68e6e5f0950..148bb084e0d92 100644 --- a/be/src/storage/tablet_reader.cpp +++ b/be/src/storage/tablet_reader.cpp @@ -15,8 +15,11 @@ #include "storage/tablet_reader.h" #include +#include #include +#include +#include "column/column_access_path.h" #include "column/datum_convert.h" #include "common/status.h" #include "gen_cpp/tablet_schema.pb.h" @@ -31,13 +34,17 @@ #include "storage/delete_predicates.h" #include "storage/empty_iterator.h" #include "storage/merge_iterator.h" +#include "storage/olap_common.h" #include "storage/predicate_parser.h" +#include "storage/rowset/column_reader.h" #include "storage/rowset/rowid_range_option.h" #include "storage/seek_range.h" #include "storage/tablet.h" #include "storage/tablet_updates.h" #include "storage/types.h" #include "storage/union_iterator.h" +#include "types/logical_type.h" +#include "util/json_flattener.h" namespace starrocks { @@ -127,10 +134,71 @@ Status TabletReader::open(const TabletReaderParams& read_params) { _reader_params = &read_params; return Status::OK(); } + + RETURN_IF_ERROR(_init_compaction_column_paths(read_params)); Status st = _init_collector(read_params); return st; } +Status TabletReader::_init_compaction_column_paths(const TabletReaderParams& read_params) { + if (!config::enable_compaction_flat_json || !is_compaction(read_params.reader_type) || + read_params.column_access_paths == nullptr) { + return Status::OK(); + } + + if (!read_params.column_access_paths->empty()) { + VLOG(3) << "Compaction flat json paths exists: " << read_params.column_access_paths->size(); + return Status::OK(); + } + + DCHECK(is_compaction(read_params.reader_type) && read_params.column_access_paths != nullptr && + read_params.column_access_paths->empty()); + int num_readers = 0; + for (const auto& rowset : _rowsets) { + auto segments = rowset->segments(); + std::for_each(segments.begin(), segments.end(), + [&](const auto& segment) { num_readers += segment->num_rows() > 0 ? 1 : 0; }); + } + + std::vector readers; + for (size_t i = 0; i < _tablet_schema->num_columns(); i++) { + const auto& col = _tablet_schema->column(i); + auto col_name = std::string(col.name()); + if (_schema.get_field_by_name(col_name) == nullptr || col.type() != LogicalType::TYPE_JSON) { + continue; + } + readers.clear(); + for (const auto& rowset : _rowsets) { + for (const auto& segment : rowset->segments()) { + if (segment->num_rows() == 0) { + continue; + } + auto reader = segment->column_with_uid(col.unique_id()); + if (reader != nullptr && reader->column_type() == LogicalType::TYPE_JSON && + nullptr != reader->sub_readers() && !reader->sub_readers()->empty()) { + readers.emplace_back(reader); + } + } + } + if (readers.size() == num_readers) { + // must all be flat json type + JsonPathDeriver deriver; + deriver.derived(readers); + auto paths = deriver.flat_paths(); + auto types = deriver.flat_types(); + + VLOG(3) << "Compaction flat json column: " << JsonFlatPath::debug_flat_json(paths, types, true); + ASSIGN_OR_RETURN(auto res, ColumnAccessPath::create(TAccessPathType::ROOT, col_name, i)); + for (size_t j = 0; j < paths.size(); j++) { + ColumnAccessPath::insert_json_path(res.get(), types[j], paths[j]); + } + res->set_from_compaction(true); + read_params.column_access_paths->emplace_back(std::move(res)); + } + } + return Status::OK(); +} + Status TabletReader::_init_collector_for_pk_index_read() { DCHECK(_reader_params != nullptr); // get pk eq predicates, and convert these predicates to encoded pk column diff --git a/be/src/storage/tablet_reader.h b/be/src/storage/tablet_reader.h index 5dbca3dbcaf99..53979dcde198a 100644 --- a/be/src/storage/tablet_reader.h +++ b/be/src/storage/tablet_reader.h @@ -87,6 +87,7 @@ class TabletReader final : public ChunkIterator { MemPool* mempool); Status _init_collector_for_pk_index_read(); + Status _init_compaction_column_paths(const TabletReaderParams& read_params); TabletSharedPtr _tablet; TabletSchemaCSPtr _tablet_schema; diff --git a/be/src/storage/vertical_compaction_task.cpp b/be/src/storage/vertical_compaction_task.cpp index 9b8639c18f8ed..a4c8c1d6306ef 100644 --- a/be/src/storage/vertical_compaction_task.cpp +++ b/be/src/storage/vertical_compaction_task.cpp @@ -131,6 +131,7 @@ Status VerticalCompactionTask::_compact_column_group(bool is_key, int column_gro reader_params.reader_type = compaction_type() == BASE_COMPACTION ? READER_BASE_COMPACTION : READER_CUMULATIVE_COMPACTION; reader_params.profile = _runtime_profile.create_child("merge_rowsets"); + reader_params.column_access_paths = &_column_access_paths; StatusOr ret = _calculate_chunk_size_for_column_group(column_group); if (!ret.ok()) { diff --git a/be/src/util/json_flattener.cpp b/be/src/util/json_flattener.cpp index 558073907b200..9520a31dd7870 100644 --- a/be/src/util/json_flattener.cpp +++ b/be/src/util/json_flattener.cpp @@ -44,6 +44,7 @@ #include "exprs/expr_context.h" #include "gutil/casts.h" #include "runtime/types.h" +#include "storage/rowset/column_reader.h" #include "types/logical_type.h" #include "util/json.h" #include "util/json_converter.h" @@ -330,6 +331,39 @@ void JsonPathDeriver::derived(const std::vector& json_datas) { _finalize(); } +void JsonPathDeriver::derived(const std::vector& json_readers) { + DCHECK(_paths.empty()); + DCHECK(_types.empty()); + DCHECK(_derived_maps.empty()); + DCHECK(_path_root == nullptr); + + if (json_readers.empty()) { + return; + } + + _path_root = std::make_shared(); + _total_rows = 0; + + // extract flat paths + for (const auto& reader : json_readers) { + DCHECK_EQ(LogicalType::TYPE_JSON, reader->column_type()); + DCHECK(!reader->sub_readers()->empty()); + _total_rows += reader->num_rows(); + + int start = reader->is_nullable() ? 1 : 0; + int end = reader->has_remain_json() ? reader->sub_readers()->size() - 1 : reader->sub_readers()->size(); + _has_remain |= reader->has_remain_json(); + for (size_t i = start; i < end; i++) { + const auto& sub = (*reader->sub_readers())[i]; + auto leaf = JsonFlatPath::normalize_from_path(sub->name(), _path_root.get()); + _derived_maps[leaf].type &= flat_json::LOGICAL_TYPE_TO_JSON_BITS.at(sub->column_type()); + _derived_maps[leaf].hits += reader->num_rows(); + } + } + _json_sparsity_factory = 1; // only extract common schema + _finalize(); +} + void JsonPathDeriver::_derived_on_flat_json(const std::vector& json_datas) { // extract flat paths for (size_t k = 0; k < json_datas.size(); k++) { @@ -338,7 +372,7 @@ void JsonPathDeriver::_derived_on_flat_json(const std::vector& js size_t hits = 0; if (col->is_nullable()) { auto nullable = down_cast(col); - hits = nullable->null_count(); + hits = col->size() - nullable->null_count(); json_col = down_cast(nullable->data_column().get()); } else { hits = col->size(); @@ -349,8 +383,8 @@ void JsonPathDeriver::_derived_on_flat_json(const std::vector& js continue; } - auto paths = json_col->flat_column_paths(); - auto types = json_col->flat_column_types(); + const auto& paths = json_col->flat_column_paths(); + const auto& types = json_col->flat_column_types(); for (size_t i = 0; i < paths.size(); i++) { auto leaf = JsonFlatPath::normalize_from_path(paths[i], _path_root.get()); @@ -455,7 +489,7 @@ void JsonPathDeriver::_finalize() { // leaf node // check sparsity, same key may appear many times in json, so we need avoid duplicate compute hits auto desc = _derived_maps[node]; - if (desc.multi_times <= 0 && desc.hits >= _total_rows * config::json_flat_sparsity_factor) { + if (desc.multi_times <= 0 && desc.hits >= _total_rows * _json_sparsity_factory) { hit_leaf.emplace_back(node, path); node->type = flat_json::JSON_BITS_TO_LOGICAL_TYPE.at(desc.type); node->remain = false; // later update @@ -478,7 +512,7 @@ void JsonPathDeriver::_finalize() { for (auto& [node, path] : hit_leaf) { if (_paths.size() >= limit) { node->remain = true; - _has_remain = true; + _has_remain |= true; continue; } node->index = _paths.size(); @@ -544,6 +578,7 @@ JsonFlattener::JsonFlattener(const std::vector& paths, const std::v void JsonFlattener::flatten(const Column* json_column) { for (auto& col : _flat_columns) { DCHECK_EQ(col->size(), 0); + col->reserve(json_column->size()); } // input const JsonColumn* json_data = nullptr; @@ -701,34 +736,22 @@ std::vector JsonFlattener::mutable_result() { } JsonMerger::JsonMerger(const std::vector& paths, const std::vector& types, bool has_remain) - : _has_remain(has_remain) { + : _src_paths(std::move(paths)), _has_remain(has_remain) { _src_root = std::make_shared(); - for (size_t i = 0; i < paths.size(); i++) { - auto* leaf = JsonFlatPath::normalize_from_path(paths[i], _src_root.get()); + for (size_t i = 0; i < _src_paths.size(); i++) { + auto* leaf = JsonFlatPath::normalize_from_path(_src_paths[i], _src_root.get()); leaf->type = types[i]; leaf->index = i; } } -void dfs_exclude(JsonFlatPath* node) { - if (node->children.empty()) { - return; - } - bool all_exclude = true; - for (auto& [_, child] : node->children) { - dfs_exclude(child.get()); - all_exclude &= (child->op == JsonFlatPath::OP_EXCLUDE); - } - node->op = all_exclude ? JsonFlatPath::OP_EXCLUDE : JsonFlatPath::OP_INCLUDE; -} - void JsonMerger::set_exclude_paths(const std::vector& exclude_paths) { - for (auto& path : exclude_paths) { + this->_exclude_paths = exclude_paths; + for (auto& path : _exclude_paths) { auto* leaf = JsonFlatPath::normalize_from_path(path, _src_root.get()); leaf->op = JsonFlatPath::OP_EXCLUDE; } - dfs_exclude(_src_root.get()); } void JsonMerger::set_root_path(const std::string& base_path) { @@ -742,12 +765,13 @@ ColumnPtr JsonMerger::merge(const std::vector& columns) { _result = NullableColumn::create(JsonColumn::create(), NullColumn::create()); _json_result = down_cast(down_cast(_result.get())->data_column().get()); _null_result = down_cast(down_cast(_result.get())->null_column().get()); + size_t rows = columns[0]->size(); + _result->reserve(rows); for (auto& col : columns) { _src_columns.emplace_back(col.get()); } - size_t rows = columns[0]->size(); if (_src_root->op == JsonFlatPath::OP_INCLUDE) { _merge_impl(rows); } else { @@ -814,24 +838,25 @@ void JsonMerger::_merge_json_with_remain(const JsonFlatPath* root, const vpack:: } continue; } - if (iter->second->op == JsonFlatPath::OP_EXCLUDE) { + auto* child = iter->second.get(); + if (child->op == JsonFlatPath::OP_EXCLUDE) { continue; } if (v.isObject()) { - if (iter->second->op == JsonFlatPath::OP_IGNORE) { - _merge_json_with_remain(iter->second.get(), &v, builder, index); - } else if (iter->second->op == JsonFlatPath::OP_ROOT) { - _merge_json_with_remain(iter->second.get(), &v, builder, index); + if (child->op == JsonFlatPath::OP_IGNORE) { + _merge_json_with_remain(child, &v, builder, index); + } else if (child->op == JsonFlatPath::OP_ROOT) { + _merge_json_with_remain(child, &v, builder, index); } else { - DCHECK(iter->second->op == JsonFlatPath::OP_INCLUDE); + DCHECK(child->op == JsonFlatPath::OP_INCLUDE); builder->add(k, vpack::Value(vpack::ValueType::Object)); - _merge_json_with_remain(iter->second.get(), &v, builder, index); + _merge_json_with_remain(child, &v, builder, index); builder->close(); } continue; } // leaf node - DCHECK(iter->second->op == JsonFlatPath::OP_INCLUDE || iter->second->op == JsonFlatPath::OP_ROOT); + DCHECK(child->op == JsonFlatPath::OP_INCLUDE || child->op == JsonFlatPath::OP_ROOT); builder->add(k, v); } for (auto& [child_name, child] : root->children) { @@ -862,7 +887,7 @@ void JsonMerger::_merge_json(const JsonFlatPath* root, vpack::Builder* builder, } if (child->children.empty()) { - DCHECK(child->op == JsonFlatPath::OP_INCLUDE); + DCHECK(child->op == JsonFlatPath::OP_INCLUDE || child->op == JsonFlatPath::OP_ROOT); auto col = _src_columns[child->index]; if (!col->is_null(index)) { DCHECK(flat_json::JSON_MERGE_FUNC.contains(child->type)); @@ -885,17 +910,6 @@ void JsonMerger::_merge_json(const JsonFlatPath* root, vpack::Builder* builder, } } -HyperJsonTransformer::HyperJsonTransformer(JsonPathDeriver& deriver) - : _dst_remain(deriver.has_remain_json()), _dst_paths(deriver.flat_paths()), _dst_types(deriver.flat_types()) { - for (size_t i = 0; i < _dst_paths.size(); i++) { - _dst_columns.emplace_back(ColumnHelper::create_column(TypeDescriptor(_dst_types[i]), true)); - } - - if (_dst_remain) { - _dst_columns.emplace_back(JsonColumn::create()); - } -} - HyperJsonTransformer::HyperJsonTransformer(const std::vector& paths, const std::vector& types, bool has_remain) : _dst_remain(has_remain), _dst_paths(std::move(paths)), _dst_types(types) { @@ -910,13 +924,12 @@ HyperJsonTransformer::HyperJsonTransformer(const std::vector& paths void HyperJsonTransformer::init_read_task(const std::vector& paths, const std::vector& types, bool has_remain) { - DCHECK(_src_paths.empty()); - DCHECK(_src_types.empty()); + _src_paths = paths; + _src_types = types; - _src_paths.assign(paths.begin(), paths.end()); - _src_types.assign(types.begin(), types.end()); _merge_tasks.clear(); _flat_tasks.clear(); + _pool.clear(); std::vector equals; std::vector merges; @@ -1037,15 +1050,14 @@ void HyperJsonTransformer::init_read_task(const std::vector& paths, } } -void HyperJsonTransformer::init_compaction_task(JsonColumn* column) { - DCHECK(column->is_flat_json()); - _src_paths.clear(); - _src_types.clear(); +void HyperJsonTransformer::init_compaction_task(const std::vector& paths, + const std::vector& types, bool has_remain) { + _src_paths = paths; + _src_types = types; + _merge_tasks.clear(); _flat_tasks.clear(); - - _src_paths.assign(column->flat_column_paths().begin(), column->flat_column_paths().end()); - _src_types.assign(column->flat_column_types().begin(), column->flat_column_types().end()); + _pool.clear(); std::unordered_set _src_set(_src_paths.begin(), _src_paths.end()); @@ -1059,6 +1071,8 @@ void HyperJsonTransformer::init_compaction_task(JsonColumn* column) { for (size_t j = 0; j < _src_paths.size(); j++) { size_t i = 0; for (; i < _dst_paths.size(); i++) { + DCHECK(!_src_paths[j].starts_with(_dst_paths[i] + ".")); + DCHECK(!_dst_paths[i].starts_with(_src_paths[j] + ".")); if (_dst_paths[i] == _src_paths[j]) { auto& mk = _merge_tasks.emplace_back(); mk.is_merge = false; @@ -1082,7 +1096,7 @@ void HyperJsonTransformer::init_compaction_task(JsonColumn* column) { } std::vector all_flat_paths; // for remove from remain - if (column->has_remain()) { + if (has_remain) { for (size_t i = 0; i < _dst_paths.size(); i++) { if (_src_set.find(_dst_paths[i]) == _src_set.end()) { // merge to remain @@ -1117,9 +1131,9 @@ void HyperJsonTransformer::init_compaction_task(JsonColumn* column) { p.emplace_back(_src_paths[index]); t.emplace_back(_src_types[index]); } - mk.merger = std::make_unique(p, t, column->has_remain()); + mk.merger = std::make_unique(p, t, has_remain); mk.merger->set_exclude_paths(all_flat_paths); - if (column->has_remain()) { + if (has_remain) { _merge_tasks[0].src_index.emplace_back(_src_paths.size()); } } @@ -1205,7 +1219,7 @@ Status HyperJsonTransformer::_merge(const MergeTask& task, std::vectorhas_exclude_paths()) { // only use remain _dst_columns[task.dst_index] = columns[task.src_index[0]]; return Status::OK(); @@ -1254,17 +1268,6 @@ void HyperJsonTransformer::_flat(const FlatTask& task, std::vector& c } } -void HyperJsonTransformer::reset() { - _src_paths.clear(); - _src_types.clear(); - _merge_tasks.clear(); - _flat_tasks.clear(); - _pool.clear(); - for (size_t i = 0; i < _dst_columns.size(); i++) { - _dst_columns[i] = _dst_columns[i]->clone_empty(); - } -} - std::vector HyperJsonTransformer::mutable_result() { std::vector res; for (size_t i = 0; i < _dst_columns.size(); i++) { diff --git a/be/src/util/json_flattener.h b/be/src/util/json_flattener.h index 552244d7b29b6..7eac2823139ea 100644 --- a/be/src/util/json_flattener.h +++ b/be/src/util/json_flattener.h @@ -29,6 +29,7 @@ #include "column/nullable_column.h" #include "column/vectorized_fwd.h" +#include "common/config.h" #include "common/object_pool.h" #include "common/status.h" #include "common/statusor.h" @@ -38,6 +39,7 @@ namespace starrocks { namespace vpack = arangodb::velocypack; +class ColumnReader; class JsonFlatPath { public: @@ -64,6 +66,23 @@ class JsonFlatPath { // set new root, other path will set to exclude, the node must include the root path static void set_root(const std::string& new_root_path, JsonFlatPath* node); + static std::string debug_flat_json(const std::vector& paths, const std::vector& types, + bool has_remain) { + if (paths.empty()) { + return "[]"; + } + DCHECK_EQ(paths.size(), types.size()); + std::ostringstream ss; + ss << "["; + size_t i = 0; + for (; i < paths.size() - 1; i++) { + ss << paths[i] << "(" << type_to_string(types[i]) << "), "; + } + ss << paths[i] << "(" << type_to_string(types[i]) << ")"; + ss << (has_remain ? "]" : "}"); + return ss.str(); + } + private: static std::pair _split_path(const std::string& path); }; @@ -79,6 +98,8 @@ class JsonPathDeriver { // dervie paths void derived(const std::vector& json_datas); + void derived(const std::vector& json_readers); + bool has_remain_json() const { return _has_remain; } std::shared_ptr& flat_path_root() { return _path_root; } @@ -118,6 +139,7 @@ class JsonPathDeriver { std::vector _paths; std::vector _types; + double _json_sparsity_factory = config::json_flat_sparsity_factor; size_t _total_rows; std::unordered_map _derived_maps; std::shared_ptr _path_root; @@ -149,10 +171,10 @@ class JsonFlattener { bool _has_remain = false; // note: paths may be less 1 than flat columns std::vector _dst_paths; + std::shared_ptr _dst_root; std::vector _flat_columns; JsonColumn* _remain; - std::shared_ptr _dst_root; }; // merge flat json A,B,C to JsonColumn @@ -171,6 +193,8 @@ class JsonMerger { // for compaction, set exclude paths, to remove the path void set_exclude_paths(const std::vector& exclude_paths); + bool has_exclude_paths() const { return !_exclude_paths.empty(); } + // input nullable-json, output none null json ColumnPtr merge(const std::vector& columns); @@ -185,9 +209,12 @@ class JsonMerger { void _merge_json(const JsonFlatPath* root, vpack::Builder* builder, size_t index); private: + std::vector _src_paths; bool _has_remain = false; + std::shared_ptr _src_root; std::vector _src_columns; + std::vector _exclude_paths; bool _output_nullable = false; ColumnPtr _result; @@ -211,8 +238,6 @@ class JsonMerger { // - D need extract from remain class HyperJsonTransformer { public: - HyperJsonTransformer(JsonPathDeriver& deriver); - HyperJsonTransformer(const std::vector& paths, const std::vector& types, bool has_remain); ~HyperJsonTransformer() = default; @@ -221,7 +246,8 @@ class HyperJsonTransformer { void init_read_task(const std::vector& paths, const std::vector& types, bool has_remain); // init for compaction - void init_compaction_task(JsonColumn* column); + void init_compaction_task(const std::vector& paths, const std::vector& types, + bool has_remain); Status trans(std::vector& columns); @@ -229,8 +255,6 @@ class HyperJsonTransformer { std::vector mutable_result(); - void reset(); - std::vector cast_paths() const; std::vector merge_paths() const; @@ -252,7 +276,6 @@ class HyperJsonTransformer { int dst_index = -1; std::vector src_index; - std::unique_ptr deriver; std::unique_ptr merger; }; @@ -286,4 +309,4 @@ class HyperJsonTransformer { int64_t _merge_ms = 0; }; -} // namespace starrocks \ No newline at end of file +} // namespace starrocks diff --git a/be/test/storage/rowset/flat_json_column_compact_test.cpp b/be/test/storage/rowset/flat_json_column_compact_test.cpp index 8742dce83f5c1..ad5ed75293b41 100644 --- a/be/test/storage/rowset/flat_json_column_compact_test.cpp +++ b/be/test/storage/rowset/flat_json_column_compact_test.cpp @@ -55,7 +55,10 @@ class FlatJsonColumnCompactTest : public testing::Test { protected: void SetUp() override { _meta.reset(new ColumnMetaPB()); } - void TearDown() override {} + void TearDown() override { + config::json_flat_null_factor = 0.3; + config::json_flat_sparsity_factor = 0.9; + } std::shared_ptr create_dummy_segment(const std::shared_ptr& fs, const std::string& fname) { return std::make_shared(fs, FileInfo{fname}, 1, _dummy_segment_schema, nullptr); @@ -104,7 +107,37 @@ class FlatJsonColumnCompactTest : public testing::Test { return json_col; } - void test_json(ColumnWriterOptions& writer_opts, std::vector& jsons, ColumnPtr& read_col) { + ColumnPtr more_flat_json(const std::vector& jsons, bool is_nullable) { + auto json_col = JsonColumn::create(); + + auto flat_col = JsonColumn::create(); + auto* flat_column = down_cast(flat_col.get()); + auto null_col = NullColumn::create(); + + for (const auto& json : jsons) { + if ("NULL" != json) { + ASSIGN_OR_ABORT(auto jv, JsonValue::parse(json)); + flat_column->append(&jv); + } else { + flat_column->append_default(); + } + null_col->append("NULL" == json); + } + + JsonPathDeriver deriver; + deriver.derived({flat_column}); + JsonFlattener flattener(deriver); + flattener.flatten(flat_column); + json_col->set_flat_columns(deriver.flat_paths(), deriver.flat_types(), flattener.mutable_result()); + + if (is_nullable) { + return NullableColumn::create(json_col, null_col); + } + return json_col; + } + + void test_json(ColumnWriterOptions& writer_opts, std::vector& jsons, ColumnPtr& read_col, + ColumnAccessPath* path = nullptr) { auto fs = std::make_shared(); ASSERT_TRUE(fs->create_dir(TEST_DIR).ok()); @@ -143,14 +176,13 @@ class FlatJsonColumnCompactTest : public testing::Test { // close the file ASSERT_TRUE(wfile->close().ok()); } - LOG(INFO) << "Finish writing"; auto res = ColumnReader::create(_meta.get(), segment.get(), nullptr); ASSERT_TRUE(res.ok()); auto reader = std::move(res).value(); { - ASSIGN_OR_ABORT(auto iter, reader->new_iterator(nullptr)); + ASSIGN_OR_ABORT(auto iter, reader->new_iterator(path)); ASSIGN_OR_ABORT(auto read_file, fs->new_random_access_file(fname)); ColumnIteratorOptions iter_opts; @@ -170,6 +202,59 @@ class FlatJsonColumnCompactTest : public testing::Test { } } + void test_compact_path(std::vector& jsons, JsonPathDeriver* deriver) { + auto fs = std::make_shared(); + ASSERT_TRUE(fs->create_dir(TEST_DIR).ok()); + + TabletColumn json_tablet_column = create_with_default_value(""); + TypeInfoPtr type_info = get_type_info(json_tablet_column); + + std::vector> segments; + std::vector> unique_readers; + std::vector readers; + for (size_t k = 0; k < jsons.size(); k++) { + const std::string fname = TEST_DIR + fmt::format("/test_flat_json_compact{}.data", k); + auto segment = create_dummy_segment(fs, fname); + segments.push_back(segment); + ASSIGN_OR_ABORT(auto wfile, fs->new_writable_file(fname)); + // write data + ColumnWriterOptions writer_opts; + writer_opts.need_flat = true; + ColumnMetaPB column_meta; + writer_opts.meta = &column_meta; + writer_opts.meta->set_column_id(0); + writer_opts.meta->set_unique_id(0); + writer_opts.meta->set_type(TYPE_JSON); + writer_opts.meta->set_length(0); + writer_opts.meta->set_encoding(DEFAULT_ENCODING); + writer_opts.meta->set_compression(starrocks::LZ4_FRAME); + writer_opts.meta->set_is_nullable(jsons[0]->is_nullable()); + writer_opts.need_zone_map = false; + writer_opts.is_compaction = true; + + ASSIGN_OR_ABORT(auto writer, ColumnWriter::create(writer_opts, &json_tablet_column, wfile.get())); + ASSERT_OK(writer->init()); + ASSERT_TRUE(writer->append(*jsons[k]).ok()); + + ASSERT_TRUE(writer->finish().ok()); + ASSERT_TRUE(writer->write_data().ok()); + ASSERT_TRUE(writer->write_ordinal_index().ok()); + + // mock segment rows + segment->set_num_rows(jsons[k]->size()); + + // close the file + ASSERT_TRUE(wfile->close().ok()); + + auto res = ColumnReader::create(&column_meta, segment.get(), nullptr); + ASSERT_TRUE(res.ok()); + auto reader = std::move(res).value(); + unique_readers.emplace_back(std::move(reader)); + readers.push_back(unique_readers.back().get()); + } + deriver->derived(readers); + } + JsonColumn* get_json_column(ColumnPtr& col) { if (col->is_nullable()) { return down_cast(down_cast(col.get())->data_column().get()); @@ -199,6 +284,7 @@ TEST_F(FlatJsonColumnCompactTest, testJsonCompactToJson) { test_json(writer_opts, jsons, read_col); auto* read_json = get_json_column(read_col); + EXPECT_FALSE(_meta->json_meta().is_flat()); EXPECT_FALSE(read_json->is_flat_json()); EXPECT_EQ(5, read_json->size()); EXPECT_EQ(0, read_json->get_flat_fields().size()); @@ -224,6 +310,7 @@ TEST_F(FlatJsonColumnCompactTest, testNullJsonCompactToJson) { test_json(writer_opts, jsons, read_col); auto* read_json = get_json_column(read_col); + EXPECT_FALSE(_meta->json_meta().is_flat()); EXPECT_FALSE(read_json->is_flat_json()); EXPECT_EQ(5, read_json->size()); EXPECT_EQ(0, read_json->get_flat_fields().size()); @@ -248,6 +335,7 @@ TEST_F(FlatJsonColumnCompactTest, testFlatJsonCompactToJson) { writer_opts.need_flat = false; test_json(writer_opts, jsons, read_col); + EXPECT_FALSE(_meta->json_meta().is_flat()); auto* read_json = get_json_column(read_col); EXPECT_FALSE(read_json->is_flat_json()); EXPECT_EQ(5, read_json->size()); @@ -276,6 +364,7 @@ TEST_F(FlatJsonColumnCompactTest, testFlatJsonCompactToJson2) { test_json(writer_opts, jsons, read_col); auto* read_json = get_json_column(read_col); + EXPECT_FALSE(_meta->json_meta().is_flat()); EXPECT_FALSE(read_json->is_flat_json()); EXPECT_EQ(5, read_json->size()); EXPECT_EQ(0, read_json->get_flat_fields().size()); @@ -304,6 +393,7 @@ TEST_F(FlatJsonColumnCompactTest, testFlatJsonCompactToJson3) { test_json(writer_opts, jsons, read_col); auto* read_json = get_json_column(read_col); + EXPECT_FALSE(_meta->json_meta().is_flat()); EXPECT_FALSE(read_json->is_flat_json()); EXPECT_EQ(5, read_json->size()); EXPECT_EQ(0, read_json->get_flat_fields().size()); @@ -332,6 +422,8 @@ TEST_F(FlatJsonColumnCompactTest, testFlatJsonCompactToJson4) { test_json(writer_opts, jsons, read_col); auto* read_json = get_json_column(read_col); + EXPECT_FALSE(_meta->json_meta().is_flat()); + EXPECT_FALSE(read_json->is_flat_json()); EXPECT_EQ(5, read_json->size()); EXPECT_EQ(0, read_json->get_flat_fields().size()); @@ -364,6 +456,8 @@ TEST_F(FlatJsonColumnCompactTest, testHyperJsonCompactToJson) { test_json(writer_opts, jsons, read_col); auto* read_json = get_json_column(read_col); + EXPECT_FALSE(_meta->json_meta().is_flat()); + EXPECT_FALSE(read_json->is_flat_json()); EXPECT_EQ(5, read_json->size()); EXPECT_EQ(0, read_json->get_flat_fields().size()); @@ -394,6 +488,8 @@ TEST_F(FlatJsonColumnCompactTest, testNullFlatJsonCompactToJson) { test_json(writer_opts, jsons, read_col); auto* read_json = get_json_column(read_col); + EXPECT_FALSE(_meta->json_meta().is_flat()); + EXPECT_FALSE(read_json->is_flat_json()); EXPECT_EQ(5, read_json->size()); EXPECT_EQ(0, read_json->get_flat_fields().size()); @@ -421,6 +517,7 @@ TEST_F(FlatJsonColumnCompactTest, testNullFlatJsonCompactToJson2) { test_json(writer_opts, jsons, read_col); auto* read_json = get_json_column(read_col); + EXPECT_FALSE(_meta->json_meta().is_flat()); EXPECT_FALSE(read_json->is_flat_json()); EXPECT_EQ(5, read_json->size()); EXPECT_EQ(0, read_json->get_flat_fields().size()); @@ -449,6 +546,7 @@ TEST_F(FlatJsonColumnCompactTest, testNullFlatJsonCompactToJson3) { test_json(writer_opts, jsons, read_col); auto* read_json = get_json_column(read_col); + EXPECT_FALSE(_meta->json_meta().is_flat()); EXPECT_FALSE(read_json->is_flat_json()); EXPECT_EQ(5, read_json->size()); EXPECT_EQ(0, read_json->get_flat_fields().size()); @@ -505,6 +603,7 @@ TEST_F(FlatJsonColumnCompactTest, testNullHyperJsonCompactToJson) { ColumnWriterOptions writer_opts; writer_opts.need_flat = false; test_json(writer_opts, jsons, read_col); + EXPECT_FALSE(_meta->json_meta().is_flat()); auto* read_json = get_json_column(read_col); EXPECT_FALSE(read_json->is_flat_json()); @@ -535,6 +634,9 @@ TEST_F(FlatJsonColumnCompactTest, testJsonCompactToFlatJson) { ColumnWriterOptions writer_opts; writer_opts.need_flat = true; test_json(writer_opts, jsons, read_col); + EXPECT_TRUE(_meta->json_meta().is_flat()); + EXPECT_FALSE(_meta->json_meta().has_remain()); + EXPECT_EQ(2, _meta->children_columns_size()); auto* read_json = get_json_column(read_col); EXPECT_FALSE(read_json->is_flat_json()); @@ -562,6 +664,8 @@ TEST_F(FlatJsonColumnCompactTest, testNullJsonCompactToFlatJson) { ColumnWriterOptions writer_opts; writer_opts.need_flat = true; test_json(writer_opts, jsons, read_col); + EXPECT_FALSE(_meta->json_meta().is_flat()); + EXPECT_FALSE(_meta->json_meta().has_remain()); auto* read_json = get_json_column(read_col); EXPECT_FALSE(read_json->is_flat_json()); @@ -589,6 +693,9 @@ TEST_F(FlatJsonColumnCompactTest, testFlatJsonCompactToFlatJson) { ColumnWriterOptions writer_opts; writer_opts.need_flat = true; test_json(writer_opts, jsons, read_col); + EXPECT_TRUE(_meta->json_meta().is_flat()); + EXPECT_FALSE(_meta->json_meta().has_remain()); + EXPECT_EQ(2, _meta->children_columns_size()); auto* read_json = get_json_column(read_col); EXPECT_FALSE(read_json->is_flat_json()); @@ -616,6 +723,9 @@ TEST_F(FlatJsonColumnCompactTest, testFlatJsonCompactToFlatJson2) { ColumnWriterOptions writer_opts; writer_opts.need_flat = true; test_json(writer_opts, jsons, read_col); + EXPECT_TRUE(_meta->json_meta().is_flat()); + EXPECT_TRUE(_meta->json_meta().has_remain()); + EXPECT_EQ(3, _meta->children_columns_size()); auto* read_json = get_json_column(read_col); EXPECT_FALSE(read_json->is_flat_json()); @@ -643,6 +753,9 @@ TEST_F(FlatJsonColumnCompactTest, testFlatJsonCompactToFlatJson3) { ColumnWriterOptions writer_opts; writer_opts.need_flat = true; test_json(writer_opts, jsons, read_col); + EXPECT_TRUE(_meta->json_meta().is_flat()); + EXPECT_TRUE(_meta->json_meta().has_remain()); + EXPECT_EQ(3, _meta->children_columns_size()); auto* read_json = get_json_column(read_col); EXPECT_FALSE(read_json->is_flat_json()); @@ -670,6 +783,12 @@ TEST_F(FlatJsonColumnCompactTest, testFlatJsonCompactToFlatJson4) { ColumnWriterOptions writer_opts; writer_opts.need_flat = true; test_json(writer_opts, jsons, read_col); + EXPECT_TRUE(_meta->json_meta().is_flat()); + EXPECT_TRUE(_meta->json_meta().has_remain()); + EXPECT_EQ(5, _meta->children_columns_size()); + EXPECT_EQ("b1.b2", _meta->children_columns(2).name()); + EXPECT_EQ("b1.b3.b4", _meta->children_columns(3).name()); + EXPECT_EQ("remain", _meta->children_columns(4).name()); auto* read_json = get_json_column(read_col); EXPECT_FALSE(read_json->is_flat_json()); @@ -701,6 +820,12 @@ TEST_F(FlatJsonColumnCompactTest, testHyperJsonCompactToFlatJson) { ColumnWriterOptions writer_opts; writer_opts.need_flat = true; test_json(writer_opts, jsons, read_col); + EXPECT_TRUE(_meta->json_meta().is_flat()); + EXPECT_TRUE(_meta->json_meta().has_remain()); + EXPECT_EQ(5, _meta->children_columns_size()); + EXPECT_EQ("b1.b2", _meta->children_columns(2).name()); + EXPECT_EQ("b1.b3.b4", _meta->children_columns(3).name()); + EXPECT_EQ("remain", _meta->children_columns(4).name()); auto* read_json = get_json_column(read_col); EXPECT_FALSE(read_json->is_flat_json()); @@ -716,6 +841,8 @@ TEST_F(FlatJsonColumnCompactTest, testHyperJsonCompactToFlatJson) { } TEST_F(FlatJsonColumnCompactTest, testNullFlatJsonCompactToFlatJson) { + config::json_flat_null_factor = 1; + config::json_flat_sparsity_factor = 0.7; // clang-format off Columns jsons = { flat_json(R"({"a": 1, "b": 21})", true), @@ -730,6 +857,9 @@ TEST_F(FlatJsonColumnCompactTest, testNullFlatJsonCompactToFlatJson) { ColumnWriterOptions writer_opts; writer_opts.need_flat = true; test_json(writer_opts, jsons, read_col); + EXPECT_TRUE(_meta->json_meta().is_flat()); + EXPECT_TRUE(_meta->json_meta().has_remain()); + EXPECT_EQ(4, _meta->children_columns_size()); auto* read_json = get_json_column(read_col); EXPECT_FALSE(read_json->is_flat_json()); @@ -743,6 +873,8 @@ TEST_F(FlatJsonColumnCompactTest, testNullFlatJsonCompactToFlatJson) { } TEST_F(FlatJsonColumnCompactTest, testNullFlatJsonCompactToFlatJson2) { + config::json_flat_null_factor = 1; + config::json_flat_sparsity_factor = 0.7; // clang-format off Columns jsons = { flat_json(R"({"a": 1, "b": 21})", true), @@ -757,6 +889,9 @@ TEST_F(FlatJsonColumnCompactTest, testNullFlatJsonCompactToFlatJson2) { ColumnWriterOptions writer_opts; writer_opts.need_flat = true; test_json(writer_opts, jsons, read_col); + EXPECT_TRUE(_meta->json_meta().is_flat()); + EXPECT_TRUE(_meta->json_meta().has_remain()); + EXPECT_EQ(4, _meta->children_columns_size()); auto* read_json = get_json_column(read_col); EXPECT_FALSE(read_json->is_flat_json()); @@ -770,6 +905,8 @@ TEST_F(FlatJsonColumnCompactTest, testNullFlatJsonCompactToFlatJson2) { } TEST_F(FlatJsonColumnCompactTest, testNullFlatJsonCompactToFlatJson3) { + config::json_flat_null_factor = 1; + config::json_flat_sparsity_factor = 0.7; // clang-format off Columns jsons = { flat_json(R"({"a": 1, "b": 21, "g": {}})", true), @@ -785,6 +922,10 @@ TEST_F(FlatJsonColumnCompactTest, testNullFlatJsonCompactToFlatJson3) { writer_opts.need_flat = true; test_json(writer_opts, jsons, read_col); + EXPECT_TRUE(_meta->json_meta().is_flat()); + EXPECT_TRUE(_meta->json_meta().has_remain()); + EXPECT_EQ(4, _meta->children_columns_size()); + auto* read_json = get_json_column(read_col); EXPECT_FALSE(read_json->is_flat_json()); EXPECT_EQ(5, read_json->size()); @@ -797,6 +938,8 @@ TEST_F(FlatJsonColumnCompactTest, testNullFlatJsonCompactToFlatJson3) { } TEST_F(FlatJsonColumnCompactTest, testNullFlatJsonCompactToFlatJson4) { + config::json_flat_null_factor = 1; + config::json_flat_sparsity_factor = 0.7; // clang-format off Columns jsons = { flat_json(R"({"a": 1, "b": 21, "b1": {"b2": 1, "b3": {"b4": "ab1", "b5": [1, 2, 3]}}, "g": {}})", true), @@ -811,6 +954,13 @@ TEST_F(FlatJsonColumnCompactTest, testNullFlatJsonCompactToFlatJson4) { ColumnWriterOptions writer_opts; writer_opts.need_flat = true; test_json(writer_opts, jsons, read_col); + EXPECT_TRUE(_meta->json_meta().is_flat()); + EXPECT_TRUE(_meta->json_meta().has_remain()); + EXPECT_EQ(6, _meta->children_columns_size()); + EXPECT_EQ("nulls", _meta->children_columns(0).name()); + EXPECT_EQ("b1.b2", _meta->children_columns(3).name()); + EXPECT_EQ("b1.b3.b4", _meta->children_columns(4).name()); + EXPECT_EQ("remain", _meta->children_columns(5).name()); auto* read_json = get_json_column(read_col); EXPECT_FALSE(read_json->is_flat_json()); @@ -841,6 +991,13 @@ TEST_F(FlatJsonColumnCompactTest, testNullHyperJsonCompactToFlatJson) { ColumnWriterOptions writer_opts; writer_opts.need_flat = true; test_json(writer_opts, jsons, read_col); + EXPECT_TRUE(_meta->json_meta().is_flat()); + EXPECT_TRUE(_meta->json_meta().has_remain()); + EXPECT_EQ(6, _meta->children_columns_size()); + EXPECT_EQ("nulls", _meta->children_columns(0).name()); + EXPECT_EQ("b1.b2", _meta->children_columns(3).name()); + EXPECT_EQ("b1.b3.b4", _meta->children_columns(4).name()); + EXPECT_EQ("remain", _meta->children_columns(5).name()); auto* read_json = get_json_column(read_col); EXPECT_FALSE(read_json->is_flat_json()); @@ -870,6 +1027,13 @@ TEST_F(FlatJsonColumnCompactTest, testNullHyperJsonCompactToFlatJson2) { ColumnWriterOptions writer_opts; writer_opts.need_flat = true; test_json(writer_opts, jsons, read_col); + EXPECT_TRUE(_meta->json_meta().is_flat()); + EXPECT_TRUE(_meta->json_meta().has_remain()); + EXPECT_EQ(5, _meta->children_columns_size()); + EXPECT_EQ("nulls", _meta->children_columns(0).name()); + EXPECT_EQ("b1.b2", _meta->children_columns(2).name()); + EXPECT_EQ("b1.b3.b4", _meta->children_columns(3).name()); + EXPECT_EQ("remain", _meta->children_columns(4).name()); auto* read_json = get_json_column(read_col); EXPECT_FALSE(read_json->is_flat_json()); @@ -897,6 +1061,12 @@ TEST_F(FlatJsonColumnCompactTest, testNullHyperJsonCompactToFlatJson3) { ColumnWriterOptions writer_opts; writer_opts.need_flat = true; test_json(writer_opts, jsons, read_col); + EXPECT_TRUE(_meta->json_meta().is_flat()); + EXPECT_TRUE(_meta->json_meta().has_remain()); + EXPECT_EQ(5, _meta->children_columns_size()); + EXPECT_EQ("b1.b2", _meta->children_columns(2).name()); + EXPECT_EQ("b1.b3.b4", _meta->children_columns(3).name()); + EXPECT_EQ("remain", _meta->children_columns(4).name()); auto* read_json = get_json_column(read_col); EXPECT_FALSE(read_json->is_flat_json()); @@ -924,6 +1094,9 @@ TEST_F(FlatJsonColumnCompactTest, testNullHyperJsonCompactToFlatJson4) { ColumnWriterOptions writer_opts; writer_opts.need_flat = true; test_json(writer_opts, jsons, read_col); + EXPECT_TRUE(_meta->json_meta().is_flat()); + EXPECT_TRUE(_meta->json_meta().has_remain()); + EXPECT_EQ(4, _meta->children_columns_size()); auto* read_json = get_json_column(read_col); EXPECT_FALSE(read_json->is_flat_json()); @@ -935,4 +1108,349 @@ TEST_F(FlatJsonColumnCompactTest, testNullHyperJsonCompactToFlatJson4) { EXPECT_EQ(jsons[i]->debug_item(0), read_col->debug_item(i)); } } + +TEST_F(FlatJsonColumnCompactTest, testHyperJsonCompactToFlatJsonRemain) { + config::json_flat_sparsity_factor = 0.6; + // clang-format off + Columns jsons = { + more_flat_json({ + R"({"a": 11, "b": "abc1", "c": 11})", + R"({"a": 12, "b": "abc2", "f": 12})", + R"({"a": 13, "b": "abc3", "e": 13})", + }, true), + more_flat_json({ + R"({"a": 21, "b": "efg1", "c": 21})", + R"({"a": 22, "b": "efg2", "c": 22})", + R"({"a": 23, "b": "efg3", "c": 23})", + R"({"a": 24, "b": "efg4", "c": 24})", + R"({"a": 25, "b": "efg5", "c": 25})", + R"({"a": 26, "b": "efg6", "c": 26})", + R"({"a": 27, "b": "efg7", "c": 27})", + R"({"a": 28, "b": "efg8", "c": 28})", + R"({"a": 29, "b": "efg9", "c": 29})", + R"({"a": 20, "b": "qwe1", "c": 20})", + R"({"a": 31, "b": "qwe2", "c": 30})", + R"({"a": 32, "b": "qwe3", "c": 31})", + R"({"a": 33, "b": "qwe4", "c": 32})", + }, true), + more_flat_json({ + R"({"a": 31, "b": "xwy1", "x": 31})", + R"({"a": 32, "b": "xwy2", "x": 32})", + R"({"a": 33, "b": "xwy3", "x": 33})", + R"({"a": 34, "b": "xwy4", "x": 34})", + R"({"a": 35, "b": "xwy5", "x": 35})", + }, true), + }; + // clang-format on + + ColumnPtr read_col = jsons[0]->clone_empty(); + ColumnWriterOptions writer_opts; + writer_opts.need_flat = true; + test_json(writer_opts, jsons, read_col); + + EXPECT_TRUE(_meta->json_meta().is_flat()); + EXPECT_TRUE(_meta->json_meta().has_remain()); + EXPECT_EQ(5, _meta->children_columns_size()); + EXPECT_EQ("nulls", _meta->children_columns(0).name()); + EXPECT_EQ("a", _meta->children_columns(1).name()); + EXPECT_EQ("c", _meta->children_columns(3).name()); + EXPECT_EQ("remain", _meta->children_columns(4).name()); + + EXPECT_EQ(R"({"a": 11, "b": "abc1", "c": 11})", read_col->debug_item(0)); + EXPECT_EQ(R"({"a": 12, "b": "abc2", "f": 12})", read_col->debug_item(1)); + EXPECT_EQ(R"({"a": 25, "b": "efg5", "c": 25})", read_col->debug_item(7)); + EXPECT_EQ(R"({"a": 33, "b": "qwe4", "c": 32})", read_col->debug_item(15)); + EXPECT_EQ(R"({"a": 33, "b": "xwy3", "x": 33})", read_col->debug_item(18)); +} + +TEST_F(FlatJsonColumnCompactTest, testHyperJsonCompactToFlatJsonRemain3) { + config::json_flat_sparsity_factor = 0.6; + // clang-format off + Columns jsons = { + more_flat_json({ + R"({"a": 11, "b": "abc1", "c": {"d": 21, "e": 211}})", + R"({"a": 12, "b": "abc2", "c": {"d": 22, "e": 221}})", + R"({"a": 13, "b": "abc3", "c": {"d": 23, "e": 231}})", + }, true), + more_flat_json({ + R"({"a": 21, "b": "efg1", "c": "c21"})", + R"({"a": 22, "b": "efg2", "c": "c22"})", + R"({"a": 23, "b": "efg3", "c": "c23"})", + R"({"a": 24, "b": "efg4", "c": "c24"})", + R"({"a": 25, "b": "efg5", "c": "c25"})", + R"({"a": 26, "b": "efg6", "c": "c26"})", + R"({"a": 27, "b": "efg7", "c": "c27"})", + R"({"a": 28, "b": "efg8", "c": "c28"})", + R"({"a": 29, "b": "efg9", "c": "c29"})", + R"({"a": 20, "b": "qwe1", "c": "c20"})", + R"({"a": 31, "b": "qwe2", "c": "c30"})", + R"({"a": 32, "b": "qwe3", "c": "c31"})", + R"({"a": 33, "b": "qwe4", "c": "c32"})", + }, true), + more_flat_json({ + R"({"a": 31, "b": "xwy1", "c": "d31"})", + R"({"a": 32, "b": "xwy2", "c": "d32"})", + R"({"a": 33, "b": "xwy3", "c": "d33"})", + R"({"a": 34, "b": "xwy4", "c": "d34"})", + R"({"a": 35, "b": "xwy5", "c": "d35"})", + }, true), + }; + // clang-format on + + ColumnPtr read_col = jsons[0]->clone_empty(); + ColumnWriterOptions writer_opts; + writer_opts.need_flat = true; + test_json(writer_opts, jsons, read_col); + + EXPECT_TRUE(_meta->json_meta().is_flat()); + EXPECT_TRUE(_meta->json_meta().has_remain()); + EXPECT_EQ(4, _meta->children_columns_size()); + EXPECT_EQ("nulls", _meta->children_columns(0).name()); + EXPECT_EQ("a", _meta->children_columns(1).name()); + EXPECT_EQ("remain", _meta->children_columns(3).name()); + + auto* read_json = get_json_column(read_col); + EXPECT_FALSE(read_json->is_flat_json()); + EXPECT_EQ(21, read_json->size()); + EXPECT_EQ(R"({"a": 12, "b": "abc2", "c": {"d": 22, "e": 221}})", read_col->debug_item(1)); + EXPECT_EQ(R"({"a": 29, "b": "efg9", "c": "c29"})", read_col->debug_item(11)); + EXPECT_EQ(R"({"a": 33, "b": "xwy3", "c": "d33"})", read_col->debug_item(18)); +} + +TEST_F(FlatJsonColumnCompactTest, testHyperJsonCompactToFlatJsonRemainRead) { + config::json_flat_sparsity_factor = 0.6; + // clang-format off + Columns jsons = { + more_flat_json({ + R"({"a": 11, "b": "abc1", "c": 11})", + R"({"a": 12, "b": "abc2", "f": 12})", + R"({"a": 13, "b": "abc3", "e": 13})", + }, true), + more_flat_json({ + R"({"a": 21, "b": "efg1", "c": 21})", + R"({"a": 22, "b": "efg2", "c": 22})", + R"({"a": 23, "b": "efg3", "c": 23})", + R"({"a": 24, "b": "efg4", "c": 24})", + R"({"a": 25, "b": "efg5", "c": 25})", + R"({"a": 26, "b": "efg6", "c": 26})", + R"({"a": 27, "b": "efg7", "c": 27})", + R"({"a": 28, "b": "efg8", "c": 28})", + R"({"a": 29, "b": "efg9", "c": 29})", + R"({"a": 20, "b": "qwe1", "c": 20})", + R"({"a": 31, "b": "qwe2", "c": 30})", + R"({"a": 32, "b": "qwe3", "c": 31})", + R"({"a": 33, "b": "qwe4", "c": 32})", + }, true), + more_flat_json({ + R"({"a": 31, "b": "xwy1", "x": 31})", + R"({"a": 32, "b": "xwy2", "x": 32})", + R"({"a": 33, "b": "xwy3", "x": 33})", + R"({"a": 34, "b": "xwy4", "x": 34})", + R"({"a": 35, "b": "xwy5", "x": 35})", + }, true), + }; + // clang-format on + + ASSIGN_OR_ABORT(auto root, ColumnAccessPath::create(TAccessPathType::FIELD, "root", 0)); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_BIGINT, "c"); + + ColumnPtr read_col = jsons[0]->clone_empty(); + ColumnWriterOptions writer_opts; + writer_opts.need_flat = true; + test_json(writer_opts, jsons, read_col, root.get()); + + EXPECT_EQ(5, _meta->children_columns_size()); + EXPECT_EQ("nulls", _meta->children_columns(0).name()); + EXPECT_EQ("a", _meta->children_columns(1).name()); + EXPECT_EQ("c", _meta->children_columns(3).name()); + EXPECT_EQ("remain", _meta->children_columns(4).name()); + + EXPECT_EQ(R"({c: 11})", read_col->debug_item(0)); + EXPECT_EQ(R"({c: NULL})", read_col->debug_item(1)); + EXPECT_EQ(R"({c: 25})", read_col->debug_item(7)); + EXPECT_EQ(R"({c: 32})", read_col->debug_item(15)); + EXPECT_EQ(R"({c: NULL})", read_col->debug_item(18)); +} + +TEST_F(FlatJsonColumnCompactTest, testHyperJsonCompactToFlatJsonRemainRead3) { + config::json_flat_sparsity_factor = 0.6; + // clang-format off + Columns jsons = { + more_flat_json({ + R"({"a": 11, "b": "abc1", "c": {"d": 21, "e": 211}})", + R"({"a": 12, "b": "abc2", "c": {"d": 22, "e": 221}})", + R"({"a": 13, "b": "abc3", "c": {"d": 23, "e": 231}})", + }, true), + more_flat_json({ + R"({"a": 21, "b": "efg1", "c": "c21"})", + R"({"a": 22, "b": "efg2", "c": "c22"})", + R"({"a": 23, "b": "efg3", "c": "c23"})", + R"({"a": 24, "b": "efg4", "c": "c24"})", + R"({"a": 25, "b": "efg5", "c": "c25"})", + R"({"a": 26, "b": "efg6", "c": "c26"})", + R"({"a": 27, "b": "efg7", "c": "c27"})", + R"({"a": 28, "b": "efg8", "c": "c28"})", + R"({"a": 29, "b": "efg9", "c": "c29"})", + R"({"a": 20, "b": "qwe1", "c": "c20"})", + R"({"a": 31, "b": "qwe2", "c": "c30"})", + R"({"a": 32, "b": "qwe3", "c": "c31"})", + R"({"a": 33, "b": "qwe4", "c": "c32"})", + }, true), + more_flat_json({ + R"({"a": 31, "b": "xwy1", "c": "d31"})", + R"({"a": 32, "b": "xwy2", "c": "d32"})", + R"({"a": 33, "b": "xwy3", "c": "d33"})", + R"({"a": 34, "b": "xwy4", "c": "d34"})", + R"({"a": 35, "b": "xwy5", "c": "d35"})", + }, true), + }; + // clang-format on + + ASSIGN_OR_ABORT(auto root, ColumnAccessPath::create(TAccessPathType::FIELD, "root", 0)); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_VARCHAR, "c"); + + ColumnPtr read_col = jsons[0]->clone_empty(); + ColumnWriterOptions writer_opts; + writer_opts.need_flat = true; + test_json(writer_opts, jsons, read_col, root.get()); + + EXPECT_EQ(4, _meta->children_columns_size()); + EXPECT_EQ("nulls", _meta->children_columns(0).name()); + EXPECT_EQ("a", _meta->children_columns(1).name()); + EXPECT_EQ("remain", _meta->children_columns(3).name()); + + auto* read_json = get_json_column(read_col); + EXPECT_TRUE(read_json->is_flat_json()); + EXPECT_EQ(21, read_json->size()); + EXPECT_EQ(1, read_json->get_flat_fields().size()); + EXPECT_EQ(R"({c: '{"d": 22, "e": 221}'})", read_col->debug_item(1)); + EXPECT_EQ(R"({c: 'c29'})", read_col->debug_item(11)); + EXPECT_EQ(R"({c: 'd33'})", read_col->debug_item(18)); +} + +TEST_F(FlatJsonColumnCompactTest, testHyperJsonCompactPaths) { + config::json_flat_sparsity_factor = 0.6; + // clang-format off + Columns jsons = { + more_flat_json({ + R"({"a": 11, "b": "abc1", "c": 11})", + R"({"a": 12, "b": "abc2", "f": 12})", + R"({"a": 13, "b": "abc3", "e": 13})", + }, true), + more_flat_json({ + R"({"a": 21, "b": "efg1", "c": 21})", + R"({"a": 22, "b": "efg2", "c": 22})", + R"({"a": 23, "b": "efg3", "c": 23})", + R"({"a": 24, "b": "efg4", "c": 24})", + R"({"a": 25, "b": "efg5", "c": 25})", + R"({"a": 26, "b": "efg6", "c": 26})", + R"({"a": 27, "b": "efg7", "c": 27})", + R"({"a": 28, "b": "efg8", "c": 28})", + R"({"a": 29, "b": "efg9", "c": 29})", + R"({"a": 20, "b": "qwe1", "c": 20})", + R"({"a": 31, "b": "qwe2", "c": 30})", + R"({"a": 32, "b": "qwe3", "c": 31})", + R"({"a": 33, "b": "qwe4", "c": 32})", + }, true), + more_flat_json({ + R"({"a": 31, "b": "xwy1", "x": 31})", + R"({"a": 32, "b": "xwy2", "x": 32})", + R"({"a": 33, "b": "xwy3", "x": 33})", + R"({"a": 34, "b": "xwy4", "x": 34})", + R"({"a": 35, "b": "xwy5", "x": 35})", + }, true), + }; + // clang-format on + + JsonPathDeriver deriver; + test_compact_path(jsons, &deriver); + + EXPECT_EQ(2, deriver.flat_paths().size()); + EXPECT_EQ(R"([a(BIGINT), b(VARCHAR)])", + JsonFlatPath::debug_flat_json(deriver.flat_paths(), deriver.flat_types(), deriver.has_remain_json())); +} + +TEST_F(FlatJsonColumnCompactTest, testHyperJsonCompactPaths2) { + config::json_flat_sparsity_factor = 0.6; + // clang-format off + Columns jsons = { + more_flat_json({ + R"({"a": 11, "b": "abc1", "c": 11})", + R"({"a": 12, "b": "abc2", "c": 12})", + R"({"a": 13, "b": "abc3", "c": 13})", + }, true), + more_flat_json({ + R"({"a": 21, "b": "efg1", "c": {"d": 21, "e": 211}})", + R"({"a": 22, "b": "efg2", "c": {"d": 22, "e": 221}})", + R"({"a": 23, "b": "efg3", "c": {"d": 23, "e": 231}})", + R"({"a": 24, "b": "efg4", "c": {"d": 24, "e": 241}})", + R"({"a": 25, "b": "efg5", "c": {"d": 25, "e": 251}})", + R"({"a": 26, "b": "efg6", "c": {"d": 26, "e": 261}})", + R"({"a": 27, "b": "efg7", "c": {"d": 27, "e": 271}})", + R"({"a": 28, "b": "efg8", "c": {"d": 28, "e": 281}})", + R"({"a": 29, "b": "efg9", "c": {"d": 29, "e": 291}})", + R"({"a": 20, "b": "qwe1", "c": {"d": 20, "e": 201}})", + R"({"a": 31, "b": "qwe2", "c": {"d": 30, "e": 301}})", + R"({"a": 32, "b": "qwe3", "c": {"d": 31, "e": 311}})", + R"({"a": 33, "b": "qwe4", "c": {"d": 32, "e": 321}})", + }, true), + more_flat_json({ + R"({"a": 31, "b": "xwy1", "c": {"d": 31, "e": 311, "f": 312}})", + R"({"a": 32, "b": "xwy2", "c": {"d": 32, "e": 321, "f": 322}})", + R"({"a": 33, "b": "xwy3", "c": {"d": 33, "e": 331, "f": 332}})", + R"({"a": 34, "b": "xwy4", "c": {"d": 34, "e": 341, "f": 342}})", + R"({"a": 35, "b": "xwy5", "c": {"d": 35, "e": 351, "f": 352}})", + }, true), + }; + // clang-format on + + JsonPathDeriver deriver; + test_compact_path(jsons, &deriver); + + EXPECT_EQ(2, deriver.flat_paths().size()); + EXPECT_EQ(R"([a(BIGINT), b(VARCHAR)])", + JsonFlatPath::debug_flat_json(deriver.flat_paths(), deriver.flat_types(), deriver.has_remain_json())); +} + +TEST_F(FlatJsonColumnCompactTest, testHyperJsonCompactPaths3) { + config::json_flat_sparsity_factor = 0.6; + // clang-format off + Columns jsons = { + more_flat_json({ + R"({"a": 11, "b": "abc1", "c": {"d": 21, "e": 211}})", + R"({"a": 12, "b": "abc2", "c": {"d": 22, "e": 221}})", + R"({"a": 13, "b": "abc3", "c": {"d": 23, "e": 231}})", + }, true), + more_flat_json({ + R"({"a": 21, "b": "efg1", "c": "c21"})", + R"({"a": 22, "b": "efg2", "c": "c22"})", + R"({"a": 23, "b": "efg3", "c": "c23"})", + R"({"a": 24, "b": "efg4", "c": "c24"})", + R"({"a": 25, "b": "efg5", "c": "c25"})", + R"({"a": 26, "b": "efg6", "c": "c26"})", + R"({"a": 27, "b": "efg7", "c": "c27"})", + R"({"a": 28, "b": "efg8", "c": "c28"})", + R"({"a": 29, "b": "efg9", "c": "c29"})", + R"({"a": 20, "b": "qwe1", "c": "c20"})", + R"({"a": 31, "b": "qwe2", "c": "c30"})", + R"({"a": 32, "b": "qwe3", "c": "c31"})", + R"({"a": 33, "b": "qwe4", "c": "c32"})", + }, true), + more_flat_json({ + R"({"a": 31, "b": "xwy1", "c": "d31"})", + R"({"a": 32, "b": "xwy2", "c": "d32"})", + R"({"a": 33, "b": "xwy3", "c": "d33"})", + R"({"a": 34, "b": "xwy4", "c": "d34"})", + R"({"a": 35, "b": "xwy5", "c": "d35"})", + }, true), + }; + // clang-format on + + JsonPathDeriver deriver; + test_compact_path(jsons, &deriver); + + EXPECT_EQ(2, deriver.flat_paths().size()); + EXPECT_EQ(R"([a(BIGINT), b(VARCHAR)])", + JsonFlatPath::debug_flat_json(deriver.flat_paths(), deriver.flat_types(), deriver.has_remain_json())); +} } // namespace starrocks diff --git a/be/test/storage/rowset/flat_json_column_rw_test.cpp b/be/test/storage/rowset/flat_json_column_rw_test.cpp index 1c248dbaaf03e..a523244021979 100644 --- a/be/test/storage/rowset/flat_json_column_rw_test.cpp +++ b/be/test/storage/rowset/flat_json_column_rw_test.cpp @@ -613,8 +613,8 @@ TEST_F(FlatJsonColumnRWTest, testDeepFlatJson) { } ASSIGN_OR_ABORT(auto root, ColumnAccessPath::create(TAccessPathType::FIELD, "root", 0)); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "root.b.b4.b5"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "root.b.b2.b3"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "b.b4.b5"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "b.b2.b3"); ColumnPtr read_col = JsonColumn::create(); ColumnWriterOptions writer_opts; @@ -658,11 +658,11 @@ TEST_F(FlatJsonColumnRWTest, testHyperFlatJson) { } ASSIGN_OR_ABORT(auto root, ColumnAccessPath::create(TAccessPathType::FIELD, "root", 0)); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "root.b.b4.b5"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "root.b.b2.b3"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "root.a"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "root.ff.f1"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "root.gg.g1"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "b.b4.b5"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "b.b2.b3"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "a"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "ff.f1"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "gg.g1"); ColumnPtr read_col = JsonColumn::create(); ColumnWriterOptions writer_opts; @@ -825,8 +825,8 @@ TEST_F(FlatJsonColumnRWTest, testDeepJson) { } ASSIGN_OR_ABORT(auto root, ColumnAccessPath::create(TAccessPathType::FIELD, "root", 0)); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "root.b.b4.b5"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "root.b.b2.b3"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "b.b4.b5"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "b.b2.b3"); ColumnPtr read_col = JsonColumn::create(); ColumnWriterOptions writer_opts; @@ -866,11 +866,11 @@ TEST_F(FlatJsonColumnRWTest, testHyperJson) { } ASSIGN_OR_ABORT(auto root, ColumnAccessPath::create(TAccessPathType::FIELD, "root", 0)); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "root.b.b4.b5"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "root.b.b2.b3"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "root.a"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "root.ff.f1"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "root.gg.g1"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "b.b4.b5"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "b.b2.b3"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "a"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "ff.f1"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "gg.g1"); ColumnPtr read_col = JsonColumn::create(); ColumnWriterOptions writer_opts; @@ -910,11 +910,11 @@ TEST_F(FlatJsonColumnRWTest, testHyperNoCastTypeJson) { } ASSIGN_OR_ABORT(auto root, ColumnAccessPath::create(TAccessPathType::FIELD, "root", 0)); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_BIGINT, "root.b.b4.b5"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_VARCHAR, "root.b.b2.b3"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_BIGINT, "root.a"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "root.ff.f1"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_VARCHAR, "root.gg.g1"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_BIGINT, "b.b4.b5"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_VARCHAR, "b.b2.b3"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_BIGINT, "a"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "ff.f1"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_VARCHAR, "gg.g1"); ColumnPtr read_col = JsonColumn::create(); ColumnWriterOptions writer_opts; @@ -950,11 +950,11 @@ TEST_F(FlatJsonColumnRWTest, testHyperCastTypeJson) { } ASSIGN_OR_ABORT(auto root, ColumnAccessPath::create(TAccessPathType::FIELD, "root", 0)); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_DOUBLE, "root.b.b4.b5"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_BIGINT, "root.b.b2"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_VARCHAR, "root.a"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_BIGINT, "root.ff.f1"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "root.gg.g1"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_DOUBLE, "b.b4.b5"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_BIGINT, "b.b2"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_VARCHAR, "a"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_BIGINT, "ff.f1"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "gg.g1"); ColumnPtr read_col = JsonColumn::create(); ColumnWriterOptions writer_opts; @@ -990,11 +990,11 @@ TEST_F(FlatJsonColumnRWTest, testHyperCastTypeJson2) { } ASSIGN_OR_ABORT(auto root, ColumnAccessPath::create(TAccessPathType::FIELD, "root", 0)); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_DOUBLE, "root.b.b4.b5"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_VARCHAR, "root.b.b2"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_VARCHAR, "root.a"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_BIGINT, "root.ff.f1"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "root.gg.g1"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_DOUBLE, "b.b4.b5"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_VARCHAR, "b.b2"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_VARCHAR, "a"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_BIGINT, "ff.f1"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "gg.g1"); ColumnPtr read_col = JsonColumn::create(); ColumnWriterOptions writer_opts; @@ -1252,8 +1252,8 @@ TEST_F(FlatJsonColumnRWTest, testDeepNullFlatJson) { ColumnPtr write_col = create_json(json, true); ASSIGN_OR_ABORT(auto root, ColumnAccessPath::create(TAccessPathType::FIELD, "root", 0)); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "root.b.b4.b5"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "root.b.b2.b3"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "b.b4.b5"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "b.b2.b3"); ColumnPtr read_col = write_col->clone_empty(); ColumnWriterOptions writer_opts; @@ -1292,11 +1292,11 @@ TEST_F(FlatJsonColumnRWTest, testHyperNullFlatJson) { ColumnPtr write_col = create_json(json, true); ASSIGN_OR_ABORT(auto root, ColumnAccessPath::create(TAccessPathType::FIELD, "root", 0)); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_DOUBLE, "root.b.b4.b5"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_VARCHAR, "root.b.b2"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_VARCHAR, "root.a"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_BIGINT, "root.ff.f1"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "root.gg.g1"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_DOUBLE, "b.b4.b5"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_VARCHAR, "b.b2"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_VARCHAR, "a"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_BIGINT, "ff.f1"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "gg.g1"); ColumnPtr read_col = write_col->clone_empty(); ColumnWriterOptions writer_opts; @@ -1459,8 +1459,8 @@ TEST_F(FlatJsonColumnRWTest, testDeepNullJson) { ColumnPtr write_col = create_json(json, true); ASSIGN_OR_ABORT(auto root, ColumnAccessPath::create(TAccessPathType::FIELD, "root", 0)); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "root.b.b4.b5"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "root.b.b2.b3"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "b.b4.b5"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "b.b2.b3"); ColumnPtr read_col = write_col->clone_empty(); ColumnWriterOptions writer_opts; @@ -1495,11 +1495,11 @@ TEST_F(FlatJsonColumnRWTest, testHyperNullJson) { auto write_col = create_json(json, true); ASSIGN_OR_ABORT(auto root, ColumnAccessPath::create(TAccessPathType::FIELD, "root", 0)); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "root.b.b4.b5"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "root.b.b2.b3"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "root.a"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "root.ff.f1"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "root.gg.g1"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "b.b4.b5"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "b.b2.b3"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "a"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "ff.f1"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "gg.g1"); ColumnPtr read_col = write_col->clone_empty(); ColumnWriterOptions writer_opts; @@ -1534,11 +1534,11 @@ TEST_F(FlatJsonColumnRWTest, testHyperNullJson2) { auto write_col = create_json(json, true); ASSIGN_OR_ABORT(auto root, ColumnAccessPath::create(TAccessPathType::FIELD, "root", 0)); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "root.b.b4.b5"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "root.b.b2.b3"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "root.a"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "root.ff.f1"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "root.gg.g1"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "b.b4.b5"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "b.b2.b3"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "a"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "ff.f1"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "gg.g1"); ColumnPtr read_col = write_col->clone_empty(); ColumnWriterOptions writer_opts; @@ -1572,11 +1572,11 @@ TEST_F(FlatJsonColumnRWTest, testHyperNoCastTypeNullJson) { ColumnPtr write_col = create_json(json, true); ASSIGN_OR_ABORT(auto root, ColumnAccessPath::create(TAccessPathType::FIELD, "root", 0)); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_BIGINT, "root.b.b4.b5"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_VARCHAR, "root.b.b2.b3"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_BIGINT, "root.a"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "root.ff.f1"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_VARCHAR, "root.gg.g1"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_BIGINT, "b.b4.b5"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_VARCHAR, "b.b2.b3"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_BIGINT, "a"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "ff.f1"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_VARCHAR, "gg.g1"); ColumnPtr read_col = write_col->clone_empty(); ColumnWriterOptions writer_opts; @@ -1605,11 +1605,11 @@ TEST_F(FlatJsonColumnRWTest, testHyperCastTypeNullJson) { ColumnPtr write_col = create_json(json, true); ASSIGN_OR_ABORT(auto root, ColumnAccessPath::create(TAccessPathType::FIELD, "root", 0)); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_DOUBLE, "root.b.b4.b5"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_BIGINT, "root.b.b2"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_VARCHAR, "root.a"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_BIGINT, "root.ff.f1"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "root.gg.g1"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_DOUBLE, "b.b4.b5"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_BIGINT, "b.b2"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_VARCHAR, "a"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_BIGINT, "ff.f1"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "gg.g1"); ColumnPtr read_col = write_col->clone_empty(); ColumnWriterOptions writer_opts; @@ -1638,11 +1638,11 @@ TEST_F(FlatJsonColumnRWTest, testHyperCastTypeNullJson2) { ColumnPtr write_col = create_json(json, true); ASSIGN_OR_ABORT(auto root, ColumnAccessPath::create(TAccessPathType::FIELD, "root", 0)); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_DOUBLE, "root.b.b4.b5"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_VARCHAR, "root.b.b2"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_VARCHAR, "root.a"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_BIGINT, "root.ff.f1"); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "root.gg.g1"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_DOUBLE, "b.b4.b5"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_VARCHAR, "b.b2"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_VARCHAR, "a"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_BIGINT, "ff.f1"); + ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_JSON, "gg.g1"); ColumnPtr read_col = write_col->clone_empty(); ColumnWriterOptions writer_opts;