Skip to content

Commit 3d9fcc2

Browse files
committed
Complete round trip json file test with multiple record batches
Change-Id: I56d3222db251c99af5c8a3536909e45b429c8150
1 parent 2753449 commit 3d9fcc2

File tree

4 files changed

+120
-32
lines changed

4 files changed

+120
-32
lines changed

cpp/src/arrow/ipc/ipc-json-test.cc

Lines changed: 68 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "arrow/array.h"
2929
#include "arrow/ipc/json-internal.h"
3030
#include "arrow/ipc/json.h"
31+
#include "arrow/table.h"
3132
#include "arrow/test-util.h"
3233
#include "arrow/type.h"
3334
#include "arrow/type_traits.h"
@@ -96,11 +97,12 @@ void CheckPrimitive(const std::shared_ptr<DataType>& type,
9697
}
9798

9899
template <typename TYPE, typename C_TYPE>
99-
void MakeArray(const std::shared_ptr<DataType>& type,
100-
const std::vector<bool>& is_valid, const std::vector<C_TYPE>& values,
101-
std::shared_ptr<Array>* out) {
102-
std::shared_ptr<Buffer> values_buffer = test::GetBufferFromVector(values);
100+
void MakeArray(const std::shared_ptr<DataType>& type, const std::vector<bool>& is_valid,
101+
const std::vector<C_TYPE>& values, std::shared_ptr<Array>* out) {
102+
std::shared_ptr<Buffer> values_buffer;
103103
std::shared_ptr<Buffer> values_bitmap;
104+
105+
ASSERT_OK(test::CopyBufferFromVector(values, &values_buffer));
104106
ASSERT_OK(test::GetBitmapFromBoolVector(is_valid, &values_bitmap));
105107

106108
using ArrayType = typename TypeTraits<TYPE>::ArrayType;
@@ -193,42 +195,84 @@ TEST(TestJsonArrayWriter, NestedTypes) {
193195
TestArrayRoundTrip(struct_array);
194196
}
195197

196-
TEST(TestJsonFileReadWrite, BasicRoundTrip) {
197-
auto v1_type = int8();
198-
auto v2_type = int32();
199-
auto v3_type = utf8();
198+
// Data generation for test case below
199+
void MakeBatchArrays(const std::shared_ptr<Schema>& schema, const int num_rows,
200+
std::vector<std::shared_ptr<Array>>* arrays) {
201+
std::vector<bool> is_valid;
202+
test::random_is_valid(num_rows, 0.25, &is_valid);
203+
204+
std::vector<int8_t> v1_values;
205+
std::vector<int32_t> v2_values;
200206

201-
std::vector<bool> is_valid = {true, false, true, true, false, true, true};
207+
test::randint<int8_t>(num_rows, 0, 100, &v1_values);
208+
test::randint<int32_t>(num_rows, 0, 100, &v2_values);
202209

203-
std::vector<int8_t> v1_values = {0, 1, 2, 3, 4, 5, 6};
204210
std::shared_ptr<Array> v1;
205-
MakeArray<Int8Type, int8_t>(v1_type, is_valid, v1_values, &v1);
211+
MakeArray<Int8Type, int8_t>(schema->field(0)->type, is_valid, v1_values, &v1);
206212

207-
std::vector<int32_t> v2_values = {0, 1, 2, 3, 4, 5, 6};
208213
std::shared_ptr<Array> v2;
209-
MakeArray<Int32Type, int32_t>(v2_type, is_valid, v2_values, &v2);
210-
211-
std::vector<std::string> v3_values = {"foo", "bar", "", "", "", "baz", "qux"};
214+
MakeArray<Int32Type, int32_t>(schema->field(1)->type, is_valid, v2_values, &v2);
215+
216+
static const int kBufferSize = 10;
217+
static uint8_t buffer[kBufferSize];
218+
static uint32_t seed = 0;
219+
StringBuilder string_builder(default_memory_pool(), utf8());
220+
for (int i = 0; i < num_rows; ++i) {
221+
if (!is_valid[i]) {
222+
string_builder.AppendNull();
223+
} else {
224+
test::random_ascii(kBufferSize, seed++, buffer);
225+
string_builder.Append(buffer, kBufferSize);
226+
}
227+
}
212228
std::shared_ptr<Array> v3;
213-
MakeArray<StringType, std::string>(v3_type, is_valid, v3_values, &v3);
229+
ASSERT_OK(string_builder.Finish(&v3));
214230

215-
std::shared_ptr<Schema> schema({field("f1", v1_type), field("f2", v2_type),
216-
field("f3", v3_type)});
231+
arrays->emplace_back(v1);
232+
arrays->emplace_back(v2);
233+
arrays->emplace_back(v3);
234+
}
217235

218-
std::vector<std::shared_ptr<Array>> arrays = {v1, v2, v3}
236+
TEST(TestJsonFileReadWrite, BasicRoundTrip) {
237+
auto v1_type = int8();
238+
auto v2_type = int32();
239+
auto v3_type = utf8();
240+
241+
std::shared_ptr<Schema> schema(
242+
new Schema({field("f1", v1_type), field("f2", v2_type), field("f3", v3_type)}));
219243

220244
std::unique_ptr<JsonWriter> writer;
221245
ASSERT_OK(JsonWriter::Open(schema, &writer));
222246

223247
const int nbatches = 3;
224-
const int32_t num_rows = static_cast<int32_t>(v1_values.size());
225-
248+
std::vector<std::shared_ptr<RecordBatch>> batches;
226249
for (int i = 0; i < nbatches; ++i) {
227-
ASSERT_OK(writer_->WriteRecordBatch(arrays, num_rows));
250+
int32_t num_rows = 5 + i * 5;
251+
std::vector<std::shared_ptr<Array>> arrays;
252+
253+
MakeBatchArrays(schema, num_rows, &arrays);
254+
batches.emplace_back(std::make_shared<RecordBatch>(schema, num_rows, arrays));
255+
ASSERT_OK(writer->WriteRecordBatch(arrays, num_rows));
228256
}
229257

230-
std::shared_ptr<Buffer> data;
231-
ASSERT_OK(writer->Finish(&data));
258+
std::string result;
259+
ASSERT_OK(writer->Finish(&result));
260+
261+
std::unique_ptr<JsonReader> reader;
262+
263+
auto buffer = std::make_shared<Buffer>(
264+
reinterpret_cast<const uint8_t*>(result.c_str()), static_cast<int>(result.size()));
265+
266+
ASSERT_OK(JsonReader::Open(buffer, &reader));
267+
ASSERT_TRUE(reader->schema()->Equals(*schema.get()));
268+
269+
ASSERT_EQ(nbatches, reader->num_record_batches());
270+
271+
for (int i = 0; i < nbatches; ++i) {
272+
std::shared_ptr<RecordBatch> batch;
273+
ASSERT_OK(reader->GetRecordBatch(i, &batch));
274+
ASSERT_TRUE(batch->Equals(*batches[i].get()));
275+
}
232276
}
233277

234278
} // namespace ipc

cpp/src/arrow/ipc/json.cc

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,11 @@ class JsonWriter::JsonWriterImpl {
5454
return Status::OK();
5555
}
5656

57-
Status Finish() {
57+
Status Finish(std::string* result) {
5858
writer_->EndArray(); // Record batches
5959
writer_->EndObject();
60+
61+
*result = string_buffer_.GetString();
6062
return Status::OK();
6163
}
6264

@@ -75,7 +77,7 @@ class JsonWriter::JsonWriterImpl {
7577
const std::shared_ptr<Array>& column = columns[i];
7678

7779
DCHECK_EQ(num_rows, column->length())
78-
<< "Array length did not match record batch length";
80+
<< "Array length did not match record batch length";
7981

8082
RETURN_NOT_OK(
8183
WriteJsonArray(schema_->field(i)->name, *column.get(), writer_.get()));
@@ -97,14 +99,16 @@ JsonWriter::JsonWriter(const std::shared_ptr<Schema>& schema) {
9799
impl_.reset(new JsonWriterImpl(schema));
98100
}
99101

102+
JsonWriter::~JsonWriter() {}
103+
100104
Status JsonWriter::Open(
101105
const std::shared_ptr<Schema>& schema, std::unique_ptr<JsonWriter>* writer) {
102106
*writer = std::unique_ptr<JsonWriter>(new JsonWriter(schema));
103107
return (*writer)->impl_->Start();
104108
}
105109

106-
Status JsonWriter::Finish(std::shared_ptr<Buffer>* out) {
107-
return impl_->Finish();
110+
Status JsonWriter::Finish(std::string* result) {
111+
return impl_->Finish(result);
108112
}
109113

110114
Status JsonWriter::WriteRecordBatch(
@@ -137,7 +141,7 @@ class JsonReader::JsonReaderImpl {
137141
}
138142

139143
Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) const {
140-
DCHECK_GT(i, 0) << "i out of bounds";
144+
DCHECK_GE(i, 0) << "i out of bounds";
141145
DCHECK_LT(i, record_batches_->GetArray().Size()) << "i out of bounds";
142146

143147
const auto& batch_val = record_batches_->GetArray()[i];
@@ -183,6 +187,8 @@ JsonReader::JsonReader(MemoryPool* pool, const std::shared_ptr<Buffer>& data) {
183187
impl_.reset(new JsonReaderImpl(pool, data));
184188
}
185189

190+
JsonReader::~JsonReader() {}
191+
186192
Status JsonReader::Open(
187193
const std::shared_ptr<Buffer>& data, std::unique_ptr<JsonReader>* reader) {
188194
return Open(default_memory_pool(), data, reader);
@@ -202,5 +208,9 @@ int JsonReader::num_record_batches() const {
202208
return impl_->num_record_batches();
203209
}
204210

211+
Status JsonReader::GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) const {
212+
return impl_->GetRecordBatch(i, batch);
213+
}
214+
205215
} // namespace ipc
206216
} // namespace arrow

cpp/src/arrow/ipc/json.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ namespace ipc {
3838

3939
class ARROW_EXPORT JsonWriter {
4040
public:
41+
~JsonWriter();
42+
4143
static Status Open(
4244
const std::shared_ptr<Schema>& schema, std::unique_ptr<JsonWriter>* out);
4345

@@ -46,7 +48,7 @@ class ARROW_EXPORT JsonWriter {
4648
Status WriteRecordBatch(
4749
const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows);
4850

49-
Status Finish(std::shared_ptr<Buffer>* result);
51+
Status Finish(std::string* result);
5052

5153
private:
5254
explicit JsonWriter(const std::shared_ptr<Schema>& schema);
@@ -59,6 +61,8 @@ class ARROW_EXPORT JsonWriter {
5961
// TODO(wesm): Read from a file stream rather than an in-memory buffer
6062
class ARROW_EXPORT JsonReader {
6163
public:
64+
~JsonReader();
65+
6266
static Status Open(MemoryPool* pool, const std::shared_ptr<Buffer>& data,
6367
std::unique_ptr<JsonReader>* reader);
6468

@@ -71,7 +75,7 @@ class ARROW_EXPORT JsonReader {
7175
int num_record_batches() const;
7276

7377
// Read a record batch from the file
74-
Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch);
78+
Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) const;
7579

7680
private:
7781
JsonReader(MemoryPool* pool, const std::shared_ptr<Buffer>& data);

cpp/src/arrow/test-util.h

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,19 @@ std::shared_ptr<Buffer> GetBufferFromVector(const std::vector<T>& values) {
108108
reinterpret_cast<const uint8_t*>(values.data()), values.size() * sizeof(T));
109109
}
110110

111+
template <typename T>
112+
inline Status CopyBufferFromVector(
113+
const std::vector<T>& values, std::shared_ptr<Buffer>* result) {
114+
int64_t nbytes = static_cast<int>(values.size()) * sizeof(T);
115+
116+
auto buffer = std::make_shared<PoolBuffer>(default_memory_pool());
117+
RETURN_NOT_OK(buffer->Resize(nbytes));
118+
memcpy(buffer->mutable_data(), values.data(), nbytes);
119+
120+
*result = buffer;
121+
return Status::OK();
122+
}
123+
111124
static inline Status GetBitmapFromBoolVector(
112125
const std::vector<bool>& is_valid, std::shared_ptr<Buffer>* result) {
113126
int length = static_cast<int>(is_valid.size());
@@ -126,13 +139,21 @@ static inline Status GetBitmapFromBoolVector(
126139

127140
// Sets approximately pct_null of the first n bytes in null_bytes to zero
128141
// and the rest to non-zero (true) values.
129-
void random_null_bytes(int64_t n, double pct_null, uint8_t* null_bytes) {
142+
static inline void random_null_bytes(int64_t n, double pct_null, uint8_t* null_bytes) {
130143
Random rng(random_seed());
131144
for (int i = 0; i < n; ++i) {
132145
null_bytes[i] = rng.NextDoubleFraction() > pct_null;
133146
}
134147
}
135148

149+
static inline void random_is_valid(
150+
int64_t n, double pct_null, std::vector<bool>* is_valid) {
151+
Random rng(random_seed());
152+
for (int i = 0; i < n; ++i) {
153+
is_valid->push_back(rng.NextDoubleFraction() > pct_null);
154+
}
155+
}
156+
136157
static inline void random_bytes(int n, uint32_t seed, uint8_t* out) {
137158
std::mt19937 gen(seed);
138159
std::uniform_int_distribution<int> d(0, 255);
@@ -142,6 +163,15 @@ static inline void random_bytes(int n, uint32_t seed, uint8_t* out) {
142163
}
143164
}
144165

166+
static inline void random_ascii(int n, uint32_t seed, uint8_t* out) {
167+
std::mt19937 gen(seed);
168+
std::uniform_int_distribution<int> d(65, 122);
169+
170+
for (int i = 0; i < n; ++i) {
171+
out[i] = d(gen) & 0xFF;
172+
}
173+
}
174+
145175
template <typename T>
146176
void rand_uniform_int(int n, uint32_t seed, T min_value, T max_value, T* out) {
147177
DCHECK(out);

0 commit comments

Comments
 (0)