Skip to content

Commit

Permalink
[Feature] Stream load support JSON to STRUCT/MAP/ARRAY (#45406)
Browse files Browse the repository at this point in the history
Signed-off-by: ricky <rickif@qq.com>
Co-authored-by: wyb <wybb86@gmail.com>
(cherry picked from commit a4f53d3)
  • Loading branch information
rickif authored and mergify[bot] committed May 30, 2024
1 parent 8909cff commit 3a18269
Show file tree
Hide file tree
Showing 14 changed files with 482 additions and 86 deletions.
128 changes: 51 additions & 77 deletions be/src/exec/json_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,46 @@ void JsonScanner::close() {
FileScanner::close();
}

static TypeDescriptor construct_json_type(const TypeDescriptor& src_type) {
switch (src_type.type) {
case TYPE_ARRAY: {
TypeDescriptor json_type(TYPE_ARRAY);
const auto& child_type = src_type.children[0];
json_type.children.emplace_back(construct_json_type(child_type));
return json_type;
}
case TYPE_STRUCT: {
TypeDescriptor json_type(TYPE_STRUCT);
json_type.field_names = src_type.field_names;
for (auto& child_type : src_type.children) {
json_type.children.emplace_back(construct_json_type(child_type));
}
return json_type;
}
case TYPE_MAP: {
TypeDescriptor json_type(TYPE_MAP);
const auto& key_type = src_type.children[0];
const auto& value_type = src_type.children[1];
json_type.children.emplace_back(construct_json_type(key_type));
json_type.children.emplace_back(construct_json_type(value_type));
return json_type;
}
case TYPE_FLOAT:
case TYPE_DOUBLE:
case TYPE_BIGINT:
case TYPE_INT:
case TYPE_SMALLINT:
case TYPE_TINYINT:
case TYPE_VARCHAR:
case TYPE_JSON: {
return src_type;
}
default:
// Treat other types as VARCHAR.
return TypeDescriptor::create_varchar_type(TypeDescriptor::MAX_VARCHAR_LENGTH);
}
}

Status JsonScanner::_construct_json_types() {
size_t slot_size = _src_slot_descriptors.size();
_json_types.resize(slot_size);
Expand All @@ -130,77 +170,7 @@ Status JsonScanner::_construct_json_types() {
continue;
}

switch (slot_desc->type().type) {
case TYPE_ARRAY: {
TypeDescriptor json_type(TYPE_ARRAY);
TypeDescriptor* child_type = &json_type;

const TypeDescriptor* slot_type = &(slot_desc->type().children[0]);
while (slot_type->type == TYPE_ARRAY) {
slot_type = &(slot_type->children[0]);

child_type->children.emplace_back(TYPE_ARRAY);
child_type = &(child_type->children[0]);
}

// the json lib don't support get_int128_t(), so we load with BinaryColumn and then convert to LargeIntColumn
if (slot_type->type == TYPE_FLOAT || slot_type->type == TYPE_DOUBLE || slot_type->type == TYPE_BIGINT ||
slot_type->type == TYPE_INT || slot_type->type == TYPE_SMALLINT || slot_type->type == TYPE_TINYINT) {
// Treat these types as what they are.
child_type->children.emplace_back(slot_type->type);
} else if (slot_type->type == TYPE_VARCHAR) {
auto varchar_type = TypeDescriptor::create_varchar_type(slot_type->len);
child_type->children.emplace_back(varchar_type);
} else if (slot_type->type == TYPE_CHAR) {
auto char_type = TypeDescriptor::create_char_type(slot_type->len);
child_type->children.emplace_back(char_type);
} else if (slot_type->type == TYPE_JSON) {
child_type->children.emplace_back(TypeDescriptor::create_json_type());
} else {
// Treat other types as VARCHAR.
auto varchar_type = TypeDescriptor::create_varchar_type(TypeDescriptor::MAX_VARCHAR_LENGTH);
child_type->children.emplace_back(varchar_type);
}

_json_types[column_pos] = std::move(json_type);
break;
}

// Treat these types as what they are.
case TYPE_FLOAT:
case TYPE_DOUBLE:
case TYPE_BIGINT:
case TYPE_INT:
case TYPE_SMALLINT:
case TYPE_TINYINT: {
_json_types[column_pos] = TypeDescriptor{slot_desc->type().type};
break;
}

case TYPE_CHAR: {
auto char_type = TypeDescriptor::create_char_type(slot_desc->type().len);
_json_types[column_pos] = std::move(char_type);
break;
}

case TYPE_VARCHAR: {
auto varchar_type = TypeDescriptor::create_varchar_type(slot_desc->type().len);
_json_types[column_pos] = std::move(varchar_type);
break;
}

case TYPE_JSON: {
_json_types[column_pos] = TypeDescriptor::create_json_type();
break;
}

// Treat other types as VARCHAR.
default: {
auto varchar_type = TypeDescriptor::create_varchar_type(TypeDescriptor::MAX_VARCHAR_LENGTH);
_json_types[column_pos] = std::move(varchar_type);
break;
}
}
_json_types[column_pos] = construct_json_type(slot_desc->type());
}
return Status::OK();
}
Expand Down Expand Up @@ -306,8 +276,8 @@ Status JsonScanner::_open_next_reader() {
LOG(WARNING) << "Failed to create sequential files: " << st.to_string();
return st;
}
_cur_file_reader =
std::make_unique<JsonReader>(_state, _counter, this, file, _strict_mode, _src_slot_descriptors, range_desc);
_cur_file_reader = std::make_unique<JsonReader>(_state, _counter, this, file, _strict_mode, _src_slot_descriptors,
_json_types, range_desc);
RETURN_IF_ERROR(_cur_file_reader->open());
_next_range++;
return Status::OK();
Expand All @@ -334,17 +304,19 @@ StatusOr<ChunkPtr> JsonScanner::_cast_chunk(const starrocks::ChunkPtr& src_chunk

JsonReader::JsonReader(starrocks::RuntimeState* state, starrocks::ScannerCounter* counter, JsonScanner* scanner,
std::shared_ptr<SequentialFile> file, bool strict_mode, std::vector<SlotDescriptor*> slot_descs,
const TBrokerRangeDesc& range_desc)
std::vector<TypeDescriptor> type_descs, const TBrokerRangeDesc& range_desc)
: _state(state),
_counter(counter),
_scanner(scanner),
_strict_mode(strict_mode),
_file(std::move(file)),
_slot_descs(std::move(slot_descs)),
_type_descs(type_descs),
_op_col_index(-1),
_range_desc(range_desc) {
int index = 0;
for (const auto& desc : _slot_descs) {
for (size_t i = 0; i < _slot_descs.size(); ++i) {
const auto& desc = _slot_descs[i];
if (desc == nullptr) {
continue;
}
Expand All @@ -353,6 +325,7 @@ JsonReader::JsonReader(starrocks::RuntimeState* state, starrocks::ScannerCounter
}
index++;
_slot_desc_dict.emplace(desc->col_name(), desc);
_type_desc_dict.emplace(desc->col_name(), _type_descs[i]);
}
}

Expand Down Expand Up @@ -565,15 +538,16 @@ Status JsonReader::_construct_row_without_jsonpath(simdjson::ondemand::object* r
}

auto slot_desc = itr->second;
auto type_desc = _type_desc_dict[key];

// update the prev parsed position
column_index = chunk->get_index_by_slot_id(slot_desc->id());
if (_prev_parsed_position.size() <= key_index) {
_prev_parsed_position.emplace_back(key, column_index, slot_desc->type());
_prev_parsed_position.emplace_back(key, column_index, type_desc);
} else {
_prev_parsed_position[key_index].key = key;
_prev_parsed_position[key_index].column_index = column_index;
_prev_parsed_position[key_index].type = slot_desc->type();
_prev_parsed_position[key_index].type = type_desc;
}
}

