Skip to content

Commit

Permalink
[BugFix][FlatJson] Fix flat json bugs (backport #50223) (#50292)
Browse files Browse the repository at this point in the history
Co-authored-by: Seaven <seaven_7@qq.com>
  • Loading branch information
mergify[bot] and Seaven authored Aug 29, 2024
1 parent da18f99 commit 2ad4ba4
Show file tree
Hide file tree
Showing 9 changed files with 157 additions and 30 deletions.
34 changes: 26 additions & 8 deletions be/src/connector/lake_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,6 @@ void LakeDataSource::init_counter(RuntimeState* state) {
_prefetch_wait_finish_timer = ADD_CHILD_TIMER(_runtime_profile, "PrefetchWaitFinishTime", io_statistics_name);
_prefetch_pending_timer = ADD_CHILD_TIMER(_runtime_profile, "PrefetchPendingTime", io_statistics_name);

_json_flatten_timer = ADD_CHILD_TIMER(_runtime_profile, "JsonFlattern", segment_read_name);
_access_path_hits_counter = ADD_COUNTER(_runtime_profile, "AccessPathHits", TUnit::UNIT);
_access_path_unhits_counter = ADD_COUNTER(_runtime_profile, "AccessPathUnhits", TUnit::UNIT);
}
Expand Down Expand Up @@ -708,17 +707,19 @@ void LakeDataSource::update_counter() {
std::string access_path_hits = "AccessPathHits";
int64_t total = 0;
for (auto& [k, v] : _reader->stats().flat_json_hits) {
auto* path_counter = _runtime_profile->get_counter(fmt::format("[Hit]{}", k));
std::string path = fmt::format("[Hit]{}", k);
auto* path_counter = _runtime_profile->get_counter(path);
if (path_counter == nullptr) {
path_counter = ADD_CHILD_COUNTER(_runtime_profile, k, TUnit::UNIT, access_path_hits);
path_counter = ADD_CHILD_COUNTER(_runtime_profile, path, TUnit::UNIT, access_path_hits);
}
total += v;
COUNTER_UPDATE(path_counter, v);
}
for (auto& [k, v] : _reader->stats().merge_json_hits) {
auto* path_counter = _runtime_profile->get_counter(fmt::format("[HitMerge]{}", k));
std::string merge_path = fmt::format("[HitMerge]{}", k);
auto* path_counter = _runtime_profile->get_counter(merge_path);
if (path_counter == nullptr) {
path_counter = ADD_CHILD_COUNTER(_runtime_profile, k, TUnit::UNIT, access_path_hits);
path_counter = ADD_CHILD_COUNTER(_runtime_profile, merge_path, TUnit::UNIT, access_path_hits);
}
total += v;
COUNTER_UPDATE(path_counter, v);
Expand All @@ -729,17 +730,34 @@ void LakeDataSource::update_counter() {
std::string access_path_unhits = "AccessPathUnhits";
int64_t total = 0;
for (auto& [k, v] : _reader->stats().dynamic_json_hits) {
auto* path_counter = _runtime_profile->get_counter(fmt::format("[Unhit]{}", k));
std::string path = fmt::format("[Unhit]{}", k);
auto* path_counter = _runtime_profile->get_counter(path);
if (path_counter == nullptr) {
path_counter = ADD_CHILD_COUNTER(_runtime_profile, k, TUnit::UNIT, access_path_unhits);
path_counter = ADD_CHILD_COUNTER(_runtime_profile, path, TUnit::UNIT, access_path_unhits);
}
total += v;
COUNTER_UPDATE(path_counter, v);
}
COUNTER_UPDATE(_access_path_unhits_counter, total);
}

COUNTER_UPDATE(_json_flatten_timer, _reader->stats().json_flatten_ns);
std::string parent_name = "SegmentRead";
if (_reader->stats().json_init_ns > 0) {
RuntimeProfile::Counter* c = ADD_CHILD_TIMER(_runtime_profile, "FlatJsonInit", parent_name);
COUNTER_UPDATE(c, _reader->stats().json_init_ns);
}
if (_reader->stats().json_cast_ns > 0) {
RuntimeProfile::Counter* c = ADD_CHILD_TIMER(_runtime_profile, "FlatJsonCast", parent_name);
COUNTER_UPDATE(c, _reader->stats().json_cast_ns);
}
if (_reader->stats().json_merge_ns > 0) {
RuntimeProfile::Counter* c = ADD_CHILD_TIMER(_runtime_profile, "FlatJsonMerge", parent_name);
COUNTER_UPDATE(c, _reader->stats().json_merge_ns);
}
if (_reader->stats().json_flatten_ns > 0) {
RuntimeProfile::Counter* c = ADD_CHILD_TIMER(_runtime_profile, "FlatJsonFlatten", parent_name);
COUNTER_UPDATE(c, _reader->stats().json_flatten_ns);
}
}

// ================================
Expand Down
1 change: 0 additions & 1 deletion be/src/connector/lake_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ class LakeDataSource final : public DataSource {
RuntimeProfile::Counter* _prefetch_pending_timer = nullptr;

RuntimeProfile::Counter* _pushdown_access_paths_counter = nullptr;
RuntimeProfile::Counter* _json_flatten_timer = nullptr;
RuntimeProfile::Counter* _access_path_hits_counter = nullptr;
RuntimeProfile::Counter* _access_path_unhits_counter = nullptr;
};
Expand Down
36 changes: 27 additions & 9 deletions be/src/exec/pipeline/scan/olap_chunk_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ void OlapChunkSource::_init_counter(RuntimeState* state) {
// IOTime
_io_timer = ADD_CHILD_TIMER(_runtime_profile, "IOTime", IO_TASK_EXEC_TIMER_NAME);

_json_flatten_timer = ADD_CHILD_TIMER(_runtime_profile, "JsonFlattern", segment_read_name);
_access_path_hits_counter = ADD_COUNTER(_runtime_profile, "AccessPathHits", TUnit::UNIT);
_access_path_unhits_counter = ADD_COUNTER(_runtime_profile, "AccessPathUnhits", TUnit::UNIT);
}
Expand Down Expand Up @@ -647,21 +646,23 @@ void OlapChunkSource::_update_counter() {
COUNTER_UPDATE(c2, _reader->stats().rows_del_filtered);
}

if (_reader->stats().flat_json_hits.size() > 0) {
if (_reader->stats().flat_json_hits.size() > 0 || _reader->stats().merge_json_hits.size() > 0) {
std::string access_path_hits = "AccessPathHits";
int64_t total = 0;
for (auto& [k, v] : _reader->stats().flat_json_hits) {
auto* path_counter = _runtime_profile->get_counter(fmt::format("[Hit]{}", k));
std::string path = fmt::format("[Hit]{}", k);
auto* path_counter = _runtime_profile->get_counter(path);
if (path_counter == nullptr) {
path_counter = ADD_CHILD_COUNTER(_runtime_profile, k, TUnit::UNIT, access_path_hits);
path_counter = ADD_CHILD_COUNTER(_runtime_profile, path, TUnit::UNIT, access_path_hits);
}
total += v;
COUNTER_UPDATE(path_counter, v);
}
for (auto& [k, v] : _reader->stats().merge_json_hits) {
auto* path_counter = _runtime_profile->get_counter(fmt::format("[HitMerge]{}", k));
std::string merge_path = fmt::format("[HitMerge]{}", k);
auto* path_counter = _runtime_profile->get_counter(merge_path);
if (path_counter == nullptr) {
path_counter = ADD_CHILD_COUNTER(_runtime_profile, k, TUnit::UNIT, access_path_hits);
path_counter = ADD_CHILD_COUNTER(_runtime_profile, merge_path, TUnit::UNIT, access_path_hits);
}
total += v;
COUNTER_UPDATE(path_counter, v);
Expand All @@ -672,17 +673,34 @@ void OlapChunkSource::_update_counter() {
std::string access_path_unhits = "AccessPathUnhits";
int64_t total = 0;
for (auto& [k, v] : _reader->stats().dynamic_json_hits) {
auto* path_counter = _runtime_profile->get_counter(fmt::format("[Unhit]{}", k));
std::string path = fmt::format("[Unhit]{}", k);
auto* path_counter = _runtime_profile->get_counter(path);
if (path_counter == nullptr) {
path_counter = ADD_CHILD_COUNTER(_runtime_profile, k, TUnit::UNIT, access_path_unhits);
path_counter = ADD_CHILD_COUNTER(_runtime_profile, path, TUnit::UNIT, access_path_unhits);
}
total += v;
COUNTER_UPDATE(path_counter, v);
}
COUNTER_UPDATE(_access_path_unhits_counter, total);
}

COUNTER_UPDATE(_json_flatten_timer, _reader->stats().json_flatten_ns);
std::string parent_name = "SegmentRead";
if (_reader->stats().json_init_ns > 0) {
RuntimeProfile::Counter* c = ADD_CHILD_TIMER(_runtime_profile, "FlatJsonInit", parent_name);
COUNTER_UPDATE(c, _reader->stats().json_init_ns);
}
if (_reader->stats().json_cast_ns > 0) {
RuntimeProfile::Counter* c = ADD_CHILD_TIMER(_runtime_profile, "FlatJsonCast", parent_name);
COUNTER_UPDATE(c, _reader->stats().json_cast_ns);
}
if (_reader->stats().json_merge_ns > 0) {
RuntimeProfile::Counter* c = ADD_CHILD_TIMER(_runtime_profile, "FlatJsonMerge", parent_name);
COUNTER_UPDATE(c, _reader->stats().json_merge_ns);
}
if (_reader->stats().json_flatten_ns > 0) {
RuntimeProfile::Counter* c = ADD_CHILD_TIMER(_runtime_profile, "FlatJsonFlatten", parent_name);
COUNTER_UPDATE(c, _reader->stats().json_flatten_ns);
}
}

} // namespace starrocks::pipeline
1 change: 0 additions & 1 deletion be/src/exec/pipeline/scan/olap_chunk_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ class OlapChunkSource final : public ChunkSource {
RuntimeProfile::Counter* _total_columns_data_page_count = nullptr;
RuntimeProfile::Counter* _read_pk_index_timer = nullptr;
RuntimeProfile::Counter* _pushdown_access_paths_counter = nullptr;
RuntimeProfile::Counter* _json_flatten_timer = nullptr;
RuntimeProfile::Counter* _access_path_hits_counter = nullptr;
RuntimeProfile::Counter* _access_path_unhits_counter = nullptr;
};
Expand Down
4 changes: 2 additions & 2 deletions be/src/exprs/json_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -536,15 +536,15 @@ static StatusOr<ColumnPtr> _extract_with_hyper(NativeJsonState* state, const std
state->real_path.paths.emplace_back(p);
}

state->init_flat = true;
state->flat_path = flat_path.substr(1);
if (in_flat) {
state->is_partial_match = false;
state->flat_column_type = TargetType;
} else {
state->is_partial_match = true;
state->flat_column_type = TYPE_JSON;
}
state->flat_path = flat_path.substr(1);
state->init_flat = true;
});
}
std::vector<std::string> dst_path{state->flat_path};
Expand Down
4 changes: 0 additions & 4 deletions be/src/storage/meta_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,6 @@ Status SegmentMetaCollecter::_collect(const std::string& name, ColumnId cid, Col
}

