Skip to content

Commit 35c2f85

Browse files
committed
Refactoring. Start drafting string/list reader
Change-Id: I3e7508c435e4a9f5fe76c1df3951338a24d81839
1 parent f26402a commit 35c2f85

File tree

5 files changed

+132
-65
lines changed

5 files changed

+132
-65
lines changed

cpp/src/arrow/ipc/json-internal.cc

Lines changed: 98 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include "arrow/types/primitive.h"
3535
#include "arrow/types/string.h"
3636
#include "arrow/types/struct.h"
37+
#include "arrow/util/bit-util.h"
3738
#include "arrow/util/memory-pool.h"
3839
#include "arrow/util/status.h"
3940

@@ -58,6 +59,22 @@ static std::string GetBufferTypeName(BufferType type) {
5859
return "UNKNOWN";
5960
}
6061

62+
static std::string GetTimeUnitName(TimeUnit unit) {
63+
switch (unit) {
64+
case TimeUnit::SECOND:
65+
return "SECOND";
66+
case TimeUnit::MILLI:
67+
return "MILLISECOND";
68+
case TimeUnit::MICRO:
69+
return "MICROSECOND";
70+
case TimeUnit::NANO:
71+
return "NANOSECOND";
72+
default:
73+
break;
74+
}
75+
return "UNKNOWN";
76+
}
77+
6178
class BufferLayout {
6279
public:
6380
BufferLayout(BufferType type, int bit_width) : type_(type), bit_width_(bit_width) {}
@@ -173,20 +190,7 @@ class JsonSchemaWriter : public TypeVisitor {
173190
void>::type
174191
WriteTypeMetadata(const T& type) {
175192
writer_->Key("unit");
176-
switch (type.unit) {
177-
case TimeUnit::SECOND:
178-
writer_->String("SECOND");
179-
break;
180-
case TimeUnit::MILLI:
181-
writer_->String("MILLISECOND");
182-
break;
183-
case TimeUnit::MICRO:
184-
writer_->String("MICROSECOND");
185-
break;
186-
case TimeUnit::NANO:
187-
writer_->String("NANOSECOND");
188-
break;
189-
}
193+
writer_->String(GetTimeUnitName(type.unit));
190194
}
191195

192196
template <typename T>
@@ -679,16 +683,15 @@ class JsonSchemaReader {
679683
RETURN_NOT_OK(GetFieldsFromArray(json_children->value, &children));
680684

681685
std::shared_ptr<DataType> type;
682-
RETURN_NOT_OK(GetType(json_type->value, children, &type));
686+
RETURN_NOT_OK(GetType(json_type->value.GetObject(), children, &type));
683687

684688
*field = std::make_shared<Field>(
685689
json_name->value.GetString(), type, json_nullable->value.GetBool());
686690
return Status::OK();
687691
}
688692

689-
Status GetInteger(const rj::Value& obj, std::shared_ptr<DataType>* type) {
690-
const auto& json_type = obj.GetObject();
691-
693+
Status GetInteger(
694+
const rj::Value::ConstObject& json_type, std::shared_ptr<DataType>* type) {
692695
const auto& json_bit_width = json_type.FindMember("bitWidth");
693696
RETURN_NOT_INT("bitWidth", json_bit_width, json_type);
694697

@@ -719,9 +722,8 @@ class JsonSchemaReader {
719722
return Status::OK();
720723
}
721724

722-
Status GetFloatingPoint(const rj::Value& obj, std::shared_ptr<DataType>* type) {
723-
const auto& json_type = obj.GetObject();
724-
725+
Status GetFloatingPoint(
726+
const rj::Value::ConstObject& json_type, std::shared_ptr<DataType>* type) {
725727
const auto& json_precision = json_type.FindMember("precision");
726728
RETURN_NOT_STRING("precision", json_precision, json_type);
727729

@@ -742,9 +744,8 @@ class JsonSchemaReader {
742744
}
743745

744746
template <typename T>
745-
Status GetTimeLike(const rj::Value& obj, std::shared_ptr<DataType>* type) {
746-
const auto& json_type = obj.GetObject();
747-
747+
Status GetTimeLike(
748+
const rj::Value::ConstObject& json_type, std::shared_ptr<DataType>* type) {
748749
const auto& json_unit = json_type.FindMember("unit");
749750
RETURN_NOT_STRING("unit", json_unit, json_type);
750751

@@ -771,11 +772,9 @@ class JsonSchemaReader {
771772
return Status::OK();
772773
}
773774

774-
Status GetUnion(const rj::Value& obj,
775+
Status GetUnion(const rj::Value::ConstObject& json_type,
775776
const std::vector<std::shared_ptr<Field>>& children,
776777
std::shared_ptr<DataType>* type) {
777-
const auto& json_type = obj.GetObject();
778-
779778
const auto& json_mode = json_type.FindMember("mode");
780779
RETURN_NOT_STRING("mode", json_mode, json_type);
781780

@@ -806,20 +805,18 @@ class JsonSchemaReader {
806805
return Status::OK();
807806
}
808807

809-
Status GetType(const rj::Value& obj,
808+
Status GetType(const rj::Value::ConstObject& json_type,
810809
const std::vector<std::shared_ptr<Field>>& children,
811810
std::shared_ptr<DataType>* type) {
812-
const auto& json_type = obj.GetObject();
813-
814811
const auto& json_type_name = json_type.FindMember("name");
815812
RETURN_NOT_STRING("name", json_type_name, json_type);
816813

817814
std::string type_name = json_type_name->value.GetString();
818815

819816
if (type_name == "int") {
820-
return GetInteger(obj, type);
817+
return GetInteger(json_type, type);
821818
} else if (type_name == "floatingpoint") {
822-
return GetFloatingPoint(obj, type);
819+
return GetFloatingPoint(json_type, type);
823820
} else if (type_name == "bool") {
824821
*type = boolean();
825822
} else if (type_name == "utf8") {
@@ -831,15 +828,15 @@ class JsonSchemaReader {
831828
} else if (type_name == "date") {
832829
*type = date();
833830
} else if (type_name == "time") {
834-
return GetTimeLike<TimeType>(obj, type);
831+
return GetTimeLike<TimeType>(json_type, type);
835832
} else if (type_name == "timestamp") {
836-
return GetTimeLike<TimestampType>(obj, type);
833+
return GetTimeLike<TimestampType>(json_type, type);
837834
} else if (type_name == "list") {
838835
*type = list(children[0]);
839836
} else if (type_name == "struct") {
840837
*type = struct_(children);
841838
} else {
842-
return GetUnion(obj, children, type);
839+
return GetUnion(json_type, children, type);
843840
}
844841
return Status::OK();
845842
}
@@ -891,10 +888,9 @@ class JsonArrayReader {
891888
typename std::enable_if<std::is_base_of<PrimitiveCType, T>::value ||
892889
std::is_base_of<BooleanType, T>::value,
893890
Status>::type
894-
ReadArray(const rj::Value& obj, const std::vector<bool>& is_valid,
891+
ReadArray(const rj::Value::ConstObject& json_array, const std::vector<bool>& is_valid,
895892
const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* array) {
896893
typename TypeTraits<T>::BuilderType builder(pool_, type);
897-
const auto& json_array = obj.GetObject();
898894

899895
const auto& json_data = json_array.FindMember("DATA");
900896
RETURN_NOT_ARRAY("DATA", json_data, json_array);
@@ -909,12 +905,16 @@ class JsonArrayReader {
909905

910906
const rj::Value& val = json_data_arr[i];
911907
if (IsSignedInt<T>::value) {
908+
DCHECK(val.IsInt());
912909
builder.Append(val.GetInt64());
913910
} else if (IsUnsignedInt<T>::value) {
911+
DCHECK(val.IsUint());
914912
builder.Append(val.GetUint64());
915913
} else if (IsFloatingPoint<T>::value) {
914+
DCHECK(val.IsFloat());
916915
builder.Append(val.GetFloat());
917916
} else if (std::is_base_of<BooleanType, T>::value) {
917+
DCHECK(val.IsBool());
918918
builder.Append(val.GetBool());
919919
} else {
920920
// We are in the wrong function
@@ -927,28 +927,83 @@ class JsonArrayReader {
927927

928928
template <typename T>
929929
typename std::enable_if<std::is_base_of<BinaryType, T>::value, Status>::type ReadArray(
930-
const rj::Value& obj, const std::vector<bool>& is_valid,
930+
const rj::Value::ConstObject& json_array, const std::vector<bool>& is_valid,
931931
const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* array) {
932-
return Status::OK();
932+
typename TypeTraits<T>::BuilderType builder(pool_, type);
933+
934+
const auto& json_data = json_array.FindMember("DATA");
935+
RETURN_NOT_ARRAY("DATA", json_data, json_array);
936+
937+
const auto& json_data_arr = json_data->value.GetArray();
938+
939+
for (auto i = 0; i < json_data_arr.Size(); ++i) {
940+
if (!is_valid[i]) {
941+
builder.AppendNull();
942+
continue;
943+
}
944+
945+
const rj::Value& val = json_data_arr[i];
946+
DCHECK(val.IsString());
947+
builder.Append(val.GetString());
948+
}
949+
950+
return builder.Finish(array);
933951
}
934952

935953
template <typename T>
936954
typename std::enable_if<std::is_base_of<ListType, T>::value, Status>::type ReadArray(
937-
const rj::Value& obj, const std::vector<bool>& is_valid,
955+
const rj::Value::ConstObject& json_array, const std::vector<bool>& is_valid,
938956
const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* array) {
957+
const auto& json_offsets = json_array.FindMember("OFFSETS");
958+
RETURN_NOT_ARRAY("OFFSETS", json_offsets, json_array);
959+
const auto& json_offsets_arr = json_offsets->value.GetArray();
960+
961+
int length = static_cast<int>(is_valid.size());
962+
963+
auto validity_buffer = std::make_shared<PoolBuffer>(pool_);
964+
RETURN_NOT_OK(validity_buffer->Resize(BitUtil::BytesForBits(length)));
965+
966+
auto offsets_buffer = std::make_shared<PoolBuffer>(pool_);
967+
RETURN_NOT_OK(offsets_buffer->Resize((length + 1) * sizeof(int32_t)));
968+
969+
int32_t null_count = 0;
970+
uint8_t* bitmap = reinterpret_cast<uint8_t*>(validity_buffer->mutable_data());
971+
memset(bitmap, 0, validity_buffer->size());
972+
973+
int32_t* offsets = reinterpret_cast<int32_t*>(offsets_buffer->mutable_data());
974+
975+
for (int i = 0; i < length; ++i) {
976+
const rj::Value& val = json_offsets_arr[i];
977+
978+
DCHECK(val.IsInt());
979+
offsets[i] = val.GetInt();
980+
981+
if (!is_valid[i]) {
982+
++null_count;
983+
continue;
984+
}
985+
BitUtil::SetBit(bitmap, i);
986+
}
987+
988+
// auto list_type = dynamic_cast<const ListType*>(type.get());
989+
std::shared_ptr<Array> values;
990+
991+
*array = std::make_shared<ListArray>(
992+
type, length, offsets_buffer, values, null_count, validity_buffer);
993+
939994
return Status::OK();
940995
}
941996

942997
template <typename T>
943998
typename std::enable_if<std::is_base_of<StructType, T>::value, Status>::type ReadArray(
944-
const rj::Value& obj, const std::vector<bool>& is_valid,
999+
const rj::Value::ConstObject& json_array, const std::vector<bool>& is_valid,
9451000
const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* array) {
9461001
return Status::OK();
9471002
}
9481003

9491004
template <typename T>
9501005
typename std::enable_if<std::is_base_of<NullType, T>::value, Status>::type ReadArray(
951-
const rj::Value& obj, const std::vector<bool>& is_valid,
1006+
const rj::Value::ConstObject& json_array, const std::vector<bool>& is_valid,
9521007
const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* array) {
9531008
return Status::NotImplemented("null");
9541009
}
@@ -977,7 +1032,7 @@ class JsonArrayReader {
9771032

9781033
#define TYPE_CASE(TYPE) \
9791034
case TYPE::type_id: \
980-
return ReadArray<TYPE>(obj, is_valid, type, array);
1035+
return ReadArray<TYPE>(json_array, is_valid, type, array);
9811036

9821037
#define NOT_IMPLEMENTED_CASE(TYPE_ENUM) \
9831038
case Type::TYPE_ENUM: { \

cpp/src/arrow/ipc/json.cc

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,5 @@
2222
#include "arrow/util/status.h"
2323

2424
namespace arrow {
25-
namespace ipc {
26-
27-
namespace rj = rapidjson;
28-
29-
} // namespace ipc
25+
namespace ipc {} // namespace ipc
3026
} // namespace arrow

cpp/src/arrow/ipc/json.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,13 @@
2424
#include <vector>
2525

2626
#include "arrow/type_fwd.h"
27-
#include "arrow/visibility.h"
27+
#include "arrow/util/visibility.h"
2828

2929
namespace arrow {
3030

3131
class MemoryPool;
32+
class RecordBatch;
33+
class Schema;
3234

3335
namespace io {
3436

@@ -42,7 +44,7 @@ namespace ipc {
4244
class ARROW_EXPORT JsonWriter {
4345
public:
4446
static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
45-
std::shared_ptr<FileWriter>* out);
47+
std::shared_ptr<JsonWriter>* out);
4648

4749
// TODO(wesm): Write dictionaries
4850

cpp/src/arrow/type_traits.h

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -132,22 +132,34 @@ struct TypeTraits<BooleanType> {
132132
}
133133
};
134134

135+
template <>
136+
struct TypeTraits<StringType> {
137+
using ArrayType = StringArray;
138+
using BuilderType = StringBuilder;
139+
};
140+
141+
template <>
142+
struct TypeTraits<BinaryType> {
143+
using ArrayType = BinaryArray;
144+
using BuilderType = BinaryBuilder;
145+
};
146+
135147
// Not all type classes have a c_type
136148
template <typename T>
137149
struct as_void {
138150
using type = void;
139151
};
140152

141153
// The partial specialization will match if T has the ATTR_NAME member
142-
#define GET_ATTR(ATTR_NAME, DEFAULT) \
143-
template <typename T, typename Enable = void> \
144-
struct GetAttr_##ATTR_NAME { \
145-
using type = DEFAULT; \
146-
}; \
147-
\
148-
template <typename T> \
154+
#define GET_ATTR(ATTR_NAME, DEFAULT) \
155+
template <typename T, typename Enable = void> \
156+
struct GetAttr_##ATTR_NAME { \
157+
using type = DEFAULT; \
158+
}; \
159+
\
160+
template <typename T> \
149161
struct GetAttr_##ATTR_NAME<T, typename as_void<typename T::ATTR_NAME>::type> { \
150-
using type = typename T::ATTR_NAME; \
162+
using type = typename T::ATTR_NAME; \
151163
};
152164

153165
GET_ATTR(c_type, void);
@@ -157,7 +169,7 @@ GET_ATTR(TypeClass, void);
157169

158170
#define PRIMITIVE_TRAITS(T) \
159171
using TypeClass = typename std::conditional<std::is_base_of<DataType, T>::value, T, \
160-
typename GetAttr_TypeClass<T>::type>::type; \
172+
typename GetAttr_TypeClass<T>::type>::type; \
161173
using c_type = typename GetAttr_c_type<TypeClass>::type;
162174

163175
template <typename T>

cpp/src/arrow/types/string.h

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,12 @@ class ARROW_EXPORT BinaryBuilder : public ListBuilder {
119119
return byte_builder_->Append(value, length);
120120
}
121121

122+
Status Append(const char* value, int32_t length) {
123+
return Append(reinterpret_cast<const uint8_t*>(value), length);
124+
}
125+
126+
Status Append(const std::string& value) { return Append(value.c_str(), value.size()); }
127+
122128
Status Finish(std::shared_ptr<Array>* out) override;
123129

124130
protected:
@@ -131,13 +137,9 @@ class ARROW_EXPORT StringBuilder : public BinaryBuilder {
131137
explicit StringBuilder(MemoryPool* pool, const TypePtr& type)
132138
: BinaryBuilder(pool, type) {}
133139

134-
Status Finish(std::shared_ptr<Array>* out) override;
140+
using BinaryBuilder::Append;
135141

136-
Status Append(const std::string& value) { return Append(value.c_str(), value.size()); }
137-
138-
Status Append(const char* value, int32_t length) {
139-
return BinaryBuilder::Append(reinterpret_cast<const uint8_t*>(value), length);
140-
}
142+
Status Finish(std::shared_ptr<Array>* out) override;
141143

142144
Status Append(const std::vector<std::string>& values, uint8_t* null_bytes);
143145
};

0 commit comments

Comments
 (0)