Expand Down
5 changes: 4 additions & 1 deletion be/src/exec/json_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ class JsonScanner : public FileScanner {
class JsonReader {
public:
JsonReader(RuntimeState* state, ScannerCounter* counter, JsonScanner* scanner, std::shared_ptr<SequentialFile> file,
bool strict_mode, std::vector<SlotDescriptor*> slot_descs, const TBrokerRangeDesc& range_desc);
bool strict_mode, std::vector<SlotDescriptor*> slot_descs, std::vector<TypeDescriptor> types,
const TBrokerRangeDesc& range_desc);

~JsonReader();

Expand Down Expand Up @@ -131,9 +132,11 @@ class JsonReader {
std::shared_ptr<SequentialFile> _file;
bool _closed = false;
std::vector<SlotDescriptor*> _slot_descs;
std::vector<TypeDescriptor> _type_descs;
//Attention: _slot_desc_dict's key is the string_view of the column of _slot_descs,
// so the lifecycle of _slot_descs should be longer than _slot_desc_dict;
std::unordered_map<std::string_view, SlotDescriptor*> _slot_desc_dict;
std::unordered_map<std::string_view, TypeDescriptor> _type_desc_dict;

// For performance reason, the simdjson parser should be reused over several files.
//https://github.com/simdjson/simdjson/blob/master/doc/performance.md
Expand Down
2 changes: 2 additions & 0 deletions be/src/formats/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ add_library(Formats STATIC
json/nullable_column.cpp
json/numeric_column.cpp
json/binary_column.cpp
json/struct_column.cpp
json/map_column.cpp
avro/nullable_column.cpp
avro/numeric_column.cpp
avro/binary_column.cpp
Expand Down
66 changes: 66 additions & 0 deletions be/src/formats/json/map_column.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "column/map_column.h"

#include "fmt/format.h"
#include "formats/json/map_column.h"
#include "formats/json/nullable_column.h"
#include "gutil/strings/substitute.h"

namespace starrocks {

Status add_map_column(Column* column, const TypeDescriptor& type_desc, const std::string& name,
simdjson::ondemand::value* value) {
auto map_column = down_cast<MapColumn*>(column);

try {
if (value->type() != simdjson::ondemand::json_type::object) {
std::ostringstream ss;
ss << "Expected value type [object], got [" << value->type() << "]";
return Status::DataQualityError(ss.str());
}
simdjson::ondemand::object obj = value->get_object();
simdjson::ondemand::parser parser;
size_t field_count = 0;
for (auto field : obj) {
{
// This is a tricky way to transform a std::string to simdjson:ondemand:value
std::string_view field_name_str = field.unescaped_key();
auto dummy_json = simdjson::padded_string(R"({"dummy_key": ")" + std::string(field_name_str) + R"("})");
simdjson::ondemand::document doc = parser.iterate(dummy_json);
simdjson::ondemand::object obj = doc.get_object();
simdjson::ondemand::value field_key = obj.find_field("dummy_key");

RETURN_IF_ERROR(add_nullable_column(map_column->keys_column().get(), type_desc.children[0], name,
&field_key, true));
}

{
simdjson::ondemand::value field_value = field.value();
RETURN_IF_ERROR(add_nullable_column(map_column->values_column().get(), type_desc.children[1], name,
&field_value, true));
}
++field_count;
}
map_column->offsets_column()->append(map_column->offsets_column()->get_data().back() + field_count);

return Status::OK();
} catch (simdjson::simdjson_error& e) {
auto err_msg = strings::Substitute("Failed to parse value as object, column=$0, error=$1", name,
simdjson::error_message(e.error()));
return Status::DataQualityError(err_msg);
}
}
} // namespace starrocks
27 changes: 27 additions & 0 deletions be/src/formats/json/map_column.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <string>

#include "column/column.h"
#include "common/status.h"
#include "runtime/types.h"
#include "simdjson.h"

namespace starrocks {
Status add_map_column(Column* column, const TypeDescriptor& type_desc, const std::string& name,
simdjson::ondemand::value* value);
} // namespace starrocks
Loading

0 comments on commit 3a18269

Please sign in to comment.