Skip to content

Commit 6566343

Browse files
committed
Recursively construct children for list/struct
Change-Id: Iab4687ef38f889100a8e83fad59c1bec3772810a
1 parent 35c2f85 commit 6566343

File tree

1 file changed

+103
-52
lines changed

1 file changed

+103
-52
lines changed

cpp/src/arrow/ipc/json-internal.cc

Lines changed: 103 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "arrow/ipc/json-internal.h"
1919

2020
#include <cstdint>
21+
#include <memory>
2122
#include <sstream>
2223
#include <string>
2324
#include <type_traits>
@@ -41,6 +42,9 @@
4142
namespace arrow {
4243
namespace ipc {
4344

45+
using RjArray = rj::Value::ConstArray;
46+
using RjObject = rj::Value::ConstObject;
47+
4448
enum class BufferType : char { DATA, OFFSET, TYPE, VALIDITY };
4549

4650
static std::string GetBufferTypeName(BufferType type) {
@@ -59,6 +63,20 @@ static std::string GetBufferTypeName(BufferType type) {
5963
return "UNKNOWN";
6064
}
6165

66+
static std::string GetFloatingPrecisionName(FloatingPointMeta::Precision precision) {
67+
switch (precision) {
68+
case FloatingPointMeta::HALF:
69+
return "HALF";
70+
case FloatingPointMeta::SINGLE:
71+
return "SINGLE";
72+
case FloatingPointMeta::DOUBLE:
73+
return "DOUBLE";
74+
default:
75+
break;
76+
}
77+
return "UNKNOWN";
78+
}
79+
6280
static std::string GetTimeUnitName(TimeUnit unit) {
6381
switch (unit) {
6482
case TimeUnit::SECOND:
@@ -155,19 +173,7 @@ class JsonSchemaWriter : public TypeVisitor {
155173
typename std::enable_if<std::is_base_of<FloatingPointMeta, T>::value, void>::type
156174
WriteTypeMetadata(const T& type) {
157175
writer_->Key("precision");
158-
switch (type.precision()) {
159-
case FloatingPointMeta::HALF:
160-
writer_->String("HALF");
161-
break;
162-
case FloatingPointMeta::SINGLE:
163-
writer_->String("SINGLE");
164-
break;
165-
case FloatingPointMeta::DOUBLE:
166-
writer_->String("DOUBLE");
167-
break;
168-
default:
169-
break;
170-
}
176+
writer_->String(GetFloatingPrecisionName(type.precision()));
171177
}
172178

173179
template <typename T>
@@ -722,8 +728,7 @@ class JsonSchemaReader {
722728
return Status::OK();
723729
}
724730

725-
Status GetFloatingPoint(
726-
const rj::Value::ConstObject& json_type, std::shared_ptr<DataType>* type) {
731+
Status GetFloatingPoint(const RjObject& json_type, std::shared_ptr<DataType>* type) {
727732
const auto& json_precision = json_type.FindMember("precision");
728733
RETURN_NOT_STRING("precision", json_precision, json_type);
729734

@@ -744,8 +749,7 @@ class JsonSchemaReader {
744749
}
745750

746751
template <typename T>
747-
Status GetTimeLike(
748-
const rj::Value::ConstObject& json_type, std::shared_ptr<DataType>* type) {
752+
Status GetTimeLike(const RjObject& json_type, std::shared_ptr<DataType>* type) {
749753
const auto& json_unit = json_type.FindMember("unit");
750754
RETURN_NOT_STRING("unit", json_unit, json_type);
751755

@@ -772,7 +776,7 @@ class JsonSchemaReader {
772776
return Status::OK();
773777
}
774778

775-
Status GetUnion(const rj::Value::ConstObject& json_type,
779+
Status GetUnion(const RjObject& json_type,
776780
const std::vector<std::shared_ptr<Field>>& children,
777781
std::shared_ptr<DataType>* type) {
778782
const auto& json_mode = json_type.FindMember("mode");
@@ -797,6 +801,7 @@ class JsonSchemaReader {
797801
std::vector<uint8_t> type_ids;
798802
const auto& id_array = json_type_ids->value.GetArray();
799803
for (const rj::Value& val : id_array) {
804+
DCHECK(val.IsUint());
800805
type_ids.push_back(val.GetUint());
801806
}
802807

@@ -805,7 +810,7 @@ class JsonSchemaReader {
805810
return Status::OK();
806811
}
807812

808-
Status GetType(const rj::Value::ConstObject& json_type,
813+
Status GetType(const RjObject& json_type,
809814
const std::vector<std::shared_ptr<Field>>& children,
810815
std::shared_ptr<DataType>* type) {
811816
const auto& json_type_name = json_type.FindMember("name");
@@ -852,9 +857,6 @@ class JsonArrayReader {
852857
: pool_(pool), json_array_(json_array), schema_(schema) {}
853858

854859
Status GetResult(std::shared_ptr<Array>* array) {
855-
if (!json_array_.IsObject()) {
856-
return Status::Invalid("Array was not a JSON object");
857-
}
858860
const auto& json_array = json_array_.GetObject();
859861

860862
const auto& json_name = json_array.FindMember("name");
@@ -884,11 +886,33 @@ class JsonArrayReader {
884886
return GetArray(obj, result->type, array);
885887
}
886888

889+
Status GetValidityBuffer(const std::vector<bool>& is_valid, int32_t* null_count,
890+
std::shared_ptr<Buffer>* validity_buffer) {
891+
int length = static_cast<int>(is_valid.size());
892+
893+
auto out_buffer = std::make_shared<PoolBuffer>(pool_);
894+
RETURN_NOT_OK(out_buffer->Resize(BitUtil::BytesForBits(length)));
895+
uint8_t* bitmap = reinterpret_cast<uint8_t*>(out_buffer->mutable_data());
896+
memset(bitmap, 0, out_buffer->size());
897+
898+
*null_count = 0;
899+
for (int i = 0; i < length; ++i) {
900+
if (!is_valid[i]) {
901+
++(*null_count);
902+
continue;
903+
}
904+
BitUtil::SetBit(bitmap, i);
905+
}
906+
907+
*validity_buffer = out_buffer;
908+
return Status::OK();
909+
}
910+
887911
template <typename T>
888912
typename std::enable_if<std::is_base_of<PrimitiveCType, T>::value ||
889913
std::is_base_of<BooleanType, T>::value,
890914
Status>::type
891-
ReadArray(const rj::Value::ConstObject& json_array, const std::vector<bool>& is_valid,
915+
ReadArray(const RjObject& json_array, int32_t length, const std::vector<bool>& is_valid,
892916
const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* array) {
893917
typename TypeTraits<T>::BuilderType builder(pool_, type);
894918

@@ -897,7 +921,8 @@ class JsonArrayReader {
897921

898922
const auto& json_data_arr = json_data->value.GetArray();
899923

900-
for (auto i = 0; i < json_data_arr.Size(); ++i) {
924+
DCHECK_EQ(static_cast<int32_t>(json_data_arr.Size()), length);
925+
for (auto i = 0; i < length; ++i) {
901926
if (!is_valid[i]) {
902927
builder.AppendNull();
903928
continue;
@@ -927,7 +952,7 @@ class JsonArrayReader {
927952

928953
template <typename T>
929954
typename std::enable_if<std::is_base_of<BinaryType, T>::value, Status>::type ReadArray(
930-
const rj::Value::ConstObject& json_array, const std::vector<bool>& is_valid,
955+
const RjObject& json_array, int32_t length, const std::vector<bool>& is_valid,
931956
const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* array) {
932957
typename TypeTraits<T>::BuilderType builder(pool_, type);
933958

@@ -936,7 +961,8 @@ class JsonArrayReader {
936961

937962
const auto& json_data_arr = json_data->value.GetArray();
938963

939-
for (auto i = 0; i < json_data_arr.Size(); ++i) {
964+
DCHECK_EQ(static_cast<int32_t>(json_data_arr.Size()), length);
965+
for (auto i = 0; i < length; ++i) {
940966
if (!is_valid[i]) {
941967
builder.AppendNull();
942968
continue;
@@ -952,65 +978,89 @@ class JsonArrayReader {
952978

953979
template <typename T>
954980
typename std::enable_if<std::is_base_of<ListType, T>::value, Status>::type ReadArray(
955-
const rj::Value::ConstObject& json_array, const std::vector<bool>& is_valid,
981+
const RjObject& json_array, int32_t length, const std::vector<bool>& is_valid,
956982
const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* array) {
957983
const auto& json_offsets = json_array.FindMember("OFFSETS");
958984
RETURN_NOT_ARRAY("OFFSETS", json_offsets, json_array);
959985
const auto& json_offsets_arr = json_offsets->value.GetArray();
960986

961-
int length = static_cast<int>(is_valid.size());
962-
963-
auto validity_buffer = std::make_shared<PoolBuffer>(pool_);
964-
RETURN_NOT_OK(validity_buffer->Resize(BitUtil::BytesForBits(length)));
987+
int32_t null_count = 0;
988+
std::shared_ptr<Buffer> validity_buffer;
989+
RETURN_NOT_OK(GetValidityBuffer(is_valid, &null_count, &validity_buffer));
965990

966991
auto offsets_buffer = std::make_shared<PoolBuffer>(pool_);
967992
RETURN_NOT_OK(offsets_buffer->Resize((length + 1) * sizeof(int32_t)));
968-
969-
int32_t null_count = 0;
970-
uint8_t* bitmap = reinterpret_cast<uint8_t*>(validity_buffer->mutable_data());
971-
memset(bitmap, 0, validity_buffer->size());
972-
973993
int32_t* offsets = reinterpret_cast<int32_t*>(offsets_buffer->mutable_data());
974994

975-
for (int i = 0; i < length; ++i) {
995+
for (int i = 0; i < length + 1; ++i) {
976996
const rj::Value& val = json_offsets_arr[i];
977-
978997
DCHECK(val.IsInt());
979998
offsets[i] = val.GetInt();
980-
981-
if (!is_valid[i]) {
982-
++null_count;
983-
continue;
984-
}
985-
BitUtil::SetBit(bitmap, i);
986999
}
9871000

988-
// auto list_type = dynamic_cast<const ListType*>(type.get());
989-
std::shared_ptr<Array> values;
1001+
std::vector<std::shared_ptr<Array>> children;
1002+
RETURN_NOT_OK(GetChildren(json_array, type, &children));
1003+
DCHECK_EQ(children.size(), 1);
9901004

9911005
*array = std::make_shared<ListArray>(
992-
type, length, offsets_buffer, values, null_count, validity_buffer);
1006+
type, length, offsets_buffer, children[0], null_count, validity_buffer);
9931007

9941008
return Status::OK();
9951009
}
9961010

9971011
template <typename T>
9981012
typename std::enable_if<std::is_base_of<StructType, T>::value, Status>::type ReadArray(
999-
const rj::Value::ConstObject& json_array, const std::vector<bool>& is_valid,
1013+
const RjObject& json_array, int32_t length, const std::vector<bool>& is_valid,
10001014
const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* array) {
1015+
int32_t null_count = 0;
1016+
std::shared_ptr<Buffer> validity_buffer;
1017+
RETURN_NOT_OK(GetValidityBuffer(is_valid, &null_count, &validity_buffer));
1018+
1019+
std::vector<std::shared_ptr<Array>> fields;
1020+
RETURN_NOT_OK(GetChildren(json_array, type, &fields));
1021+
1022+
*array =
1023+
std::make_shared<StructArray>(type, length, fields, null_count, validity_buffer);
1024+
10011025
return Status::OK();
10021026
}
10031027

10041028
template <typename T>
10051029
typename std::enable_if<std::is_base_of<NullType, T>::value, Status>::type ReadArray(
1006-
const rj::Value::ConstObject& json_array, const std::vector<bool>& is_valid,
1030+
const RjObject& json_array, int32_t length, const std::vector<bool>& is_valid,
10071031
const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* array) {
1008-
return Status::NotImplemented("null");
1032+
*array = std::make_shared<NullArray>(type, length);
1033+
return Status::OK();
1034+
}
1035+
1036+
Status GetChildren(const RjObject& json_array, const std::shared_ptr<DataType>& type,
1037+
std::vector<std::shared_ptr<Array>>* array) {
1038+
const auto& json_children = json_array.FindMember("children");
1039+
RETURN_NOT_ARRAY("children", json_children, json_array);
1040+
const auto& json_children_arr = json_children->value.GetArray();
1041+
1042+
if (type->num_children() != static_cast<int>(json_children_arr.Size())) {
1043+
std::stringstream ss;
1044+
ss << "Expected " << type->num_children() << " children, but got "
1045+
<< json_children_arr.Size();
1046+
return Status::Invalid(ss.str());
1047+
}
1048+
1049+
for (auto i = 0; i < json_children_arr.Size(); ++i) {
1050+
DCHECK(json_children_arr[i].IsObject());
1051+
std::shared_ptr<Array> child;
1052+
RETURN_NOT_OK(GetArray(json_children_arr[i], type->child(i)->type, &child));
1053+
array->emplace_back(child);
1054+
}
1055+
1056+
return Status::OK();
10091057
}
10101058

10111059
Status GetArray(const rj::Value& obj, const std::shared_ptr<DataType>& type,
10121060
std::shared_ptr<Array>* array) {
1013-
if (!obj.IsObject()) { return Status::Invalid("Array was not a JSON object"); }
1061+
if (!obj.IsObject()) {
1062+
return Status::Invalid("Array element was not a JSON object");
1063+
}
10141064
const auto& json_array = obj.GetObject();
10151065

10161066
const auto& json_length = json_array.FindMember("count");
@@ -1032,7 +1082,7 @@ class JsonArrayReader {
10321082

10331083
#define TYPE_CASE(TYPE) \
10341084
case TYPE::type_id: \
1035-
return ReadArray<TYPE>(json_array, is_valid, type, array);
1085+
return ReadArray<TYPE>(json_array, length, is_valid, type, array);
10361086

10371087
#define NOT_IMPLEMENTED_CASE(TYPE_ENUM) \
10381088
case Type::TYPE_ENUM: { \
@@ -1100,6 +1150,7 @@ Status WriteJsonArray(
11001150

11011151
Status ReadJsonArray(MemoryPool* pool, const rj::Value& json_array, const Schema& schema,
11021152
std::shared_ptr<Array>* array) {
1153+
if (!json_array.IsObject()) { return Status::Invalid("Element was not a JSON object"); }
11031154
JsonArrayReader converter(pool, json_array, schema);
11041155
return converter.GetResult(array);
11051156
}

0 commit comments

Comments
 (0)