Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 48 additions & 16 deletions cpp/src/arrow/python/deserialize.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,9 @@ Status DeserializeDict(PyObject* context, const Array& array, int64_t start_idx,
return Status::OK();
}

Status DeserializeArray(const Array& array, int64_t offset, PyObject* base,
const SerializedPyObject& blobs, PyObject** out) {
int32_t index = checked_cast<const Int32Array&>(array).Value(offset);
RETURN_NOT_OK(py::TensorToNdarray(blobs.tensors[index], base, out));
Status DeserializeArray(int32_t index, PyObject* base, const SerializedPyObject& blobs,
PyObject** out) {
RETURN_NOT_OK(py::TensorToNdarray(blobs.ndarrays[index], base, out));
// Mark the array as immutable
OwnedRef flags(PyObject_GetAttrString(*out, "flags"));
DCHECK(flags.obj() != NULL) << "Could not mark Numpy array immutable";
Expand Down Expand Up @@ -178,11 +177,16 @@ Status GetValue(PyObject* context, const UnionArray& parent, const Array& arr,
default: {
const std::string& child_name = parent.type()->child(type)->name();
if (child_name == "tensor") {
return DeserializeArray(arr, index, base, blobs, result);
int32_t ref = checked_cast<const Int32Array&>(arr).Value(index);
*result = wrap_tensor(blobs.tensors[ref]);
return Status::OK();
} else if (child_name == "buffer") {
int32_t ref = checked_cast<const Int32Array&>(arr).Value(index);
*result = wrap_buffer(blobs.buffers[ref]);
return Status::OK();
} else if (child_name == "ndarray") {
int32_t ref = checked_cast<const Int32Array&>(arr).Value(index);
return DeserializeArray(ref, base, blobs, result);
} else {
DCHECK(false) << "union tag " << type << " with child name '" << child_name
<< "' not recognized";
Expand Down Expand Up @@ -256,13 +260,19 @@ Status DeserializeSet(PyObject* context, const Array& array, int64_t start_idx,
Status ReadSerializedObject(io::RandomAccessFile* src, SerializedPyObject* out) {
int64_t bytes_read;
int32_t num_tensors;
int32_t num_ndarrays;
int32_t num_buffers;

// Read number of tensors
RETURN_NOT_OK(
src->Read(sizeof(int32_t), &bytes_read, reinterpret_cast<uint8_t*>(&num_tensors)));
RETURN_NOT_OK(
src->Read(sizeof(int32_t), &bytes_read, reinterpret_cast<uint8_t*>(&num_ndarrays)));
RETURN_NOT_OK(
src->Read(sizeof(int32_t), &bytes_read, reinterpret_cast<uint8_t*>(&num_buffers)));

// Align stream to 8-byte offset
RETURN_NOT_OK(ipc::AlignStream(src, ipc::kArrowIpcAlignment));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to look more closely at this before this patch is merged. I spent a bunch of time in c9ac869 on this and so I want to make sure that stream alignment is happening at the highest level possible rather than leaking into lower-level implementation details

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wesm Let me know about your thoughts on this, it seems that handling the alignment here is analogous to how the alignment for Tensors is handled down below in correspondence with c9ac869, so this looks good to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should probably be changed to take InputStream* src as input and handle alignment one level higher. I'm going to check out this branch and take a look

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a closer look and we can leave as is (the extra 4 bytes at the start of the object made the need for the alignment)

std::shared_ptr<RecordBatchReader> reader;
RETURN_NOT_OK(ipc::RecordBatchStreamReader::Open(src, &reader));
RETURN_NOT_OK(reader->ReadNext(&out->batch));
Expand All @@ -280,6 +290,13 @@ Status ReadSerializedObject(io::RandomAccessFile* src, SerializedPyObject* out)
out->tensors.push_back(tensor);
}

for (int i = 0; i < num_ndarrays; ++i) {
std::shared_ptr<Tensor> ndarray;
RETURN_NOT_OK(ipc::ReadTensor(src, &ndarray));
RETURN_NOT_OK(ipc::AlignStream(src, ipc::kTensorAlignment));
out->ndarrays.push_back(ndarray);
}

int64_t offset = -1;
RETURN_NOT_OK(src->Tell(&offset));
for (int i = 0; i < num_buffers; ++i) {
Expand All @@ -305,21 +322,14 @@ Status DeserializeObject(PyObject* context, const SerializedPyObject& obj, PyObj
obj, out);
}

Status DeserializeTensor(const SerializedPyObject& object, std::shared_ptr<Tensor>* out) {
if (object.tensors.size() != 1) {
return Status::Invalid("Object is not a Tensor");
}
*out = object.tensors[0];
return Status::OK();
}

Status GetSerializedFromComponents(int num_tensors, int num_buffers, PyObject* data,
SerializedPyObject* out) {
Status GetSerializedFromComponents(int num_tensors, int num_ndarrays, int num_buffers,
PyObject* data, SerializedPyObject* out) {
PyAcquireGIL gil;
const Py_ssize_t data_length = PyList_Size(data);
RETURN_IF_PYERROR();

const Py_ssize_t expected_data_length = 1 + num_tensors * 2 + num_buffers;
const Py_ssize_t expected_data_length =
1 + num_tensors * 2 + num_ndarrays * 2 + num_buffers;
if (data_length != expected_data_length) {
return Status::Invalid("Invalid number of buffers in data");
}
Expand Down Expand Up @@ -357,6 +367,20 @@ Status GetSerializedFromComponents(int num_tensors, int num_buffers, PyObject* d
out->tensors.emplace_back(std::move(tensor));
}

// Zero-copy reconstruct tensors for numpy ndarrays
for (int i = 0; i < num_ndarrays; ++i) {
std::shared_ptr<Buffer> metadata;
std::shared_ptr<Buffer> body;
std::shared_ptr<Tensor> tensor;
RETURN_NOT_OK(GetBuffer(buffer_index++, &metadata));
RETURN_NOT_OK(GetBuffer(buffer_index++, &body));

ipc::Message message(metadata, body);

RETURN_NOT_OK(ReadTensor(message, &tensor));
out->ndarrays.emplace_back(std::move(tensor));
}

// Unwrap and append buffers
for (int i = 0; i < num_buffers; ++i) {
std::shared_ptr<Buffer> buffer;
Expand All @@ -367,6 +391,14 @@ Status GetSerializedFromComponents(int num_tensors, int num_buffers, PyObject* d
return Status::OK();
}

Status DeserializeTensor(const SerializedPyObject& object, std::shared_ptr<Tensor>* out) {
if (object.tensors.size() != 1) {
return Status::Invalid("Object is not a Tensor");
}
*out = object.tensors[0];
return Status::OK();
}

Status ReadTensor(std::shared_ptr<Buffer> src, std::shared_ptr<Tensor>* out) {
io::BufferReader reader(src);
SerializedPyObject object;
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/arrow/python/deserialize.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,15 @@ Status ReadSerializedObject(io::RandomAccessFile* src, SerializedPyObject* out);
/// SerializedPyObject::GetComponents.
///
/// \param[in] num_tensors number of tensors in the object
/// \param[in] num_ndarrays number of numpy Ndarrays in the object
/// \param[in] num_buffers number of buffers in the object
/// \param[in] data a list containing pyarrow.Buffer instances. Must be 1 +
/// num_tensors * 2 + num_buffers in length
/// \param[out] out the reconstructed object
/// \return Status
ARROW_EXPORT
Status GetSerializedFromComponents(int num_tensors, int num_buffers, PyObject* data,
SerializedPyObject* out);
Status GetSerializedFromComponents(int num_tensors, int num_ndarrays, int num_buffers,
PyObject* data, SerializedPyObject* out);

/// \brief Reconstruct Python object from Arrow-serialized representation
/// \param[in] context Serialization context which contains custom serialization
Expand Down
42 changes: 40 additions & 2 deletions cpp/src/arrow/python/serialize.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class SequenceBuilder {
doubles_(::arrow::float64(), pool),
date64s_(::arrow::date64(), pool),
tensor_indices_(::arrow::int32(), pool),
ndarray_indices_(::arrow::int32(), pool),
buffer_indices_(::arrow::int32(), pool),
list_offsets_({0}),
tuple_offsets_({0}),
Expand Down Expand Up @@ -160,6 +161,14 @@ class SequenceBuilder {
return tensor_indices_.Append(tensor_index);
}

/// Appending a numpy ndarray to the sequence
///
/// \param tensor_index Index of the tensor in the object.
Status AppendNdarray(const int32_t ndarray_index) {
RETURN_NOT_OK(Update(ndarray_indices_.length(), &ndarray_tag_));
return ndarray_indices_.Append(ndarray_index);
}

/// Appending a buffer to the sequence
///
/// \param buffer_index Indes of the buffer in the object.
Expand Down Expand Up @@ -265,6 +274,7 @@ class SequenceBuilder {
RETURN_NOT_OK(AddElement(date64_tag_, &date64s_));
RETURN_NOT_OK(AddElement(tensor_tag_, &tensor_indices_, "tensor"));
RETURN_NOT_OK(AddElement(buffer_tag_, &buffer_indices_, "buffer"));
RETURN_NOT_OK(AddElement(ndarray_tag_, &ndarray_indices_, "ndarray"));

RETURN_NOT_OK(AddSubsequence(list_tag_, list_data, list_offsets_, "list"));
RETURN_NOT_OK(AddSubsequence(tuple_tag_, tuple_data, tuple_offsets_, "tuple"));
Expand Down Expand Up @@ -307,6 +317,7 @@ class SequenceBuilder {
Date64Builder date64s_;

Int32Builder tensor_indices_;
Int32Builder ndarray_indices_;
Int32Builder buffer_indices_;

std::vector<int32_t> list_offsets_;
Expand All @@ -331,6 +342,7 @@ class SequenceBuilder {

int8_t tensor_tag_ = -1;
int8_t buffer_tag_ = -1;
int8_t ndarray_tag_ = -1;
int8_t list_tag_ = -1;
int8_t tuple_tag_ = -1;
int8_t dict_tag_ = -1;
Expand Down Expand Up @@ -557,6 +569,11 @@ Status Append(PyObject* context, PyObject* elem, SequenceBuilder* builder,
std::shared_ptr<Buffer> buffer;
RETURN_NOT_OK(unwrap_buffer(elem, &buffer));
blobs_out->buffers.push_back(buffer);
} else if (is_tensor(elem)) {
RETURN_NOT_OK(builder->AppendTensor(static_cast<int32_t>(blobs_out->tensors.size())));
std::shared_ptr<Tensor> tensor;
RETURN_NOT_OK(unwrap_tensor(elem, &tensor));
blobs_out->tensors.push_back(tensor);
} else {
// Attempt to serialize the object using the custom callback.
PyObject* serialized_object;
Expand Down Expand Up @@ -584,11 +601,11 @@ Status SerializeArray(PyObject* context, PyArrayObject* array, SequenceBuilder*
case NPY_FLOAT:
case NPY_DOUBLE: {
RETURN_NOT_OK(
builder->AppendTensor(static_cast<int32_t>(blobs_out->tensors.size())));
builder->AppendNdarray(static_cast<int32_t>(blobs_out->ndarrays.size())));
std::shared_ptr<Tensor> tensor;
RETURN_NOT_OK(NdarrayToTensor(default_memory_pool(),
reinterpret_cast<PyObject*>(array), &tensor));
blobs_out->tensors.push_back(tensor);
blobs_out->ndarrays.push_back(tensor);
} break;
default: {
PyObject* serialized_object;
Expand Down Expand Up @@ -757,11 +774,17 @@ Status WriteTensorHeader(std::shared_ptr<DataType> dtype,

Status SerializedPyObject::WriteTo(io::OutputStream* dst) {
int32_t num_tensors = static_cast<int32_t>(this->tensors.size());
int32_t num_ndarrays = static_cast<int32_t>(this->ndarrays.size());
int32_t num_buffers = static_cast<int32_t>(this->buffers.size());
RETURN_NOT_OK(
dst->Write(reinterpret_cast<const uint8_t*>(&num_tensors), sizeof(int32_t)));
RETURN_NOT_OK(
dst->Write(reinterpret_cast<const uint8_t*>(&num_ndarrays), sizeof(int32_t)));
RETURN_NOT_OK(
dst->Write(reinterpret_cast<const uint8_t*>(&num_buffers), sizeof(int32_t)));

// Align stream to 8-byte offset
RETURN_NOT_OK(ipc::AlignStream(dst, ipc::kArrowIpcAlignment));
RETURN_NOT_OK(ipc::WriteRecordBatchStream({this->batch}, dst));

// Align stream to 64-byte offset so tensor bodies are 64-byte aligned
Expand All @@ -774,6 +797,11 @@ Status SerializedPyObject::WriteTo(io::OutputStream* dst) {
RETURN_NOT_OK(ipc::AlignStream(dst, ipc::kTensorAlignment));
}

for (const auto& tensor : this->ndarrays) {
RETURN_NOT_OK(ipc::WriteTensor(*tensor, dst, &metadata_length, &body_length));
RETURN_NOT_OK(ipc::AlignStream(dst, ipc::kTensorAlignment));
}

for (const auto& buffer : this->buffers) {
int64_t size = buffer->size();
RETURN_NOT_OK(dst->Write(reinterpret_cast<const uint8_t*>(&size), sizeof(int64_t)));
Expand All @@ -795,6 +823,8 @@ Status SerializedPyObject::GetComponents(MemoryPool* memory_pool, PyObject** out
// quite esoteric
PyDict_SetItemString(result.obj(), "num_tensors",
PyLong_FromSize_t(this->tensors.size()));
PyDict_SetItemString(result.obj(), "num_ndarrays",
PyLong_FromSize_t(this->ndarrays.size()));
PyDict_SetItemString(result.obj(), "num_buffers",
PyLong_FromSize_t(this->buffers.size()));
PyDict_SetItemString(result.obj(), "data", buffers);
Expand Down Expand Up @@ -835,6 +865,14 @@ Status SerializedPyObject::GetComponents(MemoryPool* memory_pool, PyObject** out
RETURN_NOT_OK(PushBuffer(message->body()));
}

// For each ndarray, get a metadata buffer and a buffer for the body
for (const auto& ndarray : this->ndarrays) {
std::unique_ptr<ipc::Message> message;
RETURN_NOT_OK(ipc::GetTensorMessage(*ndarray, memory_pool, &message));
RETURN_NOT_OK(PushBuffer(message->metadata()));
RETURN_NOT_OK(PushBuffer(message->body()));
}

for (const auto& buf : this->buffers) {
RETURN_NOT_OK(PushBuffer(buf));
}
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/python/serialize.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ namespace py {
struct ARROW_EXPORT SerializedPyObject {
std::shared_ptr<RecordBatch> batch;
std::vector<std::shared_ptr<Tensor>> tensors;
std::vector<std::shared_ptr<Tensor>> ndarrays;
std::vector<std::shared_ptr<Buffer>> buffers;

/// \brief Write serialized Python object to OutputStream
Expand Down
3 changes: 2 additions & 1 deletion python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -1121,7 +1121,8 @@ cdef extern from "arrow/python/api.h" namespace 'arrow::py' nogil:
CStatus ReadSerializedObject(RandomAccessFile* src,
CSerializedPyObject* out)

CStatus GetSerializedFromComponents(int num_tensors, int num_buffers,
CStatus GetSerializedFromComponents(int num_tensors, int num_ndarrays,
int num_buffers,
object buffers,
CSerializedPyObject* out)

Expand Down
4 changes: 3 additions & 1 deletion python/pyarrow/serialization.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -281,12 +281,14 @@ cdef class SerializedPyObject:
"""
cdef:
int num_tensors = components['num_tensors']
int num_ndarrays = components['num_ndarrays']
int num_buffers = components['num_buffers']
list buffers = components['data']
SerializedPyObject result = SerializedPyObject()

with nogil:
check_status(GetSerializedFromComponents(num_tensors, num_buffers,
check_status(GetSerializedFromComponents(num_tensors, num_ndarrays,
num_buffers,
buffers, &result.data))

return result
Expand Down
67 changes: 67 additions & 0 deletions python/pyarrow/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import numpy as np

import pyarrow
from pyarrow.compat import builtin_pickle
from pyarrow.lib import (SerializationContext, _default_serialization_context,
py_buffer)
Expand Down Expand Up @@ -55,6 +56,57 @@ def _deserialize_numpy_array_list(data):
return np.array(data[0], dtype=np.dtype(data[1]))


# ----------------------------------------------------------------------
# pyarrow.RecordBatch-specific serialization matters

def _serialize_pyarrow_recordbatch(batch):
output_stream = pyarrow.BufferOutputStream()
writer = pyarrow.RecordBatchStreamWriter(output_stream,
schema=batch.schema)
writer.write_batch(batch)
writer.close()
return output_stream.getvalue() # This will also close the stream.


def _deserialize_pyarrow_recordbatch(buf):
reader = pyarrow.RecordBatchStreamReader(buf)
batch = reader.read_next_batch()
return batch


# ----------------------------------------------------------------------
# pyarrow.Array-specific serialization matters

def _serialize_pyarrow_array(array):
# TODO(suquark): implement more effcient array serialization.
batch = pyarrow.RecordBatch.from_arrays([array], [''])
return _serialize_pyarrow_recordbatch(batch)


def _deserialize_pyarrow_array(buf):
# TODO(suquark): implement more effcient array deserialization.
batch = _deserialize_pyarrow_recordbatch(buf)
return batch.columns[0]


# ----------------------------------------------------------------------
# pyarrow.Table-specific serialization matters

def _serialize_pyarrow_table(table):
output_stream = pyarrow.BufferOutputStream()
writer = pyarrow.RecordBatchStreamWriter(output_stream,
schema=table.schema)
writer.write_table(table)
writer.close()
return output_stream.getvalue() # This will also close the stream.


def _deserialize_pyarrow_table(buf):
reader = pyarrow.RecordBatchStreamReader(buf)
table = reader.read_all()
return table


def _pickle_to_buffer(x):
pickled = builtin_pickle.dumps(x, protocol=builtin_pickle.HIGHEST_PROTOCOL)
return py_buffer(pickled)
Expand Down Expand Up @@ -205,6 +257,21 @@ def _deserialize_default_dict(data):
custom_serializer=_serialize_numpy_array_list,
custom_deserializer=_deserialize_numpy_array_list)

serialization_context.register_type(
pyarrow.Array, 'pyarrow.Array',
custom_serializer=_serialize_pyarrow_array,
custom_deserializer=_deserialize_pyarrow_array)

serialization_context.register_type(
pyarrow.RecordBatch, 'pyarrow.RecordBatch',
custom_serializer=_serialize_pyarrow_recordbatch,
custom_deserializer=_deserialize_pyarrow_recordbatch)

serialization_context.register_type(
pyarrow.Table, 'pyarrow.Table',
custom_serializer=_serialize_pyarrow_table,
custom_deserializer=_deserialize_pyarrow_table)

_register_custom_pandas_handlers(serialization_context)


Expand Down
Loading