Skip to content

Commit d290538

Browse files
suquarkpcmoritz
authored andcommitted
ARROW-3587: [Python] Efficient serialization for Arrow Objects (array, table, tensor, etc)
This PR enables efficient serialization for Arrow Objects (array, table, tensor, record batch). Author: suquark <zsy9509@mail.ustc.edu.cn> Closes #2832 from suquark/serialization and squashes the following commits: 5a2d2c6 <suquark> Fix the outdated test. 4824c57 <suquark> Add serialization hooks for pyarrow object
1 parent 8e9cb87 commit d290538

File tree

9 files changed

+200
-24
lines changed

9 files changed

+200
-24
lines changed

cpp/src/arrow/python/deserialize.cc

Lines changed: 48 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,9 @@ Status DeserializeDict(PyObject* context, const Array& array, int64_t start_idx,
9696
return Status::OK();
9797
}
9898

99-
Status DeserializeArray(const Array& array, int64_t offset, PyObject* base,
100-
const SerializedPyObject& blobs, PyObject** out) {
101-
int32_t index = checked_cast<const Int32Array&>(array).Value(offset);
102-
RETURN_NOT_OK(py::TensorToNdarray(blobs.tensors[index], base, out));
99+
Status DeserializeArray(int32_t index, PyObject* base, const SerializedPyObject& blobs,
100+
PyObject** out) {
101+
RETURN_NOT_OK(py::TensorToNdarray(blobs.ndarrays[index], base, out));
103102
// Mark the array as immutable
104103
OwnedRef flags(PyObject_GetAttrString(*out, "flags"));
105104
DCHECK(flags.obj() != NULL) << "Could not mark Numpy array immutable";
@@ -178,11 +177,16 @@ Status GetValue(PyObject* context, const UnionArray& parent, const Array& arr,
178177
default: {
179178
const std::string& child_name = parent.type()->child(type)->name();
180179
if (child_name == "tensor") {
181-
return DeserializeArray(arr, index, base, blobs, result);
180+
int32_t ref = checked_cast<const Int32Array&>(arr).Value(index);
181+
*result = wrap_tensor(blobs.tensors[ref]);
182+
return Status::OK();
182183
} else if (child_name == "buffer") {
183184
int32_t ref = checked_cast<const Int32Array&>(arr).Value(index);
184185
*result = wrap_buffer(blobs.buffers[ref]);
185186
return Status::OK();
187+
} else if (child_name == "ndarray") {
188+
int32_t ref = checked_cast<const Int32Array&>(arr).Value(index);
189+
return DeserializeArray(ref, base, blobs, result);
186190
} else {
187191
DCHECK(false) << "union tag " << type << " with child name '" << child_name
188192
<< "' not recognized";
@@ -256,13 +260,19 @@ Status DeserializeSet(PyObject* context, const Array& array, int64_t start_idx,
256260
Status ReadSerializedObject(io::RandomAccessFile* src, SerializedPyObject* out) {
257261
int64_t bytes_read;
258262
int32_t num_tensors;
263+
int32_t num_ndarrays;
259264
int32_t num_buffers;
265+
260266
// Read number of tensors
261267
RETURN_NOT_OK(
262268
src->Read(sizeof(int32_t), &bytes_read, reinterpret_cast<uint8_t*>(&num_tensors)));
269+
RETURN_NOT_OK(
270+
src->Read(sizeof(int32_t), &bytes_read, reinterpret_cast<uint8_t*>(&num_ndarrays)));
263271
RETURN_NOT_OK(
264272
src->Read(sizeof(int32_t), &bytes_read, reinterpret_cast<uint8_t*>(&num_buffers)));
265273

274+
// Align stream to 8-byte offset
275+
RETURN_NOT_OK(ipc::AlignStream(src, ipc::kArrowIpcAlignment));
266276
std::shared_ptr<RecordBatchReader> reader;
267277
RETURN_NOT_OK(ipc::RecordBatchStreamReader::Open(src, &reader));
268278
RETURN_NOT_OK(reader->ReadNext(&out->batch));
@@ -280,6 +290,13 @@ Status ReadSerializedObject(io::RandomAccessFile* src, SerializedPyObject* out)
280290
out->tensors.push_back(tensor);
281291
}
282292

293+
for (int i = 0; i < num_ndarrays; ++i) {
294+
std::shared_ptr<Tensor> ndarray;
295+
RETURN_NOT_OK(ipc::ReadTensor(src, &ndarray));
296+
RETURN_NOT_OK(ipc::AlignStream(src, ipc::kTensorAlignment));
297+
out->ndarrays.push_back(ndarray);
298+
}
299+
283300
int64_t offset = -1;
284301
RETURN_NOT_OK(src->Tell(&offset));
285302
for (int i = 0; i < num_buffers; ++i) {
@@ -305,21 +322,14 @@ Status DeserializeObject(PyObject* context, const SerializedPyObject& obj, PyObj
305322
obj, out);
306323
}
307324

308-
Status DeserializeTensor(const SerializedPyObject& object, std::shared_ptr<Tensor>* out) {
309-
if (object.tensors.size() != 1) {
310-
return Status::Invalid("Object is not a Tensor");
311-
}
312-
*out = object.tensors[0];
313-
return Status::OK();
314-
}
315-
316-
Status GetSerializedFromComponents(int num_tensors, int num_buffers, PyObject* data,
317-
SerializedPyObject* out) {
325+
Status GetSerializedFromComponents(int num_tensors, int num_ndarrays, int num_buffers,
326+
PyObject* data, SerializedPyObject* out) {
318327
PyAcquireGIL gil;
319328
const Py_ssize_t data_length = PyList_Size(data);
320329
RETURN_IF_PYERROR();
321330

322-
const Py_ssize_t expected_data_length = 1 + num_tensors * 2 + num_buffers;
331+
const Py_ssize_t expected_data_length =
332+
1 + num_tensors * 2 + num_ndarrays * 2 + num_buffers;
323333
if (data_length != expected_data_length) {
324334
return Status::Invalid("Invalid number of buffers in data");
325335
}
@@ -357,6 +367,20 @@ Status GetSerializedFromComponents(int num_tensors, int num_buffers, PyObject* d
357367
out->tensors.emplace_back(std::move(tensor));
358368
}
359369

370+
// Zero-copy reconstruct tensors for numpy ndarrays
371+
for (int i = 0; i < num_ndarrays; ++i) {
372+
std::shared_ptr<Buffer> metadata;
373+
std::shared_ptr<Buffer> body;
374+
std::shared_ptr<Tensor> tensor;
375+
RETURN_NOT_OK(GetBuffer(buffer_index++, &metadata));
376+
RETURN_NOT_OK(GetBuffer(buffer_index++, &body));
377+
378+
ipc::Message message(metadata, body);
379+
380+
RETURN_NOT_OK(ReadTensor(message, &tensor));
381+
out->ndarrays.emplace_back(std::move(tensor));
382+
}
383+
360384
// Unwrap and append buffers
361385
for (int i = 0; i < num_buffers; ++i) {
362386
std::shared_ptr<Buffer> buffer;
@@ -367,6 +391,14 @@ Status GetSerializedFromComponents(int num_tensors, int num_buffers, PyObject* d
367391
return Status::OK();
368392
}
369393

394+
Status DeserializeTensor(const SerializedPyObject& object, std::shared_ptr<Tensor>* out) {
395+
if (object.tensors.size() != 1) {
396+
return Status::Invalid("Object is not a Tensor");
397+
}
398+
*out = object.tensors[0];
399+
return Status::OK();
400+
}
401+
370402
Status ReadTensor(std::shared_ptr<Buffer> src, std::shared_ptr<Tensor>* out) {
371403
io::BufferReader reader(src);
372404
SerializedPyObject object;

cpp/src/arrow/python/deserialize.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,15 @@ Status ReadSerializedObject(io::RandomAccessFile* src, SerializedPyObject* out);
5050
/// SerializedPyObject::GetComponents.
5151
///
5252
/// \param[in] num_tensors number of tensors in the object
53+
/// \param[in] num_ndarrays number of numpy Ndarrays in the object
5354
/// \param[in] num_buffers number of buffers in the object
5455
/// \param[in] data a list containing pyarrow.Buffer instances. Must be 1 +
5556
/// num_tensors * 2 + num_buffers in length
5657
/// \param[out] out the reconstructed object
5758
/// \return Status
5859
ARROW_EXPORT
59-
Status GetSerializedFromComponents(int num_tensors, int num_buffers, PyObject* data,
60-
SerializedPyObject* out);
60+
Status GetSerializedFromComponents(int num_tensors, int num_ndarrays, int num_buffers,
61+
PyObject* data, SerializedPyObject* out);
6162

6263
/// \brief Reconstruct Python object from Arrow-serialized representation
6364
/// \param[in] context Serialization context which contains custom serialization

cpp/src/arrow/python/serialize.cc

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ class SequenceBuilder {
7474
doubles_(::arrow::float64(), pool),
7575
date64s_(::arrow::date64(), pool),
7676
tensor_indices_(::arrow::int32(), pool),
77+
ndarray_indices_(::arrow::int32(), pool),
7778
buffer_indices_(::arrow::int32(), pool),
7879
list_offsets_({0}),
7980
tuple_offsets_({0}),
@@ -160,6 +161,14 @@ class SequenceBuilder {
160161
return tensor_indices_.Append(tensor_index);
161162
}
162163

164+
/// Appending a numpy ndarray to the sequence
165+
///
166+
/// \param tensor_index Index of the tensor in the object.
167+
Status AppendNdarray(const int32_t ndarray_index) {
168+
RETURN_NOT_OK(Update(ndarray_indices_.length(), &ndarray_tag_));
169+
return ndarray_indices_.Append(ndarray_index);
170+
}
171+
163172
/// Appending a buffer to the sequence
164173
///
165174
/// \param buffer_index Indes of the buffer in the object.
@@ -265,6 +274,7 @@ class SequenceBuilder {
265274
RETURN_NOT_OK(AddElement(date64_tag_, &date64s_));
266275
RETURN_NOT_OK(AddElement(tensor_tag_, &tensor_indices_, "tensor"));
267276
RETURN_NOT_OK(AddElement(buffer_tag_, &buffer_indices_, "buffer"));
277+
RETURN_NOT_OK(AddElement(ndarray_tag_, &ndarray_indices_, "ndarray"));
268278

269279
RETURN_NOT_OK(AddSubsequence(list_tag_, list_data, list_offsets_, "list"));
270280
RETURN_NOT_OK(AddSubsequence(tuple_tag_, tuple_data, tuple_offsets_, "tuple"));
@@ -307,6 +317,7 @@ class SequenceBuilder {
307317
Date64Builder date64s_;
308318

309319
Int32Builder tensor_indices_;
320+
Int32Builder ndarray_indices_;
310321
Int32Builder buffer_indices_;
311322

312323
std::vector<int32_t> list_offsets_;
@@ -331,6 +342,7 @@ class SequenceBuilder {
331342

332343
int8_t tensor_tag_ = -1;
333344
int8_t buffer_tag_ = -1;
345+
int8_t ndarray_tag_ = -1;
334346
int8_t list_tag_ = -1;
335347
int8_t tuple_tag_ = -1;
336348
int8_t dict_tag_ = -1;
@@ -557,6 +569,11 @@ Status Append(PyObject* context, PyObject* elem, SequenceBuilder* builder,
557569
std::shared_ptr<Buffer> buffer;
558570
RETURN_NOT_OK(unwrap_buffer(elem, &buffer));
559571
blobs_out->buffers.push_back(buffer);
572+
} else if (is_tensor(elem)) {
573+
RETURN_NOT_OK(builder->AppendTensor(static_cast<int32_t>(blobs_out->tensors.size())));
574+
std::shared_ptr<Tensor> tensor;
575+
RETURN_NOT_OK(unwrap_tensor(elem, &tensor));
576+
blobs_out->tensors.push_back(tensor);
560577
} else {
561578
// Attempt to serialize the object using the custom callback.
562579
PyObject* serialized_object;
@@ -584,11 +601,11 @@ Status SerializeArray(PyObject* context, PyArrayObject* array, SequenceBuilder*
584601
case NPY_FLOAT:
585602
case NPY_DOUBLE: {
586603
RETURN_NOT_OK(
587-
builder->AppendTensor(static_cast<int32_t>(blobs_out->tensors.size())));
604+
builder->AppendNdarray(static_cast<int32_t>(blobs_out->ndarrays.size())));
588605
std::shared_ptr<Tensor> tensor;
589606
RETURN_NOT_OK(NdarrayToTensor(default_memory_pool(),
590607
reinterpret_cast<PyObject*>(array), &tensor));
591-
blobs_out->tensors.push_back(tensor);
608+
blobs_out->ndarrays.push_back(tensor);
592609
} break;
593610
default: {
594611
PyObject* serialized_object;
@@ -757,11 +774,17 @@ Status WriteTensorHeader(std::shared_ptr<DataType> dtype,
757774

758775
Status SerializedPyObject::WriteTo(io::OutputStream* dst) {
759776
int32_t num_tensors = static_cast<int32_t>(this->tensors.size());
777+
int32_t num_ndarrays = static_cast<int32_t>(this->ndarrays.size());
760778
int32_t num_buffers = static_cast<int32_t>(this->buffers.size());
761779
RETURN_NOT_OK(
762780
dst->Write(reinterpret_cast<const uint8_t*>(&num_tensors), sizeof(int32_t)));
781+
RETURN_NOT_OK(
782+
dst->Write(reinterpret_cast<const uint8_t*>(&num_ndarrays), sizeof(int32_t)));
763783
RETURN_NOT_OK(
764784
dst->Write(reinterpret_cast<const uint8_t*>(&num_buffers), sizeof(int32_t)));
785+
786+
// Align stream to 8-byte offset
787+
RETURN_NOT_OK(ipc::AlignStream(dst, ipc::kArrowIpcAlignment));
765788
RETURN_NOT_OK(ipc::WriteRecordBatchStream({this->batch}, dst));
766789

767790
// Align stream to 64-byte offset so tensor bodies are 64-byte aligned
@@ -774,6 +797,11 @@ Status SerializedPyObject::WriteTo(io::OutputStream* dst) {
774797
RETURN_NOT_OK(ipc::AlignStream(dst, ipc::kTensorAlignment));
775798
}
776799

800+
for (const auto& tensor : this->ndarrays) {
801+
RETURN_NOT_OK(ipc::WriteTensor(*tensor, dst, &metadata_length, &body_length));
802+
RETURN_NOT_OK(ipc::AlignStream(dst, ipc::kTensorAlignment));
803+
}
804+
777805
for (const auto& buffer : this->buffers) {
778806
int64_t size = buffer->size();
779807
RETURN_NOT_OK(dst->Write(reinterpret_cast<const uint8_t*>(&size), sizeof(int64_t)));
@@ -795,6 +823,8 @@ Status SerializedPyObject::GetComponents(MemoryPool* memory_pool, PyObject** out
795823
// quite esoteric
796824
PyDict_SetItemString(result.obj(), "num_tensors",
797825
PyLong_FromSize_t(this->tensors.size()));
826+
PyDict_SetItemString(result.obj(), "num_ndarrays",
827+
PyLong_FromSize_t(this->ndarrays.size()));
798828
PyDict_SetItemString(result.obj(), "num_buffers",
799829
PyLong_FromSize_t(this->buffers.size()));
800830
PyDict_SetItemString(result.obj(), "data", buffers);
@@ -835,6 +865,14 @@ Status SerializedPyObject::GetComponents(MemoryPool* memory_pool, PyObject** out
835865
RETURN_NOT_OK(PushBuffer(message->body()));
836866
}
837867

868+
// For each ndarray, get a metadata buffer and a buffer for the body
869+
for (const auto& ndarray : this->ndarrays) {
870+
std::unique_ptr<ipc::Message> message;
871+
RETURN_NOT_OK(ipc::GetTensorMessage(*ndarray, memory_pool, &message));
872+
RETURN_NOT_OK(PushBuffer(message->metadata()));
873+
RETURN_NOT_OK(PushBuffer(message->body()));
874+
}
875+
838876
for (const auto& buf : this->buffers) {
839877
RETURN_NOT_OK(PushBuffer(buf));
840878
}

cpp/src/arrow/python/serialize.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ namespace py {
5050
struct ARROW_EXPORT SerializedPyObject {
5151
std::shared_ptr<RecordBatch> batch;
5252
std::vector<std::shared_ptr<Tensor>> tensors;
53+
std::vector<std::shared_ptr<Tensor>> ndarrays;
5354
std::vector<std::shared_ptr<Buffer>> buffers;
5455

5556
/// \brief Write serialized Python object to OutputStream

python/pyarrow/includes/libarrow.pxd

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1121,7 +1121,8 @@ cdef extern from "arrow/python/api.h" namespace 'arrow::py' nogil:
11211121
CStatus ReadSerializedObject(RandomAccessFile* src,
11221122
CSerializedPyObject* out)
11231123

1124-
CStatus GetSerializedFromComponents(int num_tensors, int num_buffers,
1124+
CStatus GetSerializedFromComponents(int num_tensors, int num_ndarrays,
1125+
int num_buffers,
11251126
object buffers,
11261127
CSerializedPyObject* out)
11271128

python/pyarrow/serialization.pxi

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,12 +281,14 @@ cdef class SerializedPyObject:
281281
"""
282282
cdef:
283283
int num_tensors = components['num_tensors']
284+
int num_ndarrays = components['num_ndarrays']
284285
int num_buffers = components['num_buffers']
285286
list buffers = components['data']
286287
SerializedPyObject result = SerializedPyObject()
287288

288289
with nogil:
289-
check_status(GetSerializedFromComponents(num_tensors, num_buffers,
290+
check_status(GetSerializedFromComponents(num_tensors, num_ndarrays,
291+
num_buffers,
290292
buffers, &result.data))
291293

292294
return result

python/pyarrow/serialization.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import numpy as np
2323

24+
import pyarrow
2425
from pyarrow.compat import builtin_pickle
2526
from pyarrow.lib import (SerializationContext, _default_serialization_context,
2627
py_buffer)
@@ -55,6 +56,57 @@ def _deserialize_numpy_array_list(data):
5556
return np.array(data[0], dtype=np.dtype(data[1]))
5657

5758

59+
# ----------------------------------------------------------------------
60+
# pyarrow.RecordBatch-specific serialization matters
61+
62+
def _serialize_pyarrow_recordbatch(batch):
63+
output_stream = pyarrow.BufferOutputStream()
64+
writer = pyarrow.RecordBatchStreamWriter(output_stream,
65+
schema=batch.schema)
66+
writer.write_batch(batch)
67+
writer.close()
68+
return output_stream.getvalue() # This will also close the stream.
69+
70+
71+
def _deserialize_pyarrow_recordbatch(buf):
72+
reader = pyarrow.RecordBatchStreamReader(buf)
73+
batch = reader.read_next_batch()
74+
return batch
75+
76+
77+
# ----------------------------------------------------------------------
78+
# pyarrow.Array-specific serialization matters
79+
80+
def _serialize_pyarrow_array(array):
81+
# TODO(suquark): implement more effcient array serialization.
82+
batch = pyarrow.RecordBatch.from_arrays([array], [''])
83+
return _serialize_pyarrow_recordbatch(batch)
84+
85+
86+
def _deserialize_pyarrow_array(buf):
87+
# TODO(suquark): implement more effcient array deserialization.
88+
batch = _deserialize_pyarrow_recordbatch(buf)
89+
return batch.columns[0]
90+
91+
92+
# ----------------------------------------------------------------------
93+
# pyarrow.Table-specific serialization matters
94+
95+
def _serialize_pyarrow_table(table):
96+
output_stream = pyarrow.BufferOutputStream()
97+
writer = pyarrow.RecordBatchStreamWriter(output_stream,
98+
schema=table.schema)
99+
writer.write_table(table)
100+
writer.close()
101+
return output_stream.getvalue() # This will also close the stream.
102+
103+
104+
def _deserialize_pyarrow_table(buf):
105+
reader = pyarrow.RecordBatchStreamReader(buf)
106+
table = reader.read_all()
107+
return table
108+
109+
58110
def _pickle_to_buffer(x):
59111
pickled = builtin_pickle.dumps(x, protocol=builtin_pickle.HIGHEST_PROTOCOL)
60112
return py_buffer(pickled)
@@ -205,6 +257,21 @@ def _deserialize_default_dict(data):
205257
custom_serializer=_serialize_numpy_array_list,
206258
custom_deserializer=_deserialize_numpy_array_list)
207259

260+
serialization_context.register_type(
261+
pyarrow.Array, 'pyarrow.Array',
262+
custom_serializer=_serialize_pyarrow_array,
263+
custom_deserializer=_deserialize_pyarrow_array)
264+
265+
serialization_context.register_type(
266+
pyarrow.RecordBatch, 'pyarrow.RecordBatch',
267+
custom_serializer=_serialize_pyarrow_recordbatch,
268+
custom_deserializer=_deserialize_pyarrow_recordbatch)
269+
270+
serialization_context.register_type(
271+
pyarrow.Table, 'pyarrow.Table',
272+
custom_serializer=_serialize_pyarrow_table,
273+
custom_deserializer=_deserialize_pyarrow_table)
274+
208275
_register_custom_pandas_handlers(serialization_context)
209276

210277

0 commit comments

Comments
 (0)