Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Optimize](simd json reader) Cached search results for previous row (keyed as index in JSON object) - used as a hint. #17124

Merged
merged 2 commits into from
Feb 27, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[Optimize](simd json reader) Cached search results for previous row (…
…keyed as index in JSON object) - used as a hint.

`_simdjson_set_column_value` could become a hot spot while parsing json in simdjson mode,
introduce `_prev_positions` to cache results for previous row (keyed as index in JSON object) due to the json name field order,
should be quite the same between each lines
  • Loading branch information
eldenmoon committed Feb 24, 2023
commit 67a40be87cc7e45581a3f620f3e2edf0587670f3
71 changes: 45 additions & 26 deletions be/src/vec/exec/format/json/new_json_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1042,9 +1042,10 @@ Status NewJsonReader::_simdjson_init_reader() {
}
_ondemand_json_parser = std::make_unique<simdjson::ondemand::parser>();
for (int i = 0; i < _file_slot_descs.size(); ++i) {
_slot_desc_index.emplace(_file_slot_descs[i]->col_name(), i);
_slot_desc_index[_file_slot_descs[i]->col_name()] = i;
}
_simdjson_ondemand_padding_buffer.resize(_padded_size);
_prev_positions.resize(_file_slot_descs.size());
return Status::OK();
}

Expand Down Expand Up @@ -1303,27 +1304,50 @@ Status NewJsonReader::_simdjson_handle_nested_complex_json(
return Status::OK();
}

size_t NewJsonReader::_column_index(const StringRef& name, size_t key_index) {
/// Optimization by caching the order of fields (which is almost always the same)
/// and a quick check to match the next expected field, instead of searching the hash table.
if (_prev_positions.size() > key_index && _prev_positions[key_index] &&
name == _prev_positions[key_index]->get_first()) {
return _prev_positions[key_index]->get_second();
} else {
auto* it = _slot_desc_index.find(name);
if (it) {
if (key_index < _prev_positions.size()) {
_prev_positions[key_index] = it;
}
return it->get_second();
} else {
return size_t(-1);
}
}
}

Status NewJsonReader::_simdjson_set_column_value(simdjson::ondemand::object* value,
std::vector<MutableColumnPtr>& columns,
const std::vector<SlotDescriptor*>& slot_descs,
bool* valid) {
// set
_seen_columns.assign(columns.size(), false);
size_t cur_row_count = columns[0]->size();
bool has_valid_value = false;
// iterate through object, simdjson::ondemond will parsing on the fly
size_t key_index = 0;
for (auto field : *value) {
std::string_view key = field.unescaped_key();
auto iter = _slot_desc_index.find(std::string(key));
if (iter == _slot_desc_index.end()) {
StringRef name_ref(key.data(), key.size());
const size_t column_index = _column_index(name_ref, key_index++);
if (UNLIKELY(ssize_t(column_index) < 0)) {
// This key is not exist in slot desc, just ignore
continue;
}
simdjson::ondemand::value val = field.value();
RETURN_IF_ERROR(_simdjson_write_data_to_column(val, slot_descs[iter->second],
columns[iter->second].get(), valid));
RETURN_IF_ERROR(_simdjson_write_data_to_column(val, slot_descs[column_index],
columns[column_index].get(), valid));
if (!(*valid)) {
return Status::OK();
}
_seen_columns[column_index] = true;
has_valid_value = true;
}
if (!has_valid_value) {
Expand All @@ -1334,22 +1358,31 @@ Status NewJsonReader::_simdjson_set_column_value(simdjson::ondemand::object* val

// fill missing slot
int nullcount = 0;
int ctx_idx = 0;
for (auto slot_desc : slot_descs) {
for (size_t i = 0; i < slot_descs.size(); ++i) {
if (_seen_columns[i]) {
continue;
}
auto slot_desc = slot_descs[i];
if (!slot_desc->is_materialized()) {
continue;
}
int dest_index = ctx_idx++;
auto* column_ptr = columns[dest_index].get();
auto* column_ptr = columns[i].get();
if (column_ptr->size() < cur_row_count + 1) {
DCHECK(column_ptr->size() == cur_row_count);
column_ptr->assume_mutable()->insert_default();
++nullcount;
}
DCHECK(column_ptr->size() == cur_row_count + 1);
}

#ifndef NDEBUG
// Check all columns rows matched
for (size_t i = 0; i < columns.size(); ++i) {
DCHECK_EQ(columns[i]->size(), cur_row_count + 1);
}
// There is at least one valid value here
DCHECK(nullcount < columns.size());
#endif
*valid = true;
return Status::OK();
}
Expand Down Expand Up @@ -1389,27 +1422,13 @@ Status NewJsonReader::_simdjson_write_data_to_column(simdjson::ondemand::value&
}
break;
}
case simdjson::ondemand::json_type::object:
case simdjson::ondemand::json_type::array: {
auto str_view = simdjson::to_json_string(value).value();
std::string value_str(str_view.data(), str_view.size());
// compact json value
value_str.erase(std::remove_if(value_str.begin(), value_str.end(),
[](const char& c) {
// white space
return c == ' ' || c == '\t' || c == '\n' || c == '\r' ||
c == '\f' || c == '\v';
}),
value_str.end());
nullable_column->get_null_map_data().push_back(0);
column_string->insert_data(value_str.data(), value_str.length());
break;
}
default: {
auto str_view = simdjson::to_json_string(value).value();
if (value.type() == simdjson::ondemand::json_type::string) {
nullable_column->get_null_map_data().push_back(0);
// trim
str_view = str_view.substr(1, str_view.length() - 2);
column_string->insert_data(str_view.data() + 1, str_view.length() - 2);
break;
}
nullable_column->get_null_map_data().push_back(0);
column_string->insert_data(str_view.data(), str_view.length());
Expand Down
13 changes: 12 additions & 1 deletion be/src/vec/exec/format/json/new_json_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "io/fs/file_reader.h"
#include "olap/iterators.h"
#include "vec/common/hash_table/hash_map.h"
#include "vec/exec/format/generic_reader.h"

namespace doris {
Expand Down Expand Up @@ -138,6 +139,8 @@ class NewJsonReader : public GenericReader {
Status _append_error_msg(simdjson::ondemand::object* obj, std::string error_msg,
std::string col_name, bool* valid);

size_t _column_index(const StringRef& name, size_t key_index);

Status (NewJsonReader::*_vhandle_json_callback)(
std::vector<vectorized::MutableColumnPtr>& columns,
const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row, bool* eof);
Expand Down Expand Up @@ -194,8 +197,16 @@ class NewJsonReader : public GenericReader {
RuntimeProfile::Counter* _file_read_timer;

bool _is_dynamic_schema = false;

// ======SIMD JSON======
// name mapping
phmap::flat_hash_map<String, size_t> _slot_desc_index;
/// Hash table match `field name -> position in the block`. NOTE You can use perfect hash map.
using NameMap = HashMap<StringRef, size_t, StringRefHash>;
NameMap _slot_desc_index;
/// Cached search results for previous row (keyed as index in JSON object) - used as a hint.
std::vector<NameMap::LookupResult> _prev_positions;
/// Set of columns which already met in row. Exception is thrown if there are more than one column with the same name.
std::vector<UInt8> _seen_columns;
// simdjson
static constexpr size_t _init_buffer_size = 1024 * 1024 * 8;
size_t _padded_size = _init_buffer_size + simdjson::SIMDJSON_PADDING;
Expand Down