Skip to content

Commit 2c93cce

Browse files
committed
WIP JSON array reader code path
Change-Id: I7c52a441edcef57a7a868c80f0caa2f4ac734f22
1 parent 932ba7a commit 2c93cce

File tree

8 files changed

+299
-90
lines changed

8 files changed

+299
-90
lines changed

cpp/src/arrow/ipc/json-internal.cc

Lines changed: 166 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include "arrow/types/primitive.h"
3434
#include "arrow/types/string.h"
3535
#include "arrow/types/struct.h"
36+
#include "arrow/util/memory-pool.h"
3637
#include "arrow/util/status.h"
3738

3839
namespace arrow {
@@ -847,23 +848,163 @@ class JsonSchemaReader {
847848
const rj::Value& json_schema_;
848849
};
849850

850-
// class JsonArrayReader {
851-
// public:
852-
// explicit JsonArrayReader(const rj::Value& json_array, const Schema& schema)
853-
// : json_array_(json_array), schema_(schema) {}
851+
class JsonArrayReader {
852+
public:
853+
explicit JsonArrayReader(
854+
MemoryPool* pool, const rj::Value& json_array, const Schema& schema)
855+
: pool_(pool), json_array_(json_array), schema_(schema) {}
856+
857+
Status GetResult(std::shared_ptr<Array>* array) {
858+
if (!json_array_.IsObject()) {
859+
return Status::Invalid("Array was not a JSON object");
860+
}
861+
const auto& json_array = json_array_.GetObject();
862+
863+
const auto& json_name = json_array.FindMember("name");
864+
RETURN_NOT_STRING("name", json_name, json_array);
865+
866+
return GetArrayFromStruct(
867+
json_array_, json_name.GetString(), schema_.fields(), array);
868+
}
869+
870+
Status GetArrayFromStruct(const rj::Value& obj, const std::string& name,
871+
const std::vector<std::shared_ptr<Field>>& fields, std::shared_ptr<Array>* array) {
872+
std::shared_ptr<Field> result = nullptr;
873+
874+
for (const std::shared_ptr<Field>& field : fields) {
875+
if (field->name == name) {
876+
result = field;
877+
break;
878+
}
879+
}
880+
881+
if (result == nullptr) {
882+
std::stringstream ss;
883+
ss << "Field named " << name << " not found in struct/schema";
884+
return Status::KeyError(ss.str());
885+
}
886+
887+
return GetArray(obj, result->type, array);
888+
}
889+
890+
template <typename T>
891+
Status ReadArray(const rj::Value& obj, const std::vector<bool>& is_valid,
892+
const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* array) {
893+
typename TypeTraits<T>::BuilderType builder(pool_, type);
894+
const auto& json_array = obj.GetObject();
854895

855-
// Status GetArray(std::shared_ptr<Array>* array) {
856-
// if (!json_array_.IsObject()) {
857-
// return Status::Invalid("Array was not a JSON object");
858-
// }
896+
const auto& json_data = json_array.FindMember("DATA");
897+
RETURN_NOT_ARRAY("DATA", json_data, json_array);
859898

860-
// return Status::OK();
861-
// }
899+
const auto& json_data_arr = json_type_ids->value.GetArray();
862900

863-
// private:
864-
// const rj::Value& json_array_;
865-
// const Schema& schema_;
866-
// };
901+
for (auto i = 0; i < json_data_arr.Size(); ++i) {
902+
if (!is_valid[i]) {
903+
builder.AppendNull();
904+
continue;
905+
}
906+
907+
const rj::Value& val = json_data_arr[i];
908+
if (IsSignedInt<T>::value) {
909+
builder.Append(val.GetInt64());
910+
} else if (IsUnsignedInt<T>::value) {
911+
builder.Append(val.GetUint64());
912+
} else if (IsFloatingPoint<T>::value) {
913+
builder.Append(val.GetFloat());
914+
} else if (std::is_base_of<BooleanType, T>::value) {
915+
builder.Append(val.GetBool());
916+
} else {
917+
// We are in the wrong function
918+
return Status::Invalid(type->ToString());
919+
}
920+
}
921+
922+
return builder.Finish(array);
923+
}
924+
925+
template <typename T>
926+
typename std::enable_if<std::is_base_of<BinaryType, T>::value, Status>::type
927+
ReadArray(const rj::Value& obj, const std::shared_ptr<DataType>& type,
928+
std::shared_ptr<Array>* array) {}
929+
930+
template <typename T>
931+
typename std::enable_if<std::is_base_of<ListType, T>::value, Status>::type
932+
ReadArray(const rj::Value& obj, const std::shared_ptr<DataType>& type,
933+
std::shared_ptr<Array>* array) {}
934+
935+
template <typename T>
936+
typename std::enable_if<std::is_base_of<StructType, T>::value, Status>::type
937+
ReadArray(const rj::Value& obj, const std::shared_ptr<DataType>& type,
938+
std::shared_ptr<Array>* array) {}
939+
940+
template <typename T>
941+
typename std::enable_if<std::is_base_of<NullType, T>::value, Status>::type
942+
ReadArray(const rj::Value& obj, const std::shared_ptr<DataType>& type,
943+
std::shared_ptr<Array>* array) {}
944+
945+
Status GetArray(const rj::Value& obj, const std::shared_ptr<DataType>& type,
946+
std::shared_ptr<Array>* array) {
947+
if (!obj.IsObject()) { return Status::Invalid("Array was not a JSON object"); }
948+
const auto& json_array = obj.GetObject();
949+
950+
const auto& json_length = json_array.FindMember("count");
951+
RETURN_NOT_INT("count", json_length, json_array);
952+
953+
const auto& json_validity = json_array.FindMember("VALIDITY");
954+
RETURN_NOT_ARRAY("VALIDITY", json_validity, json_array);
955+
956+
std::vector<bool> is_valid(count);
957+
958+
#define TYPE_CASE(TYPE) \
959+
case TYPE::type_enum: \
960+
return ReadArray<TYPE>(obj, type, array);
961+
962+
#define NOT_IMPLEMENTED_CASE(TYPE_ENUM) \
963+
case Type::TYPE_ENUM: \
964+
std::stringstream ss; \
965+
ss << type->ToString(); \
966+
return Status::NotImplemented(ss.str());
967+
968+
switch (type->type) {
969+
TYPE_CASE(NullType);
970+
TYPE_CASE(BooleanType);
971+
TYPE_CASE(UInt8Type);
972+
TYPE_CASE(Int8Type);
973+
TYPE_CASE(UInt16Type);
974+
TYPE_CASE(Int16Type);
975+
TYPE_CASE(UInt32Type);
976+
TYPE_CASE(Int32Type);
977+
TYPE_CASE(UInt64Type);
978+
TYPE_CASE(Int64Type);
979+
TYPE_CASE(HalfFloatType);
980+
TYPE_CASE(FloatType);
981+
TYPE_CASE(DoubleType);
982+
TYPE_CASE(StringType);
983+
TYPE_CASE(BinaryType);
984+
NOT_IMPLEMENTED_CASE(DATE);
985+
NOT_IMPLEMENTED_CASE(TIMESTAMP);
986+
NOT_IMPLEMENTED_CASE(TIME);
987+
NOT_IMPLEMENTED_CASE(INTERVAL);
988+
TYPE_CASE(ListType);
989+
TYPE_CASE(StructType);
990+
NOT_IMPLEMENTED_CASE(UNION);
991+
default:
992+
std::stringstream ss;
993+
ss << type->ToString();
994+
return Status::NotImplemented(ss.str());
995+
};
996+
997+
#undef TYPE_CASE
998+
#undef NOT_IMPLEMENTED_CASE
999+
1000+
return Status::OK();
1001+
}
1002+
1003+
private:
1004+
MemoryPool* pool;
1005+
const rj::Value& json_array_;
1006+
const Schema& schema_;
1007+
};
8671008

8681009
Status WriteJsonSchema(const Schema& schema, RjWriter* json_writer) {
8691010
JsonSchemaWriter converter(schema, json_writer);
@@ -875,17 +1016,17 @@ Status ReadJsonSchema(const rj::Value& json_schema, std::shared_ptr<Schema>* sch
8751016
return converter.GetSchema(schema);
8761017
}
8771018

878-
// Status WriteJsonArray(
879-
// const std::string& name, const Array& array, RjWriter* json_writer) {
880-
// JsonArrayWriter converter(name, array, json_writer);
881-
// converter.Write();
882-
// }
883-
884-
// Status ReadJsonArray(
885-
// const rj::Value& json_array, const Schema& schema, std::shared_ptr<Array>* array) {
886-
// JsonArrayReader converter(json_array, schema);
887-
// return converter.GetArray(array);
888-
// }
1019+
Status WriteJsonArray(
1020+
const std::string& name, const Array& array, RjWriter* json_writer) {
1021+
JsonArrayWriter converter(name, array, json_writer);
1022+
return converter.Write();
1023+
}
1024+
1025+
Status ReadJsonArray(MemoryPool* pool, const rj::Value& json_array, const Schema& schema,
1026+
std::shared_ptr<Array>* array) {
1027+
JsonArrayReader converter(pool, json_array, schema);
1028+
return converter.GetArray(array);
1029+
}
8891030

8901031
} // namespace ipc
8911032
} // namespace arrow