Status SegmentMetaCollecter::_collect_flat_json(ColumnId cid, Column* column) {
if (cid >= _segment->num_columns()) {
return Status::NotFound("error column id");
}

const ColumnReader* col_reader = _segment->column(cid);
if (col_reader == nullptr) {
return Status::NotFound("don't found column");
Expand Down
9 changes: 9 additions & 0 deletions be/src/storage/rowset/json_column_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ Status JsonFlatColumnIterator::init(const ColumnIteratorOptions& opts) {
for (int i = 0; i < _source_paths.size(); i++) {
opts.stats->flat_json_hits[_source_paths[i]] += 1;
}
if (has_remain) {
opts.stats->flat_json_hits["remain"] += 1;
}
return Status::OK();
}

Expand Down Expand Up @@ -162,6 +165,9 @@ Status JsonFlatColumnIterator::init(const ColumnIteratorOptions& opts) {
for (int i = 0; i < fp.size(); i++) {
opts.stats->dynamic_json_hits[fp[i]] += 1;
}
if (has_remain) {
opts.stats->flat_json_hits["remain"] += 1;
}
}

return Status::OK();
Expand Down Expand Up @@ -490,6 +496,9 @@ Status JsonMergeIterator::init(const ColumnIteratorOptions& opts) {
for (auto& p : _src_paths) {
opts.stats->merge_json_hits[p] += 1;
}
if (has_remain) {
opts.stats->merge_json_hits["remain"] += 1;
}
SCOPED_RAW_TIMER(&_opts.stats->json_init_ns);
_merger = std::make_unique<JsonMerger>(_src_paths, _src_types, has_remain);

Expand Down
17 changes: 12 additions & 5 deletions be/src/util/json_flattener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -505,13 +505,20 @@ void JsonPathDeriver::_finalize() {

// sort by name, just for stable order
size_t limit = config::json_flat_column_max > 0 ? config::json_flat_column_max : std::numeric_limits<size_t>::max();
std::sort(hit_leaf.begin(), hit_leaf.end(), [&](const auto& a, const auto& b) {
auto desc_a = _derived_maps[a.first];
auto desc_b = _derived_maps[b.first];
return desc_a.hits > desc_b.hits;
});
for (size_t i = limit; i < hit_leaf.size(); i++) {
hit_leaf[i].first->remain = true;
}
if (hit_leaf.size() > limit) {
_has_remain |= true;
hit_leaf.resize(limit);
}
std::sort(hit_leaf.begin(), hit_leaf.end(), [](const auto& a, const auto& b) { return a.second < b.second; });
for (auto& [node, path] : hit_leaf) {
if (_paths.size() >= limit) {
node->remain = true;
_has_remain |= true;
continue;
}
node->index = _paths.size();
_paths.emplace_back(path.substr(1));
_types.emplace_back(node->type);
Expand Down
81 changes: 81 additions & 0 deletions be/test/util/json_flattener_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,4 +270,85 @@ TEST_F(JsonFlattenerTest, testMiddleJson) {
EXPECT_EQ(R"({"g1": 123})", result[1]->debug_item(1));
}

TEST_F(JsonFlattenerTest, testSortHitNums) {
// clang-format off
std::vector<std::string> jsons = {
R"({"k1": 10, "k2": 20, "k3": 30, "k4": 40, "k5": 50, "k6": 60, "k7": 70, "k8": 80, "k9": 90, "a10": 100, "k11": 110, "h12": 120, "k13": 130, "g14": 140, "k15": 150, "z16": 160, "e17": 170, "c18": 180, "a19": 190, "b20": 200})",
R"({"k2": 25, "k3": 35, "k4": 45, "k5": 55, "k6": 65, "k7": 75, "k8": 85, "k9": 95, "a10": 105, "k11": 115, "h12": 125, "k13": 135, "g14": 145, "k15": 155, "z16": 165, "e17": 175, "c18": 185, "a19": 195, "b20": 205})",
R"({"k3": 32, "k4": 42, "k5": 52, "k6": 62, "k7": 72, "k8": 82, "k9": 92, "a10": 102, "k11": 112, "h12": 122, "k13": 132, "g14": 142, "k15": 152, "z16": 162, "e17": 172, "c18": 182, "a19": 192, "b20": 202})",
R"({"k4": 48, "k5": 58, "k6": 68, "k7": 78, "k8": 88, "k9": 98, "a10": 108, "k11": 118, "h12": 128, "k13": 138, "g14": 148, "k15": 158, "z16": 168, "e17": 178, "c18": 188, "a19": 198, "b20": 208})",
R"({"k5": 54, "k6": 64, "k7": 74, "k8": 84, "k9": 94, "a10": 104, "k11": 114, "h12": 124, "k13": 134, "g14": 144, "k15": 154, "z16": 164, "e17": 174, "c18": 184, "a19": 194, "b20": 204})",
R"({"k6": 66, "k7": 76, "k8": 86, "k9": 96, "a10": 106, "k11": 116, "h12": 126, "k13": 136, "g14": 146, "k15": 156, "z16": 166, "e17": 176, "c18": 186, "a19": 196, "b20": 206})",
R"({"k7": 79, "k8": 89, "k9": 99, "a10": 109, "k11": 119, "h12": 129, "k13": 139, "g14": 149, "k15": 159, "z16": 169, "e17": 179, "c18": 189, "a19": 199, "b20": 209})",
R"({"k8": 81, "k9": 91, "a10": 101, "k11": 111, "h12": 121, "k13": 131, "g14": 141, "k15": 151, "z16": 161, "e17": 171, "c18": 181, "a19": 191, "b20": 201})",
R"({"k9": 93, "a10": 103, "k11": 113, "h12": 123, "k13": 133, "g14": 143, "k15": 153, "z16": 163, "e17": 173, "c18": 183, "a19": 193, "b20": 203})",
R"({"a10": 107, "k11": 117, "h12": 127, "k13": 137, "g14": 147, "k15": 157, "z16": 167, "e17": 177, "c18": 187, "a19": 197, "b20": 207})",
R"({"k11": 130, "h12": 140, "k13": 150, "g14": 160, "k15": 170, "z16": 180, "e17": 190, "c18": 200, "a19": 210, "b20": 220})",
R"({"h12": 145, "k13": 155, "g14": 165, "k15": 175, "z16": 185, "e17": 195, "c18": 205, "a19": 215, "b20": 225})",
R"({"k13": 152, "g14": 162, "k15": 172, "z16": 182, "e17": 192, "c18": 202, "a19": 212, "b20": 222})",
R"({"g14": 168, "k15": 178, "z16": 188, "e17": 198, "c18": 208, "a19": 218, "b20": 228})",
R"({"k15": 174, "z16": 184, "e17": 194, "c18": 204, "a19": 214, "b20": 224})",
R"({"z16": 186, "e17": 196, "c18": 206, "a19": 216, "b20": 226})",
R"({"e17": 199, "c18": 209, "a19": 219, "b20": 229})",
R"({"c18": 201, "a19": 211, "b20": 221})",
R"({"a19": 213, "b20": 223})",
R"({"b20": 227})"
};
// clang-format on

ColumnPtr input = JsonColumn::create();
JsonColumn* json_input = down_cast<JsonColumn*>(input.get());
for (const auto& json : jsons) {
ASSIGN_OR_ABORT(auto json_value, JsonValue::parse(json));
json_input->append(&json_value);
}

{
config::json_flat_sparsity_factor = 0.3;
JsonPathDeriver jf;
jf.derived({json_input});

auto& result = jf.flat_paths();
std::vector<std::string> paths = {"a10", "a19", "b20", "c18", "e17", "g14", "h12", "k11",
"k13", "k15", "k6", "k7", "k8", "k9", "z16"};
EXPECT_EQ(true, jf.has_remain_json());
EXPECT_EQ(paths, result);
}

{
config::json_flat_sparsity_factor = 0.4;
JsonPathDeriver jf;
jf.derived({json_input});

auto& result = jf.flat_paths();
std::vector<std::string> paths = {"a10", "a19", "b20", "c18", "e17", "g14", "h12",
"k11", "k13", "k15", "k8", "k9", "z16"};
EXPECT_EQ(true, jf.has_remain_json());
EXPECT_EQ(paths, result);
}

{
config::json_flat_sparsity_factor = 0.8;
JsonPathDeriver jf;
jf.derived({json_input});

auto& result = jf.flat_paths();
std::vector<std::string> paths = {"a19", "b20", "c18", "e17", "z16"};
EXPECT_EQ(true, jf.has_remain_json());
EXPECT_EQ(paths, result);
}

{
config::json_flat_sparsity_factor = 0;
JsonPathDeriver jf;
jf.derived({json_input});

auto& result = jf.flat_paths();
std::vector<std::string> paths = {"a10", "a19", "b20", "c18", "e17", "g14", "h12", "k1", "k11", "k13",
"k15", "k2", "k3", "k4", "k5", "k6", "k7", "k8", "k9", "z16"};
EXPECT_EQ(true, jf.has_remain_json());
EXPECT_EQ(paths, result);
}
}

} // namespace starrocks

0 comments on commit 2ad4ba4

Please sign in to comment.