Skip to content

Commit

Permalink
[Bug] Fix the bug of Largetint and Decimal json load failed. (apache#…
Browse files Browse the repository at this point in the history
…4983)

Use param of json load "num_as_string" to use flag kParseNumbersAsStringsFlag to parse json data.
  • Loading branch information
HappenLee authored Dec 6, 2020
1 parent b1b99ae commit b954dfd
Show file tree
Hide file tree
Showing 18 changed files with 374 additions and 29 deletions.
48 changes: 34 additions & 14 deletions be/src/exec/json_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ Status JsonScanner::open_next_reader() {
std::string json_root = "";
std::string jsonpath = "";
bool strip_outer_array = false;
bool num_as_string = false;

if (range.__isset.jsonpaths) {
jsonpath = range.jsonpaths;
}
Expand All @@ -147,7 +149,10 @@ Status JsonScanner::open_next_reader() {
if (range.__isset.strip_outer_array) {
strip_outer_array = range.strip_outer_array;
}
_cur_file_reader = new JsonReader(_state, _counter, _profile, file, strip_outer_array);
if (range.__isset.num_as_string) {
num_as_string = range.num_as_string;
}
_cur_file_reader = new JsonReader(_state, _counter, _profile, file, strip_outer_array, num_as_string);
RETURN_IF_ERROR(_cur_file_reader->init(jsonpath, json_root));

return Status::OK();
Expand Down Expand Up @@ -178,18 +183,23 @@ rapidjson::Value::ConstValueIterator JsonDataInternal::get_next() {
}

////// class JsonReader
JsonReader::JsonReader(RuntimeState* state, ScannerCounter* counter, RuntimeProfile* profile,
FileReader* file_reader, bool strip_outer_array)
: _handle_json_callback(nullptr),
_next_line(0),
_total_lines(0),
_state(state),
_counter(counter),
_profile(profile),
_file_reader(file_reader),
_closed(false),
_strip_outer_array(strip_outer_array),
_json_doc(nullptr) {
JsonReader::JsonReader(
RuntimeState* state, ScannerCounter* counter,
RuntimeProfile* profile,
FileReader* file_reader,
bool strip_outer_array,
bool num_as_string) :
_handle_json_callback(nullptr),
_next_line(0),
_total_lines(0),
_state(state),
_counter(counter),
_profile(profile),
_file_reader(file_reader),
_closed(false),
_strip_outer_array(strip_outer_array),
_num_as_string(num_as_string),
_json_doc(nullptr) {
_bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES);
_read_timer = ADD_TIMER(_profile, "FileReadTime");
}
Expand Down Expand Up @@ -270,8 +280,18 @@ Status JsonReader::_parse_json_doc(bool* eof) {
*eof = true;
return Status::OK();
}

bool has_parse_error = false;
// parse jsondata to JsonDoc
if (_origin_json_doc.Parse((char*)json_str, length).HasParseError()) {
// As the issue: https://github.com/Tencent/rapidjson/issues/1458
// Now, rapidjson only support uint64_t, So lagreint load cause bug. We use kParseNumbersAsStringsFlag.
if (_num_as_string) {
has_parse_error = _origin_json_doc.Parse<rapidjson::kParseNumbersAsStringsFlag>((char*)json_str, length).HasParseError();
} else {
has_parse_error = _origin_json_doc.Parse((char*)json_str, length).HasParseError();
}

if (has_parse_error) {
std::stringstream str_error;
str_error << "Parse json data for JsonDoc failed. code = "
<< _origin_json_doc.GetParseError() << ", error-info:"
Expand Down
6 changes: 4 additions & 2 deletions be/src/exec/json_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ struct JsonPath;
// return other error Status if encounter other errors.
class JsonReader {
public:
JsonReader(RuntimeState* state, ScannerCounter* counter, RuntimeProfile* profile,
FileReader* file_reader, bool strip_outer_array);
JsonReader(RuntimeState* state, ScannerCounter* counter, RuntimeProfile* profile, FileReader* file_reader,
bool strip_outer_array, bool num_as_string);

~JsonReader();

Status init(const std::string& jsonpath, const std::string& json_root); // must call before use
Expand Down Expand Up @@ -150,6 +151,7 @@ class JsonReader {
FileReader* _file_reader;
bool _closed;
bool _strip_outer_array;
bool _num_as_string;
RuntimeProfile::Counter* _bytes_read_counter;
RuntimeProfile::Counter* _read_timer;

Expand Down
9 changes: 9 additions & 0 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,15 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
} else {
request.__set_strip_outer_array(false);
}
if (!http_req->header(HTTP_NUM_AS_STRING).empty()) {
if (boost::iequals(http_req->header(HTTP_NUM_AS_STRING), "true")) {
request.__set_num_as_string(true);
} else {
request.__set_num_as_string(false);
}
} else {
request.__set_num_as_string(false);
}
if (!http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL).empty()) {
request.__set_sequence_col(
http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL));
Expand Down
1 change: 1 addition & 0 deletions be/src/http/http_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ static const std::string HTTP_EXEC_MEM_LIMIT = "exec_mem_limit";
static const std::string HTTP_JSONPATHS = "jsonpaths";
static const std::string HTTP_JSONROOT = "json_root";
static const std::string HTTP_STRIP_OUTER_ARRAY = "strip_outer_array";
static const std::string HTTP_NUM_AS_STRING = "num_as_string";
static const std::string HTTP_MERGE_TYPE = "merge_type";
static const std::string HTTP_DELETE_CONDITION = "delete";
static const std::string HTTP_FUNCTION_COLUMN = "function_column";
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/raw_value.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,11 @@ void RawValue::print_value(const void* value, const TypeDescriptor& type, int sc
break;

case TYPE_DECIMAL:
*stream << *reinterpret_cast<const DecimalValue*>(value);
*stream << reinterpret_cast<const DecimalValue*>(value)->to_string();
break;

case TYPE_DECIMALV2:
*stream << reinterpret_cast<const PackedInt128*>(value)->value;
*stream << DecimalV2Value(reinterpret_cast<const PackedInt128*>(value)->value).to_string();
break;

case TYPE_LARGEINT:
Expand Down
194 changes: 191 additions & 3 deletions be/test/exec/json_scanner_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "exprs/cast_functions.h"
#include "gen_cpp/Descriptors_types.h"
#include "gen_cpp/PlanNodes_types.h"
#include "exprs/decimalv2_operators.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/row_batch.h"
Expand All @@ -49,6 +50,7 @@ class JsonScannerTest : public testing::Test {
UserFunctionCache::instance()->init(
"./be/test/runtime/test_data/user_function_cache/normal");
CastFunctions::init();
DecimalV2Operators::init();
}

protected:
Expand All @@ -70,11 +72,11 @@ class JsonScannerTest : public testing::Test {

#define TUPLE_ID_DST 0
#define TUPLE_ID_SRC 1
#define COLUMN_NUMBERS 4
#define COLUMN_NUMBERS 6
#define DST_TUPLE_SLOT_ID_START 1
#define SRC_TUPLE_SLOT_ID_START 5
#define SRC_TUPLE_SLOT_ID_START 7
int JsonScannerTest::create_src_tuple(TDescriptorTable& t_desc_table, int next_slot_id) {
const char* columnNames[] = {"category", "author", "title", "price"};
const char *columnNames[] = {"category","author","title","price", "largeint", "decimal"};
for (int i = 0; i < COLUMN_NUMBERS; i++) {
TSlotDescriptor slot_desc;

Expand Down Expand Up @@ -223,6 +225,62 @@ int JsonScannerTest::create_dst_tuple(TDescriptorTable& t_desc_table, int next_s
t_desc_table.slotDescriptors.push_back(slot_desc);
}
byteOffset += 8;
{// lagreint
TSlotDescriptor slot_desc;

slot_desc.id = next_slot_id++;
slot_desc.parent = 0;
TTypeDesc type;
{
TTypeNode node;
node.__set_type(TTypeNodeType::SCALAR);
TScalarType scalar_type;
scalar_type.__set_type(TPrimitiveType::LARGEINT);
node.__set_scalar_type(scalar_type);
type.types.push_back(node);
}
slot_desc.slotType = type;
slot_desc.columnPos = 4;
slot_desc.byteOffset = byteOffset;
slot_desc.nullIndicatorByte = 0;
slot_desc.nullIndicatorBit = 4;
slot_desc.colName = "lagreint";
slot_desc.slotIdx = 5;
slot_desc.isMaterialized = true;

t_desc_table.slotDescriptors.push_back(slot_desc);
}
byteOffset += 16;
{// decimal
TSlotDescriptor slot_desc;

slot_desc.id = next_slot_id++;
slot_desc.parent = 0;
TTypeDesc type;
{
TTypeNode node;
node.__set_type(TTypeNodeType::SCALAR);
TScalarType scalar_type;
scalar_type.__isset.precision = true;
scalar_type.__isset.scale = true;
scalar_type.__set_precision(-1);
scalar_type.__set_scale(-1);
scalar_type.__set_type(TPrimitiveType::DECIMALV2);
node.__set_scalar_type(scalar_type);
type.types.push_back(node);
}
slot_desc.slotType = type;
slot_desc.columnPos = 5;
slot_desc.byteOffset = byteOffset;
slot_desc.nullIndicatorByte = 0;
slot_desc.nullIndicatorBit = 5;
slot_desc.colName = "decimal";
slot_desc.slotIdx = 6;
slot_desc.isMaterialized = true;

t_desc_table.slotDescriptors.push_back(slot_desc);
}

t_desc_table.__isset.slotDescriptors = true;
{
// TTupleDescriptor dest
Expand Down Expand Up @@ -363,6 +421,94 @@ void JsonScannerTest::create_expr_info() {
_params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 3, expr);
_params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 3);
}
// largeint VARCHAR --> LargeInt
{
TTypeDesc int_type;
{
TTypeNode node;
node.__set_type(TTypeNodeType::SCALAR);
TScalarType scalar_type;
scalar_type.__set_type(TPrimitiveType::LARGEINT);
node.__set_scalar_type(scalar_type);
int_type.types.push_back(node);
}
TExprNode cast_expr;
cast_expr.node_type = TExprNodeType::CAST_EXPR;
cast_expr.type = int_type;
cast_expr.__set_opcode(TExprOpcode::CAST);
cast_expr.__set_num_children(1);
cast_expr.__set_output_scale(-1);
cast_expr.__isset.fn = true;
cast_expr.fn.name.function_name = "casttolargeint";
cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN;
cast_expr.fn.arg_types.push_back(varchar_type);
cast_expr.fn.ret_type = int_type;
cast_expr.fn.has_var_args = false;
cast_expr.fn.__set_signature("casttolargeint(VARCHAR(*))");
cast_expr.fn.__isset.scalar_fn = true;
cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_large_int_val";

TExprNode slot_ref;
slot_ref.node_type = TExprNodeType::SLOT_REF;
slot_ref.type = varchar_type;
slot_ref.num_children = 0;
slot_ref.__isset.slot_ref = true;
slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 4; // price id in src tuple
slot_ref.slot_ref.tuple_id = 1;

TExpr expr;
expr.nodes.push_back(cast_expr);
expr.nodes.push_back(slot_ref);

_params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 4, expr);
_params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 4);
}
// decimal VARCHAR --> Decimal
{
TTypeDesc int_type;
{
TTypeNode node;
node.__set_type(TTypeNodeType::SCALAR);
TScalarType scalar_type;
scalar_type.__isset.precision = true;
scalar_type.__isset.scale = true;
scalar_type.__set_precision(-1);
scalar_type.__set_scale(-1);
scalar_type.__set_type(TPrimitiveType::DECIMALV2);
node.__set_scalar_type(scalar_type);
int_type.types.push_back(node);
}
TExprNode cast_expr;
cast_expr.node_type = TExprNodeType::CAST_EXPR;
cast_expr.type = int_type;
cast_expr.__set_opcode(TExprOpcode::CAST);
cast_expr.__set_num_children(1);
cast_expr.__set_output_scale(-1);
cast_expr.__isset.fn = true;
cast_expr.fn.name.function_name = "casttodecimalv2";
cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN;
cast_expr.fn.arg_types.push_back(varchar_type);
cast_expr.fn.ret_type = int_type;
cast_expr.fn.has_var_args = false;
cast_expr.fn.__set_signature("casttodecimalv2(VARCHAR(*))");
cast_expr.fn.__isset.scalar_fn = true;
cast_expr.fn.scalar_fn.symbol = "doris::DecimalV2Operators::cast_to_decimalv2_val";

TExprNode slot_ref;
slot_ref.node_type = TExprNodeType::SLOT_REF;
slot_ref.type = varchar_type;
slot_ref.num_children = 0;
slot_ref.__isset.slot_ref = true;
slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 5; // price id in src tuple
slot_ref.slot_ref.tuple_id = 1;

TExpr expr;
expr.nodes.push_back(cast_expr);
expr.nodes.push_back(slot_ref);

_params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 5, expr);
_params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 5);
}
// _params.__isset.expr_of_dest_slot = true;
_params.__set_dest_tuple_id(TUPLE_ID_DST);
_params.__set_src_tuple_id(TUPLE_ID_SRC);
Expand Down Expand Up @@ -420,6 +566,10 @@ TEST_F(JsonScannerTest, normal_simple_arrayjson) {
status = scan_node.get_next(&_runtime_state, &batch, &eof);
ASSERT_TRUE(status.ok());
ASSERT_EQ(2, batch.num_rows());
// Do not use num_as_string, so largeint is too big is null and decimal value loss precision
auto tuple_str = batch.get_row(1)->get_tuple(0)->to_string(*scan_node.row_desc().tuple_descriptors()[0]);
ASSERT_TRUE(tuple_str.find("1180591620717411303424") == tuple_str.npos);
ASSERT_TRUE(tuple_str.find("9999999999999.999999") == tuple_str.npos);
ASSERT_FALSE(eof);
batch.reset();

Expand All @@ -428,7 +578,45 @@ TEST_F(JsonScannerTest, normal_simple_arrayjson) {
ASSERT_EQ(0, batch.num_rows());
ASSERT_TRUE(eof);

// Use num_as_string load data again
BrokerScanNode scan_node2(&_obj_pool, _tnode, *_desc_tbl);
status = scan_node2.prepare(&_runtime_state);
ASSERT_TRUE(status.ok());
scan_ranges.clear();
{
TScanRangeParams scan_range_params;

TBrokerScanRange broker_scan_range;
broker_scan_range.params = _params;
TBrokerRangeDesc range;
range.start_offset = 0;
range.size = -1;
range.format_type = TFileFormatType::FORMAT_JSON;
range.strip_outer_array = true;
range.num_as_string = true;
range.__isset.strip_outer_array = true;
range.__isset.num_as_string = true;
range.splittable = true;
range.path = "./be/test/exec/test_data/json_scanner/test_simple2.json";
range.file_type = TFileType::FILE_LOCAL;
broker_scan_range.ranges.push_back(range);
scan_range_params.scan_range.__set_broker_scan_range(broker_scan_range);
scan_ranges.push_back(scan_range_params);
}
scan_node2.set_scan_ranges(scan_ranges);
status = scan_node2.open(&_runtime_state);
ASSERT_TRUE(status.ok());

status = scan_node2.get_next(&_runtime_state, &batch, &eof);
ASSERT_TRUE(status.ok());
ASSERT_EQ(2, batch.num_rows());
// Use num as string, load largeint, decimal successfully
tuple_str = batch.get_row(1)->get_tuple(0)->to_string(*scan_node2.row_desc().tuple_descriptors()[0]);
ASSERT_FALSE(tuple_str.find("1180591620717411303424") == tuple_str.npos);
ASSERT_FALSE(tuple_str.find("9999999999999.999999") == tuple_str.npos);

scan_node.close(&_runtime_state);
scan_node2.close(&_runtime_state);
{
std::stringstream ss;
scan_node.runtime_profile()->pretty_print(&ss);
Expand Down
4 changes: 2 additions & 2 deletions be/test/exec/test_data/json_scanner/test_simple2.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[
{"category":"reference","author":"NigelRees","title":"SayingsoftheCentury","price":8.95},
{"category":"fiction","author":"EvelynWaugh","title":"SwordofHonour","price":12.99}
{"category":"reference","author":"NigelRees","title":"SayingsoftheCentury","price":8.95, "largeint":1234, "decimal":1234.1234},
{"category":"fiction","author":"EvelynWaugh","title":"SwordofHonour","price":12.99, "largeint":1180591620717411303424, "decimal":9999999999999.999999}
]

Loading

0 comments on commit b954dfd

Please sign in to comment.