cpp/src/arrow/schema-test.cc

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,4 +98,25 @@ f3: list<item: int16>)";
9898
ASSERT_EQ(expected, result);
9999
}
100100

101+
TEST_F(TestSchema, GetFieldByName) {
102+
auto f0 = field("f0", int32());
103+
auto f1 = field("f1", uint8(), false);
104+
auto f2 = field("f2", utf8());
105+
auto f3 = field("f3", list(int16()));
106+
107+
vector<shared_ptr<Field>> fields = {f0, f1, f2, f3};
108+
auto schema = std::make_shared<Schema>(fields);
109+
110+
std::shared_ptr<Field> result;
111+
112+
result = schema->GetFieldByName("f1");
113+
ASSERT_TRUE(f1->Equals(result));
114+
115+
result = schema->GetFieldByName("f3");
116+
ASSERT_TRUE(f3->Equals(result));
117+
118+
result = schema->GetFieldByName("not-found");
119+
ASSERT_TRUE(result == nullptr);
120+
}
121+
101122
} // namespace arrow

cpp/src/arrow/schema.cc

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,21 @@ bool Schema::Equals(const std::shared_ptr<Schema>& other) const {
4242
return Equals(*other.get());
4343
}
4444

45+
std::shared_ptr<Field> Schema::GetFieldByName(const std::string& name) {
46+
if (fields_.size() > 0 && name_to_index_.size() == 0) {
47+
for (size_t i = 0; i < fields_.size(); ++i) {
48+
name_to_index_[fields_[i]->name] = i;
49+
}
50+
}
51+
52+
auto it = name_to_index_.find(name);
53+
if (it == name_to_index_.end()) {
54+
return nullptr;
55+
} else {
56+
return fields_[it->second];
57+
}
58+
}
59+
4560
std::string Schema::ToString() const {
4661
std::stringstream buffer;
4762

cpp/src/arrow/schema.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@
2020

2121
#include <memory>
2222
#include <string>
23+
#include <unordered_map>
2324
#include <vector>
2425

26+
#include "arrow/type.h"
2527
#include "arrow/util/visibility.h"
2628

2729
namespace arrow {
2830

29-
struct Field;
30-
3131
class ARROW_EXPORT Schema {
3232
public:
3333
explicit Schema(const std::vector<std::shared_ptr<Field>>& fields);
@@ -37,7 +37,10 @@ class ARROW_EXPORT Schema {
3737
bool Equals(const std::shared_ptr<Schema>& other) const;
3838

3939
// Return the ith schema element. Does not boundscheck
40-
const std::shared_ptr<Field>& field(int i) const { return fields_[i]; }
40+
std::shared_ptr<Field> field(int i) const { return fields_[i]; }
41+
42+
// Returns nullptr if name not found
43+
std::shared_ptr<Field> GetFieldByName(const std::string& name);
4144

4245
const std::vector<std::shared_ptr<Field>>& fields() const { return fields_; }
4346

@@ -48,6 +51,7 @@ class ARROW_EXPORT Schema {
4851

4952
private:
5053
std::vector<std::shared_ptr<Field>> fields_;
54+
std::unordered_map<std::string, int> name_to_index_;
5155
};
5256

5357
} // namespace arrow

cpp/src/arrow/type.h

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -74,21 +74,15 @@ struct Type {
7474
// Default unit millisecond
7575
TIMESTAMP = 17,
7676

77-
// Timestamp as double seconds since the UNIX epoch
78-
TIMESTAMP_DOUBLE = 18,
79-
8077
// Exact time encoded with int64, default unit millisecond
81-
TIME = 19,
78+
TIME = 18,
8279

8380
// YEAR_MONTH or DAY_TIME interval in SQL style
84-
INTERVAL = 20,
81+
INTERVAL = 19,
8582

8683
// Precision- and scale-based decimal type. Storage type depends on the
8784
// parameters.
88-
DECIMAL = 21,
89-
90-
// Decimal value encoded as a text string
91-
DECIMAL_TEXT = 22,
85+
DECIMAL = 20,
9286

9387
// A list of some logical data type
9488
LIST = 30,
@@ -98,6 +92,12 @@ struct Type {
9892

9993
// Unions of logical types
10094
UNION = 32,
95+
96+
// Timestamp as double seconds since the UNIX epoch
97+
TIMESTAMP_DOUBLE = 33,
98+
99+
// Decimal value encoded as a text string
100+
DECIMAL_TEXT = 34,
101101
};
102102
};
103103

@@ -155,7 +155,7 @@ struct ARROW_EXPORT Field {
155155
std::string name;
156156

157157
// The field's data type
158-
TypePtr type;
158+
std::shared_ptr<DataType> type;
159159

160160
// Fields can be nullable
161161
bool nullable;
@@ -164,8 +164,8 @@ struct ARROW_EXPORT Field {
164164
// 0 means it's not dictionary encoded
165165
int64_t dictionary;
166166

167-
Field(const std::string& name, const TypePtr& type, bool nullable = true,
168-
int64_t dictionary = 0)
167+
Field(const std::string& name, const std::shared_ptr<DataType>& type,
168+
bool nullable = true, int64_t dictionary = 0)
169169
: name(name), type(type), nullable(nullable), dictionary(dictionary) {}
170170

171171
bool operator==(const Field& other) const { return this->Equals(other); }
@@ -459,8 +459,8 @@ std::shared_ptr<DataType> ARROW_EXPORT union_(
459459
const std::vector<std::shared_ptr<Field>>& child_fields,
460460
const std::vector<uint8_t>& type_ids, UnionMode mode = UnionMode::SPARSE);
461461

462-
std::shared_ptr<Field> ARROW_EXPORT field(const std::string& name, const TypePtr& type,
463-
bool nullable = true, int64_t dictionary = 0);
462+
std::shared_ptr<Field> ARROW_EXPORT field(const std::string& name,
463+
const std::shared_ptr<DataType>& type, bool nullable = true, int64_t dictionary = 0);
464464

465465
} // namespace arrow
466466

0 commit comments

Comments
 (0)