Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/arrow/json/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

add_arrow_test(test
SOURCES
chunked_builder_test.cc
chunker_test.cc
converter_test.cc
parser_test.cc
Expand Down
46 changes: 24 additions & 22 deletions cpp/src/arrow/json/chunked_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@
#include "arrow/buffer.h"
#include "arrow/json/converter.h"
#include "arrow/table.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/logging.h"
#include "arrow/util/task_group.h"

namespace arrow {

using internal::checked_cast;
using internal::TaskGroup;

namespace json {
Expand Down Expand Up @@ -199,15 +201,6 @@ class ChunkedListArrayBuilder : public ChunkedArrayBuilder {
const std::shared_ptr<Array>& unconverted) override {
std::unique_lock<std::mutex> lock(mutex_);

auto list_array = static_cast<const ListArray*>(unconverted.get());

if (null_bitmap_chunks_.size() <= static_cast<size_t>(block_index)) {
null_bitmap_chunks_.resize(static_cast<size_t>(block_index) + 1, nullptr);
offset_chunks_.resize(null_bitmap_chunks_.size(), nullptr);
}
null_bitmap_chunks_[block_index] = unconverted->null_bitmap();
offset_chunks_[block_index] = list_array->value_offsets();

if (unconverted->type_id() == Type::NA) {
auto st = InsertNull(block_index, unconverted->length());
if (!st.ok()) {
Expand All @@ -217,8 +210,17 @@ class ChunkedListArrayBuilder : public ChunkedArrayBuilder {
}

DCHECK_EQ(unconverted->type_id(), Type::LIST);
value_builder_->Insert(block_index, list_array->list_type()->value_field(),
list_array->values());
const auto& list_array = checked_cast<const ListArray&>(*unconverted);

if (null_bitmap_chunks_.size() <= static_cast<size_t>(block_index)) {
null_bitmap_chunks_.resize(static_cast<size_t>(block_index) + 1, nullptr);
offset_chunks_.resize(null_bitmap_chunks_.size(), nullptr);
}
null_bitmap_chunks_[block_index] = unconverted->null_bitmap();
offset_chunks_[block_index] = list_array.value_offsets();

value_builder_->Insert(block_index, list_array.list_type()->value_field(),
list_array.values());
}

Status Finish(std::shared_ptr<ChunkedArray>* out) override {
Expand Down Expand Up @@ -305,17 +307,17 @@ class ChunkedStructArrayBuilder : public ChunkedArrayBuilder {
return;
}

auto struct_array = std::static_pointer_cast<StructArray>(unconverted);
const auto& struct_array = checked_cast<const StructArray&>(*unconverted);
if (promotion_graph_ == nullptr) {
// If unexpected fields are ignored or result in an error then all parsers will emit
// columns exclusively in the ordering specified in ParseOptions::explicit_schema,
// so child_builders_ is immutable and no associative lookup is necessary.
for (int i = 0; i < unconverted->num_fields(); ++i) {
child_builders_[i]->Insert(block_index, unconverted->type()->field(i),
struct_array->field(i));
struct_array.field(i));
}
} else {
auto st = InsertChildren(block_index, struct_array.get());
auto st = InsertChildren(block_index, struct_array);
if (!st.ok()) {
return task_group_->Append([st] { return st; });
}
Expand Down Expand Up @@ -383,10 +385,10 @@ class ChunkedStructArrayBuilder : public ChunkedArrayBuilder {
// Insert children associatively by name; the unconverted block may have unexpected or
// differently ordered fields
// call from Insert() only, with mutex_ locked
Status InsertChildren(int64_t block_index, const StructArray* unconverted) {
const auto& fields = unconverted->type()->fields();
Status InsertChildren(int64_t block_index, const StructArray& unconverted) {
const auto& fields = unconverted.type()->fields();

for (int i = 0; i < unconverted->num_fields(); ++i) {
for (int i = 0; i < unconverted.num_fields(); ++i) {
auto it = name_to_index_.find(fields[i]->name());

if (it == name_to_index_.end()) {
Expand All @@ -405,9 +407,9 @@ class ChunkedStructArrayBuilder : public ChunkedArrayBuilder {
child_builders_.emplace_back(std::move(child_builder));
}

auto unconverted_field = unconverted->type()->field(i);
auto unconverted_field = unconverted.type()->field(i);
child_builders_[it->second]->Insert(block_index, unconverted_field,
unconverted->field(i));
unconverted.field(i));

child_absent_[block_index].resize(child_builders_.size(), true);
child_absent_[block_index][it->second] = false;
Expand Down Expand Up @@ -444,12 +446,12 @@ Status MakeChunkedArrayBuilder(const std::shared_ptr<TaskGroup>& task_group,
return Status::OK();
}
if (type->id() == Type::LIST) {
auto list_type = static_cast<const ListType*>(type.get());
const auto& list_type = checked_cast<const ListType&>(*type);
std::shared_ptr<ChunkedArrayBuilder> value_builder;
RETURN_NOT_OK(MakeChunkedArrayBuilder(task_group, pool, promotion_graph,
list_type->value_type(), &value_builder));
list_type.value_type(), &value_builder));
*out = std::make_shared<ChunkedListArrayBuilder>(
task_group, pool, std::move(value_builder), list_type->value_field());
task_group, pool, std::move(value_builder), list_type.value_field());
return Status::OK();
}
std::shared_ptr<Converter> converter;
Expand Down
95 changes: 56 additions & 39 deletions cpp/src/arrow/json/chunked_builder_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ using internal::checked_cast;
using internal::GetCpuThreadPool;
using internal::TaskGroup;

void AssertBuilding(const std::unique_ptr<ChunkedArrayBuilder>& builder,
void AssertBuilding(const std::shared_ptr<ChunkedArrayBuilder>& builder,
const std::vector<std::string>& chunks,
std::shared_ptr<ChunkedArray>* out) {
ArrayVector unconverted;
Expand All @@ -67,9 +67,8 @@ std::shared_ptr<ChunkedArray> ExtractField(const std::string& name,
for (auto& chunk : chunks) {
chunk = checked_cast<const StructArray&>(*chunk).GetFieldByName(name);
}
auto struct_type = static_cast<const StructType*>(columns.type().get());
return std::make_shared<ChunkedArray>(chunks,
struct_type->GetFieldByName(name)->type());
const auto& struct_type = checked_cast<const StructType&>(*columns.type());
return std::make_shared<ChunkedArray>(chunks, struct_type.GetFieldByName(name)->type());
}

void AssertFieldEqual(const std::vector<std::string>& path,
Expand All @@ -83,27 +82,9 @@ void AssertFieldEqual(const std::vector<std::string>& path,
AssertChunkedEqual(expected, *actual);
}

template <typename T>
std::string RowsOfOneColumn(string_view name, std::initializer_list<T> values,
decltype(std::to_string(*values.begin()))* = nullptr) {
std::stringstream ss;
for (auto value : values) {
ss << R"({")" << name << R"(":)" << std::to_string(value) << "}\n";
}
return ss.str();
}

std::string RowsOfOneColumn(string_view name, std::initializer_list<std::string> values) {
std::stringstream ss;
for (auto value : values) {
ss << R"({")" << name << R"(":)" << value << "}\n";
}
return ss.str();
}

TEST(ChunkedArrayBuilder, Empty) {
auto tg = TaskGroup::MakeSerial();
std::unique_ptr<ChunkedArrayBuilder> builder;
std::shared_ptr<ChunkedArrayBuilder> builder;
ASSERT_OK(MakeChunkedArrayBuilder(tg, default_memory_pool(), nullptr,
struct_({field("a", int32())}), &builder));

Expand All @@ -116,7 +97,7 @@ TEST(ChunkedArrayBuilder, Empty) {

TEST(ChunkedArrayBuilder, Basics) {
auto tg = TaskGroup::MakeSerial();
std::unique_ptr<ChunkedArrayBuilder> builder;
std::shared_ptr<ChunkedArrayBuilder> builder;
ASSERT_OK(MakeChunkedArrayBuilder(tg, default_memory_pool(), nullptr,
struct_({field("a", int32())}), &builder));

Expand All @@ -130,7 +111,7 @@ TEST(ChunkedArrayBuilder, Basics) {

TEST(ChunkedArrayBuilder, Insert) {
auto tg = TaskGroup::MakeSerial();
std::unique_ptr<ChunkedArrayBuilder> builder;
std::shared_ptr<ChunkedArrayBuilder> builder;
ASSERT_OK(MakeChunkedArrayBuilder(tg, default_memory_pool(), nullptr,
struct_({field("a", int32())}), &builder));

Expand All @@ -151,7 +132,7 @@ TEST(ChunkedArrayBuilder, Insert) {

TEST(ChunkedArrayBuilder, MultipleChunks) {
auto tg = TaskGroup::MakeSerial();
std::unique_ptr<ChunkedArrayBuilder> builder;
std::shared_ptr<ChunkedArrayBuilder> builder;
ASSERT_OK(MakeChunkedArrayBuilder(tg, default_memory_pool(), nullptr,
struct_({field("a", int32())}), &builder));

Expand All @@ -170,7 +151,7 @@ TEST(ChunkedArrayBuilder, MultipleChunks) {

TEST(ChunkedArrayBuilder, MultipleChunksParallel) {
auto tg = TaskGroup::MakeThreaded(GetCpuThreadPool());
std::unique_ptr<ChunkedArrayBuilder> builder;
std::shared_ptr<ChunkedArrayBuilder> builder;
ASSERT_OK(MakeChunkedArrayBuilder(tg, default_memory_pool(), nullptr,
struct_({field("a", int32())}), &builder));

Expand All @@ -194,7 +175,7 @@ TEST(ChunkedArrayBuilder, MultipleChunksParallel) {

TEST(InferringChunkedArrayBuilder, Empty) {
auto tg = TaskGroup::MakeSerial();
std::unique_ptr<ChunkedArrayBuilder> builder;
std::shared_ptr<ChunkedArrayBuilder> builder;
ASSERT_OK(MakeChunkedArrayBuilder(tg, default_memory_pool(), GetPromotionGraph(),
struct_({}), &builder));

Expand All @@ -207,7 +188,7 @@ TEST(InferringChunkedArrayBuilder, Empty) {

TEST(InferringChunkedArrayBuilder, SingleChunkNull) {
auto tg = TaskGroup::MakeSerial();
std::unique_ptr<ChunkedArrayBuilder> builder;
std::shared_ptr<ChunkedArrayBuilder> builder;
ASSERT_OK(MakeChunkedArrayBuilder(tg, default_memory_pool(), GetPromotionGraph(),
struct_({}), &builder));

Expand All @@ -224,7 +205,7 @@ TEST(InferringChunkedArrayBuilder, SingleChunkNull) {

TEST(InferringChunkedArrayBuilder, MultipleChunkNull) {
auto tg = TaskGroup::MakeSerial();
std::unique_ptr<ChunkedArrayBuilder> builder;
std::shared_ptr<ChunkedArrayBuilder> builder;
ASSERT_OK(MakeChunkedArrayBuilder(tg, default_memory_pool(), GetPromotionGraph(),
struct_({}), &builder));

Expand All @@ -244,7 +225,7 @@ TEST(InferringChunkedArrayBuilder, MultipleChunkNull) {

TEST(InferringChunkedArrayBuilder, SingleChunkInteger) {
auto tg = TaskGroup::MakeSerial();
std::unique_ptr<ChunkedArrayBuilder> builder;
std::shared_ptr<ChunkedArrayBuilder> builder;
ASSERT_OK(MakeChunkedArrayBuilder(tg, default_memory_pool(), GetPromotionGraph(),
struct_({}), &builder));

Expand All @@ -264,7 +245,7 @@ TEST(InferringChunkedArrayBuilder, SingleChunkInteger) {

TEST(InferringChunkedArrayBuilder, MultipleChunkInteger) {
auto tg = TaskGroup::MakeSerial();
std::unique_ptr<ChunkedArrayBuilder> builder;
std::shared_ptr<ChunkedArrayBuilder> builder;
ASSERT_OK(MakeChunkedArrayBuilder(tg, default_memory_pool(), GetPromotionGraph(),
struct_({}), &builder));

Expand All @@ -285,7 +266,7 @@ TEST(InferringChunkedArrayBuilder, MultipleChunkInteger) {

TEST(InferringChunkedArrayBuilder, SingleChunkDouble) {
auto tg = TaskGroup::MakeSerial();
std::unique_ptr<ChunkedArrayBuilder> builder;
std::shared_ptr<ChunkedArrayBuilder> builder;
ASSERT_OK(MakeChunkedArrayBuilder(tg, default_memory_pool(), GetPromotionGraph(),
struct_({}), &builder));

Expand All @@ -305,7 +286,7 @@ TEST(InferringChunkedArrayBuilder, SingleChunkDouble) {

TEST(InferringChunkedArrayBuilder, MultipleChunkDouble) {
auto tg = TaskGroup::MakeSerial();
std::unique_ptr<ChunkedArrayBuilder> builder;
std::shared_ptr<ChunkedArrayBuilder> builder;
ASSERT_OK(MakeChunkedArrayBuilder(tg, default_memory_pool(), GetPromotionGraph(),
struct_({}), &builder));

Expand All @@ -327,7 +308,7 @@ TEST(InferringChunkedArrayBuilder, MultipleChunkDouble) {

TEST(InferringChunkedArrayBuilder, SingleChunkTimestamp) {
auto tg = TaskGroup::MakeSerial();
std::unique_ptr<ChunkedArrayBuilder> builder;
std::shared_ptr<ChunkedArrayBuilder> builder;
ASSERT_OK(MakeChunkedArrayBuilder(tg, default_memory_pool(), GetPromotionGraph(),
struct_({}), &builder));

Expand All @@ -348,7 +329,7 @@ TEST(InferringChunkedArrayBuilder, SingleChunkTimestamp) {

TEST(InferringChunkedArrayBuilder, MultipleChunkTimestamp) {
auto tg = TaskGroup::MakeSerial();
std::unique_ptr<ChunkedArrayBuilder> builder;
std::shared_ptr<ChunkedArrayBuilder> builder;
ASSERT_OK(MakeChunkedArrayBuilder(tg, default_memory_pool(), GetPromotionGraph(),
struct_({}), &builder));

Expand All @@ -371,7 +352,7 @@ TEST(InferringChunkedArrayBuilder, MultipleChunkTimestamp) {

TEST(InferringChunkedArrayBuilder, SingleChunkString) {
auto tg = TaskGroup::MakeSerial();
std::unique_ptr<ChunkedArrayBuilder> builder;
std::shared_ptr<ChunkedArrayBuilder> builder;
ASSERT_OK(MakeChunkedArrayBuilder(tg, default_memory_pool(), GetPromotionGraph(),
struct_({}), &builder));

Expand All @@ -392,7 +373,7 @@ TEST(InferringChunkedArrayBuilder, SingleChunkString) {

TEST(InferringChunkedArrayBuilder, MultipleChunkString) {
auto tg = TaskGroup::MakeSerial();
std::unique_ptr<ChunkedArrayBuilder> builder;
std::shared_ptr<ChunkedArrayBuilder> builder;
ASSERT_OK(MakeChunkedArrayBuilder(tg, default_memory_pool(), GetPromotionGraph(),
struct_({}), &builder));

Expand All @@ -415,7 +396,7 @@ TEST(InferringChunkedArrayBuilder, MultipleChunkString) {

TEST(InferringChunkedArrayBuilder, MultipleChunkIntegerParallel) {
auto tg = TaskGroup::MakeThreaded(GetCpuThreadPool());
std::unique_ptr<ChunkedArrayBuilder> builder;
std::shared_ptr<ChunkedArrayBuilder> builder;
ASSERT_OK(MakeChunkedArrayBuilder(tg, default_memory_pool(), GetPromotionGraph(),
struct_({}), &builder));

Expand All @@ -433,5 +414,41 @@ TEST(InferringChunkedArrayBuilder, MultipleChunkIntegerParallel) {
AssertFieldEqual({"a"}, actual, *expected);
}

TEST(InferringChunkedArrayBuilder, SingleChunkList) {
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ChunkedArrayBuilder> builder;
ASSERT_OK(MakeChunkedArrayBuilder(tg, default_memory_pool(), GetPromotionGraph(),
struct_({}), &builder));

std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder,
{
std::string("{}\n") + "{\"a\": []}\n" + "{\"a\": [1, 2]}\n",
},
&actual);

auto expected = ChunkedArrayFromJSON(list(int64()), {"[null, [], [1, 2]]"});
AssertFieldEqual({"a"}, actual, *expected);
}

TEST(InferringChunkedArrayBuilder, MultipleChunkList) {
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ChunkedArrayBuilder> builder;
ASSERT_OK(MakeChunkedArrayBuilder(tg, default_memory_pool(), GetPromotionGraph(),
struct_({}), &builder));

std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder,
{
"{}\n",
"{\"a\": []}\n",
"{\"a\": [1, 2]}\n",
},
&actual);

auto expected = ChunkedArrayFromJSON(list(int64()), {"[null]", "[[]]", "[[1, 2]]"});
AssertFieldEqual({"a"}, actual, *expected);
}

} // namespace json
} // namespace arrow
12 changes: 6 additions & 6 deletions cpp/src/arrow/json/converter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,17 @@ namespace {

const DictionaryArray& GetDictionaryArray(const std::shared_ptr<Array>& in) {
DCHECK_EQ(in->type_id(), Type::DICTIONARY);
auto dict_type = static_cast<const DictionaryType*>(in->type().get());
auto dict_type = checked_cast<const DictionaryType*>(in->type().get());
DCHECK_EQ(dict_type->index_type()->id(), Type::INT32);
DCHECK_EQ(dict_type->value_type()->id(), Type::STRING);
return static_cast<const DictionaryArray&>(*in);
return checked_cast<const DictionaryArray&>(*in);
}

template <typename ValidVisitor, typename NullVisitor>
Status VisitDictionaryEntries(const DictionaryArray& dict_array,
ValidVisitor&& visit_valid, NullVisitor&& visit_null) {
const StringArray& dict = static_cast<const StringArray&>(*dict_array.dictionary());
const Int32Array& indices = static_cast<const Int32Array&>(*dict_array.indices());
const StringArray& dict = checked_cast<const StringArray&>(*dict_array.dictionary());
const Int32Array& indices = checked_cast<const Int32Array&>(*dict_array.indices());
for (int64_t i = 0; i < indices.length(); ++i) {
if (indices.IsValid(i)) {
RETURN_NOT_OK(visit_valid(dict.GetView(indices.GetView(i))));
Expand Down Expand Up @@ -281,8 +281,8 @@ const PromotionGraph* GetPromotionGraph() {
return timestamp(TimeUnit::SECOND);

case Kind::kArray: {
auto type = static_cast<const ListType*>(unexpected_field->type().get());
auto value_field = type->value_field();
const auto& type = checked_cast<const ListType&>(*unexpected_field->type());
auto value_field = type.value_field();
return list(value_field->WithType(Infer(value_field)));
}
case Kind::kObject: {
Expand Down
Loading